001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt.strategy;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.List;
024import java.util.Set;
025
026import org.apache.activemq.ActiveMQPrefetchPolicy;
027import org.apache.activemq.broker.region.DurableTopicSubscription;
028import org.apache.activemq.broker.region.RegionBroker;
029import org.apache.activemq.broker.region.TopicRegion;
030import org.apache.activemq.command.ActiveMQDestination;
031import org.apache.activemq.command.ActiveMQTopic;
032import org.apache.activemq.command.ConsumerInfo;
033import org.apache.activemq.command.RemoveSubscriptionInfo;
034import org.apache.activemq.command.Response;
035import org.apache.activemq.command.SubscriptionInfo;
036import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
037import org.apache.activemq.transport.mqtt.MQTTProtocolException;
038import org.apache.activemq.transport.mqtt.MQTTProtocolSupport;
039import org.apache.activemq.transport.mqtt.MQTTSubscription;
040import org.apache.activemq.transport.mqtt.ResponseHandler;
041import org.fusesource.mqtt.client.QoS;
042import org.fusesource.mqtt.client.Topic;
043import org.fusesource.mqtt.codec.CONNECT;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * Default implementation that uses unmapped topic subscriptions.
049 */
050public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStrategy {
051
052    private static final Logger LOG = LoggerFactory.getLogger(MQTTDefaultSubscriptionStrategy.class);
053
054    private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
055
056    @Override
057    public void onConnect(CONNECT connect) throws MQTTProtocolException {
058        List<SubscriptionInfo> subs = lookupSubscription(protocol.getClientId());
059
060        if (connect.cleanSession()) {
061            deleteDurableSubs(subs);
062        } else {
063            restoreDurableSubs(subs);
064        }
065    }
066
067    @Override
068    public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException {
069        ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
070
071        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
072        consumerInfo.setDestination(destination);
073        consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_TOPIC_PREFETCH);
074        consumerInfo.setRetroactive(true);
075        consumerInfo.setDispatchAsync(true);
076        // create durable subscriptions only when clean session is false
077        if (!protocol.isCleanSession() && protocol.getClientId() != null && requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
078            consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
079            consumerInfo.setPrefetchSize(ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH);
080        }
081
082        if (protocol.getActiveMQSubscriptionPrefetch() > 0) {
083            consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
084        }
085
086        return doSubscribe(consumerInfo, topicName, requestedQoS);
087    }
088
089    @Override
090    public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException {
091
092        ActiveMQDestination destination = mqttSubscription.getDestination();
093
094        // check whether the Topic has been recovered in restoreDurableSubs
095        // mark subscription available for recovery for duplicate subscription
096        if (restoredSubs.remove(destination.getPhysicalName())) {
097            return;
098        }
099
100        super.onReSubscribe(mqttSubscription);
101    }
102
103    @Override
104    public void onUnSubscribe(String topicName) throws MQTTProtocolException {
105        MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
106        if (subscription != null) {
107            doUnSubscribe(subscription);
108
109            // check if the durable sub also needs to be removed
110            if (subscription.getConsumerInfo().getSubscriptionName() != null) {
111                // also remove it from restored durable subscriptions set
112                restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
113
114                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
115                rsi.setConnectionId(protocol.getConnectionId());
116                rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName());
117                rsi.setClientId(protocol.getClientId());
118                protocol.sendToActiveMQ(rsi, new ResponseHandler() {
119                    @Override
120                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
121                        // ignore failures..
122                    }
123                });
124            }
125        }
126    }
127
128    private void deleteDurableSubs(List<SubscriptionInfo> subs) {
129        try {
130            for (SubscriptionInfo sub : subs) {
131                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
132                rsi.setConnectionId(protocol.getConnectionId());
133                rsi.setSubscriptionName(sub.getSubcriptionName());
134                rsi.setClientId(sub.getClientId());
135                protocol.sendToActiveMQ(rsi, new ResponseHandler() {
136                    @Override
137                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
138                        // ignore failures..
139                    }
140                });
141            }
142        } catch (Throwable e) {
143            LOG.warn("Could not delete the MQTT durable subs.", e);
144        }
145    }
146
147    private void restoreDurableSubs(List<SubscriptionInfo> subs) {
148        try {
149            for (SubscriptionInfo sub : subs) {
150                String name = sub.getSubcriptionName();
151                String[] split = name.split(":", 2);
152                QoS qoS = QoS.valueOf(split[0]);
153                onSubscribe(new Topic(split[1], qoS));
154                // mark this durable subscription as restored by Broker
155                restoredSubs.add(split[1]);
156            }
157        } catch (IOException e) {
158            LOG.warn("Could not restore the MQTT durable subs.", e);
159        }
160    }
161
162    List<SubscriptionInfo> lookupSubscription(String clientId) throws MQTTProtocolException {
163        List<SubscriptionInfo> result = new ArrayList<SubscriptionInfo>();
164        RegionBroker regionBroker;
165
166        try {
167            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
168        } catch (Exception e) {
169            throw new MQTTProtocolException("Error recovering durable subscriptions: " + e.getMessage(), false, e);
170        }
171
172        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
173        List<DurableTopicSubscription> subscriptions = topicRegion.lookupSubscriptions(clientId);
174        if (subscriptions != null) {
175            for (DurableTopicSubscription subscription : subscriptions) {
176                LOG.debug("Recovered durable sub:{} on connect", subscription);
177
178                SubscriptionInfo info = new SubscriptionInfo();
179
180                info.setDestination(subscription.getActiveMQDestination());
181                info.setSubcriptionName(subscription.getSubscriptionKey().getSubscriptionName());
182                info.setClientId(clientId);
183
184                result.add(info);
185            }
186        }
187
188        return result;
189    }
190}