001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt.strategy;
018
019import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertActiveMQToMQTT;
020import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertMQTTToActiveMQ;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.List;
027import java.util.Set;
028import java.util.StringTokenizer;
029
030import org.apache.activemq.ActiveMQPrefetchPolicy;
031import org.apache.activemq.broker.region.QueueRegion;
032import org.apache.activemq.broker.region.RegionBroker;
033import org.apache.activemq.command.ActiveMQDestination;
034import org.apache.activemq.command.ActiveMQQueue;
035import org.apache.activemq.command.ActiveMQTopic;
036import org.apache.activemq.command.ConsumerInfo;
037import org.apache.activemq.command.DestinationInfo;
038import org.apache.activemq.command.Response;
039import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
040import org.apache.activemq.transport.mqtt.MQTTProtocolException;
041import org.apache.activemq.transport.mqtt.MQTTSubscription;
042import org.apache.activemq.transport.mqtt.ResponseHandler;
043import org.fusesource.mqtt.client.QoS;
044import org.fusesource.mqtt.codec.CONNECT;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Subscription strategy that converts all MQTT subscribes that would be durable to
050 * Virtual Topic Queue subscriptions.  Also maps all publish requests to be prefixed
051 * with the VirtualTopic. prefix unless already present.
052 */
053public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
054
055    private static final String VIRTUALTOPIC_PREFIX = "VirtualTopic.";
056    private static final String VIRTUALTOPIC_CONSUMER_PREFIX = "Consumer.";
057
058    private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionStrategy.class);
059
060    private final Set<ActiveMQQueue> restoredQueues = Collections.synchronizedSet(new HashSet<ActiveMQQueue>());
061
062    @Override
063    public void onConnect(CONNECT connect) throws MQTTProtocolException {
064        List<ActiveMQQueue> queues = lookupQueues(protocol.getClientId());
065
066        if (connect.cleanSession()) {
067            deleteDurableQueues(queues);
068        } else {
069            restoreDurableQueue(queues);
070        }
071    }
072
073    @Override
074    public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
075        ActiveMQDestination destination = null;
076        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
077        if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
078            String converted = VIRTUALTOPIC_CONSUMER_PREFIX + protocol.getClientId() + ":" + requestedQoS + "." +
079                               VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
080            destination = new ActiveMQQueue(converted);
081            consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH);
082        } else {
083            String converted = convertMQTTToActiveMQ(topicName);
084            if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
085                converted = VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName);
086            }
087            destination = new ActiveMQTopic(converted);
088            consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
089        }
090
091        consumerInfo.setDestination(destination);
092        if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
093            consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
094        }
095        consumerInfo.setRetroactive(true);
096        consumerInfo.setDispatchAsync(true);
097
098        return doSubscribe(consumerInfo, topicName, requestedQoS);
099    }
100
101    @Override
102    public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
103
104        ActiveMQDestination destination = mqttSubscription.getDestination();
105
106        // check whether the Topic has been recovered in restoreDurableSubs
107        // mark subscription available for recovery for duplicate subscription
108        if (restoredQueues.remove(destination)) {
109            return;
110        }
111
112        if (mqttSubscription.getDestination().isTopic()) {
113            super.onReSubscribe(mqttSubscription);
114        } else {
115            doUnSubscribe(mqttSubscription);
116            ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
117            consumerInfo.setConsumerId(getNextConsumerId());
118            doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS());
119        }
120    }
121
122    @Override
123    public void onUnSubscribe(String topicName) throws MQTTProtocolException {
124        MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
125        if (subscription != null) {
126            doUnSubscribe(subscription);
127            if (subscription.getDestination().isQueue()) {
128                DestinationInfo remove = new DestinationInfo();
129                remove.setConnectionId(protocol.getConnectionId());
130                remove.setDestination(subscription.getDestination());
131                remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
132
133                protocol.sendToActiveMQ(remove, new ResponseHandler() {
134                    @Override
135                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
136                        // ignore failures..
137                    }
138                });
139            }
140        }
141    }
142
143    @Override
144    public ActiveMQDestination onSend(String topicName) {
145        if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) {
146            return new ActiveMQTopic(VIRTUALTOPIC_PREFIX + topicName);
147        } else {
148            return new ActiveMQTopic(topicName);
149        }
150    }
151
152    @Override
153    public String onSend(ActiveMQDestination destination) {
154        String destinationName = destination.getPhysicalName();
155        int position = destinationName.indexOf(VIRTUALTOPIC_PREFIX);
156        if (position >= 0) {
157            destinationName = destinationName.substring(position+VIRTUALTOPIC_PREFIX.length()).substring(0);
158        }
159        return destinationName;
160    }
161
162    @Override
163    public boolean isControlTopic(ActiveMQDestination destination) {
164        String destinationName = destination.getPhysicalName();
165        if (destinationName.startsWith("$") || destinationName.startsWith(VIRTUALTOPIC_PREFIX + "$")) {
166            return true;
167        }
168        return false;
169    }
170
171    private void deleteDurableQueues(List<ActiveMQQueue> queues) {
172        try {
173            for (ActiveMQQueue queue : queues) {
174                LOG.debug("Removing subscription for {} ",queue.getPhysicalName());
175                DestinationInfo removeAction = new DestinationInfo();
176                removeAction.setConnectionId(protocol.getConnectionId());
177                removeAction.setDestination(queue);
178                removeAction.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
179
180                protocol.sendToActiveMQ(removeAction, new ResponseHandler() {
181                    @Override
182                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
183                        // ignore failures..
184                    }
185                });
186            }
187        } catch (Throwable e) {
188            LOG.warn("Could not delete the MQTT durable subs.", e);
189        }
190    }
191
192    private void restoreDurableQueue(List<ActiveMQQueue> queues) {
193        try {
194            for (ActiveMQQueue queue : queues) {
195                String name = queue.getPhysicalName().substring(VIRTUALTOPIC_CONSUMER_PREFIX.length());
196                StringTokenizer tokenizer = new StringTokenizer(name);
197                tokenizer.nextToken(":.");
198                String qosString = tokenizer.nextToken();
199                tokenizer.nextToken();
200                String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1));
201                QoS qoS = QoS.valueOf(qosString);
202                LOG.trace("Restoring subscription: {}:{}", topicName, qoS);
203
204                ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
205                consumerInfo.setDestination(queue);
206                consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH);
207                if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
208                    consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
209                }
210                consumerInfo.setRetroactive(true);
211                consumerInfo.setDispatchAsync(true);
212
213                doSubscribe(consumerInfo, topicName, qoS);
214
215                // mark this durable subscription as restored by Broker
216                restoredQueues.add(queue);
217            }
218        } catch (IOException e) {
219            LOG.warn("Could not restore the MQTT durable subs.", e);
220        }
221    }
222
223    List<ActiveMQQueue> lookupQueues(String clientId) throws MQTTProtocolException {
224        List<ActiveMQQueue> result = new ArrayList<ActiveMQQueue>();
225        RegionBroker regionBroker;
226
227        try {
228            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
229        } catch (Exception e) {
230            throw new MQTTProtocolException("Error recovering queues: " + e.getMessage(), false, e);
231        }
232
233        final QueueRegion queueRegion = (QueueRegion) regionBroker.getQueueRegion();
234        for (ActiveMQDestination destination : queueRegion.getDestinationMap().keySet()) {
235            if (destination.isQueue() && !destination.isTemporary()) {
236                if (destination.getPhysicalName().startsWith("Consumer." + clientId)) {
237                    LOG.debug("Recovered client sub: {} on connect", destination.getPhysicalName());
238                    result.add((ActiveMQQueue) destination);
239                }
240            }
241        }
242
243        return result;
244    }
245}