001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.ws.jetty8;
018
019import org.apache.activemq.broker.BrokerService;
020import org.apache.activemq.broker.BrokerServiceAware;
021import org.apache.activemq.command.Command;
022import org.apache.activemq.transport.TransportSupport;
023import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
024import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
025import org.apache.activemq.transport.mqtt.MQTTTransport;
026import org.apache.activemq.transport.mqtt.MQTTWireFormat;
027import org.apache.activemq.util.ByteSequence;
028import org.apache.activemq.util.IOExceptionSupport;
029import org.apache.activemq.util.ServiceStopper;
030import org.eclipse.jetty.websocket.WebSocket;
031import org.fusesource.mqtt.codec.DISCONNECT;
032import org.fusesource.mqtt.codec.MQTTFrame;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import java.io.IOException;
037import java.security.cert.X509Certificate;
038import java.util.concurrent.CountDownLatch;
039
040public class MQTTSocket  extends TransportSupport implements WebSocket.OnBinaryMessage, MQTTTransport, BrokerServiceAware {
041
042    private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
043    Connection outbound;
044    MQTTProtocolConverter protocolConverter = null;
045    MQTTWireFormat wireFormat = new MQTTWireFormat();
046    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
047    private BrokerService brokerService;
048
049    @Override
050    public void onMessage(byte[] bytes, int offset, int length) {
051        if (!transportStartedAtLeastOnce()) {
052            LOG.debug("Waiting for StompSocket to be properly started...");
053            try {
054                socketTransportStarted.await();
055            } catch (InterruptedException e) {
056                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
057            }
058        }
059
060        try {
061            MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
062            getProtocolConverter().onMQTTCommand(frame);
063        } catch (Exception e) {
064            onException(IOExceptionSupport.create(e));
065        }
066    }
067
068    private MQTTProtocolConverter getProtocolConverter() {
069        if( protocolConverter == null ) {
070            protocolConverter = new MQTTProtocolConverter(this, brokerService);
071        }
072        return protocolConverter;
073    }
074
075    @Override
076    public void onOpen(Connection connection) {
077        this.outbound = connection;
078    }
079
080    @Override
081    public void onClose(int closeCode, String message) {
082        try {
083            getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
084        } catch (Exception e) {
085            LOG.warn("Failed to close WebSocket", e);
086        }
087    }
088
089    protected void doStart() throws Exception {
090        socketTransportStarted.countDown();
091    }
092
093    @Override
094    protected void doStop(ServiceStopper stopper) throws Exception {
095    }
096
097    private boolean transportStartedAtLeastOnce() {
098        return socketTransportStarted.getCount() == 0;
099    }
100
101    @Override
102    public int getReceiveCounter() {
103        return 0;
104    }
105
106    @Override
107    public String getRemoteAddress() {
108        return "MQTTSocket_" + this.hashCode();
109    }
110
111    @Override
112    public void oneway(Object command) throws IOException {
113        try {
114            getProtocolConverter().onActiveMQCommand((Command) command);
115        } catch (Exception e) {
116            onException(IOExceptionSupport.create(e));
117        }
118    }
119
120    @Override
121    public void sendToActiveMQ(Command command) {
122        doConsume(command);
123    }
124
125    @Override
126    public void sendToMQTT(MQTTFrame command) throws IOException {
127        ByteSequence bytes = wireFormat.marshal(command);
128        outbound.sendMessage(bytes.getData(), 0, bytes.getLength());
129    }
130
131    @Override
132    public X509Certificate[] getPeerCertificates() {
133        return new X509Certificate[0];
134    }
135
136    @Override
137    public MQTTInactivityMonitor getInactivityMonitor() {
138        return null;
139    }
140
141    @Override
142    public MQTTWireFormat getWireFormat() {
143        return wireFormat;
144    }
145
146    @Override
147    public void setBrokerService(BrokerService brokerService) {
148        this.brokerService = brokerService;
149    }
150}