001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.ws.jetty9;
018
019import org.apache.activemq.broker.BrokerService;
020import org.apache.activemq.broker.BrokerServiceAware;
021import org.apache.activemq.command.Command;
022import org.apache.activemq.transport.TransportSupport;
023import org.apache.activemq.transport.mqtt.MQTTInactivityMonitor;
024import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
025import org.apache.activemq.transport.mqtt.MQTTTransport;
026import org.apache.activemq.transport.mqtt.MQTTWireFormat;
027import org.apache.activemq.util.ByteSequence;
028import org.apache.activemq.util.IOExceptionSupport;
029import org.apache.activemq.util.ServiceStopper;
030import org.eclipse.jetty.websocket.api.Session;
031import org.eclipse.jetty.websocket.api.WebSocketListener;
032import org.fusesource.mqtt.codec.DISCONNECT;
033import org.fusesource.mqtt.codec.MQTTFrame;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037import java.io.IOException;
038import java.nio.ByteBuffer;
039import java.security.cert.X509Certificate;
040import java.util.concurrent.CountDownLatch;
041
042public class MQTTSocket  extends TransportSupport implements WebSocketListener, MQTTTransport, BrokerServiceAware {
043
044    private static final Logger LOG = LoggerFactory.getLogger(MQTTSocket.class);
045    Session session;
046    MQTTProtocolConverter protocolConverter = null;
047    MQTTWireFormat wireFormat = new MQTTWireFormat();
048    private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
049    private BrokerService brokerService;
050
051    private MQTTProtocolConverter getProtocolConverter() {
052        if( protocolConverter == null ) {
053            protocolConverter = new MQTTProtocolConverter(this, brokerService);
054        }
055        return protocolConverter;
056    }
057
058    protected void doStart() throws Exception {
059        socketTransportStarted.countDown();
060    }
061
062    @Override
063    protected void doStop(ServiceStopper stopper) throws Exception {
064    }
065
066    private boolean transportStartedAtLeastOnce() {
067        return socketTransportStarted.getCount() == 0;
068    }
069
070    @Override
071    public int getReceiveCounter() {
072        return 0;
073    }
074
075    @Override
076    public String getRemoteAddress() {
077        return "MQTTSocket_" + this.hashCode();
078    }
079
080    @Override
081    public void oneway(Object command) throws IOException {
082        try {
083            getProtocolConverter().onActiveMQCommand((Command) command);
084        } catch (Exception e) {
085            onException(IOExceptionSupport.create(e));
086        }
087    }
088
089    @Override
090    public void sendToActiveMQ(Command command) {
091        doConsume(command);
092    }
093
094    @Override
095    public void sendToMQTT(MQTTFrame command) throws IOException {
096        ByteSequence bytes = wireFormat.marshal(command);
097        session.getRemote().sendBytes(ByteBuffer.wrap(bytes.getData(), 0, bytes.getLength()));
098    }
099
100    @Override
101    public X509Certificate[] getPeerCertificates() {
102        return new X509Certificate[0];
103    }
104
105    @Override
106    public MQTTInactivityMonitor getInactivityMonitor() {
107        return null;
108    }
109
110    @Override
111    public MQTTWireFormat getWireFormat() {
112        return wireFormat;
113    }
114
115    @Override
116    public void setBrokerService(BrokerService brokerService) {
117        this.brokerService = brokerService;
118    }
119
120    @Override
121    public void onWebSocketBinary(byte[] bytes, int offset, int length) {
122        if (!transportStartedAtLeastOnce()) {
123            LOG.debug("Waiting for StompSocket to be properly started...");
124            try {
125                socketTransportStarted.await();
126            } catch (InterruptedException e) {
127                LOG.warn("While waiting for StompSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
128            }
129        }
130
131        try {
132            MQTTFrame frame = (MQTTFrame)wireFormat.unmarshal(new ByteSequence(bytes, offset, length));
133            getProtocolConverter().onMQTTCommand(frame);
134        } catch (Exception e) {
135            onException(IOExceptionSupport.create(e));
136        }
137    }
138
139    @Override
140    public void onWebSocketClose(int arg0, String arg1) {
141        try {
142            getProtocolConverter().onMQTTCommand(new DISCONNECT().encode());
143        } catch (Exception e) {
144            LOG.warn("Failed to close WebSocket", e);
145        }        
146    }
147
148    @Override
149    public void onWebSocketConnect(Session session) {
150        this.session = session;
151    }
152
153    @Override
154    public void onWebSocketError(Throwable arg0) {
155        
156    }
157
158    @Override
159    public void onWebSocketText(String arg0) {        
160    }
161}