001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.ws.jetty9; 018 019import org.apache.activemq.broker.BrokerService; 020import org.apache.activemq.broker.BrokerServiceAware; 021import org.apache.activemq.command.Command; 022import org.apache.activemq.transport.TransportSupport; 023import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor; 024import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; 025import org.apache.activemq.transport.mqtt.MQTTTransport; 026import org.apache.activemq.transport.mqtt.MQTTWireFormat; 027import org.apache.activemq.util.ByteSequence; 028import org.apache.activemq.util.IOExceptionSupport; 029import org.apache.activemq.util.ServiceStopper; 030import org.eclipse.jetty.websocket.api.Session; 031import org.eclipse.jetty.websocket.api.WebSocketListener; 032import org.fusesource.mqtt.codec.DISCONNECT; 033import org.fusesource.mqtt.codec.MQTTFrame; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037import java.io.IOException; 038import java.nio.ByteBuffer; 039import java.security.cert.X509Certificate; 040import java.util.concurrent.CountDownLatch; 041 042public class MQTTSocket extends TransportSupport implements WebSocketListener, MQTTTransport, BrokerServiceAware { 043 044 private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class); 045 Session session; 046 MQTTProtocolConverter protocolConverter = null; 047 MQTTWireFormat wireFormat = new MQTTWireFormat(); 048 private final CountDownLatch socketTransportStarted = new CountDownLatch(1); 049 private BrokerService brokerService; 050 051 private MQTTProtocolConverter getProtocolConverter() { 052 if( protocolConverter == null ) { 053 protocolConverter = new MQTTProtocolConverter(this, brokerService); 054 } 055 return protocolConverter; 056 } 057 058 protected void doStart() throws Exception { 059 socketTransportStarted.countDown(); 060 } 061 062 @Override 063 protected void doStop(ServiceStopper stopper) throws Exception { 064 } 065 066 private boolean transportStartedAtLeastOnce() { 067 return socketTransportStarted.getCount() == 0; 068 } 069 070 @Override 071 public int getReceiveCounter() { 072 return 0; 073 } 074 075 @Override 076 public String getRemoteAddress() { 077 return "MQTTSocket_" + this.hashCode(); 078 } 079 080 @Override 081 public void oneway(Object command) throws IOException { 082 try { 083 getProtocolConverter().onActiveMQCommand((Command) command); 084 } catch (Exception e) { 085 onException(IOExceptionSupport.create(e)); 086 } 087 } 088 089 @Override 090 public void sendToActiveMQ(Command command) { 091 doConsume(command); 092 } 093 094 @Override 095 public void sendToMQTT(MQTTFrame command) throws IOException { 096 ByteSequence bytes = wireFormat.marshal(command); 097 session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength())); 098 } 099 100 @Override 101 public X509Certificate[] getPeerCertificates() { 102 return new X509Certificate[0]; 103 } 104 105 @Override 106 public MQTTInactivityMonitor getInactivityMonitor() { 107 return null; 108 } 109 110 @Override 111 public MQTTWireFormat getWireFormat() { 112 return wireFormat; 113 } 114 115 @Override 116 public void setBrokerService(BrokerService brokerService) { 117 this.brokerService = brokerService; 118 } 119 120 @Override 121 public void onWebSocketBinary(byte[] bytes, int offset, int length) { 122 if (!transportStartedAtLeastOnce()) { 123 LOG.debug("Waiting for StompSocket to be properly started..."); 124 try { 125 socketTransportStarted.await(); 126 } catch (InterruptedException e) { 127 LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); 128 } 129 } 130 131 try { 132 MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length)); 133 getProtocolConverter().onMQTTCommand(frame); 134 } catch (Exception e) { 135 onException(IOExceptionSupport.create(e)); 136 } 137 } 138 139 @Override 140 public void onWebSocketClose(int arg0, String arg1) { 141 try { 142 getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); 143 } catch (Exception e) { 144 LOG.warn("Failed to close WebSocket", e); 145 } 146 } 147 148 @Override 149 public void onWebSocketConnect(Session session) { 150 this.session = session; 151 } 152 153 @Override 154 public void onWebSocketError(Throwable arg0) { 155 156 } 157 158 @Override 159 public void onWebSocketText(String arg0) { 160 } 161}