001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt.strategy; 018 019import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertActiveMQToMQTT; 020import static org.apache.activemq.transport.mqtt.MQTTProtocolSupport.convertMQTTToActiveMQ; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Set; 028import java.util.StringTokenizer; 029 030import org.apache.activemq.ActiveMQPrefetchPolicy; 031import org.apache.activemq.broker.region.QueueRegion; 032import org.apache.activemq.broker.region.RegionBroker; 033import org.apache.activemq.command.ActiveMQDestination; 034import org.apache.activemq.command.ActiveMQQueue; 035import org.apache.activemq.command.ActiveMQTopic; 036import org.apache.activemq.command.ConsumerInfo; 037import org.apache.activemq.command.DestinationInfo; 038import org.apache.activemq.command.Response; 039import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; 040import org.apache.activemq.transport.mqtt.MQTTProtocolException; 041import org.apache.activemq.transport.mqtt.MQTTSubscription; 042import org.apache.activemq.transport.mqtt.ResponseHandler; 043import org.fusesource.mqtt.client.QoS; 044import org.fusesource.mqtt.codec.CONNECT; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * Subscription strategy that converts all MQTT subscribes that would be durable to 050 * Virtual Topic Queue subscriptions. Also maps all publish requests to be prefixed 051 * with the VirtualTopic. prefix unless already present. 052 */ 053public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy { 054 055 private static final String VIRTUALTOPIC_PREFIX = "VirtualTopic."; 056 private static final String VIRTUALTOPIC_CONSUMER_PREFIX = "Consumer."; 057 058 private static final Logger LOG = LoggerFactory.getLogger(MQTTVirtualTopicSubscriptionStrategy.class); 059 060 private final Set<ActiveMQQueue> restoredQueues = Collections.synchronizedSet(new HashSet<ActiveMQQueue>()); 061 062 @Override 063 public void onConnect(CONNECT connect) throws MQTTProtocolException { 064 List<ActiveMQQueue> queues = lookupQueues(protocol.getClientId()); 065 066 if (connect.cleanSession()) { 067 deleteDurableQueues(queues); 068 } else { 069 restoreDurableQueue(queues); 070 } 071 } 072 073 @Override 074 public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException { 075 ActiveMQDestination destination = null; 076 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); 077 if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) { 078 String converted = VIRTUALTOPIC_CONSUMER_PREFIX + protocol.getClientId() + ":" + requestedQoS + "." + 079 VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName); 080 destination = new ActiveMQQueue(converted); 081 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH); 082 } else { 083 String converted = convertMQTTToActiveMQ(topicName); 084 if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) { 085 converted = VIRTUALTOPIC_PREFIX + convertMQTTToActiveMQ(topicName); 086 } 087 destination = new ActiveMQTopic(converted); 088 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH); 089 } 090 091 consumerInfo.setDestination(destination); 092 if (protocol.getActiveMQSubscriptionPrefetch() > 0) { 093 consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); 094 } 095 consumerInfo.setRetroactive(true); 096 consumerInfo.setDispatchAsync(true); 097 098 return doSubscribe(consumerInfo, topicName, requestedQoS); 099 } 100 101 @Override 102 public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException { 103 104 ActiveMQDestination destination = mqttSubscription.getDestination(); 105 106 // check whether the Topic has been recovered in restoreDurableSubs 107 // mark subscription available for recovery for duplicate subscription 108 if (restoredQueues.remove(destination)) { 109 return; 110 } 111 112 if (mqttSubscription.getDestination().isTopic()) { 113 super.onReSubscribe(mqttSubscription); 114 } else { 115 doUnSubscribe(mqttSubscription); 116 ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo(); 117 consumerInfo.setConsumerId(getNextConsumerId()); 118 doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS()); 119 } 120 } 121 122 @Override 123 public void onUnSubscribe(String topicName) throws MQTTProtocolException { 124 MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName); 125 if (subscription != null) { 126 doUnSubscribe(subscription); 127 if (subscription.getDestination().isQueue()) { 128 DestinationInfo remove = new DestinationInfo(); 129 remove.setConnectionId(protocol.getConnectionId()); 130 remove.setDestination(subscription.getDestination()); 131 remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 132 133 protocol.sendToActiveMQ(remove, new ResponseHandler() { 134 @Override 135 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 136 // ignore failures.. 137 } 138 }); 139 } 140 } 141 } 142 143 @Override 144 public ActiveMQDestination onSend(String topicName) { 145 if (!topicName.startsWith(VIRTUALTOPIC_PREFIX)) { 146 return new ActiveMQTopic(VIRTUALTOPIC_PREFIX + topicName); 147 } else { 148 return new ActiveMQTopic(topicName); 149 } 150 } 151 152 @Override 153 public String onSend(ActiveMQDestination destination) { 154 String destinationName = destination.getPhysicalName(); 155 int position = destinationName.indexOf(VIRTUALTOPIC_PREFIX); 156 if (position >= 0) { 157 destinationName = destinationName.substring(position+VIRTUALTOPIC_PREFIX.length()).substring(0); 158 } 159 return destinationName; 160 } 161 162 @Override 163 public boolean isControlTopic(ActiveMQDestination destination) { 164 String destinationName = destination.getPhysicalName(); 165 if (destinationName.startsWith("$") || destinationName.startsWith(VIRTUALTOPIC_PREFIX + "$")) { 166 return true; 167 } 168 return false; 169 } 170 171 private void deleteDurableQueues(List<ActiveMQQueue> queues) { 172 try { 173 for (ActiveMQQueue queue : queues) { 174 LOG.debug("Removing subscription for {} ",queue.getPhysicalName()); 175 DestinationInfo removeAction = new DestinationInfo(); 176 removeAction.setConnectionId(protocol.getConnectionId()); 177 removeAction.setDestination(queue); 178 removeAction.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 179 180 protocol.sendToActiveMQ(removeAction, new ResponseHandler() { 181 @Override 182 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 183 // ignore failures.. 184 } 185 }); 186 } 187 } catch (Throwable e) { 188 LOG.warn("Could not delete the MQTT durable subs.", e); 189 } 190 } 191 192 private void restoreDurableQueue(List<ActiveMQQueue> queues) { 193 try { 194 for (ActiveMQQueue queue : queues) { 195 String name = queue.getPhysicalName().substring(VIRTUALTOPIC_CONSUMER_PREFIX.length()); 196 StringTokenizer tokenizer = new StringTokenizer(name); 197 tokenizer.nextToken(":."); 198 String qosString = tokenizer.nextToken(); 199 tokenizer.nextToken(); 200 String topicName = convertActiveMQToMQTT(tokenizer.nextToken("").substring(1)); 201 QoS qoS = QoS.valueOf(qosString); 202 LOG.trace("Restoring subscription: {}:{}", topicName, qoS); 203 204 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); 205 consumerInfo.setDestination(queue); 206 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH); 207 if (protocol.getActiveMQSubscriptionPrefetch() > 0) { 208 consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); 209 } 210 consumerInfo.setRetroactive(true); 211 consumerInfo.setDispatchAsync(true); 212 213 doSubscribe(consumerInfo, topicName, qoS); 214 215 // mark this durable subscription as restored by Broker 216 restoredQueues.add(queue); 217 } 218 } catch (IOException e) { 219 LOG.warn("Could not restore the MQTT durable subs.", e); 220 } 221 } 222 223 List<ActiveMQQueue> lookupQueues(String clientId) throws MQTTProtocolException { 224 List<ActiveMQQueue> result = new ArrayList<ActiveMQQueue>(); 225 RegionBroker regionBroker; 226 227 try { 228 regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); 229 } catch (Exception e) { 230 throw new MQTTProtocolException("Error recovering queues: " + e.getMessage(), false, e); 231 } 232 233 final QueueRegion queueRegion = (QueueRegion) regionBroker.getQueueRegion(); 234 for (ActiveMQDestination destination : queueRegion.getDestinationMap().keySet()) { 235 if (destination.isQueue() && !destination.isTemporary()) { 236 if (destination.getPhysicalName().startsWith("Consumer." + clientId)) { 237 LOG.debug("Recovered client sub: {} on connect", destination.getPhysicalName()); 238 result.add((ActiveMQQueue) destination); 239 } 240 } 241 } 242 243 return result; 244 } 245}