001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.ws.jetty8; 018 019import org.apache.activemq.broker.BrokerService; 020import org.apache.activemq.broker.BrokerServiceAware; 021import org.apache.activemq.command.Command; 022import org.apache.activemq.transport.TransportSupport; 023import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor; 024import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; 025import org.apache.activemq.transport.mqtt.MQTTTransport; 026import org.apache.activemq.transport.mqtt.MQTTWireFormat; 027import org.apache.activemq.util.ByteSequence; 028import org.apache.activemq.util.IOExceptionSupport; 029import org.apache.activemq.util.ServiceStopper; 030import org.eclipse.jetty.websocket.WebSocket; 031import org.fusesource.mqtt.codec.DISCONNECT; 032import org.fusesource.mqtt.codec.MQTTFrame; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import java.io.IOException; 037import java.security.cert.X509Certificate; 038import java.util.concurrent.CountDownLatch; 039 040public class MQTTSocket extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport, BrokerServiceAware { 041 042 private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class); 043 Connection outbound; 044 MQTTProtocolConverter protocolConverter = null; 045 MQTTWireFormat wireFormat = new MQTTWireFormat(); 046 private final CountDownLatch socketTransportStarted = new CountDownLatch(1); 047 private BrokerService brokerService; 048 049 @Override 050 public void onMessage(byte[] bytes, int offset, int length) { 051 if (!transportStartedAtLeastOnce()) { 052 LOG.debug("Waiting for StompSocket to be properly started..."); 053 try { 054 socketTransportStarted.await(); 055 } catch (InterruptedException e) { 056 LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions..."); 057 } 058 } 059 060 try { 061 MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length)); 062 getProtocolConverter().onMQTTCommand(frame); 063 } catch (Exception e) { 064 onException(IOExceptionSupport.create(e)); 065 } 066 } 067 068 private MQTTProtocolConverter getProtocolConverter() { 069 if( protocolConverter == null ) { 070 protocolConverter = new MQTTProtocolConverter(this, brokerService); 071 } 072 return protocolConverter; 073 } 074 075 @Override 076 public void onOpen(Connection connection) { 077 this.outbound = connection; 078 } 079 080 @Override 081 public void onClose(int closeCode, String message) { 082 try { 083 getProtocolConverter().onMQTTCommand(new DISCONNECT().encode()); 084 } catch (Exception e) { 085 LOG.warn("Failed to close WebSocket", e); 086 } 087 } 088 089 protected void doStart() throws Exception { 090 socketTransportStarted.countDown(); 091 } 092 093 @Override 094 protected void doStop(ServiceStopper stopper) throws Exception { 095 } 096 097 private boolean transportStartedAtLeastOnce() { 098 return socketTransportStarted.getCount() == 0; 099 } 100 101 @Override 102 public int getReceiveCounter() { 103 return 0; 104 } 105 106 @Override 107 public String getRemoteAddress() { 108 return "MQTTSocket_" + this.hashCode(); 109 } 110 111 @Override 112 public void oneway(Object command) throws IOException { 113 try { 114 getProtocolConverter().onActiveMQCommand((Command) command); 115 } catch (Exception e) { 116 onException(IOExceptionSupport.create(e)); 117 } 118 } 119 120 @Override 121 public void sendToActiveMQ(Command command) { 122 doConsume(command); 123 } 124 125 @Override 126 public void sendToMQTT(MQTTFrame command) throws IOException { 127 ByteSequence bytes = wireFormat.marshal(command); 128 outbound.sendMessage(bytes.getData(), 0, bytes.getLength()); 129 } 130 131 @Override 132 public X509Certificate[] getPeerCertificates() { 133 return new X509Certificate[0]; 134 } 135 136 @Override 137 public MQTTInactivityMonitor getInactivityMonitor() { 138 return null; 139 } 140 141 @Override 142 public MQTTWireFormat getWireFormat() { 143 return wireFormat; 144 } 145 146 @Override 147 public void setBrokerService(BrokerService brokerService) { 148 this.brokerService = brokerService; 149 } 150}