001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.virtual;
018
019import java.util.regex.Matcher;
020import java.util.regex.Pattern;
021
022import org.apache.activemq.broker.Broker;
023import org.apache.activemq.broker.ConnectionContext;
024import org.apache.activemq.broker.region.Destination;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ActiveMQQueue;
027import org.apache.activemq.command.ActiveMQTopic;
028import org.apache.activemq.filter.DestinationFilter;
029
030/**
031 * Creates <a href="http://activemq.org/site/virtual-destinations.html">Virtual
032 * Topics</a> using a prefix and postfix. The virtual destination creates a
033 * wildcard that is then used to look up all active queue subscriptions which
034 * match.
035 *
036 * @org.apache.xbean.XBean
037 */
038public class VirtualTopic implements VirtualDestination {
039
040    private String prefix = "Consumer.*.";
041    private String postfix = "";
042    private String name = ">";
043    private boolean selectorAware = false;
044    private boolean local = false;
045
046    @Override
047    public ActiveMQDestination getVirtualDestination() {
048        return new ActiveMQTopic(getName());
049    }
050
051    @Override
052    public Destination intercept(Destination destination) {
053        return selectorAware ? new SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(), isLocal()) : new VirtualTopicInterceptor(
054            destination, getPrefix(), getPostfix(), isLocal());
055    }
056
057    @Override
058    public ActiveMQDestination getMappedDestinations() {
059        return new ActiveMQQueue(prefix + name + postfix);
060    }
061
062    @Override
063    public Destination interceptMappedDestination(Destination destination) {
064        // do a reverse map from destination to get actual virtual destination
065        final String physicalName = destination.getActiveMQDestination().getPhysicalName();
066        final Pattern pattern = Pattern.compile(getRegex(prefix) + "(.*)" + getRegex(postfix));
067        final Matcher matcher = pattern.matcher(physicalName);
068        if (matcher.matches()) {
069            final String virtualName = matcher.group(1);
070            return new MappedQueueFilter(new ActiveMQTopic(virtualName), destination);
071        }
072        return destination;
073    }
074
075    private String getRegex(String part) {
076        StringBuilder builder = new StringBuilder();
077        for (char c : part.toCharArray()) {
078            switch (c) {
079                case '.':
080                    builder.append("\\.");
081                    break;
082                case '*':
083                    builder.append("[^\\.]*");
084                    break;
085                default:
086                    builder.append(c);
087            }
088        }
089        return builder.toString();
090    }
091
092    @Override
093    public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
094        if (destination.isQueue() && destination.isPattern()) {
095            DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(prefix + DestinationFilter.ANY_DESCENDENT));
096            if (filter.matches(destination)) {
097                broker.addDestination(context, destination, false);
098
099            }
100        }
101    }
102
103    @Override
104    public void remove(Destination destination) {
105    }
106
107    // Properties
108    // -------------------------------------------------------------------------
109
110    public String getPostfix() {
111        return postfix;
112    }
113
114    /**
115     * Sets any postix used to identify the queue consumers
116     */
117    public void setPostfix(String postfix) {
118        this.postfix = postfix;
119    }
120
121    public String getPrefix() {
122        return prefix;
123    }
124
125    /**
126     * Sets the prefix wildcard used to identify the queue consumers for a given
127     * topic
128     */
129    public void setPrefix(String prefix) {
130        this.prefix = prefix;
131    }
132
133    public String getName() {
134        return name;
135    }
136
137    public void setName(String name) {
138        this.name = name;
139    }
140
141    /**
142     * Indicates whether the selectors of consumers are used to determine
143     * dispatch to a virtual destination, when true only messages matching an
144     * existing consumer will be dispatched.
145     *
146     * @param selectorAware
147     *            when true take consumer selectors into consideration
148     */
149    public void setSelectorAware(boolean selectorAware) {
150        this.selectorAware = selectorAware;
151    }
152
153    public boolean isSelectorAware() {
154        return selectorAware;
155    }
156
157    public boolean isLocal() {
158        return local;
159    }
160
161    public void setLocal(boolean local) {
162        this.local = local;
163    }
164
165    @Override
166    public String toString() {
167        return new StringBuilder("VirtualTopic:").append(prefix).append(',').append(name).append(',').
168                                                  append(postfix).append(',').append(selectorAware).
169                                                  append(',').append(local).toString();
170    }
171}