001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020import java.util.Map; 021import java.util.concurrent.ConcurrentHashMap; 022import java.util.concurrent.ConcurrentMap; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.zip.DataFormatException; 025import java.util.zip.Inflater; 026 027import javax.jms.Destination; 028import javax.jms.InvalidClientIDException; 029import javax.jms.JMSException; 030import javax.jms.Message; 031import javax.security.auth.login.CredentialException; 032 033import org.apache.activemq.broker.BrokerService; 034import org.apache.activemq.broker.BrokerServiceAware; 035import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; 036import org.apache.activemq.command.ActiveMQBytesMessage; 037import org.apache.activemq.command.ActiveMQDestination; 038import org.apache.activemq.command.ActiveMQMapMessage; 039import org.apache.activemq.command.ActiveMQMessage; 040import org.apache.activemq.command.ActiveMQTextMessage; 041import org.apache.activemq.command.Command; 042import org.apache.activemq.command.ConnectionError; 043import org.apache.activemq.command.ConnectionId; 044import org.apache.activemq.command.ConnectionInfo; 045import org.apache.activemq.command.ExceptionResponse; 046import org.apache.activemq.command.MessageAck; 047import org.apache.activemq.command.MessageDispatch; 048import org.apache.activemq.command.MessageId; 049import org.apache.activemq.command.ProducerId; 050import org.apache.activemq.command.ProducerInfo; 051import org.apache.activemq.command.Response; 052import org.apache.activemq.command.SessionId; 053import org.apache.activemq.command.SessionInfo; 054import org.apache.activemq.command.ShutdownInfo; 055import org.apache.activemq.transport.mqtt.strategy.MQTTSubscriptionStrategy; 056import org.apache.activemq.util.ByteArrayOutputStream; 057import org.apache.activemq.util.ByteSequence; 058import org.apache.activemq.util.FactoryFinder; 059import org.apache.activemq.util.IOExceptionSupport; 060import org.apache.activemq.util.IdGenerator; 061import org.apache.activemq.util.JMSExceptionSupport; 062import org.apache.activemq.util.LRUCache; 063import org.apache.activemq.util.LongSequenceGenerator; 064import org.fusesource.hawtbuf.Buffer; 065import org.fusesource.hawtbuf.UTF8Buffer; 066import org.fusesource.mqtt.client.QoS; 067import org.fusesource.mqtt.client.Topic; 068import org.fusesource.mqtt.codec.CONNACK; 069import org.fusesource.mqtt.codec.CONNECT; 070import org.fusesource.mqtt.codec.DISCONNECT; 071import org.fusesource.mqtt.codec.MQTTFrame; 072import org.fusesource.mqtt.codec.PINGREQ; 073import org.fusesource.mqtt.codec.PINGRESP; 074import org.fusesource.mqtt.codec.PUBACK; 075import org.fusesource.mqtt.codec.PUBCOMP; 076import org.fusesource.mqtt.codec.PUBLISH; 077import org.fusesource.mqtt.codec.PUBREC; 078import org.fusesource.mqtt.codec.PUBREL; 079import org.fusesource.mqtt.codec.SUBACK; 080import org.fusesource.mqtt.codec.SUBSCRIBE; 081import org.fusesource.mqtt.codec.UNSUBACK; 082import org.fusesource.mqtt.codec.UNSUBSCRIBE; 083import org.slf4j.Logger; 084import org.slf4j.LoggerFactory; 085 086public class MQTTProtocolConverter { 087 088 private static final Logger LOG = LoggerFactory.getLogger(MQTTProtocolConverter.class); 089 090 public static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; 091 public static final int V3_1 = 3; 092 public static final int V3_1_1 = 4; 093 094 private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator(); 095 private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); 096 private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD = 0.5; 097 static final int DEFAULT_CACHE_SIZE = 5000; 098 099 private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); 100 private final SessionId sessionId = new SessionId(connectionId, -1); 101 private final ProducerId producerId = new ProducerId(sessionId, 1); 102 private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator(); 103 104 private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>(); 105 private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String, ActiveMQDestination>(DEFAULT_CACHE_SIZE); 106 private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination, String>(DEFAULT_CACHE_SIZE); 107 108 private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE); 109 private final Map<Short, PUBREC> publisherRecs = new LRUCache<Short, PUBREC>(DEFAULT_CACHE_SIZE); 110 111 private final MQTTTransport mqttTransport; 112 private final BrokerService brokerService; 113 114 private final Object commnadIdMutex = new Object(); 115 private int lastCommandId; 116 private final AtomicBoolean connected = new AtomicBoolean(false); 117 private final ConnectionInfo connectionInfo = new ConnectionInfo(); 118 private CONNECT connect; 119 private String clientId; 120 private long defaultKeepAlive; 121 private int activeMQSubscriptionPrefetch = -1; 122 private final MQTTPacketIdGenerator packetIdGenerator; 123 private boolean publishDollarTopics; 124 125 public int version; 126 127 private final FactoryFinder STRATAGY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/strategies/"); 128 129 /* 130 * Subscription strategy configuration element. 131 * > mqtt-default-subscriptions 132 * > mqtt-virtual-topic-subscriptions 133 */ 134 private String subscriptionStrategyName = "mqtt-default-subscriptions"; 135 private MQTTSubscriptionStrategy subsciptionStrategy; 136 137 public MQTTProtocolConverter(MQTTTransport mqttTransport, BrokerService brokerService) { 138 this.mqttTransport = mqttTransport; 139 this.brokerService = brokerService; 140 this.packetIdGenerator = MQTTPacketIdGenerator.getMQTTPacketIdGenerator(brokerService); 141 this.defaultKeepAlive = 0; 142 } 143 144 int generateCommandId() { 145 synchronized (commnadIdMutex) { 146 return lastCommandId++; 147 } 148 } 149 150 public void sendToActiveMQ(Command command, ResponseHandler handler) { 151 152 // Lets intercept message send requests.. 153 if (command instanceof ActiveMQMessage) { 154 ActiveMQMessage msg = (ActiveMQMessage) command; 155 try { 156 if (!getPublishDollarTopics() && findSubscriptionStrategy().isControlTopic(msg.getDestination())) { 157 // We don't allow users to send to $ prefixed topics to avoid failing MQTT 3.1.1 158 // specification requirements for system assigned destinations. 159 if (handler != null) { 160 try { 161 handler.onResponse(this, new Response()); 162 } catch (IOException e) { 163 e.printStackTrace(); 164 } 165 } 166 return; 167 } 168 } catch (IOException e) { 169 e.printStackTrace(); 170 } 171 } 172 173 command.setCommandId(generateCommandId()); 174 if (handler != null) { 175 command.setResponseRequired(true); 176 resposeHandlers.put(command.getCommandId(), handler); 177 } 178 getMQTTTransport().sendToActiveMQ(command); 179 } 180 181 void sendToMQTT(MQTTFrame frame) { 182 try { 183 mqttTransport.sendToMQTT(frame); 184 } catch (IOException e) { 185 LOG.warn("Failed to send frame " + frame, e); 186 } 187 } 188 189 /** 190 * Convert a MQTT command 191 */ 192 public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException { 193 switch (frame.messageType()) { 194 case PINGREQ.TYPE: 195 LOG.debug("Received a ping from client: " + getClientId()); 196 sendToMQTT(PING_RESP_FRAME); 197 LOG.debug("Sent Ping Response to " + getClientId()); 198 break; 199 case CONNECT.TYPE: 200 CONNECT connect = new CONNECT().decode(frame); 201 onMQTTConnect(connect); 202 LOG.debug("MQTT Client {} connected. (version: {})", getClientId(), connect.version()); 203 break; 204 case DISCONNECT.TYPE: 205 LOG.debug("MQTT Client {} disconnecting", getClientId()); 206 onMQTTDisconnect(); 207 break; 208 case SUBSCRIBE.TYPE: 209 onSubscribe(new SUBSCRIBE().decode(frame)); 210 break; 211 case UNSUBSCRIBE.TYPE: 212 onUnSubscribe(new UNSUBSCRIBE().decode(frame)); 213 break; 214 case PUBLISH.TYPE: 215 onMQTTPublish(new PUBLISH().decode(frame)); 216 break; 217 case PUBACK.TYPE: 218 onMQTTPubAck(new PUBACK().decode(frame)); 219 break; 220 case PUBREC.TYPE: 221 onMQTTPubRec(new PUBREC().decode(frame)); 222 break; 223 case PUBREL.TYPE: 224 onMQTTPubRel(new PUBREL().decode(frame)); 225 break; 226 case PUBCOMP.TYPE: 227 onMQTTPubComp(new PUBCOMP().decode(frame)); 228 break; 229 default: 230 handleException(new MQTTProtocolException("Unknown MQTTFrame type: " + frame.messageType(), true), frame); 231 } 232 } 233 234 void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException { 235 if (connected.get()) { 236 throw new MQTTProtocolException("Already connected."); 237 } 238 this.connect = connect; 239 240 String clientId = ""; 241 if (connect.clientId() != null) { 242 clientId = connect.clientId().toString(); 243 } 244 245 String userName = null; 246 if (connect.userName() != null) { 247 userName = connect.userName().toString(); 248 } 249 String passswd = null; 250 if (connect.password() != null) { 251 passswd = connect.password().toString(); 252 } 253 254 version = connect.version(); 255 256 configureInactivityMonitor(connect.keepAlive()); 257 258 connectionInfo.setConnectionId(connectionId); 259 if (clientId != null && !clientId.isEmpty()) { 260 connectionInfo.setClientId(clientId); 261 } else { 262 // Clean Session MUST be set for 0 length Client Id 263 if (!connect.cleanSession()) { 264 CONNACK ack = new CONNACK(); 265 ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED); 266 try { 267 getMQTTTransport().sendToMQTT(ack.encode()); 268 getMQTTTransport().onException(IOExceptionSupport.create("Invalid Client ID", null)); 269 } catch (IOException e) { 270 getMQTTTransport().onException(IOExceptionSupport.create(e)); 271 } 272 return; 273 } 274 connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString()); 275 } 276 277 connectionInfo.setResponseRequired(true); 278 connectionInfo.setUserName(userName); 279 connectionInfo.setPassword(passswd); 280 connectionInfo.setTransportContext(mqttTransport.getPeerCertificates()); 281 282 sendToActiveMQ(connectionInfo, new ResponseHandler() { 283 @Override 284 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 285 286 if (response.isException()) { 287 // If the connection attempt fails we close the socket. 288 Throwable exception = ((ExceptionResponse) response).getException(); 289 //let the client know 290 CONNACK ack = new CONNACK(); 291 if (exception instanceof InvalidClientIDException) { 292 ack.code(CONNACK.Code.CONNECTION_REFUSED_IDENTIFIER_REJECTED); 293 } else if (exception instanceof SecurityException) { 294 ack.code(CONNACK.Code.CONNECTION_REFUSED_NOT_AUTHORIZED); 295 } else if (exception instanceof CredentialException) { 296 ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); 297 } else { 298 ack.code(CONNACK.Code.CONNECTION_REFUSED_SERVER_UNAVAILABLE); 299 } 300 getMQTTTransport().sendToMQTT(ack.encode()); 301 getMQTTTransport().onException(IOExceptionSupport.create(exception)); 302 return; 303 } 304 305 final SessionInfo sessionInfo = new SessionInfo(sessionId); 306 sendToActiveMQ(sessionInfo, null); 307 308 final ProducerInfo producerInfo = new ProducerInfo(producerId); 309 sendToActiveMQ(producerInfo, new ResponseHandler() { 310 @Override 311 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 312 313 if (response.isException()) { 314 // If the connection attempt fails we close the socket. 315 Throwable exception = ((ExceptionResponse) response).getException(); 316 CONNACK ack = new CONNACK(); 317 ack.code(CONNACK.Code.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD); 318 getMQTTTransport().sendToMQTT(ack.encode()); 319 getMQTTTransport().onException(IOExceptionSupport.create(exception)); 320 return; 321 } 322 323 CONNACK ack = new CONNACK(); 324 ack.code(CONNACK.Code.CONNECTION_ACCEPTED); 325 connected.set(true); 326 getMQTTTransport().sendToMQTT(ack.encode()); 327 328 if (connect.cleanSession()) { 329 packetIdGenerator.stopClientSession(getClientId()); 330 } else { 331 packetIdGenerator.startClientSession(getClientId()); 332 } 333 334 findSubscriptionStrategy().onConnect(connect); 335 } 336 }); 337 } 338 }); 339 } 340 341 void onMQTTDisconnect() throws MQTTProtocolException { 342 if (connected.get()) { 343 connected.set(false); 344 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 345 sendToActiveMQ(new ShutdownInfo(), null); 346 } 347 stopTransport(); 348 } 349 350 void onSubscribe(SUBSCRIBE command) throws MQTTProtocolException { 351 checkConnected(); 352 LOG.trace("MQTT SUBSCRIBE message:{} client:{} connection:{}", 353 command.messageId(), clientId, connectionInfo.getConnectionId()); 354 Topic[] topics = command.topics(); 355 if (topics != null) { 356 byte[] qos = new byte[topics.length]; 357 for (int i = 0; i < topics.length; i++) { 358 try { 359 qos[i] = findSubscriptionStrategy().onSubscribe(topics[i]); 360 } catch (IOException e) { 361 throw new MQTTProtocolException("Failed to process subscription request", true, e); 362 } 363 } 364 SUBACK ack = new SUBACK(); 365 ack.messageId(command.messageId()); 366 ack.grantedQos(qos); 367 try { 368 getMQTTTransport().sendToMQTT(ack.encode()); 369 } catch (IOException e) { 370 LOG.warn("Couldn't send SUBACK for " + command, e); 371 } 372 } else { 373 LOG.warn("No topics defined for Subscription " + command); 374 } 375 } 376 377 public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException { 378 checkConnected(); 379 UTF8Buffer[] topics = command.topics(); 380 if (topics != null) { 381 for (UTF8Buffer topic : topics) { 382 try { 383 findSubscriptionStrategy().onUnSubscribe(topic.toString()); 384 } catch (IOException e) { 385 throw new MQTTProtocolException("Failed to process unsubscribe request", true, e); 386 } 387 } 388 } 389 UNSUBACK ack = new UNSUBACK(); 390 ack.messageId(command.messageId()); 391 sendToMQTT(ack.encode()); 392 } 393 394 /** 395 * Dispatch an ActiveMQ command 396 */ 397 public void onActiveMQCommand(Command command) throws Exception { 398 if (command.isResponse()) { 399 Response response = (Response) command; 400 ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId())); 401 if (rh != null) { 402 rh.onResponse(this, response); 403 } else { 404 // Pass down any unexpected errors. Should this close the connection? 405 if (response.isException()) { 406 Throwable exception = ((ExceptionResponse) response).getException(); 407 handleException(exception, null); 408 } 409 } 410 } else if (command.isMessageDispatch()) { 411 MessageDispatch md = (MessageDispatch) command; 412 MQTTSubscription sub = findSubscriptionStrategy().getSubscription(md.getConsumerId()); 413 if (sub != null) { 414 MessageAck ack = sub.createMessageAck(md); 415 PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage()); 416 switch (publish.qos()) { 417 case AT_LEAST_ONCE: 418 case EXACTLY_ONCE: 419 publish.dup(publish.dup() ? true : md.getMessage().isRedelivered()); 420 case AT_MOST_ONCE: 421 } 422 if (ack != null && sub.expectAck(publish)) { 423 synchronized (consumerAcks) { 424 consumerAcks.put(publish.messageId(), ack); 425 } 426 } 427 LOG.trace("MQTT Snd PUBLISH message:{} client:{} connection:{}", 428 publish.messageId(), clientId, connectionInfo.getConnectionId()); 429 getMQTTTransport().sendToMQTT(publish.encode()); 430 if (ack != null && !sub.expectAck(publish)) { 431 getMQTTTransport().sendToActiveMQ(ack); 432 } 433 } 434 } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { 435 // Pass down any unexpected async errors. Should this close the connection? 436 Throwable exception = ((ConnectionError) command).getException(); 437 handleException(exception, null); 438 } else if (command.isBrokerInfo()) { 439 //ignore 440 } else { 441 LOG.debug("Do not know how to process ActiveMQ Command {}", command); 442 } 443 } 444 445 void onMQTTPublish(PUBLISH command) throws IOException, JMSException { 446 checkConnected(); 447 LOG.trace("MQTT Rcv PUBLISH message:{} client:{} connection:{}", 448 command.messageId(), clientId, connectionInfo.getConnectionId()); 449 ActiveMQMessage message = convertMessage(command); 450 message.setProducerId(producerId); 451 message.onSend(); 452 sendToActiveMQ(message, createResponseHandler(command)); 453 } 454 455 void onMQTTPubAck(PUBACK command) { 456 short messageId = command.messageId(); 457 LOG.trace("MQTT Rcv PUBACK message:{} client:{} connection:{}", 458 messageId, clientId, connectionInfo.getConnectionId()); 459 packetIdGenerator.ackPacketId(getClientId(), messageId); 460 MessageAck ack; 461 synchronized (consumerAcks) { 462 ack = consumerAcks.remove(messageId); 463 } 464 if (ack != null) { 465 getMQTTTransport().sendToActiveMQ(ack); 466 } 467 } 468 469 void onMQTTPubRec(PUBREC commnand) { 470 //from a subscriber - send a PUBREL in response 471 PUBREL pubrel = new PUBREL(); 472 pubrel.messageId(commnand.messageId()); 473 sendToMQTT(pubrel.encode()); 474 } 475 476 void onMQTTPubRel(PUBREL command) { 477 PUBREC ack; 478 synchronized (publisherRecs) { 479 ack = publisherRecs.remove(command.messageId()); 480 } 481 if (ack == null) { 482 LOG.warn("Unknown PUBREL: {} received", command.messageId()); 483 } 484 PUBCOMP pubcomp = new PUBCOMP(); 485 pubcomp.messageId(command.messageId()); 486 sendToMQTT(pubcomp.encode()); 487 } 488 489 void onMQTTPubComp(PUBCOMP command) { 490 short messageId = command.messageId(); 491 packetIdGenerator.ackPacketId(getClientId(), messageId); 492 MessageAck ack; 493 synchronized (consumerAcks) { 494 ack = consumerAcks.remove(messageId); 495 } 496 if (ack != null) { 497 getMQTTTransport().sendToActiveMQ(ack); 498 } 499 } 500 501 ActiveMQMessage convertMessage(PUBLISH command) throws JMSException { 502 ActiveMQBytesMessage msg = new ActiveMQBytesMessage(); 503 504 msg.setProducerId(producerId); 505 MessageId id = new MessageId(producerId, publisherIdGenerator.getNextSequenceId()); 506 msg.setMessageId(id); 507 LOG.trace("MQTT-->ActiveMQ: MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", 508 command.messageId(), clientId, connectionInfo.getConnectionId(), msg.getMessageId()); 509 msg.setTimestamp(System.currentTimeMillis()); 510 msg.setPriority((byte) Message.DEFAULT_PRIORITY); 511 msg.setPersistent(command.qos() != QoS.AT_MOST_ONCE && !command.retain()); 512 msg.setIntProperty(QOS_PROPERTY_NAME, command.qos().ordinal()); 513 if (command.retain()) { 514 msg.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true); 515 } 516 517 ActiveMQDestination destination; 518 synchronized (activeMQDestinationMap) { 519 destination = activeMQDestinationMap.get(command.topicName()); 520 if (destination == null) { 521 String topicName = MQTTProtocolSupport.convertMQTTToActiveMQ(command.topicName().toString()); 522 try { 523 destination = findSubscriptionStrategy().onSend(topicName); 524 } catch (IOException e) { 525 throw JMSExceptionSupport.create(e); 526 } 527 528 activeMQDestinationMap.put(command.topicName().toString(), destination); 529 } 530 } 531 532 msg.setJMSDestination(destination); 533 msg.writeBytes(command.payload().data, command.payload().offset, command.payload().length); 534 return msg; 535 } 536 537 public PUBLISH convertMessage(ActiveMQMessage message) throws IOException, JMSException, DataFormatException { 538 PUBLISH result = new PUBLISH(); 539 // packet id is set in MQTTSubscription 540 QoS qoS; 541 if (message.propertyExists(QOS_PROPERTY_NAME)) { 542 int ordinal = message.getIntProperty(QOS_PROPERTY_NAME); 543 qoS = QoS.values()[ordinal]; 544 545 } else { 546 qoS = message.isPersistent() ? QoS.AT_MOST_ONCE : QoS.AT_LEAST_ONCE; 547 } 548 result.qos(qoS); 549 if (message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY)) { 550 result.retain(true); 551 } 552 553 String topicName; 554 synchronized (mqttTopicMap) { 555 topicName = mqttTopicMap.get(message.getJMSDestination()); 556 if (topicName == null) { 557 String amqTopicName = findSubscriptionStrategy().onSend(message.getDestination()); 558 topicName = MQTTProtocolSupport.convertActiveMQToMQTT(amqTopicName); 559 mqttTopicMap.put(message.getJMSDestination(), topicName); 560 } 561 } 562 result.topicName(new UTF8Buffer(topicName)); 563 564 if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { 565 ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); 566 msg.setReadOnlyBody(true); 567 String messageText = msg.getText(); 568 if (messageText != null) { 569 result.payload(new Buffer(messageText.getBytes("UTF-8"))); 570 } 571 } else if (message.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE) { 572 ActiveMQBytesMessage msg = (ActiveMQBytesMessage) message.copy(); 573 msg.setReadOnlyBody(true); 574 byte[] data = new byte[(int) msg.getBodyLength()]; 575 msg.readBytes(data); 576 result.payload(new Buffer(data)); 577 } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { 578 ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); 579 msg.setReadOnlyBody(true); 580 Map<String, Object> map = msg.getContentMap(); 581 if (map != null) { 582 result.payload(new Buffer(map.toString().getBytes("UTF-8"))); 583 } 584 } else { 585 ByteSequence byteSequence = message.getContent(); 586 if (byteSequence != null && byteSequence.getLength() > 0) { 587 if (message.isCompressed()) { 588 Inflater inflater = new Inflater(); 589 inflater.setInput(byteSequence.data, byteSequence.offset, byteSequence.length); 590 byte[] data = new byte[4096]; 591 int read; 592 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 593 while ((read = inflater.inflate(data)) != 0) { 594 bytesOut.write(data, 0, read); 595 } 596 byteSequence = bytesOut.toByteSequence(); 597 bytesOut.close(); 598 } 599 result.payload(new Buffer(byteSequence.data, byteSequence.offset, byteSequence.length)); 600 } 601 } 602 LOG.trace("ActiveMQ-->MQTT:MQTT_MSGID:{} client:{} connection:{} ActiveMQ_MSGID:{}", 603 result.messageId(), clientId, connectionInfo.getConnectionId(), message.getMessageId()); 604 return result; 605 } 606 607 public MQTTTransport getMQTTTransport() { 608 return mqttTransport; 609 } 610 611 boolean willSent = false; 612 public void onTransportError() { 613 if (connect != null) { 614 if (connected.get()) { 615 if (connect.willTopic() != null && connect.willMessage() != null && !willSent) { 616 willSent = true; 617 try { 618 PUBLISH publish = new PUBLISH(); 619 publish.topicName(connect.willTopic()); 620 publish.qos(connect.willQos()); 621 publish.messageId(packetIdGenerator.getNextSequenceId(getClientId())); 622 publish.payload(connect.willMessage()); 623 ActiveMQMessage message = convertMessage(publish); 624 message.setProducerId(producerId); 625 message.onSend(); 626 627 sendToActiveMQ(message, null); 628 } catch (Exception e) { 629 LOG.warn("Failed to publish Will Message " + connect.willMessage()); 630 } 631 } 632 // remove connection info 633 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 634 } 635 } 636 } 637 638 void configureInactivityMonitor(short keepAliveSeconds) { 639 MQTTInactivityMonitor monitor = getMQTTTransport().getInactivityMonitor(); 640 641 // If the user specifically shuts off the InactivityMonitor with transport.useInactivityMonitor=false, 642 // then ignore configuring it because it won't exist 643 if (monitor == null) { 644 return; 645 } 646 647 // Client has sent a valid CONNECT frame, we can stop the connect checker. 648 monitor.stopConnectChecker(); 649 650 long keepAliveMS = keepAliveSeconds * 1000; 651 652 LOG.debug("MQTT Client {} requests heart beat of {} ms", getClientId(), keepAliveMS); 653 654 try { 655 // if we have a default keep-alive value, and the client is trying to turn off keep-alive, 656 657 // we'll observe the server-side configured default value (note, no grace period) 658 if (keepAliveMS == 0 && defaultKeepAlive > 0) { 659 keepAliveMS = defaultKeepAlive; 660 } 661 662 long readGracePeriod = (long) (keepAliveMS * MQTT_KEEP_ALIVE_GRACE_PERIOD); 663 664 monitor.setProtocolConverter(this); 665 monitor.setReadKeepAliveTime(keepAliveMS); 666 monitor.setReadGraceTime(readGracePeriod); 667 monitor.startReadChecker(); 668 669 LOG.debug("MQTT Client {} established heart beat of {} ms ({} ms + {} ms grace period)", 670 new Object[] { getClientId(), keepAliveMS, keepAliveMS, readGracePeriod }); 671 } catch (Exception ex) { 672 LOG.warn("Failed to start MQTT InactivityMonitor ", ex); 673 } 674 } 675 676 void handleException(Throwable exception, MQTTFrame command) { 677 LOG.warn("Exception occurred processing: \n" + command + ": " + exception.toString()); 678 LOG.debug("Exception detail", exception); 679 680 if (connected.get() && connectionInfo != null) { 681 connected.set(false); 682 sendToActiveMQ(connectionInfo.createRemoveCommand(), null); 683 } 684 stopTransport(); 685 } 686 687 void checkConnected() throws MQTTProtocolException { 688 if (!connected.get()) { 689 throw new MQTTProtocolException("Not connected."); 690 } 691 } 692 693 private void stopTransport() { 694 try { 695 getMQTTTransport().stop(); 696 } catch (Throwable e) { 697 LOG.debug("Failed to stop MQTT transport ", e); 698 } 699 } 700 701 ResponseHandler createResponseHandler(final PUBLISH command) { 702 if (command != null) { 703 switch (command.qos()) { 704 case AT_LEAST_ONCE: 705 return new ResponseHandler() { 706 @Override 707 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 708 if (response.isException()) { 709 LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException()); 710 } else { 711 PUBACK ack = new PUBACK(); 712 ack.messageId(command.messageId()); 713 LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", 714 command.messageId(), clientId, connectionInfo.getConnectionId()); 715 converter.getMQTTTransport().sendToMQTT(ack.encode()); 716 } 717 } 718 }; 719 case EXACTLY_ONCE: 720 return new ResponseHandler() { 721 @Override 722 public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { 723 if (response.isException()) { 724 LOG.warn("Failed to send MQTT Publish: ", command, ((ExceptionResponse) response).getException()); 725 } else { 726 PUBREC ack = new PUBREC(); 727 ack.messageId(command.messageId()); 728 synchronized (publisherRecs) { 729 publisherRecs.put(command.messageId(), ack); 730 } 731 LOG.trace("MQTT Snd PUBACK message:{} client:{} connection:{}", 732 command.messageId(), clientId, connectionInfo.getConnectionId()); 733 converter.getMQTTTransport().sendToMQTT(ack.encode()); 734 } 735 } 736 }; 737 case AT_MOST_ONCE: 738 break; 739 } 740 } 741 return null; 742 } 743 744 public long getDefaultKeepAlive() { 745 return defaultKeepAlive; 746 } 747 748 /** 749 * Set the default keep alive time (in milliseconds) that would be used if configured on server side 750 * and the client sends a keep-alive value of 0 (zero) on a CONNECT frame 751 * @param keepAlive the keepAlive in milliseconds 752 */ 753 public void setDefaultKeepAlive(long keepAlive) { 754 this.defaultKeepAlive = keepAlive; 755 } 756 757 public int getActiveMQSubscriptionPrefetch() { 758 return activeMQSubscriptionPrefetch; 759 } 760 761 /** 762 * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one 763 * The default = 1 764 * 765 * @param activeMQSubscriptionPrefetch 766 * set the prefetch for the corresponding ActiveMQ subscription 767 */ 768 public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) { 769 this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch; 770 } 771 772 public MQTTPacketIdGenerator getPacketIdGenerator() { 773 return packetIdGenerator; 774 } 775 776 public void setPublishDollarTopics(boolean publishDollarTopics) { 777 this.publishDollarTopics = publishDollarTopics; 778 } 779 780 public boolean getPublishDollarTopics() { 781 return publishDollarTopics; 782 } 783 784 public ConnectionId getConnectionId() { 785 return connectionId; 786 } 787 788 public SessionId getSessionId() { 789 return sessionId; 790 } 791 792 public boolean isCleanSession() { 793 return this.connect.cleanSession(); 794 } 795 796 public String getSubscriptionStrategy() { 797 return subscriptionStrategyName; 798 } 799 800 public void setSubscriptionStrategy(String name) { 801 this.subscriptionStrategyName = name; 802 } 803 804 public String getClientId() { 805 if (clientId == null) { 806 if (connect != null && connect.clientId() != null) { 807 clientId = connect.clientId().toString(); 808 } else { 809 clientId = ""; 810 } 811 } 812 return clientId; 813 } 814 815 protected MQTTSubscriptionStrategy findSubscriptionStrategy() throws IOException { 816 if (subsciptionStrategy == null) { 817 synchronized (STRATAGY_FINDER) { 818 if (subsciptionStrategy != null) { 819 return subsciptionStrategy; 820 } 821 822 MQTTSubscriptionStrategy strategy = null; 823 if (subscriptionStrategyName != null && !subscriptionStrategyName.isEmpty()) { 824 try { 825 strategy = (MQTTSubscriptionStrategy) STRATAGY_FINDER.newInstance(subscriptionStrategyName); 826 LOG.debug("MQTT Using subscription strategy: {}", subscriptionStrategyName); 827 if (strategy instanceof BrokerServiceAware) { 828 ((BrokerServiceAware)strategy).setBrokerService(brokerService); 829 } 830 strategy.initialize(this); 831 } catch (Exception e) { 832 throw IOExceptionSupport.create(e); 833 } 834 } else { 835 throw new IOException("Invalid subscription strategy name given: " + subscriptionStrategyName); 836 } 837 838 this.subsciptionStrategy = strategy; 839 } 840 } 841 return subsciptionStrategy; 842 } 843}