001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.IOException;
020import java.util.Map;
021import java.util.concurrent.ConcurrentHashMap;
022import java.util.concurrent.ConcurrentMap;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.zip.DataFormatException;
025import java.util.zip.Inflater;
026
027import javax.jms.Destination;
028import javax.jms.InvalidClientIDException;
029import javax.jms.JMSException;
030import javax.jms.Message;
031import javax.security.auth.login.CredentialException;
032
033import org.apache.activemq.broker.BrokerService;
034import org.apache.activemq.broker.BrokerServiceAware;
035import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
036import org.apache.activemq.command.ActiveMQBytesMessage;
037import org.apache.activemq.command.ActiveMQDestination;
038import org.apache.activemq.command.ActiveMQMapMessage;
039import org.apache.activemq.command.ActiveMQMessage;
040import org.apache.activemq.command.ActiveMQTextMessage;
041import org.apache.activemq.command.Command;
042import org.apache.activemq.command.ConnectionError;
043import org.apache.activemq.command.ConnectionId;
044import org.apache.activemq.command.ConnectionInfo;
045import org.apache.activemq.command.ExceptionResponse;
046import org.apache.activemq.command.MessageAck;
047import org.apache.activemq.command.MessageDispatch;
048import org.apache.activemq.command.MessageId;
049import org.apache.activemq.command.ProducerId;
050import org.apache.activemq.command.ProducerInfo;
051import org.apache.activemq.command.Response;
052import org.apache.activemq.command.SessionId;
053import org.apache.activemq.command.SessionInfo;
054import org.apache.activemq.command.ShutdownInfo;
055import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy;
056import org.apache.activemq.util.ByteArrayOutputStream;
057import org.apache.activemq.util.ByteSequence;
058import org.apache.activemq.util.FactoryFinder;
059import org.apache.activemq.util.IOExceptionSupport;
060import org.apache.activemq.util.IdGenerator;
061import org.apache.activemq.util.JMSExceptionSupport;
062import org.apache.activemq.util.LRUCache;
063import org.apache.activemq.util.LongSequenceGenerator;
064import org.fusesource.hawtbuf.Buffer;
065import org.fusesource.hawtbuf.UTF8Buffer;
066import org.fusesource.mqtt.client.QoS;
067import org.fusesource.mqtt.client.Topic;
068import org.fusesource.mqtt.codec.CONNACK;
069import org.fusesource.mqtt.codec.CONNECT;
070import org.fusesource.mqtt.codec.DISCONNECT;
071import org.fusesource.mqtt.codec.MQTTFrame;
072import org.fusesource.mqtt.codec.PINGREQ;
073import org.fusesource.mqtt.codec.PINGRESP;
074import org.fusesource.mqtt.codec.PUBACK;
075import org.fusesource.mqtt.codec.PUBCOMP;
076import org.fusesource.mqtt.codec.PUBLISH;
077import org.fusesource.mqtt.codec.PUBREC;
078import org.fusesource.mqtt.codec.PUBREL;
079import org.fusesource.mqtt.codec.SUBACK;
080import org.fusesource.mqtt.codec.SUBSCRIBE;
081import org.fusesource.mqtt.codec.UNSUBACK;
082import org.fusesource.mqtt.codec.UNSUBSCRIBE;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086public class MQTTProtocolConverter {
087
088    private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class);
089
090    public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
091    public static final int V3_1 = 3;
092    public static final int V3_1_1 = 4;
093
094    private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
095    private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
096    private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5;
097    static final int DEFAULT_CACHE_SIZE = 5000;
098
099    private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
100    private final SessionId sessionId = new SessionId(connectionId, -1);
101    private final ProducerId producerId = new ProducerId(sessionId, 1);
102    private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator();
103
104    private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
105    private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE);
106    private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE);
107
108    private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
109    private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE);
110
111    private final MQTTTransport mqttTransport;
112    private final BrokerService brokerService;
113
114    private final Object commnadIdMutex = new Object();
115    private int lastCommandId;
116    private final AtomicBoolean connected = new AtomicBoolean(false);
117    private final ConnectionInfo connectionInfo = new ConnectionInfo();
118    private CONNECT connect;
119    private String clientId;
120    private long defaultKeepAlive;
121    private int activeMQSubscriptionPrefetch = -1;
122    private final MQTTPacketIdGenerator packetIdGenerator;
123    private boolean publishDollarTopics;
124
125    public int version;
126
127    private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/");
128
129    /*
130     * Subscription strategy configuration element.
131     *   > mqtt-default-subscriptions
132     *   > mqtt-virtual-topic-subscriptions
133     */
134    private String subscriptionStrategyName = "mqtt-default-subscriptions";
135    private MQTTSubscriptionStrategy subsciptionStrategy;
136
137    public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) {
138        this.mqttTransport = mqttTransport;
139        this.brokerService = brokerService;
140        this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService);
141        this.defaultKeepAlive = 0;
142    }
143
144    int generateCommandId() {
145        synchronized (commnadIdMutex) {
146            return lastCommandId++;
147        }
148    }
149
150    public void sendToActiveMQ(Command command, ResponseHandler handler) {
151
152        // Lets intercept message send requests..
153        if (command instanceof ActiveMQMessage) {
154            ActiveMQMessage msg = (ActiveMQMessage) command;
155            try {
156                if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) {
157                    // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1
158                    // specification requirements for system assigned destinations.
159                    if (handler != null) {
160                        try {
161                            handler.onResponse(this, new Response());
162                        } catch (IOException e) {
163                            e.printStackTrace();
164                        }
165                    }
166                    return;
167                }
168            } catch (IOException e) {
169                e.printStackTrace();
170            }
171        }
172
173        command.setCommandId(generateCommandId());
174        if (handler != null) {
175            command.setResponseRequired(true);
176            resposeHandlers.put(command.getCommandId(), handler);
177        }
178        getMQTTTransport().sendToActiveMQ(command);
179    }
180
181    void sendToMQTT(MQTTFrame frame) {
182        try {
183            mqttTransport.sendToMQTT(frame);
184        } catch (IOException e) {
185            LOG.warn("Failed to send frame " + frame, e);
186        }
187    }
188
189    /**
190     * Convert a MQTT command
191     */
192    public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
193        switch (frame.messageType()) {
194            case PINGREQ.TYPE:
195                LOG.debug("Received a ping from client: " + getClientId());
196                sendToMQTT(PING_RESP_FRAME);
197                LOG.debug("Sent Ping Response to " + getClientId());
198                break;
199            case CONNECT.TYPE:
200                CONNECT connect = new CONNECT().decode(frame);
201                onMQTTConnect(connect);
202                LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version());
203                break;
204            case DISCONNECT.TYPE:
205                LOG.debug("MQTT Client {} disconnecting", getClientId());
206                onMQTTDisconnect();
207                break;
208            case SUBSCRIBE.TYPE:
209                onSubscribe(new SUBSCRIBE().decode(frame));
210                break;
211            case UNSUBSCRIBE.TYPE:
212                onUnSubscribe(new UNSUBSCRIBE().decode(frame));
213                break;
214            case PUBLISH.TYPE:
215                onMQTTPublish(new PUBLISH().decode(frame));
216                break;
217            case PUBACK.TYPE:
218                onMQTTPubAck(new PUBACK().decode(frame));
219                break;
220            case PUBREC.TYPE:
221                onMQTTPubRec(new PUBREC().decode(frame));
222                break;
223            case PUBREL.TYPE:
224                onMQTTPubRel(new PUBREL().decode(frame));
225                break;
226            case PUBCOMP.TYPE:
227                onMQTTPubComp(new PUBCOMP().decode(frame));
228                break;
229            default:
230                handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame);
231        }
232    }
233
234    void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
235        if (connected.get()) {
236            throw new MQTTProtocolException("Already connected.");
237        }
238        this.connect = connect;
239
240        String clientId = "";
241        if (connect.clientId() != null) {
242            clientId = connect.clientId().toString();
243        }
244
245        String userName = null;
246        if (connect.userName() != null) {
247            userName = connect.userName().toString();
248        }
249        String passswd = null;
250        if (connect.password() != null) {
251            passswd = connect.password().toString();
252        }
253
254        version = connect.version();
255
256        configureInactivityMonitor(connect.keepAlive());
257
258        connectionInfo.setConnectionId(connectionId);
259        if (clientId != null && !clientId.isEmpty()) {
260            connectionInfo.setClientId(clientId);
261        } else {
262            // Clean Session MUST be set for 0 length Client Id
263            if (!connect.cleanSession()) {
264                CONNACK ack = new CONNACK();
265                ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
266                try {
267                    getMQTTTransport().sendToMQTT(ack.encode());
268                    getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null));
269                } catch (IOException e) {
270                    getMQTTTransport().onException(IOExceptionSupport.create(e));
271                }
272                return;
273            }
274            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
275        }
276
277        connectionInfo.setResponseRequired(true);
278        connectionInfo.setUserName(userName);
279        connectionInfo.setPassword(passswd);
280        connectionInfo.setTransportContext(mqttTransport.getPeerCertificates());
281
282        sendToActiveMQ(connectionInfo, new ResponseHandler() {
283            @Override
284            public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
285
286                if (response.isException()) {
287                    // If the connection attempt fails we close the socket.
288                    Throwable exception = ((ExceptionResponse) response).getException();
289                    //let the client know
290                    CONNACK ack = new CONNACK();
291                    if (exception instanceof InvalidClientIDException) {
292                        ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
293                    } else if (exception instanceof SecurityException) {
294                        ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED);
295                    } else if (exception instanceof CredentialException) {
296                        ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
297                    } else {
298                        ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
299                    }
300                    getMQTTTransport().sendToMQTT(ack.encode());
301                    getMQTTTransport().onException(IOExceptionSupport.create(exception));
302                    return;
303                }
304
305                final SessionInfo sessionInfo = new SessionInfo(sessionId);
306                sendToActiveMQ(sessionInfo, null);
307
308                final ProducerInfo producerInfo = new ProducerInfo(producerId);
309                sendToActiveMQ(producerInfo, new ResponseHandler() {
310                    @Override
311                    public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
312
313                        if (response.isException()) {
314                            // If the connection attempt fails we close the socket.
315                            Throwable exception = ((ExceptionResponse) response).getException();
316                            CONNACK ack = new CONNACK();
317                            ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD);
318                            getMQTTTransport().sendToMQTT(ack.encode());
319                            getMQTTTransport().onException(IOExceptionSupport.create(exception));
320                            return;
321                        }
322
323                        CONNACK ack = new CONNACK();
324                        ack.code(CONNACK.Code.CONNECTION_ACCEPTED);
325                        connected.set(true);
326                        getMQTTTransport().sendToMQTT(ack.encode());
327
328                        if (connect.cleanSession()) {
329                            packetIdGenerator.stopClientSession(getClientId());
330                        } else {
331                            packetIdGenerator.startClientSession(getClientId());
332                        }
333
334                        findSubscriptionStrategy().onConnect(connect);
335                    }
336                });
337            }
338        });
339    }
340
341    void onMQTTDisconnect() throws MQTTProtocolException {
342        if (connected.get()) {
343            connected.set(false);
344            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
345            sendToActiveMQ(new ShutdownInfo(), null);
346        }
347        stopTransport();
348    }
349
350    void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException {
351        checkConnected();
352        LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}",
353                  command.messageId(), clientId, connectionInfo.getConnectionId());
354        Topic[] topics = command.topics();
355        if (topics != null) {
356            byte[] qos = new byte[topics.length];
357            for (int i = 0; i < topics.length; i++) {
358                try {
359                    qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]);
360                } catch (IOException e) {
361                    throw new MQTTProtocolException("Failed to process subscription request", true, e);
362                }
363            }
364            SUBACK ack = new SUBACK();
365            ack.messageId(command.messageId());
366            ack.grantedQos(qos);
367            try {
368                getMQTTTransport().sendToMQTT(ack.encode());
369            } catch (IOException e) {
370                LOG.warn("Couldn't send SUBACK for " + command, e);
371            }
372        } else {
373            LOG.warn("No topics defined for Subscription " + command);
374        }
375    }
376
377    public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
378        checkConnected();
379        UTF8Buffer[] topics = command.topics();
380        if (topics != null) {
381            for (UTF8Buffer topic : topics) {
382                try {
383                    findSubscriptionStrategy().onUnSubscribe(topic.toString());
384                } catch (IOException e) {
385                    throw new MQTTProtocolException("Failed to process unsubscribe request", true, e);
386                }
387            }
388        }
389        UNSUBACK ack = new UNSUBACK();
390        ack.messageId(command.messageId());
391        sendToMQTT(ack.encode());
392    }
393
394    /**
395     * Dispatch an ActiveMQ command
396     */
397    public void onActiveMQCommand(Command command) throws Exception {
398        if (command.isResponse()) {
399            Response response = (Response) command;
400            ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
401            if (rh != null) {
402                rh.onResponse(this, response);
403            } else {
404                // Pass down any unexpected errors. Should this close the connection?
405                if (response.isException()) {
406                    Throwable exception = ((ExceptionResponse) response).getException();
407                    handleException(exception, null);
408                }
409            }
410        } else if (command.isMessageDispatch()) {
411            MessageDispatch md = (MessageDispatch) command;
412            MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId());
413            if (sub != null) {
414                MessageAck ack = sub.createMessageAck(md);
415                PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
416                switch (publish.qos()) {
417                    case AT_LEAST_ONCE:
418                    case EXACTLY_ONCE:
419                        publish.dup(publish.dup() ? true : md.getMessage().isRedelivered());
420                    case AT_MOST_ONCE:
421                }
422                if (ack != null && sub.expectAck(publish)) {
423                    synchronized (consumerAcks) {
424                        consumerAcks.put(publish.messageId(), ack);
425                    }
426                }
427                LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}",
428                          publish.messageId(), clientId, connectionInfo.getConnectionId());
429                getMQTTTransport().sendToMQTT(publish.encode());
430                if (ack != null && !sub.expectAck(publish)) {
431                    getMQTTTransport().sendToActiveMQ(ack);
432                }
433            }
434        } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
435            // Pass down any unexpected async errors. Should this close the connection?
436            Throwable exception = ((ConnectionError) command).getException();
437            handleException(exception, null);
438        } else if (command.isBrokerInfo()) {
439            //ignore
440        } else {
441            LOG.debug("Do not know how to process ActiveMQ Command {}", command);
442        }
443    }
444
445    void onMQTTPublish(PUBLISH command) throws IOException, JMSException {
446        checkConnected();
447        LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}",
448                  command.messageId(), clientId, connectionInfo.getConnectionId());
449        ActiveMQMessage message = convertMessage(command);
450        message.setProducerId(producerId);
451        message.onSend();
452        sendToActiveMQ(message, createResponseHandler(command));
453    }
454
455    void onMQTTPubAck(PUBACK command) {
456        short messageId = command.messageId();
457        LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}",
458                  messageId, clientId, connectionInfo.getConnectionId());
459        packetIdGenerator.ackPacketId(getClientId(), messageId);
460        MessageAck ack;
461        synchronized (consumerAcks) {
462            ack = consumerAcks.remove(messageId);
463        }
464        if (ack != null) {
465            getMQTTTransport().sendToActiveMQ(ack);
466        }
467    }
468
469    void onMQTTPubRec(PUBREC commnand) {
470        //from a subscriber - send a PUBREL in response
471        PUBREL pubrel = new PUBREL();
472        pubrel.messageId(commnand.messageId());
473        sendToMQTT(pubrel.encode());
474    }
475
476    void onMQTTPubRel(PUBREL command) {
477        PUBREC ack;
478        synchronized (publisherRecs) {
479            ack = publisherRecs.remove(command.messageId());
480        }
481        if (ack == null) {
482            LOG.warn("Unknown PUBREL: {} received", command.messageId());
483        }
484        PUBCOMP pubcomp = new PUBCOMP();
485        pubcomp.messageId(command.messageId());
486        sendToMQTT(pubcomp.encode());
487    }
488
489    void onMQTTPubComp(PUBCOMP command) {
490        short messageId = command.messageId();
491        packetIdGenerator.ackPacketId(getClientId(), messageId);
492        MessageAck ack;
493        synchronized (consumerAcks) {
494            ack = consumerAcks.remove(messageId);
495        }
496        if (ack != null) {
497            getMQTTTransport().sendToActiveMQ(ack);
498        }
499    }
500
501    ActiveMQMessage convertMessage(PUBLISH command) throws JMSException {
502        ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
503
504        msg.setProducerId(producerId);
505        MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId());
506        msg.setMessageId(id);
507        LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
508                command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId());
509        msg.setTimestamp(System.currentTimeMillis());
510        msg.setPriority((byte) Message.DEFAULT_PRIORITY);
511        msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain());
512        msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal());
513        if (command.retain()) {
514            msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
515        }
516
517        ActiveMQDestination destination;
518        synchronized (activeMQDestinationMap) {
519            destination = activeMQDestinationMap.get(command.topicName());
520            if (destination == null) {
521                String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString());
522                try {
523                    destination = findSubscriptionStrategy().onSend(topicName);
524                } catch (IOException e) {
525                    throw JMSExceptionSupport.create(e);
526                }
527
528                activeMQDestinationMap.put(command.topicName().toString(), destination);
529            }
530        }
531
532        msg.setJMSDestination(destination);
533        msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length);
534        return msg;
535    }
536
537    public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException {
538        PUBLISH result = new PUBLISH();
539        // packet id is set in MQTTSubscription
540        QoS qoS;
541        if (message.propertyExists(QOS_PROPERTY_NAME)) {
542            int ordinal = message.getIntProperty(QOS_PROPERTY_NAME);
543            qoS = QoS.values()[ordinal];
544
545        } else {
546            qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE;
547        }
548        result.qos(qoS);
549        if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) {
550            result.retain(true);
551        }
552
553        String topicName;
554        synchronized (mqttTopicMap) {
555            topicName = mqttTopicMap.get(message.getJMSDestination());
556            if (topicName == null) {
557                String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination());
558                topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName);
559                mqttTopicMap.put(message.getJMSDestination(), topicName);
560            }
561        }
562        result.topicName(new UTF8Buffer(topicName));
563
564        if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
565            ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
566            msg.setReadOnlyBody(true);
567            String messageText = msg.getText();
568            if (messageText != null) {
569                result.payload(new Buffer(messageText.getBytes("UTF-8")));
570            }
571        } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) {
572            ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy();
573            msg.setReadOnlyBody(true);
574            byte[] data = new byte[(int) msg.getBodyLength()];
575            msg.readBytes(data);
576            result.payload(new Buffer(data));
577        } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
578            ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
579            msg.setReadOnlyBody(true);
580            Map<String, Object> map = msg.getContentMap();
581            if (map != null) {
582                result.payload(new Buffer(map.toString().getBytes("UTF-8")));
583            }
584        } else {
585            ByteSequence byteSequence = message.getContent();
586            if (byteSequence != null && byteSequence.getLength() > 0) {
587                if (message.isCompressed()) {
588                    Inflater inflater = new Inflater();
589                    inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length);
590                    byte[] data = new byte[4096];
591                    int read;
592                    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
593                    while ((read = inflater.inflate(data)) != 0) {
594                        bytesOut.write(data, 0, read);
595                    }
596                    byteSequence = bytesOut.toByteSequence();
597                    bytesOut.close();
598                }
599                result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length));
600            }
601        }
602        LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}",
603                result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId());
604        return result;
605    }
606
607    public MQTTTransport getMQTTTransport() {
608        return mqttTransport;
609    }
610
611    boolean willSent = false;
612    public void onTransportError() {
613        if (connect != null) {
614            if (connected.get()) {
615                if (connect.willTopic() != null && connect.willMessage() != null && !willSent) {
616                    willSent = true;
617                    try {
618                        PUBLISH publish = new PUBLISH();
619                        publish.topicName(connect.willTopic());
620                        publish.qos(connect.willQos());
621                        publish.messageId(packetIdGenerator.getNextSequenceId(getClientId()));
622                        publish.payload(connect.willMessage());
623                        ActiveMQMessage message = convertMessage(publish);
624                        message.setProducerId(producerId);
625                        message.onSend();
626
627                        sendToActiveMQ(message, null);
628                    } catch (Exception e) {
629                        LOG.warn("Failed to publish Will Message " + connect.willMessage());
630                    }
631                }
632                // remove connection info
633                sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
634            }
635        }
636    }
637
638    void configureInactivityMonitor(short keepAliveSeconds) {
639        MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor();
640
641        // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false,
642        // then ignore configuring it because it won't exist
643        if (monitor == null) {
644            return;
645        }
646
647        // Client has sent a valid CONNECT frame, we can stop the connect checker.
648        monitor.stopConnectChecker();
649
650        long keepAliveMS = keepAliveSeconds * 1000;
651
652        LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS);
653
654        try {
655            // if we have a default keep-alive value, and the client is trying to turn off keep-alive,
656
657            // we'll observe the server-side configured default value (note, no grace period)
658            if (keepAliveMS == 0 && defaultKeepAlive > 0) {
659                keepAliveMS = defaultKeepAlive;
660            }
661
662            long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD);
663
664            monitor.setProtocolConverter(this);
665            monitor.setReadKeepAliveTime(keepAliveMS);
666            monitor.setReadGraceTime(readGracePeriod);
667            monitor.startReadChecker();
668
669            LOG.debug("MQTT Client {} established heart beat of  {} ms ({} ms + {} ms grace period)",
670                      new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod });
671        } catch (Exception ex) {
672            LOG.warn("Failed to start MQTT InactivityMonitor ", ex);
673        }
674    }
675
676    void handleException(Throwable exception, MQTTFrame command) {
677        LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString());
678        LOG.debug("Exception detail", exception);
679
680        if (connected.get() && connectionInfo != null) {
681            connected.set(false);
682            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
683        }
684        stopTransport();
685    }
686
687    void checkConnected() throws MQTTProtocolException {
688        if (!connected.get()) {
689            throw new MQTTProtocolException("Not connected.");
690        }
691    }
692
693    private void stopTransport() {
694        try {
695            getMQTTTransport().stop();
696        } catch (Throwable e) {
697            LOG.debug("Failed to stop MQTT transport ", e);
698        }
699    }
700
701    ResponseHandler createResponseHandler(final PUBLISH command) {
702        if (command != null) {
703            switch (command.qos()) {
704                case AT_LEAST_ONCE:
705                    return new ResponseHandler() {
706                        @Override
707                        public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
708                            if (response.isException()) {
709                                LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
710                            } else {
711                                PUBACK ack = new PUBACK();
712                                ack.messageId(command.messageId());
713                                LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
714                                          command.messageId(), clientId, connectionInfo.getConnectionId());
715                                converter.getMQTTTransport().sendToMQTT(ack.encode());
716                            }
717                        }
718                    };
719                case EXACTLY_ONCE:
720                    return new ResponseHandler() {
721                        @Override
722                        public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException {
723                            if (response.isException()) {
724                                LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException());
725                            } else {
726                                PUBREC ack = new PUBREC();
727                                ack.messageId(command.messageId());
728                                synchronized (publisherRecs) {
729                                    publisherRecs.put(command.messageId(), ack);
730                                }
731                                LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}",
732                                          command.messageId(), clientId, connectionInfo.getConnectionId());
733                                converter.getMQTTTransport().sendToMQTT(ack.encode());
734                            }
735                        }
736                    };
737                case AT_MOST_ONCE:
738                    break;
739            }
740        }
741        return null;
742    }
743
744    public long getDefaultKeepAlive() {
745        return defaultKeepAlive;
746    }
747
748    /**
749     * Set the default keep alive time (in milliseconds) that would be used if configured on server side
750     * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame
751     * @param keepAlive the keepAlive in milliseconds
752     */
753    public void setDefaultKeepAlive(long keepAlive) {
754        this.defaultKeepAlive = keepAlive;
755    }
756
757    public int getActiveMQSubscriptionPrefetch() {
758        return activeMQSubscriptionPrefetch;
759    }
760
761    /**
762     * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
763     * The default = 1
764     *
765     * @param activeMQSubscriptionPrefetch
766     *        set the prefetch for the corresponding ActiveMQ subscription
767     */
768    public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
769        this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
770    }
771
772    public MQTTPacketIdGenerator getPacketIdGenerator() {
773        return packetIdGenerator;
774    }
775
776    public void setPublishDollarTopics(boolean publishDollarTopics) {
777        this.publishDollarTopics = publishDollarTopics;
778    }
779
780    public boolean getPublishDollarTopics() {
781        return publishDollarTopics;
782    }
783
784    public ConnectionId getConnectionId() {
785        return connectionId;
786    }
787
788    public SessionId getSessionId() {
789        return sessionId;
790    }
791
792    public boolean isCleanSession() {
793        return this.connect.cleanSession();
794    }
795
796    public String getSubscriptionStrategy() {
797        return subscriptionStrategyName;
798    }
799
800    public void setSubscriptionStrategy(String name) {
801        this.subscriptionStrategyName = name;
802    }
803
804    public String getClientId() {
805        if (clientId == null) {
806            if (connect != null && connect.clientId() != null) {
807                clientId = connect.clientId().toString();
808            } else {
809                clientId = "";
810            }
811        }
812        return clientId;
813    }
814
815    protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException {
816        if (subsciptionStrategy == null) {
817            synchronized (STRATAGY_FINDER) {
818                if (subsciptionStrategy != null) {
819                    return subsciptionStrategy;
820                }
821
822                MQTTSubscriptionStrategy strategy = null;
823                if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) {
824                    try {
825                        strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName);
826                        LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName);
827                        if (strategy instanceof BrokerServiceAware) {
828                            ((BrokerServiceAware)strategy).setBrokerService(brokerService);
829                        }
830                        strategy.initialize(this);
831                    } catch (Exception e) {
832                        throw IOExceptionSupport.create(e);
833                    }
834                } else {
835                    throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName);
836                }
837
838                this.subsciptionStrategy = strategy;
839            }
840        }
841        return subsciptionStrategy;
842    }
843}