001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.virtual; 018 019import org.apache.activemq.broker.Broker; 020import org.apache.activemq.broker.ProducerBrokerExchange; 021import org.apache.activemq.broker.region.Destination; 022import org.apache.activemq.broker.region.Subscription; 023import org.apache.activemq.command.ActiveMQDestination; 024import org.apache.activemq.command.Message; 025import org.apache.activemq.filter.BooleanExpression; 026import org.apache.activemq.filter.MessageEvaluationContext; 027import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 028import org.apache.activemq.plugin.SubQueueSelectorCacheBroker; 029import org.apache.activemq.selector.SelectorParser; 030import org.apache.activemq.util.LRUCache; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034import java.io.IOException; 035import java.util.List; 036import java.util.Set; 037 038public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicInterceptor { 039 private static final Logger LOG = LoggerFactory.getLogger(SelectorAwareVirtualTopicInterceptor.class); 040 LRUCache<String,BooleanExpression> expressionCache = new LRUCache<String,BooleanExpression>(); 041 private SubQueueSelectorCacheBroker selectorCachePlugin; 042 043 public SelectorAwareVirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) { 044 super(next, prefix, postfix, local); 045 } 046 047 /** 048 * Respect the selectors of the subscriptions to ensure only matched messages are dispatched to 049 * the virtual queues, hence there is no build up of unmatched messages on these destinations 050 */ 051 @Override 052 protected void send(ProducerBrokerExchange context, Message message, ActiveMQDestination destination) throws Exception { 053 Broker broker = context.getConnectionContext().getBroker(); 054 Set<Destination> destinations = broker.getDestinations(destination); 055 056 for (Destination dest : destinations) { 057 if (matchesSomeConsumer(broker, message, dest)) { 058 dest.send(context, message.copy()); 059 } 060 } 061 } 062 063 private boolean matchesSomeConsumer(final Broker broker, Message message, Destination dest) throws IOException { 064 boolean matches = false; 065 MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); 066 msgContext.setDestination(dest.getActiveMQDestination()); 067 msgContext.setMessageReference(message); 068 List<Subscription> subs = dest.getConsumers(); 069 for (Subscription sub : subs) { 070 if (sub.matches(message, msgContext)) { 071 matches = true; 072 break; 073 } 074 } 075 if (matches == false) { 076 matches = tryMatchingCachedSubs(broker, dest, msgContext); 077 } 078 return matches; 079 } 080 081 private boolean tryMatchingCachedSubs(final Broker broker, Destination dest, MessageEvaluationContext msgContext) { 082 boolean matches = false; 083 LOG.debug("No active consumer match found. Will try cache if configured..."); 084 085 //retrieve the specific plugin class and lookup the selector for the destination. 086 final SubQueueSelectorCacheBroker cache = getSubQueueSelectorCacheBrokerPlugin(broker); 087 088 if (cache != null) { 089 final Set<String> selectors = cache.getSelector(dest.getActiveMQDestination().getQualifiedName()); 090 for (String selector : selectors) { 091 try { 092 final BooleanExpression expression = getExpression(selector); 093 matches = expression.matches(msgContext); 094 if (matches) { 095 return true; 096 } 097 } catch (Exception e) { 098 LOG.error(e.getMessage(), e); 099 } 100 } 101 } 102 return matches; 103 } 104 105 private BooleanExpression getExpression(String selector) throws Exception{ 106 BooleanExpression result; 107 synchronized(expressionCache){ 108 result = expressionCache.get(selector); 109 if (result == null){ 110 result = compileSelector(selector); 111 expressionCache.put(selector,result); 112 } 113 } 114 return result; 115 } 116 117 /** 118 * @return The SubQueueSelectorCacheBroker instance or null if no such broker is available. 119 */ 120 private SubQueueSelectorCacheBroker getSubQueueSelectorCacheBrokerPlugin(final Broker broker) { 121 if (selectorCachePlugin == null) { 122 selectorCachePlugin = (SubQueueSelectorCacheBroker) broker.getAdaptor(SubQueueSelectorCacheBroker.class); 123 } //if 124 125 return selectorCachePlugin; 126 } 127 128 /** 129 * Pre-compile the JMS selector. 130 * 131 * @param selectorExpression The non-null JMS selector expression. 132 */ 133 private BooleanExpression compileSelector(final String selectorExpression) throws Exception { 134 return SelectorParser.parse(selectorExpression); 135 } 136}