001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.vm;
018
019import java.io.IOException;
020import java.io.InterruptedIOException;
021import java.net.URI;
022import java.util.concurrent.BlockingQueue;
023import java.util.concurrent.LinkedBlockingQueue;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.concurrent.atomic.AtomicLong;
027
028import org.apache.activemq.command.ShutdownInfo;
029import org.apache.activemq.thread.Task;
030import org.apache.activemq.thread.TaskRunner;
031import org.apache.activemq.thread.TaskRunnerFactory;
032import org.apache.activemq.transport.FutureResponse;
033import org.apache.activemq.transport.ResponseCallback;
034import org.apache.activemq.transport.Transport;
035import org.apache.activemq.transport.TransportDisposedIOException;
036import org.apache.activemq.transport.TransportListener;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * A Transport implementation that uses direct method invocations.
042 */
043public class VMTransport implements Transport, Task {
044    protected static final Logger LOG = LoggerFactory.getLogger(VMTransport.class);
045
046    private static final AtomicLong NEXT_ID = new AtomicLong(0);
047
048    // Transport Configuration
049    protected VMTransport peer;
050    protected TransportListener transportListener;
051    protected boolean marshal;
052    protected boolean network;
053    protected boolean async = true;
054    protected int asyncQueueDepth = 2000;
055    protected final URI location;
056    protected final long id;
057
058    // Implementation
059    private volatile LinkedBlockingQueue<Object> messageQueue;
060    private volatile TaskRunnerFactory taskRunnerFactory;
061    private volatile TaskRunner taskRunner;
062
063    // Transport State
064    protected final AtomicBoolean started = new AtomicBoolean();
065    protected final AtomicBoolean disposed = new AtomicBoolean();
066
067    private volatile int receiveCounter;
068
069    public VMTransport(URI location) {
070        this.location = location;
071        this.id = NEXT_ID.getAndIncrement();
072    }
073
074    public void setPeer(VMTransport peer) {
075        this.peer = peer;
076    }
077
078    @Override
079    public void oneway(Object command) throws IOException {
080
081        if (disposed.get()) {
082            throw new TransportDisposedIOException("Transport disposed.");
083        }
084
085        if (peer == null) {
086            throw new IOException("Peer not connected.");
087        }
088
089        try {
090
091            if (peer.disposed.get()) {
092                throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
093            }
094
095            if (peer.async) {
096                peer.getMessageQueue().put(command);
097                peer.wakeup();
098                return;
099            }
100
101            if (!peer.started.get()) {
102                LinkedBlockingQueue<Object> pending = peer.getMessageQueue();
103                int sleepTimeMillis;
104                boolean accepted = false;
105                do {
106                    sleepTimeMillis = 0;
107                    // the pending queue is drained on start so we need to ensure we add before
108                    // the drain commences, otherwise we never get the command dispatched!
109                    synchronized (peer.started) {
110                        if (!peer.started.get()) {
111                            accepted = pending.offer(command);
112                            if (!accepted) {
113                                sleepTimeMillis = 500;
114                            }
115                        }
116                    }
117                    // give start thread a chance if we will loop
118                    TimeUnit.MILLISECONDS.sleep(sleepTimeMillis);
119
120                } while (!accepted && !peer.started.get());
121                if (accepted) {
122                    return;
123                }
124            }
125        } catch (InterruptedException e) {
126            InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
127            iioe.initCause(e);
128            throw iioe;
129        }
130
131        dispatch(peer, peer.messageQueue, command);
132    }
133
134    public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) {
135        TransportListener transportListener = transport.getTransportListener();
136        if (transportListener != null) {
137            // Lock here on the target transport's started since we want to wait for its start()
138            // method to finish dispatching out of the queue before we do our own.
139            synchronized (transport.started) {
140
141                // Ensure that no additional commands entered the queue in the small time window
142                // before the start method locks the dispatch lock and the oneway method was in
143                // an put operation.
144                while(pending != null && !pending.isEmpty() && !transport.isDisposed()) {
145                    doDispatch(transport, transportListener, pending.poll());
146                }
147
148                // We are now in sync mode and won't enqueue any more commands to the target
149                // transport so lets clean up its resources.
150                transport.messageQueue = null;
151
152                // Don't dispatch if either end was disposed already.
153                if (command != null && !this.disposed.get() && !transport.isDisposed()) {
154                    doDispatch(transport, transportListener, command);
155                }
156            }
157        }
158    }
159
160    public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) {
161        transport.receiveCounter++;
162        transportListener.onCommand(command);
163    }
164
165    @Override
166    public void start() throws Exception {
167
168        if (transportListener == null) {
169            throw new IOException("TransportListener not set.");
170        }
171
172        // If we are not in async mode we lock the dispatch lock here and then start to
173        // prevent any sync dispatches from occurring until we dispatch the pending messages
174        // to maintain delivery order.  When async this happens automatically so just set
175        // started and wakeup the task runner.
176        if (!async) {
177            synchronized (started) {
178                if (started.compareAndSet(false, true)) {
179                    LinkedBlockingQueue<Object> mq = getMessageQueue();
180                    Object command;
181                    while ((command = mq.poll()) != null && !disposed.get() ) {
182                        receiveCounter++;
183                        doDispatch(this, transportListener, command);
184                    }
185                }
186            }
187        } else {
188            if (started.compareAndSet(false, true)) {
189                wakeup();
190            }
191        }
192    }
193
194    @Override
195    public void stop() throws Exception {
196        // Only need to do this once, all future oneway calls will now
197        // fail as will any asnyc jobs in the task runner.
198        if (disposed.compareAndSet(false, true)) {
199
200            TaskRunner tr = taskRunner;
201            LinkedBlockingQueue<Object> mq = this.messageQueue;
202
203            taskRunner = null;
204            messageQueue = null;
205
206            if (mq != null) {
207                mq.clear();
208            }
209
210            // don't wait for completion
211            if (tr != null) {
212                try {
213                    tr.shutdown(1);
214                } catch(Exception e) {
215                }
216                tr = null;
217            }
218
219            if (peer.transportListener != null) {
220                // let the peer know that we are disconnecting after attempting
221                // to cleanly shutdown the async tasks so that this is the last
222                // command it see's.
223                try {
224                    peer.transportListener.onCommand(new ShutdownInfo());
225                } catch (Exception ignore) {
226                }
227
228                // let any requests pending a response see an exception
229                try {
230                    peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped."));
231                } catch (Exception ignore) {
232                }
233            }
234
235            // shutdown task runner factory
236            if (taskRunnerFactory != null) {
237                taskRunnerFactory.shutdownNow();
238                taskRunnerFactory = null;
239            }
240        }
241    }
242
243    protected void wakeup() {
244        if (async && started.get()) {
245            try {
246                getTaskRunner().wakeup();
247            } catch (InterruptedException e) {
248                Thread.currentThread().interrupt();
249            } catch (TransportDisposedIOException e) {
250            }
251        }
252    }
253
254    /**
255     * @see org.apache.activemq.thread.Task#iterate()
256     */
257    @Override
258    public boolean iterate() {
259
260        final TransportListener tl = transportListener;
261
262        LinkedBlockingQueue<Object> mq;
263        try {
264            mq = getMessageQueue();
265        } catch (TransportDisposedIOException e) {
266            return false;
267        }
268
269        Object command = mq.poll();
270        if (command != null && !disposed.get()) {
271            tl.onCommand(command);
272            return !mq.isEmpty() && !disposed.get();
273        } else {
274            if(disposed.get()) {
275                mq.clear();
276            }
277            return false;
278        }
279    }
280
281    @Override
282    public void setTransportListener(TransportListener commandListener) {
283        this.transportListener = commandListener;
284    }
285
286    public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
287        LinkedBlockingQueue<Object> result = messageQueue;
288        if (result == null) {
289            synchronized (this) {
290                result = messageQueue;
291                if (result == null) {
292                    if (disposed.get()) {
293                        throw new TransportDisposedIOException("The Transport has been disposed");
294                    }
295
296                    messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth);
297                }
298            }
299        }
300        return result;
301    }
302
303    protected TaskRunner getTaskRunner() throws TransportDisposedIOException {
304        TaskRunner result = taskRunner;
305        if (result == null) {
306            synchronized (this) {
307                result = taskRunner;
308                if (result == null) {
309                    if (disposed.get()) {
310                        throw new TransportDisposedIOException("The Transport has been disposed");
311                    }
312
313                    String name = "ActiveMQ VMTransport: " + toString();
314                    if (taskRunnerFactory == null) {
315                        taskRunnerFactory = new TaskRunnerFactory(name);
316                        taskRunnerFactory.init();
317                    }
318                    taskRunner = result = taskRunnerFactory.createTaskRunner(this, name);
319                }
320            }
321        }
322        return result;
323    }
324
325    @Override
326    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
327        throw new AssertionError("Unsupported Method");
328    }
329
330    @Override
331    public Object request(Object command) throws IOException {
332        throw new AssertionError("Unsupported Method");
333    }
334
335    @Override
336    public Object request(Object command, int timeout) throws IOException {
337        throw new AssertionError("Unsupported Method");
338    }
339
340    @Override
341    public TransportListener getTransportListener() {
342        return transportListener;
343    }
344
345    @Override
346    public <T> T narrow(Class<T> target) {
347        if (target.isAssignableFrom(getClass())) {
348            return target.cast(this);
349        }
350        return null;
351    }
352
353    public boolean isMarshal() {
354        return marshal;
355    }
356
357    public void setMarshal(boolean marshal) {
358        this.marshal = marshal;
359    }
360
361    public boolean isNetwork() {
362        return network;
363    }
364
365    public void setNetwork(boolean network) {
366        this.network = network;
367    }
368
369    @Override
370    public String toString() {
371        return location + "#" + id;
372    }
373
374    @Override
375    public String getRemoteAddress() {
376        if (peer != null) {
377            return peer.toString();
378        }
379        return null;
380    }
381
382    /**
383     * @return the async
384     */
385    public boolean isAsync() {
386        return async;
387    }
388
389    /**
390     * @param async the async to set
391     */
392    public void setAsync(boolean async) {
393        this.async = async;
394    }
395
396    /**
397     * @return the asyncQueueDepth
398     */
399    public int getAsyncQueueDepth() {
400        return asyncQueueDepth;
401    }
402
403    /**
404     * @param asyncQueueDepth the asyncQueueDepth to set
405     */
406    public void setAsyncQueueDepth(int asyncQueueDepth) {
407        this.asyncQueueDepth = asyncQueueDepth;
408    }
409
410    @Override
411    public boolean isFaultTolerant() {
412        return false;
413    }
414
415    @Override
416    public boolean isDisposed() {
417        return disposed.get();
418    }
419
420    @Override
421    public boolean isConnected() {
422        return !disposed.get();
423    }
424
425    @Override
426    public void reconnect(URI uri) throws IOException {
427        throw new IOException("Transport reconnect is not supported");
428    }
429
430    @Override
431    public boolean isReconnectSupported() {
432        return false;
433    }
434
435    @Override
436    public boolean isUpdateURIsSupported() {
437        return false;
438    }
439
440    @Override
441    public void updateURIs(boolean reblance,URI[] uris) throws IOException {
442        throw new IOException("URI update feature not supported");
443    }
444
445    @Override
446    public int getReceiveCounter() {
447        return receiveCounter;
448    }
449}