001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.virtual;
018
019import org.apache.activemq.broker.ProducerBrokerExchange;
020import org.apache.activemq.broker.region.Destination;
021import org.apache.activemq.broker.region.DestinationFilter;
022import org.apache.activemq.broker.region.Topic;
023import org.apache.activemq.command.ActiveMQDestination;
024import org.apache.activemq.command.ActiveMQQueue;
025import org.apache.activemq.command.Message;
026import org.apache.activemq.util.LRUCache;
027
028/**
029 * A Destination which implements <a href="http://activemq.org/site/virtual-destinations.html">Virtual Topic</a>
030 */
031public class VirtualTopicInterceptor extends DestinationFilter {
032
033    private final String prefix;
034    private final String postfix;
035    private final boolean local;
036    private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
037
038    public VirtualTopicInterceptor(Destination next, String prefix, String postfix, boolean local) {
039        super(next);
040        this.prefix = prefix;
041        this.postfix = postfix;
042        this.local = local;
043    }
044
045    public Topic getTopic() {
046        return (Topic) this.next;
047    }
048
049    @Override
050    public void send(ProducerBrokerExchange context, Message message) throws Exception {
051        if (!message.isAdvisory() && !(local && message.getBrokerPath() != null)) {
052            ActiveMQDestination queueConsumers = getQueueConsumersWildcard(message.getDestination());
053            send(context, message, queueConsumers);
054        }
055        super.send(context, message);
056    }
057
058    protected ActiveMQDestination getQueueConsumersWildcard(ActiveMQDestination original) {
059        ActiveMQQueue queue;
060        synchronized (cache) {
061            queue = cache.get(original);
062            if (queue == null) {
063                queue = new ActiveMQQueue(prefix + original.getPhysicalName() + postfix);
064                cache.put(original, queue);
065            }
066        }
067        return queue;
068    }
069}