001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.failover; 018 019import java.io.BufferedReader; 020import java.io.FileReader; 021import java.io.IOException; 022import java.io.InputStreamReader; 023import java.io.InterruptedIOException; 024import java.net.InetAddress; 025import java.net.MalformedURLException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.URL; 029import java.util.ArrayList; 030import java.util.Collections; 031import java.util.HashSet; 032import java.util.Iterator; 033import java.util.LinkedHashMap; 034import java.util.List; 035import java.util.Map; 036import java.util.StringTokenizer; 037import java.util.concurrent.CopyOnWriteArrayList; 038import java.util.concurrent.atomic.AtomicReference; 039 040import org.apache.activemq.broker.SslContext; 041import org.apache.activemq.command.Command; 042import org.apache.activemq.command.ConnectionControl; 043import org.apache.activemq.command.ConnectionId; 044import org.apache.activemq.command.MessageDispatch; 045import org.apache.activemq.command.MessagePull; 046import org.apache.activemq.command.RemoveInfo; 047import org.apache.activemq.command.Response; 048import org.apache.activemq.state.ConnectionStateTracker; 049import org.apache.activemq.state.Tracked; 050import org.apache.activemq.thread.Task; 051import org.apache.activemq.thread.TaskRunner; 052import org.apache.activemq.thread.TaskRunnerFactory; 053import org.apache.activemq.transport.CompositeTransport; 054import org.apache.activemq.transport.DefaultTransportListener; 055import org.apache.activemq.transport.FutureResponse; 056import org.apache.activemq.transport.ResponseCallback; 057import org.apache.activemq.transport.Transport; 058import org.apache.activemq.transport.TransportFactory; 059import org.apache.activemq.transport.TransportListener; 060import org.apache.activemq.util.IOExceptionSupport; 061import org.apache.activemq.util.ServiceSupport; 062import org.apache.activemq.util.URISupport; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066/** 067 * A Transport that is made reliable by being able to fail over to another 068 * transport when a transport failure is detected. 069 */ 070public class FailoverTransport implements CompositeTransport { 071 072 private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class); 073 private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10; 074 private static final int INFINITE = -1; 075 private TransportListener transportListener; 076 private boolean disposed; 077 private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>(); 078 private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>(); 079 080 private final Object reconnectMutex = new Object(); 081 private final Object backupMutex = new Object(); 082 private final Object sleepMutex = new Object(); 083 private final Object listenerMutex = new Object(); 084 private final ConnectionStateTracker stateTracker = new ConnectionStateTracker(); 085 private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>(); 086 087 private URI connectedTransportURI; 088 private URI failedConnectTransportURI; 089 private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>(); 090 private final TaskRunnerFactory reconnectTaskFactory; 091 private final TaskRunner reconnectTask; 092 private boolean started; 093 private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 094 private long maxReconnectDelay = 1000 * 30; 095 private double backOffMultiplier = 2d; 096 private long timeout = INFINITE; 097 private boolean useExponentialBackOff = true; 098 private boolean randomize = true; 099 private int maxReconnectAttempts = INFINITE; 100 private int startupMaxReconnectAttempts = INFINITE; 101 private int connectFailures; 102 private int warnAfterReconnectAttempts = 10; 103 private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY; 104 private Exception connectionFailure; 105 private boolean firstConnection = true; 106 // optionally always have a backup created 107 private boolean backup = false; 108 private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>(); 109 private int backupPoolSize = 1; 110 private boolean trackMessages = false; 111 private boolean trackTransactionProducers = true; 112 private int maxCacheSize = 128 * 1024; 113 private final TransportListener disposedListener = new DefaultTransportListener() { 114 }; 115 private final TransportListener myTransportListener = createTransportListener(); 116 private boolean updateURIsSupported = true; 117 private boolean reconnectSupported = true; 118 // remember for reconnect thread 119 private SslContext brokerSslContext; 120 private String updateURIsURL = null; 121 private boolean rebalanceUpdateURIs = true; 122 private boolean doRebalance = false; 123 private boolean connectedToPriority = false; 124 125 private boolean priorityBackup = false; 126 private final ArrayList<URI> priorityList = new ArrayList<URI>(); 127 private boolean priorityBackupAvailable = false; 128 private String nestedExtraQueryOptions; 129 private boolean shuttingDown = false; 130 131 public FailoverTransport() throws InterruptedIOException { 132 brokerSslContext = SslContext.getCurrentSslContext(); 133 stateTracker.setTrackTransactions(true); 134 // Setup a task that is used to reconnect the a connection async. 135 reconnectTaskFactory = new TaskRunnerFactory(); 136 reconnectTaskFactory.init(); 137 reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() { 138 @Override 139 public boolean iterate() { 140 boolean result = false; 141 if (!started) { 142 return result; 143 } 144 boolean buildBackup = true; 145 synchronized (backupMutex) { 146 if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) { 147 result = doReconnect(); 148 buildBackup = false; 149 } 150 } 151 if (buildBackup) { 152 buildBackups(); 153 if (priorityBackup && !connectedToPriority) { 154 try { 155 doDelay(); 156 if (reconnectTask == null) { 157 return true; 158 } 159 reconnectTask.wakeup(); 160 } catch (InterruptedException e) { 161 LOG.debug("Reconnect task has been interrupted.", e); 162 } 163 } 164 } else { 165 // build backups on the next iteration 166 buildBackup = true; 167 try { 168 if (reconnectTask == null) { 169 return true; 170 } 171 reconnectTask.wakeup(); 172 } catch (InterruptedException e) { 173 LOG.debug("Reconnect task has been interrupted.", e); 174 } 175 } 176 return result; 177 } 178 179 }, "ActiveMQ Failover Worker: " + System.identityHashCode(this)); 180 } 181 182 TransportListener createTransportListener() { 183 return new TransportListener() { 184 @Override 185 public void onCommand(Object o) { 186 Command command = (Command) o; 187 if (command == null) { 188 return; 189 } 190 if (command.isResponse()) { 191 Object object = null; 192 synchronized (requestMap) { 193 object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId())); 194 } 195 if (object != null && object.getClass() == Tracked.class) { 196 ((Tracked) object).onResponses(command); 197 } 198 } 199 200 if (command.isConnectionControl()) { 201 handleConnectionControl((ConnectionControl) command); 202 } 203 if (transportListener != null) { 204 transportListener.onCommand(command); 205 } 206 } 207 208 @Override 209 public void onException(IOException error) { 210 try { 211 handleTransportFailure(error); 212 } catch (InterruptedException e) { 213 Thread.currentThread().interrupt(); 214 transportListener.onException(new InterruptedIOException()); 215 } 216 } 217 218 @Override 219 public void transportInterupted() { 220 if (transportListener != null) { 221 transportListener.transportInterupted(); 222 } 223 } 224 225 @Override 226 public void transportResumed() { 227 if (transportListener != null) { 228 transportListener.transportResumed(); 229 } 230 } 231 }; 232 } 233 234 public final void disposeTransport(Transport transport) { 235 transport.setTransportListener(disposedListener); 236 ServiceSupport.dispose(transport); 237 } 238 239 public final void handleTransportFailure(IOException e) throws InterruptedException { 240 if (shuttingDown) { 241 // shutdown info sent and remote socket closed and we see that before a local close 242 // let the close do the work 243 return; 244 } 245 246 if (LOG.isTraceEnabled()) { 247 LOG.trace(this + " handleTransportFailure: " + e, e); 248 } 249 250 // could be blocked in write with the reconnectMutex held, but still needs to be whacked 251 Transport transport = connectedTransport.getAndSet(null); 252 if (transport != null) { 253 disposeTransport(transport); 254 } 255 256 synchronized (reconnectMutex) { 257 if (transport != null && connectedTransport.get() == null) { 258 259 boolean reconnectOk = false; 260 261 if (canReconnect()) { 262 reconnectOk = true; 263 } 264 LOG.warn("Transport (" + connectedTransportURI + ") failed" 265 + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e); 266 267 failedConnectTransportURI = connectedTransportURI; 268 connectedTransportURI = null; 269 connectedToPriority = false; 270 271 if (reconnectOk) { 272 // notify before any reconnect attempt so ack state can be whacked 273 if (transportListener != null) { 274 transportListener.transportInterupted(); 275 } 276 277 updated.remove(failedConnectTransportURI); 278 reconnectTask.wakeup(); 279 } else if (!isDisposed()) { 280 propagateFailureToExceptionListener(e); 281 } 282 } 283 } 284 } 285 286 private boolean canReconnect() { 287 return started && 0 != calculateReconnectAttemptLimit(); 288 } 289 290 public final void handleConnectionControl(ConnectionControl control) { 291 String reconnectStr = control.getReconnectTo(); 292 if (LOG.isTraceEnabled()) { 293 LOG.trace("Received ConnectionControl: {}", control); 294 } 295 296 if (reconnectStr != null) { 297 reconnectStr = reconnectStr.trim(); 298 if (reconnectStr.length() > 0) { 299 try { 300 URI uri = new URI(reconnectStr); 301 if (isReconnectSupported()) { 302 reconnect(uri); 303 LOG.info("Reconnected to: " + uri); 304 } 305 } catch (Exception e) { 306 LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e); 307 } 308 } 309 } 310 processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers()); 311 } 312 313 private final void processNewTransports(boolean rebalance, String newTransports) { 314 if (newTransports != null) { 315 newTransports = newTransports.trim(); 316 if (newTransports.length() > 0 && isUpdateURIsSupported()) { 317 List<URI> list = new ArrayList<URI>(); 318 StringTokenizer tokenizer = new StringTokenizer(newTransports, ","); 319 while (tokenizer.hasMoreTokens()) { 320 String str = tokenizer.nextToken(); 321 try { 322 URI uri = new URI(str); 323 list.add(uri); 324 } catch (Exception e) { 325 LOG.error("Failed to parse broker address: " + str, e); 326 } 327 } 328 if (list.isEmpty() == false) { 329 try { 330 updateURIs(rebalance, list.toArray(new URI[list.size()])); 331 } catch (IOException e) { 332 LOG.error("Failed to update transport URI's from: " + newTransports, e); 333 } 334 } 335 } 336 } 337 } 338 339 @Override 340 public void start() throws Exception { 341 synchronized (reconnectMutex) { 342 if (LOG.isDebugEnabled()) { 343 LOG.debug("Started " + this); 344 } 345 if (started) { 346 return; 347 } 348 started = true; 349 stateTracker.setMaxCacheSize(getMaxCacheSize()); 350 stateTracker.setTrackMessages(isTrackMessages()); 351 stateTracker.setTrackTransactionProducers(isTrackTransactionProducers()); 352 if (connectedTransport.get() != null) { 353 stateTracker.restore(connectedTransport.get()); 354 } else { 355 reconnect(false); 356 } 357 } 358 } 359 360 @Override 361 public void stop() throws Exception { 362 Transport transportToStop = null; 363 List<Transport> backupsToStop = new ArrayList<Transport>(backups.size()); 364 365 try { 366 synchronized (reconnectMutex) { 367 if (LOG.isDebugEnabled()) { 368 LOG.debug("Stopped " + this); 369 } 370 if (!started) { 371 return; 372 } 373 started = false; 374 disposed = true; 375 376 if (connectedTransport.get() != null) { 377 transportToStop = connectedTransport.getAndSet(null); 378 } 379 reconnectMutex.notifyAll(); 380 } 381 synchronized (sleepMutex) { 382 sleepMutex.notifyAll(); 383 } 384 } finally { 385 reconnectTask.shutdown(); 386 reconnectTaskFactory.shutdownNow(); 387 } 388 389 synchronized(backupMutex) { 390 for (BackupTransport backup : backups) { 391 backup.setDisposed(true); 392 Transport transport = backup.getTransport(); 393 if (transport != null) { 394 transport.setTransportListener(disposedListener); 395 backupsToStop.add(transport); 396 } 397 } 398 backups.clear(); 399 } 400 for (Transport transport : backupsToStop) { 401 try { 402 if (LOG.isTraceEnabled()) { 403 LOG.trace("Stopped backup: " + transport); 404 } 405 disposeTransport(transport); 406 } catch (Exception e) { 407 } 408 } 409 if (transportToStop != null) { 410 transportToStop.stop(); 411 } 412 } 413 414 public long getInitialReconnectDelay() { 415 return initialReconnectDelay; 416 } 417 418 public void setInitialReconnectDelay(long initialReconnectDelay) { 419 this.initialReconnectDelay = initialReconnectDelay; 420 } 421 422 public long getMaxReconnectDelay() { 423 return maxReconnectDelay; 424 } 425 426 public void setMaxReconnectDelay(long maxReconnectDelay) { 427 this.maxReconnectDelay = maxReconnectDelay; 428 } 429 430 public long getReconnectDelay() { 431 return reconnectDelay; 432 } 433 434 public void setReconnectDelay(long reconnectDelay) { 435 this.reconnectDelay = reconnectDelay; 436 } 437 438 public double getReconnectDelayExponent() { 439 return backOffMultiplier; 440 } 441 442 public void setReconnectDelayExponent(double reconnectDelayExponent) { 443 this.backOffMultiplier = reconnectDelayExponent; 444 } 445 446 public Transport getConnectedTransport() { 447 return connectedTransport.get(); 448 } 449 450 public URI getConnectedTransportURI() { 451 return connectedTransportURI; 452 } 453 454 public int getMaxReconnectAttempts() { 455 return maxReconnectAttempts; 456 } 457 458 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 459 this.maxReconnectAttempts = maxReconnectAttempts; 460 } 461 462 public int getStartupMaxReconnectAttempts() { 463 return this.startupMaxReconnectAttempts; 464 } 465 466 public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) { 467 this.startupMaxReconnectAttempts = startupMaxReconnectAttempts; 468 } 469 470 public long getTimeout() { 471 return timeout; 472 } 473 474 public void setTimeout(long timeout) { 475 this.timeout = timeout; 476 } 477 478 /** 479 * @return Returns the randomize. 480 */ 481 public boolean isRandomize() { 482 return randomize; 483 } 484 485 /** 486 * @param randomize The randomize to set. 487 */ 488 public void setRandomize(boolean randomize) { 489 this.randomize = randomize; 490 } 491 492 public boolean isBackup() { 493 return backup; 494 } 495 496 public void setBackup(boolean backup) { 497 this.backup = backup; 498 } 499 500 public int getBackupPoolSize() { 501 return backupPoolSize; 502 } 503 504 public void setBackupPoolSize(int backupPoolSize) { 505 this.backupPoolSize = backupPoolSize; 506 } 507 508 public int getCurrentBackups() { 509 return this.backups.size(); 510 } 511 512 public boolean isTrackMessages() { 513 return trackMessages; 514 } 515 516 public void setTrackMessages(boolean trackMessages) { 517 this.trackMessages = trackMessages; 518 } 519 520 public boolean isTrackTransactionProducers() { 521 return this.trackTransactionProducers; 522 } 523 524 public void setTrackTransactionProducers(boolean trackTransactionProducers) { 525 this.trackTransactionProducers = trackTransactionProducers; 526 } 527 528 public int getMaxCacheSize() { 529 return maxCacheSize; 530 } 531 532 public void setMaxCacheSize(int maxCacheSize) { 533 this.maxCacheSize = maxCacheSize; 534 } 535 536 public boolean isPriorityBackup() { 537 return priorityBackup; 538 } 539 540 public void setPriorityBackup(boolean priorityBackup) { 541 this.priorityBackup = priorityBackup; 542 } 543 544 public void setPriorityURIs(String priorityURIs) { 545 StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ","); 546 while (tokenizer.hasMoreTokens()) { 547 String str = tokenizer.nextToken(); 548 try { 549 URI uri = new URI(str); 550 priorityList.add(uri); 551 } catch (Exception e) { 552 LOG.error("Failed to parse broker address: " + str, e); 553 } 554 } 555 } 556 557 @Override 558 public void oneway(Object o) throws IOException { 559 560 Command command = (Command) o; 561 Exception error = null; 562 try { 563 564 synchronized (reconnectMutex) { 565 566 if (command != null && connectedTransport.get() == null) { 567 if (command.isShutdownInfo()) { 568 // Skipping send of ShutdownInfo command when not connected. 569 return; 570 } else if (command instanceof RemoveInfo || command.isMessageAck()) { 571 // Simulate response to RemoveInfo command or MessageAck (as it will be stale) 572 stateTracker.track(command); 573 if (command.isResponseRequired()) { 574 Response response = new Response(); 575 response.setCorrelationId(command.getCommandId()); 576 myTransportListener.onCommand(response); 577 } 578 return; 579 } else if (command instanceof MessagePull) { 580 // Simulate response to MessagePull if timed as we can't honor that now. 581 MessagePull pullRequest = (MessagePull) command; 582 if (pullRequest.getTimeout() != 0) { 583 MessageDispatch dispatch = new MessageDispatch(); 584 dispatch.setConsumerId(pullRequest.getConsumerId()); 585 dispatch.setDestination(pullRequest.getDestination()); 586 myTransportListener.onCommand(dispatch); 587 } 588 return; 589 } 590 } 591 592 // Keep trying until the message is sent. 593 for (int i = 0; !disposed; i++) { 594 try { 595 596 // Wait for transport to be connected. 597 Transport transport = connectedTransport.get(); 598 long start = System.currentTimeMillis(); 599 boolean timedout = false; 600 while (transport == null && !disposed && connectionFailure == null 601 && !Thread.currentThread().isInterrupted()) { 602 if (LOG.isTraceEnabled()) { 603 LOG.trace("Waiting for transport to reconnect..: " + command); 604 } 605 long end = System.currentTimeMillis(); 606 if (command.isMessage() && timeout > 0 && (end - start > timeout)) { 607 timedout = true; 608 if (LOG.isInfoEnabled()) { 609 LOG.info("Failover timed out after " + (end - start) + "ms"); 610 } 611 break; 612 } 613 try { 614 reconnectMutex.wait(100); 615 } catch (InterruptedException e) { 616 Thread.currentThread().interrupt(); 617 if (LOG.isDebugEnabled()) { 618 LOG.debug("Interupted: " + e, e); 619 } 620 } 621 transport = connectedTransport.get(); 622 } 623 624 if (transport == null) { 625 // Previous loop may have exited due to use being 626 // disposed. 627 if (disposed) { 628 error = new IOException("Transport disposed."); 629 } else if (connectionFailure != null) { 630 error = connectionFailure; 631 } else if (timedout == true) { 632 error = new IOException("Failover timeout of " + timeout + " ms reached."); 633 } else { 634 error = new IOException("Unexpected failure."); 635 } 636 break; 637 } 638 639 Tracked tracked = null; 640 try { 641 tracked = stateTracker.track(command); 642 } catch (IOException ioe) { 643 LOG.debug("Cannot track the command " + command, ioe); 644 } 645 // If it was a request and it was not being tracked by 646 // the state tracker, 647 // then hold it in the requestMap so that we can replay 648 // it later. 649 synchronized (requestMap) { 650 if (tracked != null && tracked.isWaitingForResponse()) { 651 requestMap.put(Integer.valueOf(command.getCommandId()), tracked); 652 } else if (tracked == null && command.isResponseRequired()) { 653 requestMap.put(Integer.valueOf(command.getCommandId()), command); 654 } 655 } 656 657 // Send the message. 658 try { 659 transport.oneway(command); 660 stateTracker.trackBack(command); 661 if (command.isShutdownInfo()) { 662 shuttingDown = true; 663 } 664 } catch (IOException e) { 665 666 // If the command was not tracked.. we will retry in 667 // this method 668 if (tracked == null) { 669 670 // since we will retry in this method.. take it 671 // out of the request 672 // map so that it is not sent 2 times on 673 // recovery 674 if (command.isResponseRequired()) { 675 requestMap.remove(Integer.valueOf(command.getCommandId())); 676 } 677 678 // Rethrow the exception so it will handled by 679 // the outer catch 680 throw e; 681 } else { 682 // Handle the error but allow the method to return since the 683 // tracked commands are replayed on reconnect. 684 if (LOG.isDebugEnabled()) { 685 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); 686 } 687 handleTransportFailure(e); 688 } 689 } 690 691 return; 692 693 } catch (IOException e) { 694 if (LOG.isDebugEnabled()) { 695 LOG.debug("Send oneway attempt: " + i + " failed for command:" + command); 696 } 697 handleTransportFailure(e); 698 } 699 } 700 } 701 } catch (InterruptedException e) { 702 // Some one may be trying to stop our thread. 703 Thread.currentThread().interrupt(); 704 throw new InterruptedIOException(); 705 } 706 707 if (!disposed) { 708 if (error != null) { 709 if (error instanceof IOException) { 710 throw (IOException) error; 711 } 712 throw IOExceptionSupport.create(error); 713 } 714 } 715 } 716 717 @Override 718 public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { 719 throw new AssertionError("Unsupported Method"); 720 } 721 722 @Override 723 public Object request(Object command) throws IOException { 724 throw new AssertionError("Unsupported Method"); 725 } 726 727 @Override 728 public Object request(Object command, int timeout) throws IOException { 729 throw new AssertionError("Unsupported Method"); 730 } 731 732 @Override 733 public void add(boolean rebalance, URI u[]) { 734 boolean newURI = false; 735 for (URI uri : u) { 736 if (!contains(uri)) { 737 uris.add(uri); 738 newURI = true; 739 } 740 } 741 if (newURI) { 742 reconnect(rebalance); 743 } 744 } 745 746 @Override 747 public void remove(boolean rebalance, URI u[]) { 748 for (URI uri : u) { 749 uris.remove(uri); 750 } 751 // rebalance is automatic if any connected to removed/stopped broker 752 } 753 754 public void add(boolean rebalance, String u) { 755 try { 756 URI newURI = new URI(u); 757 if (contains(newURI) == false) { 758 uris.add(newURI); 759 reconnect(rebalance); 760 } 761 762 } catch (Exception e) { 763 LOG.error("Failed to parse URI: " + u); 764 } 765 } 766 767 public void reconnect(boolean rebalance) { 768 synchronized (reconnectMutex) { 769 if (started) { 770 if (rebalance) { 771 doRebalance = true; 772 } 773 LOG.debug("Waking up reconnect task"); 774 try { 775 reconnectTask.wakeup(); 776 } catch (InterruptedException e) { 777 Thread.currentThread().interrupt(); 778 } 779 } else { 780 LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport."); 781 } 782 } 783 } 784 785 private List<URI> getConnectList() { 786 if (!updated.isEmpty()) { 787 return updated; 788 } 789 ArrayList<URI> l = new ArrayList<URI>(uris); 790 boolean removed = false; 791 if (failedConnectTransportURI != null) { 792 removed = l.remove(failedConnectTransportURI); 793 } 794 if (randomize) { 795 // Randomly, reorder the list by random swapping 796 for (int i = 0; i < l.size(); i++) { 797 // meed parenthesis due other JDKs (see AMQ-4826) 798 int p = ((int) (Math.random() * 100)) % l.size(); 799 URI t = l.get(p); 800 l.set(p, l.get(i)); 801 l.set(i, t); 802 } 803 } 804 if (removed) { 805 l.add(failedConnectTransportURI); 806 } 807 if (LOG.isDebugEnabled()) { 808 LOG.debug("urlList connectionList:" + l + ", from: " + uris); 809 } 810 return l; 811 } 812 813 @Override 814 public TransportListener getTransportListener() { 815 return transportListener; 816 } 817 818 @Override 819 public void setTransportListener(TransportListener commandListener) { 820 synchronized (listenerMutex) { 821 this.transportListener = commandListener; 822 listenerMutex.notifyAll(); 823 } 824 } 825 826 @Override 827 public <T> T narrow(Class<T> target) { 828 829 if (target.isAssignableFrom(getClass())) { 830 return target.cast(this); 831 } 832 Transport transport = connectedTransport.get(); 833 if (transport != null) { 834 return transport.narrow(target); 835 } 836 return null; 837 838 } 839 840 protected void restoreTransport(Transport t) throws Exception, IOException { 841 t.start(); 842 // send information to the broker - informing it we are an ft client 843 ConnectionControl cc = new ConnectionControl(); 844 cc.setFaultTolerant(true); 845 t.oneway(cc); 846 stateTracker.restore(t); 847 Map<Integer, Command> tmpMap = null; 848 synchronized (requestMap) { 849 tmpMap = new LinkedHashMap<Integer, Command>(requestMap); 850 } 851 for (Command command : tmpMap.values()) { 852 if (LOG.isTraceEnabled()) { 853 LOG.trace("restore requestMap, replay: " + command); 854 } 855 t.oneway(command); 856 } 857 } 858 859 public boolean isUseExponentialBackOff() { 860 return useExponentialBackOff; 861 } 862 863 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 864 this.useExponentialBackOff = useExponentialBackOff; 865 } 866 867 @Override 868 public String toString() { 869 return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString(); 870 } 871 872 @Override 873 public String getRemoteAddress() { 874 Transport transport = connectedTransport.get(); 875 if (transport != null) { 876 return transport.getRemoteAddress(); 877 } 878 return null; 879 } 880 881 @Override 882 public boolean isFaultTolerant() { 883 return true; 884 } 885 886 private void doUpdateURIsFromDisk() { 887 // If updateURIsURL is specified, read the file and add any new 888 // transport URI's to this FailOverTransport. 889 // Note: Could track file timestamp to avoid unnecessary reading. 890 String fileURL = getUpdateURIsURL(); 891 if (fileURL != null) { 892 BufferedReader in = null; 893 String newUris = null; 894 StringBuffer buffer = new StringBuffer(); 895 896 try { 897 in = new BufferedReader(getURLStream(fileURL)); 898 while (true) { 899 String line = in.readLine(); 900 if (line == null) { 901 break; 902 } 903 buffer.append(line); 904 } 905 newUris = buffer.toString(); 906 } catch (IOException ioe) { 907 LOG.error("Failed to read updateURIsURL: " + fileURL, ioe); 908 } finally { 909 if (in != null) { 910 try { 911 in.close(); 912 } catch (IOException ioe) { 913 // ignore 914 } 915 } 916 } 917 918 processNewTransports(isRebalanceUpdateURIs(), newUris); 919 } 920 } 921 922 final boolean doReconnect() { 923 Exception failure = null; 924 synchronized (reconnectMutex) { 925 926 // First ensure we are up to date. 927 doUpdateURIsFromDisk(); 928 929 if (disposed || connectionFailure != null) { 930 reconnectMutex.notifyAll(); 931 } 932 if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) { 933 return false; 934 } else { 935 List<URI> connectList = getConnectList(); 936 if (connectList.isEmpty()) { 937 failure = new IOException("No uris available to connect to."); 938 } else { 939 if (doRebalance) { 940 if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) { 941 // already connected to first in the list, no need to rebalance 942 doRebalance = false; 943 return false; 944 } else { 945 if (LOG.isDebugEnabled()) { 946 LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList); 947 } 948 949 try { 950 Transport transport = this.connectedTransport.getAndSet(null); 951 if (transport != null) { 952 disposeTransport(transport); 953 } 954 } catch (Exception e) { 955 if (LOG.isDebugEnabled()) { 956 LOG.debug("Caught an exception stopping existing transport for rebalance", e); 957 } 958 } 959 } 960 doRebalance = false; 961 } 962 963 resetReconnectDelay(); 964 965 Transport transport = null; 966 URI uri = null; 967 968 // If we have a backup already waiting lets try it. 969 synchronized (backupMutex) { 970 if ((priorityBackup || backup) && !backups.isEmpty()) { 971 ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups); 972 if (randomize) { 973 Collections.shuffle(l); 974 } 975 BackupTransport bt = l.remove(0); 976 backups.remove(bt); 977 transport = bt.getTransport(); 978 uri = bt.getUri(); 979 if (priorityBackup && priorityBackupAvailable) { 980 Transport old = this.connectedTransport.getAndSet(null); 981 if (old != null) { 982 disposeTransport(old); 983 } 984 priorityBackupAvailable = false; 985 } 986 } 987 } 988 989 // Sleep for the reconnectDelay if there's no backup and we aren't trying 990 // for the first time, or we were disposed for some reason. 991 if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) { 992 synchronized (sleepMutex) { 993 if (LOG.isDebugEnabled()) { 994 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. "); 995 } 996 try { 997 sleepMutex.wait(reconnectDelay); 998 } catch (InterruptedException e) { 999 Thread.currentThread().interrupt(); 1000 } 1001 } 1002 } 1003 1004 Iterator<URI> iter = connectList.iterator(); 1005 while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) { 1006 1007 try { 1008 SslContext.setCurrentSslContext(brokerSslContext); 1009 1010 // We could be starting with a backup and if so we wait to grab a 1011 // URI from the pool until next time around. 1012 if (transport == null) { 1013 uri = addExtraQueryOptions(iter.next()); 1014 transport = TransportFactory.compositeConnect(uri); 1015 } 1016 1017 if (LOG.isDebugEnabled()) { 1018 LOG.debug("Attempting " + connectFailures + "th connect to: " + uri); 1019 } 1020 transport.setTransportListener(myTransportListener); 1021 transport.start(); 1022 1023 if (started && !firstConnection) { 1024 restoreTransport(transport); 1025 } 1026 1027 if (LOG.isDebugEnabled()) { 1028 LOG.debug("Connection established"); 1029 } 1030 reconnectDelay = initialReconnectDelay; 1031 connectedTransportURI = uri; 1032 connectedTransport.set(transport); 1033 connectedToPriority = isPriority(connectedTransportURI); 1034 reconnectMutex.notifyAll(); 1035 connectFailures = 0; 1036 1037 // Make sure on initial startup, that the transportListener 1038 // has been initialized for this instance. 1039 synchronized (listenerMutex) { 1040 if (transportListener == null) { 1041 try { 1042 // if it isn't set after 2secs - it probably never will be 1043 listenerMutex.wait(2000); 1044 } catch (InterruptedException ex) { 1045 } 1046 } 1047 } 1048 1049 if (transportListener != null) { 1050 transportListener.transportResumed(); 1051 } else { 1052 if (LOG.isDebugEnabled()) { 1053 LOG.debug("transport resumed by transport listener not set"); 1054 } 1055 } 1056 1057 if (firstConnection) { 1058 firstConnection = false; 1059 LOG.info("Successfully connected to " + uri); 1060 } else { 1061 LOG.info("Successfully reconnected to " + uri); 1062 } 1063 1064 return false; 1065 } catch (Exception e) { 1066 failure = e; 1067 if (LOG.isDebugEnabled()) { 1068 LOG.debug("Connect fail to: " + uri + ", reason: " + e); 1069 } 1070 if (transport != null) { 1071 try { 1072 transport.stop(); 1073 transport = null; 1074 } catch (Exception ee) { 1075 if (LOG.isDebugEnabled()) { 1076 LOG.debug("Stop of failed transport: " + transport + 1077 " failed with reason: " + ee); 1078 } 1079 } 1080 } 1081 } finally { 1082 SslContext.setCurrentSslContext(null); 1083 } 1084 } 1085 } 1086 } 1087 1088 int reconnectLimit = calculateReconnectAttemptLimit(); 1089 1090 connectFailures++; 1091 if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) { 1092 LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)"); 1093 connectionFailure = failure; 1094 1095 // Make sure on initial startup, that the transportListener has been 1096 // initialized for this instance. 1097 synchronized (listenerMutex) { 1098 if (transportListener == null) { 1099 try { 1100 listenerMutex.wait(2000); 1101 } catch (InterruptedException ex) { 1102 } 1103 } 1104 } 1105 1106 propagateFailureToExceptionListener(connectionFailure); 1107 return false; 1108 } 1109 1110 int warnInterval = getWarnAfterReconnectAttempts(); 1111 if (warnInterval > 0 && (connectFailures % warnInterval) == 0) { 1112 LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.", 1113 uris, connectFailures); 1114 } 1115 } 1116 1117 if (!disposed) { 1118 doDelay(); 1119 } 1120 1121 return !disposed; 1122 } 1123 1124 private void doDelay() { 1125 if (reconnectDelay > 0) { 1126 synchronized (sleepMutex) { 1127 if (LOG.isDebugEnabled()) { 1128 LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection"); 1129 } 1130 try { 1131 sleepMutex.wait(reconnectDelay); 1132 } catch (InterruptedException e) { 1133 Thread.currentThread().interrupt(); 1134 } 1135 } 1136 } 1137 1138 if (useExponentialBackOff) { 1139 // Exponential increment of reconnect delay. 1140 reconnectDelay *= backOffMultiplier; 1141 if (reconnectDelay > maxReconnectDelay) { 1142 reconnectDelay = maxReconnectDelay; 1143 } 1144 } 1145 } 1146 1147 private void resetReconnectDelay() { 1148 if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) { 1149 reconnectDelay = initialReconnectDelay; 1150 } 1151 } 1152 1153 /* 1154 * called with reconnectMutex held 1155 */ 1156 private void propagateFailureToExceptionListener(Exception exception) { 1157 if (transportListener != null) { 1158 if (exception instanceof IOException) { 1159 transportListener.onException((IOException)exception); 1160 } else { 1161 transportListener.onException(IOExceptionSupport.create(exception)); 1162 } 1163 } 1164 reconnectMutex.notifyAll(); 1165 } 1166 1167 private int calculateReconnectAttemptLimit() { 1168 int maxReconnectValue = this.maxReconnectAttempts; 1169 if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) { 1170 maxReconnectValue = this.startupMaxReconnectAttempts; 1171 } 1172 return maxReconnectValue; 1173 } 1174 1175 private boolean shouldBuildBackups() { 1176 return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority)); 1177 } 1178 1179 final boolean buildBackups() { 1180 synchronized (backupMutex) { 1181 if (!disposed && shouldBuildBackups()) { 1182 ArrayList<URI> backupList = new ArrayList<URI>(priorityList); 1183 List<URI> connectList = getConnectList(); 1184 for (URI uri: connectList) { 1185 if (!backupList.contains(uri)) { 1186 backupList.add(uri); 1187 } 1188 } 1189 // removed disposed backups 1190 List<BackupTransport> disposedList = new ArrayList<BackupTransport>(); 1191 for (BackupTransport bt : backups) { 1192 if (bt.isDisposed()) { 1193 disposedList.add(bt); 1194 } 1195 } 1196 backups.removeAll(disposedList); 1197 disposedList.clear(); 1198 for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) { 1199 URI uri = addExtraQueryOptions(iter.next()); 1200 if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) { 1201 try { 1202 SslContext.setCurrentSslContext(brokerSslContext); 1203 BackupTransport bt = new BackupTransport(this); 1204 bt.setUri(uri); 1205 if (!backups.contains(bt)) { 1206 Transport t = TransportFactory.compositeConnect(uri); 1207 t.setTransportListener(bt); 1208 t.start(); 1209 bt.setTransport(t); 1210 if (priorityBackup && isPriority(uri)) { 1211 priorityBackupAvailable = true; 1212 backups.add(0, bt); 1213 // if this priority backup overflows the pool 1214 // remove the backup with the lowest priority 1215 if (backups.size() > backupPoolSize) { 1216 BackupTransport disposeTransport = backups.remove(backups.size() - 1); 1217 disposeTransport.setDisposed(true); 1218 Transport transport = disposeTransport.getTransport(); 1219 if (transport != null) { 1220 transport.setTransportListener(disposedListener); 1221 disposeTransport(transport); 1222 } 1223 } 1224 } else { 1225 backups.add(bt); 1226 } 1227 } 1228 } catch (Exception e) { 1229 LOG.debug("Failed to build backup ", e); 1230 } finally { 1231 SslContext.setCurrentSslContext(null); 1232 } 1233 } 1234 } 1235 } 1236 } 1237 return false; 1238 } 1239 1240 protected boolean isPriority(URI uri) { 1241 if (!priorityBackup) { 1242 return false; 1243 } 1244 1245 if (!priorityList.isEmpty()) { 1246 return priorityList.contains(uri); 1247 } 1248 return uris.indexOf(uri) == 0; 1249 } 1250 1251 @Override 1252 public boolean isDisposed() { 1253 return disposed; 1254 } 1255 1256 @Override 1257 public boolean isConnected() { 1258 return connectedTransport.get() != null; 1259 } 1260 1261 @Override 1262 public void reconnect(URI uri) throws IOException { 1263 add(true, new URI[]{uri}); 1264 } 1265 1266 @Override 1267 public boolean isReconnectSupported() { 1268 return this.reconnectSupported; 1269 } 1270 1271 public void setReconnectSupported(boolean value) { 1272 this.reconnectSupported = value; 1273 } 1274 1275 @Override 1276 public boolean isUpdateURIsSupported() { 1277 return this.updateURIsSupported; 1278 } 1279 1280 public void setUpdateURIsSupported(boolean value) { 1281 this.updateURIsSupported = value; 1282 } 1283 1284 @Override 1285 public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException { 1286 if (isUpdateURIsSupported()) { 1287 HashSet<URI> copy = new HashSet<URI>(); 1288 synchronized (reconnectMutex) { 1289 copy.addAll(this.updated); 1290 updated.clear(); 1291 if (updatedURIs != null && updatedURIs.length > 0) { 1292 for (URI uri : updatedURIs) { 1293 if (uri != null && !updated.contains(uri)) { 1294 updated.add(uri); 1295 } 1296 } 1297 } 1298 } 1299 if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) { 1300 buildBackups(); 1301 reconnect(rebalance); 1302 } 1303 } 1304 } 1305 1306 /** 1307 * @return the updateURIsURL 1308 */ 1309 public String getUpdateURIsURL() { 1310 return this.updateURIsURL; 1311 } 1312 1313 /** 1314 * @param updateURIsURL the updateURIsURL to set 1315 */ 1316 public void setUpdateURIsURL(String updateURIsURL) { 1317 this.updateURIsURL = updateURIsURL; 1318 } 1319 1320 /** 1321 * @return the rebalanceUpdateURIs 1322 */ 1323 public boolean isRebalanceUpdateURIs() { 1324 return this.rebalanceUpdateURIs; 1325 } 1326 1327 /** 1328 * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set 1329 */ 1330 public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) { 1331 this.rebalanceUpdateURIs = rebalanceUpdateURIs; 1332 } 1333 1334 @Override 1335 public int getReceiveCounter() { 1336 Transport transport = connectedTransport.get(); 1337 if (transport == null) { 1338 return 0; 1339 } 1340 return transport.getReceiveCounter(); 1341 } 1342 1343 public int getConnectFailures() { 1344 return connectFailures; 1345 } 1346 1347 public void connectionInterruptProcessingComplete(ConnectionId connectionId) { 1348 synchronized (reconnectMutex) { 1349 stateTracker.connectionInterruptProcessingComplete(this, connectionId); 1350 } 1351 } 1352 1353 public ConnectionStateTracker getStateTracker() { 1354 return stateTracker; 1355 } 1356 1357 private boolean contains(URI newURI) { 1358 boolean result = false; 1359 for (URI uri : uris) { 1360 if (compareURIs(newURI, uri)) { 1361 result = true; 1362 break; 1363 } 1364 } 1365 1366 return result; 1367 } 1368 1369 private boolean compareURIs(final URI first, final URI second) { 1370 1371 boolean result = false; 1372 if (first == null || second == null) { 1373 return result; 1374 } 1375 1376 if (first.getPort() == second.getPort()) { 1377 InetAddress firstAddr = null; 1378 InetAddress secondAddr = null; 1379 try { 1380 firstAddr = InetAddress.getByName(first.getHost()); 1381 secondAddr = InetAddress.getByName(second.getHost()); 1382 1383 if (firstAddr.equals(secondAddr)) { 1384 result = true; 1385 } 1386 1387 } catch(IOException e) { 1388 1389 if (firstAddr == null) { 1390 LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e); 1391 } else { 1392 LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e); 1393 } 1394 1395 if (first.getHost().equalsIgnoreCase(second.getHost())) { 1396 result = true; 1397 } 1398 } 1399 } 1400 1401 return result; 1402 } 1403 1404 private InputStreamReader getURLStream(String path) throws IOException { 1405 InputStreamReader result = null; 1406 URL url = null; 1407 try { 1408 url = new URL(path); 1409 result = new InputStreamReader(url.openStream()); 1410 } catch (MalformedURLException e) { 1411 // ignore - it could be a path to a a local file 1412 } 1413 if (result == null) { 1414 result = new FileReader(path); 1415 } 1416 return result; 1417 } 1418 1419 private URI addExtraQueryOptions(URI uri) { 1420 try { 1421 if( nestedExtraQueryOptions!=null && !nestedExtraQueryOptions.isEmpty() ) { 1422 if( uri.getQuery() == null ) { 1423 uri = URISupport.createURIWithQuery(uri, nestedExtraQueryOptions); 1424 } else { 1425 uri = URISupport.createURIWithQuery(uri, uri.getQuery()+"&"+nestedExtraQueryOptions); 1426 } 1427 } 1428 } catch (URISyntaxException e) { 1429 throw new RuntimeException(e); 1430 } 1431 return uri; 1432 } 1433 1434 public void setNestedExtraQueryOptions(String nestedExtraQueryOptions) { 1435 this.nestedExtraQueryOptions = nestedExtraQueryOptions; 1436 } 1437 1438 public int getWarnAfterReconnectAttempts() { 1439 return warnAfterReconnectAttempts; 1440 } 1441 1442 /** 1443 * Sets the number of Connect / Reconnect attempts that must occur before a warn message 1444 * is logged indicating that the transport is not connected. This can be useful when the 1445 * client is running inside some container or service as it give an indication of some 1446 * problem with the client connection that might not otherwise be visible. To disable the 1447 * log messages this value should be set to a value @{code attempts <= 0} 1448 * 1449 * @param warnAfterReconnectAttempts 1450 * The number of failed connection attempts that must happen before a warning is logged. 1451 */ 1452 public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) { 1453 this.warnAfterReconnectAttempts = warnAfterReconnectAttempts; 1454 } 1455 1456}