001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.mqtt; 018 019import java.io.IOException; 020import java.security.cert.X509Certificate; 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import javax.jms.JMSException; 024 025import org.apache.activemq.broker.BrokerService; 026import org.apache.activemq.command.Command; 027import org.apache.activemq.transport.Transport; 028import org.apache.activemq.transport.TransportFilter; 029import org.apache.activemq.transport.TransportListener; 030import org.apache.activemq.transport.tcp.SslTransport; 031import org.apache.activemq.util.IOExceptionSupport; 032import org.apache.activemq.wireformat.WireFormat; 033import org.fusesource.mqtt.codec.CONNACK; 034import org.fusesource.mqtt.codec.CONNECT; 035import org.fusesource.mqtt.codec.DISCONNECT; 036import org.fusesource.mqtt.codec.MQTTFrame; 037import org.fusesource.mqtt.codec.PINGREQ; 038import org.fusesource.mqtt.codec.PINGRESP; 039import org.fusesource.mqtt.codec.PUBACK; 040import org.fusesource.mqtt.codec.PUBCOMP; 041import org.fusesource.mqtt.codec.PUBLISH; 042import org.fusesource.mqtt.codec.PUBREC; 043import org.fusesource.mqtt.codec.PUBREL; 044import org.fusesource.mqtt.codec.SUBACK; 045import org.fusesource.mqtt.codec.SUBSCRIBE; 046import org.fusesource.mqtt.codec.UNSUBSCRIBE; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050/** 051 * The MQTTTransportFilter normally sits on top of a TcpTransport that has been 052 * configured with the StompWireFormat and is used to convert MQTT commands to 053 * ActiveMQ commands. All of the conversion work is done by delegating to the 054 * MQTTProtocolConverter 055 */ 056public class MQTTTransportFilter extends TransportFilter implements MQTTTransport { 057 private static final Logger LOG = LoggerFactory.getLogger(MQTTTransportFilter.class); 058 private static final Logger TRACE = LoggerFactory.getLogger(MQTTTransportFilter.class.getPackage().getName() + ".MQTTIO"); 059 private final MQTTProtocolConverter protocolConverter; 060 private MQTTInactivityMonitor monitor; 061 private MQTTWireFormat wireFormat; 062 private final AtomicBoolean stopped = new AtomicBoolean(); 063 private long connectAttemptTimeout = MQTTWireFormat.DEFAULT_CONNECTION_TIMEOUT; 064 065 private boolean trace; 066 private final Object sendLock = new Object(); 067 068 public MQTTTransportFilter(Transport next, WireFormat wireFormat, BrokerService brokerService) { 069 super(next); 070 this.protocolConverter = new MQTTProtocolConverter(this, brokerService); 071 072 if (wireFormat instanceof MQTTWireFormat) { 073 this.wireFormat = (MQTTWireFormat) wireFormat; 074 } 075 } 076 077 @Override 078 public void oneway(Object o) throws IOException { 079 try { 080 final Command command = (Command) o; 081 protocolConverter.onActiveMQCommand(command); 082 } catch (Exception e) { 083 throw IOExceptionSupport.create(e); 084 } 085 } 086 087 @Override 088 public void onCommand(Object command) { 089 try { 090 MQTTFrame frame = (MQTTFrame) command; 091 if (trace) { 092 TRACE.trace("Received: " + toString(frame)); 093 } 094 protocolConverter.onMQTTCommand(frame); 095 } catch (IOException e) { 096 onException(e); 097 } catch (JMSException e) { 098 onException(IOExceptionSupport.create(e)); 099 } 100 } 101 102 @Override 103 public void sendToActiveMQ(Command command) { 104 TransportListener l = transportListener; 105 if (l != null) { 106 l.onCommand(command); 107 } 108 } 109 110 @Override 111 public void sendToMQTT(MQTTFrame command) throws IOException { 112 if( !stopped.get() ) { 113 if (trace) { 114 TRACE.trace("Sending : " + toString(command)); 115 } 116 Transport n = next; 117 if (n != null) { 118 // sync access to underlying transport buffer 119 synchronized (sendLock) { 120 n.oneway(command); 121 } 122 } 123 } 124 } 125 126 static private String toString(MQTTFrame frame) { 127 if( frame == null ) 128 return null; 129 try { 130 switch (frame.messageType()) { 131 case PINGREQ.TYPE: return new PINGREQ().decode(frame).toString(); 132 case PINGRESP.TYPE: return new PINGRESP().decode(frame).toString(); 133 case CONNECT.TYPE: return new CONNECT().decode(frame).toString(); 134 case DISCONNECT.TYPE: return new DISCONNECT().decode(frame).toString(); 135 case SUBSCRIBE.TYPE: return new SUBSCRIBE().decode(frame).toString(); 136 case UNSUBSCRIBE.TYPE: return new UNSUBSCRIBE().decode(frame).toString(); 137 case PUBLISH.TYPE: return new PUBLISH().decode(frame).toString(); 138 case PUBACK.TYPE: return new PUBACK().decode(frame).toString(); 139 case PUBREC.TYPE: return new PUBREC().decode(frame).toString(); 140 case PUBREL.TYPE: return new PUBREL().decode(frame).toString(); 141 case PUBCOMP.TYPE: return new PUBCOMP().decode(frame).toString(); 142 case CONNACK.TYPE: return new CONNACK().decode(frame).toString(); 143 case SUBACK.TYPE: return new SUBACK().decode(frame).toString(); 144 default: return frame.toString(); 145 } 146 } catch (Throwable e) { 147 e.printStackTrace(); 148 return frame.toString(); 149 } 150 } 151 152 @Override 153 public void start() throws Exception { 154 if (monitor != null) { 155 monitor.startConnectChecker(getConnectAttemptTimeout()); 156 } 157 super.start(); 158 } 159 160 @Override 161 public void stop() throws Exception { 162 if (stopped.compareAndSet(false, true)) { 163 super.stop(); 164 } 165 } 166 167 @Override 168 public X509Certificate[] getPeerCertificates() { 169 if (next instanceof SslTransport) { 170 X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates(); 171 if (trace && peerCerts != null) { 172 LOG.debug("Peer Identity has been verified\n"); 173 } 174 return peerCerts; 175 } 176 return null; 177 } 178 179 public boolean isTrace() { 180 return trace; 181 } 182 183 public void setTrace(boolean trace) { 184 this.trace = trace; 185 } 186 187 @Override 188 public MQTTInactivityMonitor getInactivityMonitor() { 189 return monitor; 190 } 191 192 public void setInactivityMonitor(MQTTInactivityMonitor monitor) { 193 this.monitor = monitor; 194 } 195 196 @Override 197 public MQTTWireFormat getWireFormat() { 198 return this.wireFormat; 199 } 200 201 @Override 202 public void onException(IOException error) { 203 protocolConverter.onTransportError(); 204 super.onException(error); 205 } 206 207 public long getDefaultKeepAlive() { 208 return protocolConverter != null ? protocolConverter.getDefaultKeepAlive() : -1; 209 } 210 211 public void setDefaultKeepAlive(long defaultHeartBeat) { 212 protocolConverter.setDefaultKeepAlive(defaultHeartBeat); 213 } 214 215 /** 216 * @return the timeout value used to fail a connection if no CONNECT frame read. 217 */ 218 public long getConnectAttemptTimeout() { 219 return connectAttemptTimeout; 220 } 221 222 /** 223 * Sets the timeout value used to fail a connection if no CONNECT frame is read 224 * in the given interval. 225 * 226 * @param connectTimeout 227 * the connection frame received timeout value. 228 */ 229 public void setConnectAttemptTimeout(long connectTimeout) { 230 this.connectAttemptTimeout = connectTimeout; 231 } 232 233 public boolean getPublishDollarTopics() { 234 return protocolConverter != null && protocolConverter.getPublishDollarTopics(); 235 } 236 237 public void setPublishDollarTopics(boolean publishDollarTopics) { 238 protocolConverter.setPublishDollarTopics(publishDollarTopics); 239 } 240 241 public String getSubscriptionStrategy() { 242 return protocolConverter != null ? protocolConverter.getSubscriptionStrategy() : "default"; 243 } 244 245 public void setSubscriptionStrategy(String name) { 246 protocolConverter.setSubscriptionStrategy(name); 247 } 248 249 public int getActiveMQSubscriptionPrefetch() { 250 return protocolConverter.getActiveMQSubscriptionPrefetch(); 251 } 252 253 /** 254 * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one 255 * The default = 1 256 * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription 257 */ 258 public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) { 259 protocolConverter.setActiveMQSubscriptionPrefetch(activeMQSubscriptionPrefetch); 260 } 261 262 /** 263 * @return the maximum number of bytes a single MQTT message frame is allowed to be. 264 */ 265 public int getMaxFrameSize() { 266 return wireFormat.getMaxFrameSize(); 267 } 268 269 /** 270 * Sets the maximum frame size for an incoming MQTT frame. The protocl limit is 271 * 256 megabytes and this value cannot be set higher. 272 * 273 * @param maxFrameSize 274 * the maximum allowed frame size for a single MQTT frame. 275 */ 276 public void setMaxFrameSize(int maxFrameSize) { 277 wireFormat.setMaxFrameSize(maxFrameSize); 278 } 279}