001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.vm; 018 019import java.io.IOException; 020import java.io.InterruptedIOException; 021import java.net.URI; 022import java.util.concurrent.BlockingQueue; 023import java.util.concurrent.LinkedBlockingQueue; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.atomic.AtomicBoolean; 026import java.util.concurrent.atomic.AtomicLong; 027 028import org.apache.activemq.command.ShutdownInfo; 029import org.apache.activemq.thread.Task; 030import org.apache.activemq.thread.TaskRunner; 031import org.apache.activemq.thread.TaskRunnerFactory; 032import org.apache.activemq.transport.FutureResponse; 033import org.apache.activemq.transport.ResponseCallback; 034import org.apache.activemq.transport.Transport; 035import org.apache.activemq.transport.TransportDisposedIOException; 036import org.apache.activemq.transport.TransportListener; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * A Transport implementation that uses direct method invocations. 042 */ 043public class VMTransport implements Transport, Task { 044 protected static final Logger LOG = LoggerFactory.getLogger(VMTransport.class); 045 046 private static final AtomicLong NEXT_ID = new AtomicLong(0); 047 048 // Transport Configuration 049 protected VMTransport peer; 050 protected TransportListener transportListener; 051 protected boolean marshal; 052 protected boolean network; 053 protected boolean async = true; 054 protected int asyncQueueDepth = 2000; 055 protected final URI location; 056 protected final long id; 057 058 // Implementation 059 private volatile LinkedBlockingQueue<Object> messageQueue; 060 private volatile TaskRunnerFactory taskRunnerFactory; 061 private volatile TaskRunner taskRunner; 062 063 // Transport State 064 protected final AtomicBoolean started = new AtomicBoolean(); 065 protected final AtomicBoolean disposed = new AtomicBoolean(); 066 067 private volatile int receiveCounter; 068 069 public VMTransport(URI location) { 070 this.location = location; 071 this.id = NEXT_ID.getAndIncrement(); 072 } 073 074 public void setPeer(VMTransport peer) { 075 this.peer = peer; 076 } 077 078 @Override 079 public void oneway(Object command) throws IOException { 080 081 if (disposed.get()) { 082 throw new TransportDisposedIOException("Transport disposed."); 083 } 084 085 if (peer == null) { 086 throw new IOException("Peer not connected."); 087 } 088 089 try { 090 091 if (peer.disposed.get()) { 092 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed."); 093 } 094 095 if (peer.async) { 096 peer.getMessageQueue().put(command); 097 peer.wakeup(); 098 return; 099 } 100 101 if (!peer.started.get()) { 102 LinkedBlockingQueue<Object> pending = peer.getMessageQueue(); 103 int sleepTimeMillis; 104 boolean accepted = false; 105 do { 106 sleepTimeMillis = 0; 107 // the pending queue is drained on start so we need to ensure we add before 108 // the drain commences, otherwise we never get the command dispatched! 109 synchronized (peer.started) { 110 if (!peer.started.get()) { 111 accepted = pending.offer(command); 112 if (!accepted) { 113 sleepTimeMillis = 500; 114 } 115 } 116 } 117 // give start thread a chance if we will loop 118 TimeUnit.MILLISECONDS.sleep(sleepTimeMillis); 119 120 } while (!accepted && !peer.started.get()); 121 if (accepted) { 122 return; 123 } 124 } 125 } catch (InterruptedException e) { 126 InterruptedIOException iioe = new InterruptedIOException(e.getMessage()); 127 iioe.initCause(e); 128 throw iioe; 129 } 130 131 dispatch(peer, peer.messageQueue, command); 132 } 133 134 public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object command) { 135 TransportListener transportListener = transport.getTransportListener(); 136 if (transportListener != null) { 137 // Lock here on the target transport's started since we want to wait for its start() 138 // method to finish dispatching out of the queue before we do our own. 139 synchronized (transport.started) { 140 141 // Ensure that no additional commands entered the queue in the small time window 142 // before the start method locks the dispatch lock and the oneway method was in 143 // an put operation. 144 while(pending != null && !pending.isEmpty() && !transport.isDisposed()) { 145 doDispatch(transport, transportListener, pending.poll()); 146 } 147 148 // We are now in sync mode and won't enqueue any more commands to the target 149 // transport so lets clean up its resources. 150 transport.messageQueue = null; 151 152 // Don't dispatch if either end was disposed already. 153 if (command != null && !this.disposed.get() && !transport.isDisposed()) { 154 doDispatch(transport, transportListener, command); 155 } 156 } 157 } 158 } 159 160 public void doDispatch(VMTransport transport, TransportListener transportListener, Object command) { 161 transport.receiveCounter++; 162 transportListener.onCommand(command); 163 } 164 165 @Override 166 public void start() throws Exception { 167 168 if (transportListener == null) { 169 throw new IOException("TransportListener not set."); 170 } 171 172 // If we are not in async mode we lock the dispatch lock here and then start to 173 // prevent any sync dispatches from occurring until we dispatch the pending messages 174 // to maintain delivery order. When async this happens automatically so just set 175 // started and wakeup the task runner. 176 if (!async) { 177 synchronized (started) { 178 if (started.compareAndSet(false, true)) { 179 LinkedBlockingQueue<Object> mq = getMessageQueue(); 180 Object command; 181 while ((command = mq.poll()) != null && !disposed.get() ) { 182 receiveCounter++; 183 doDispatch(this, transportListener, command); 184 } 185 } 186 } 187 } else { 188 if (started.compareAndSet(false, true)) { 189 wakeup(); 190 } 191 } 192 } 193 194 @Override 195 public void stop() throws Exception { 196 // Only need to do this once, all future oneway calls will now 197 // fail as will any asnyc jobs in the task runner. 198 if (disposed.compareAndSet(false, true)) { 199 200 TaskRunner tr = taskRunner; 201 LinkedBlockingQueue<Object> mq = this.messageQueue; 202 203 taskRunner = null; 204 messageQueue = null; 205 206 if (mq != null) { 207 mq.clear(); 208 } 209 210 // don't wait for completion 211 if (tr != null) { 212 try { 213 tr.shutdown(1); 214 } catch(Exception e) { 215 } 216 tr = null; 217 } 218 219 if (peer.transportListener != null) { 220 // let the peer know that we are disconnecting after attempting 221 // to cleanly shutdown the async tasks so that this is the last 222 // command it see's. 223 try { 224 peer.transportListener.onCommand(new ShutdownInfo()); 225 } catch (Exception ignore) { 226 } 227 228 // let any requests pending a response see an exception 229 try { 230 peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped.")); 231 } catch (Exception ignore) { 232 } 233 } 234 235 // shutdown task runner factory 236 if (taskRunnerFactory != null) { 237 taskRunnerFactory.shutdownNow(); 238 taskRunnerFactory = null; 239 } 240 } 241 } 242 243 protected void wakeup() { 244 if (async && started.get()) { 245 try { 246 getTaskRunner().wakeup(); 247 } catch (InterruptedException e) { 248 Thread.currentThread().interrupt(); 249 } catch (TransportDisposedIOException e) { 250 } 251 } 252 } 253 254 /** 255 * @see org.apache.activemq.thread.Task#iterate() 256 */ 257 @Override 258 public boolean iterate() { 259 260 final TransportListener tl = transportListener; 261 262 LinkedBlockingQueue<Object> mq; 263 try { 264 mq = getMessageQueue(); 265 } catch (TransportDisposedIOException e) { 266 return false; 267 } 268 269 Object command = mq.poll(); 270 if (command != null && !disposed.get()) { 271 tl.onCommand(command); 272 return !mq.isEmpty() && !disposed.get(); 273 } else { 274 if(disposed.get()) { 275 mq.clear(); 276 } 277 return false; 278 } 279 } 280 281 @Override 282 public void setTransportListener(TransportListener commandListener) { 283 this.transportListener = commandListener; 284 } 285 286 public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException { 287 LinkedBlockingQueue<Object> result = messageQueue; 288 if (result == null) { 289 synchronized (this) { 290 result = messageQueue; 291 if (result == null) { 292 if (disposed.get()) { 293 throw new TransportDisposedIOException("The Transport has been disposed"); 294 } 295 296 messageQueue = result = new LinkedBlockingQueue<Object>(this.asyncQueueDepth); 297 } 298 } 299 } 300 return result; 301 } 302 303 protected TaskRunner getTaskRunner() throws TransportDisposedIOException { 304 TaskRunner result = taskRunner; 305 if (result == null) { 306 synchronized (this) { 307 result = taskRunner; 308 if (result == null) { 309 if (disposed.get()) { 310 throw new TransportDisposedIOException("The Transport has been disposed"); 311 } 312 313 String name = "ActiveMQ VMTransport: " + toString(); 314 if (taskRunnerFactory == null) { 315 taskRunnerFactory = new TaskRunnerFactory(name); 316 taskRunnerFactory.init(); 317 } 318 taskRunner = result = taskRunnerFactory.createTaskRunner(this, name); 319 } 320 } 321 } 322 return result; 323 } 324 325 @Override 326 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 327 throw new AssertionError("Unsupported Method"); 328 } 329 330 @Override 331 public Object request(Object command) throws IOException { 332 throw new AssertionError("Unsupported Method"); 333 } 334 335 @Override 336 public Object request(Object command, int timeout) throws IOException { 337 throw new AssertionError("Unsupported Method"); 338 } 339 340 @Override 341 public TransportListener getTransportListener() { 342 return transportListener; 343 } 344 345 @Override 346 public <T> T narrow(Class<T> target) { 347 if (target.isAssignableFrom(getClass())) { 348 return target.cast(this); 349 } 350 return null; 351 } 352 353 public boolean isMarshal() { 354 return marshal; 355 } 356 357 public void setMarshal(boolean marshal) { 358 this.marshal = marshal; 359 } 360 361 public boolean isNetwork() { 362 return network; 363 } 364 365 public void setNetwork(boolean network) { 366 this.network = network; 367 } 368 369 @Override 370 public String toString() { 371 return location + "#" + id; 372 } 373 374 @Override 375 public String getRemoteAddress() { 376 if (peer != null) { 377 return peer.toString(); 378 } 379 return null; 380 } 381 382 /** 383 * @return the async 384 */ 385 public boolean isAsync() { 386 return async; 387 } 388 389 /** 390 * @param async the async to set 391 */ 392 public void setAsync(boolean async) { 393 this.async = async; 394 } 395 396 /** 397 * @return the asyncQueueDepth 398 */ 399 public int getAsyncQueueDepth() { 400 return asyncQueueDepth; 401 } 402 403 /** 404 * @param asyncQueueDepth the asyncQueueDepth to set 405 */ 406 public void setAsyncQueueDepth(int asyncQueueDepth) { 407 this.asyncQueueDepth = asyncQueueDepth; 408 } 409 410 @Override 411 public boolean isFaultTolerant() { 412 return false; 413 } 414 415 @Override 416 public boolean isDisposed() { 417 return disposed.get(); 418 } 419 420 @Override 421 public boolean isConnected() { 422 return !disposed.get(); 423 } 424 425 @Override 426 public void reconnect(URI uri) throws IOException { 427 throw new IOException("Transport reconnect is not supported"); 428 } 429 430 @Override 431 public boolean isReconnectSupported() { 432 return false; 433 } 434 435 @Override 436 public boolean isUpdateURIsSupported() { 437 return false; 438 } 439 440 @Override 441 public void updateURIs(boolean reblance,URI[] uris) throws IOException { 442 throw new IOException("URI update feature not supported"); 443 } 444 445 @Override 446 public int getReceiveCounter() { 447 return receiveCounter; 448 } 449}