001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt.strategy; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Set; 025 026import org.apache.activemq.ActiveMQPrefetchPolicy; 027import org.apache.activemq.broker.region.DurableTopicSubscription; 028import org.apache.activemq.broker.region.RegionBroker; 029import org.apache.activemq.broker.region.TopicRegion; 030import org.apache.activemq.command.ActiveMQDestination; 031import org.apache.activemq.command.ActiveMQTopic; 032import org.apache.activemq.command.ConsumerInfo; 033import org.apache.activemq.command.RemoveSubscriptionInfo; 034import org.apache.activemq.command.Response; 035import org.apache.activemq.command.SubscriptionInfo; 036import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; 037import org.apache.activemq.transport.mqtt.MQTTProtocolException; 038import org.apache.activemq.transport.mqtt.MQTTProtocolSupport; 039import org.apache.activemq.transport.mqtt.MQTTSubscription; 040import org.apache.activemq.transport.mqtt.ResponseHandler; 041import org.fusesource.mqtt.client.QoS; 042import org.fusesource.mqtt.client.Topic; 043import org.fusesource.mqtt.codec.CONNECT; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * Default implementation that uses unmapped topic subscriptions. 049 */ 050public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy { 051 052 private static final Logger LOG = LoggerFactory.getLogger(MQTTDefaultSubscriptionStrategy.class); 053 054 private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>()); 055 056 @Override 057 public void onConnect(CONNECT connect) throws MQTTProtocolException { 058 List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId()); 059 060 if (connect.cleanSession()) { 061 deleteDurableSubs(subs); 062 } else { 063 restoreDurableSubs(subs); 064 } 065 } 066 067 @Override 068 public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException { 069 ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName)); 070 071 ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); 072 consumerInfo.setDestination(destination); 073 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH); 074 consumerInfo.setRetroactive(true); 075 consumerInfo.setDispatchAsync(true); 076 // create durable subscriptions only when clean session is false 077 if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) { 078 consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName); 079 consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH); 080 } 081 082 if (protocol.getActiveMQSubscriptionPrefetch() > 0) { 083 consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); 084 } 085 086 return doSubscribe(consumerInfo, topicName, requestedQoS); 087 } 088 089 @Override 090 public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException { 091 092 ActiveMQDestination destination = mqttSubscription.getDestination(); 093 094 // check whether the Topic has been recovered in restoreDurableSubs 095 // mark subscription available for recovery for duplicate subscription 096 if (restoredSubs.remove(destination.getPhysicalName())) { 097 return; 098 } 099 100 super.onReSubscribe(mqttSubscription); 101 } 102 103 @Override 104 public void onUnSubscribe(String topicName) throws MQTTProtocolException { 105 MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName); 106 if (subscription != null) { 107 doUnSubscribe(subscription); 108 109 // check if the durable sub also needs to be removed 110 if (subscription.getConsumerInfo().getSubscriptionName() != null) { 111 // also remove it from restored durable subscriptions set 112 restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName())); 113 114 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 115 rsi.setConnectionId(protocol.getConnectionId()); 116 rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName()); 117 rsi.setClientId(protocol.getClientId()); 118 protocol.sendToActiveMQ(rsi, new ResponseHandler() { 119 @Override 120 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 121 // ignore failures.. 122 } 123 }); 124 } 125 } 126 } 127 128 private void deleteDurableSubs(List<SubscriptionInfo> subs) { 129 try { 130 for (SubscriptionInfo sub : subs) { 131 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 132 rsi.setConnectionId(protocol.getConnectionId()); 133 rsi.setSubscriptionName(sub.getSubcriptionName()); 134 rsi.setClientId(sub.getClientId()); 135 protocol.sendToActiveMQ(rsi, new ResponseHandler() { 136 @Override 137 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 138 // ignore failures.. 139 } 140 }); 141 } 142 } catch (Throwable e) { 143 LOG.warn("Could not delete the MQTT durable subs.", e); 144 } 145 } 146 147 private void restoreDurableSubs(List<SubscriptionInfo> subs) { 148 try { 149 for (SubscriptionInfo sub : subs) { 150 String name = sub.getSubcriptionName(); 151 String[] split = name.split(":", 2); 152 QoS qoS = QoS.valueOf(split[0]); 153 onSubscribe(new Topic(split[1], qoS)); 154 // mark this durable subscription as restored by Broker 155 restoredSubs.add(split[1]); 156 } 157 } catch (IOException e) { 158 LOG.warn("Could not restore the MQTT durable subs.", e); 159 } 160 } 161 162 List<SubscriptionInfo> lookupSubscription(String clientId) throws MQTTProtocolException { 163 List<SubscriptionInfo> result = new ArrayList<SubscriptionInfo>(); 164 RegionBroker regionBroker; 165 166 try { 167 regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); 168 } catch (Exception e) { 169 throw new MQTTProtocolException("Error recovering durable subscriptions: " + e.getMessage(), false, e); 170 } 171 172 final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); 173 List<DurableTopicSubscription> subscriptions = topicRegion.lookupSubscriptions(clientId); 174 if (subscriptions != null) { 175 for (DurableTopicSubscription subscription : subscriptions) { 176 LOG.debug("Recovered durable sub:{} on connect", subscription); 177 178 SubscriptionInfo info = new SubscriptionInfo(); 179 180 info.setDestination(subscription.getActiveMQDestination()); 181 info.setSubcriptionName(subscription.getSubscriptionKey().getSubscriptionName()); 182 info.setClientId(clientId); 183 184 result.add(info); 185 } 186 } 187 188 return result; 189 } 190}