001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.failover;
018
019import java.io.BufferedReader;
020import java.io.FileReader;
021import java.io.IOException;
022import java.io.InputStreamReader;
023import java.io.InterruptedIOException;
024import java.net.InetAddress;
025import java.net.MalformedURLException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.URL;
029import java.util.ArrayList;
030import java.util.Collections;
031import java.util.HashSet;
032import java.util.Iterator;
033import java.util.LinkedHashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.StringTokenizer;
037import java.util.concurrent.CopyOnWriteArrayList;
038import java.util.concurrent.atomic.AtomicReference;
039
040import org.apache.activemq.broker.SslContext;
041import org.apache.activemq.command.Command;
042import org.apache.activemq.command.ConnectionControl;
043import org.apache.activemq.command.ConnectionId;
044import org.apache.activemq.command.MessageDispatch;
045import org.apache.activemq.command.MessagePull;
046import org.apache.activemq.command.RemoveInfo;
047import org.apache.activemq.command.Response;
048import org.apache.activemq.state.ConnectionStateTracker;
049import org.apache.activemq.state.Tracked;
050import org.apache.activemq.thread.Task;
051import org.apache.activemq.thread.TaskRunner;
052import org.apache.activemq.thread.TaskRunnerFactory;
053import org.apache.activemq.transport.CompositeTransport;
054import org.apache.activemq.transport.DefaultTransportListener;
055import org.apache.activemq.transport.FutureResponse;
056import org.apache.activemq.transport.ResponseCallback;
057import org.apache.activemq.transport.Transport;
058import org.apache.activemq.transport.TransportFactory;
059import org.apache.activemq.transport.TransportListener;
060import org.apache.activemq.util.IOExceptionSupport;
061import org.apache.activemq.util.ServiceSupport;
062import org.apache.activemq.util.URISupport;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066/**
067 * A Transport that is made reliable by being able to fail over to another
068 * transport when a transport failure is detected.
069 */
070public class FailoverTransport implements CompositeTransport {
071
072    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransport.class);
073    private static final int DEFAULT_INITIAL_RECONNECT_DELAY = 10;
074    private static final int INFINITE = -1;
075    private TransportListener transportListener;
076    private boolean disposed;
077    private final CopyOnWriteArrayList<URI> uris = new CopyOnWriteArrayList<URI>();
078    private final CopyOnWriteArrayList<URI> updated = new CopyOnWriteArrayList<URI>();
079
080    private final Object reconnectMutex = new Object();
081    private final Object backupMutex = new Object();
082    private final Object sleepMutex = new Object();
083    private final Object listenerMutex = new Object();
084    private final ConnectionStateTracker stateTracker = new ConnectionStateTracker();
085    private final Map<Integer, Command> requestMap = new LinkedHashMap<Integer, Command>();
086
087    private URI connectedTransportURI;
088    private URI failedConnectTransportURI;
089    private final AtomicReference<Transport> connectedTransport = new AtomicReference<Transport>();
090    private final TaskRunnerFactory reconnectTaskFactory;
091    private final TaskRunner reconnectTask;
092    private boolean started;
093    private long initialReconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
094    private long maxReconnectDelay = 1000 * 30;
095    private double backOffMultiplier = 2d;
096    private long timeout = INFINITE;
097    private boolean useExponentialBackOff = true;
098    private boolean randomize = true;
099    private int maxReconnectAttempts = INFINITE;
100    private int startupMaxReconnectAttempts = INFINITE;
101    private int connectFailures;
102    private int warnAfterReconnectAttempts = 10;
103    private long reconnectDelay = DEFAULT_INITIAL_RECONNECT_DELAY;
104    private Exception connectionFailure;
105    private boolean firstConnection = true;
106    // optionally always have a backup created
107    private boolean backup = false;
108    private final List<BackupTransport> backups = new CopyOnWriteArrayList<BackupTransport>();
109    private int backupPoolSize = 1;
110    private boolean trackMessages = false;
111    private boolean trackTransactionProducers = true;
112    private int maxCacheSize = 128 * 1024;
113    private final TransportListener disposedListener = new DefaultTransportListener() {
114    };
115    private final TransportListener myTransportListener = createTransportListener();
116    private boolean updateURIsSupported = true;
117    private boolean reconnectSupported = true;
118    // remember for reconnect thread
119    private SslContext brokerSslContext;
120    private String updateURIsURL = null;
121    private boolean rebalanceUpdateURIs = true;
122    private boolean doRebalance = false;
123    private boolean connectedToPriority = false;
124
125    private boolean priorityBackup = false;
126    private final ArrayList<URI> priorityList = new ArrayList<URI>();
127    private boolean priorityBackupAvailable = false;
128    private String nestedExtraQueryOptions;
129    private boolean shuttingDown = false;
130
131    public FailoverTransport() throws InterruptedIOException {
132        brokerSslContext = SslContext.getCurrentSslContext();
133        stateTracker.setTrackTransactions(true);
134        // Setup a task that is used to reconnect the a connection async.
135        reconnectTaskFactory = new TaskRunnerFactory();
136        reconnectTaskFactory.init();
137        reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
138            @Override
139            public boolean iterate() {
140                boolean result = false;
141                if (!started) {
142                    return result;
143                }
144                boolean buildBackup = true;
145                synchronized (backupMutex) {
146                    if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
147                        result = doReconnect();
148                        buildBackup = false;
149                    }
150                }
151                if (buildBackup) {
152                    buildBackups();
153                    if (priorityBackup && !connectedToPriority) {
154                        try {
155                            doDelay();
156                            if (reconnectTask == null) {
157                                return true;
158                            }
159                            reconnectTask.wakeup();
160                        } catch (InterruptedException e) {
161                            LOG.debug("Reconnect task has been interrupted.", e);
162                        }
163                    }
164                } else {
165                    // build backups on the next iteration
166                    buildBackup = true;
167                    try {
168                        if (reconnectTask == null) {
169                            return true;
170                        }
171                        reconnectTask.wakeup();
172                    } catch (InterruptedException e) {
173                        LOG.debug("Reconnect task has been interrupted.", e);
174                    }
175                }
176                return result;
177            }
178
179        }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
180    }
181
182    TransportListener createTransportListener() {
183        return new TransportListener() {
184            @Override
185            public void onCommand(Object o) {
186                Command command = (Command) o;
187                if (command == null) {
188                    return;
189                }
190                if (command.isResponse()) {
191                    Object object = null;
192                    synchronized (requestMap) {
193                        object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
194                    }
195                    if (object != null && object.getClass() == Tracked.class) {
196                        ((Tracked) object).onResponses(command);
197                    }
198                }
199
200                if (command.isConnectionControl()) {
201                    handleConnectionControl((ConnectionControl) command);
202                }
203                if (transportListener != null) {
204                    transportListener.onCommand(command);
205                }
206            }
207
208            @Override
209            public void onException(IOException error) {
210                try {
211                    handleTransportFailure(error);
212                } catch (InterruptedException e) {
213                    Thread.currentThread().interrupt();
214                    transportListener.onException(new InterruptedIOException());
215                }
216            }
217
218            @Override
219            public void transportInterupted() {
220                if (transportListener != null) {
221                    transportListener.transportInterupted();
222                }
223            }
224
225            @Override
226            public void transportResumed() {
227                if (transportListener != null) {
228                    transportListener.transportResumed();
229                }
230            }
231        };
232    }
233
234    public final void disposeTransport(Transport transport) {
235        transport.setTransportListener(disposedListener);
236        ServiceSupport.dispose(transport);
237    }
238
239    public final void handleTransportFailure(IOException e) throws InterruptedException {
240        if (shuttingDown) {
241            // shutdown info sent and remote socket closed and we see that before a local close
242            // let the close do the work
243            return;
244        }
245
246        if (LOG.isTraceEnabled()) {
247            LOG.trace(this + " handleTransportFailure: " + e, e);
248        }
249
250        // could be blocked in write with the reconnectMutex held, but still needs to be whacked
251        Transport transport = connectedTransport.getAndSet(null);
252        if (transport != null) {
253            disposeTransport(transport);
254        }
255
256        synchronized (reconnectMutex) {
257            if (transport != null && connectedTransport.get() == null) {
258
259                boolean reconnectOk = false;
260
261                if (canReconnect()) {
262                    reconnectOk = true;
263                }
264                 LOG.warn("Transport (" + connectedTransportURI + ") failed"
265                        + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e);
266
267                failedConnectTransportURI = connectedTransportURI;
268                connectedTransportURI = null;
269                connectedToPriority = false;
270
271                if (reconnectOk) {
272                    // notify before any reconnect attempt so ack state can be whacked
273                    if (transportListener != null) {
274                        transportListener.transportInterupted();
275                    }
276
277                    updated.remove(failedConnectTransportURI);
278                    reconnectTask.wakeup();
279                } else if (!isDisposed()) {
280                    propagateFailureToExceptionListener(e);
281                }
282            }
283        }
284    }
285
286    private boolean canReconnect() {
287        return started && 0 != calculateReconnectAttemptLimit();
288    }
289
290    public final void handleConnectionControl(ConnectionControl control) {
291        String reconnectStr = control.getReconnectTo();
292        if (LOG.isTraceEnabled()) {
293            LOG.trace("Received ConnectionControl: {}", control);
294        }
295
296        if (reconnectStr != null) {
297            reconnectStr = reconnectStr.trim();
298            if (reconnectStr.length() > 0) {
299                try {
300                    URI uri = new URI(reconnectStr);
301                    if (isReconnectSupported()) {
302                        reconnect(uri);
303                        LOG.info("Reconnected to: " + uri);
304                    }
305                } catch (Exception e) {
306                    LOG.error("Failed to handle ConnectionControl reconnect to " + reconnectStr, e);
307                }
308            }
309        }
310        processNewTransports(control.isRebalanceConnection(), control.getConnectedBrokers());
311    }
312
313    private final void processNewTransports(boolean rebalance, String newTransports) {
314        if (newTransports != null) {
315            newTransports = newTransports.trim();
316            if (newTransports.length() > 0 && isUpdateURIsSupported()) {
317                List<URI> list = new ArrayList<URI>();
318                StringTokenizer tokenizer = new StringTokenizer(newTransports, ",");
319                while (tokenizer.hasMoreTokens()) {
320                    String str = tokenizer.nextToken();
321                    try {
322                        URI uri = new URI(str);
323                        list.add(uri);
324                    } catch (Exception e) {
325                        LOG.error("Failed to parse broker address: " + str, e);
326                    }
327                }
328                if (list.isEmpty() == false) {
329                    try {
330                        updateURIs(rebalance, list.toArray(new URI[list.size()]));
331                    } catch (IOException e) {
332                        LOG.error("Failed to update transport URI's from: " + newTransports, e);
333                    }
334                }
335            }
336        }
337    }
338
339    @Override
340    public void start() throws Exception {
341        synchronized (reconnectMutex) {
342            if (LOG.isDebugEnabled()) {
343                LOG.debug("Started " + this);
344            }
345            if (started) {
346                return;
347            }
348            started = true;
349            stateTracker.setMaxCacheSize(getMaxCacheSize());
350            stateTracker.setTrackMessages(isTrackMessages());
351            stateTracker.setTrackTransactionProducers(isTrackTransactionProducers());
352            if (connectedTransport.get() != null) {
353                stateTracker.restore(connectedTransport.get());
354            } else {
355                reconnect(false);
356            }
357        }
358    }
359
360    @Override
361    public void stop() throws Exception {
362        Transport transportToStop = null;
363        List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
364
365        try {
366            synchronized (reconnectMutex) {
367                if (LOG.isDebugEnabled()) {
368                    LOG.debug("Stopped " + this);
369                }
370                if (!started) {
371                    return;
372                }
373                started = false;
374                disposed = true;
375
376                if (connectedTransport.get() != null) {
377                    transportToStop = connectedTransport.getAndSet(null);
378                }
379                reconnectMutex.notifyAll();
380            }
381            synchronized (sleepMutex) {
382                sleepMutex.notifyAll();
383            }
384        } finally {
385            reconnectTask.shutdown();
386            reconnectTaskFactory.shutdownNow();
387        }
388
389        synchronized(backupMutex) {
390            for (BackupTransport backup : backups) {
391                backup.setDisposed(true);
392                Transport transport = backup.getTransport();
393                if (transport != null) {
394                    transport.setTransportListener(disposedListener);
395                    backupsToStop.add(transport);
396                }
397            }
398            backups.clear();
399        }
400        for (Transport transport : backupsToStop) {
401            try {
402                if (LOG.isTraceEnabled()) {
403                    LOG.trace("Stopped backup: " + transport);
404                }
405                disposeTransport(transport);
406            } catch (Exception e) {
407            }
408        }
409        if (transportToStop != null) {
410            transportToStop.stop();
411        }
412    }
413
414    public long getInitialReconnectDelay() {
415        return initialReconnectDelay;
416    }
417
418    public void setInitialReconnectDelay(long initialReconnectDelay) {
419        this.initialReconnectDelay = initialReconnectDelay;
420    }
421
422    public long getMaxReconnectDelay() {
423        return maxReconnectDelay;
424    }
425
426    public void setMaxReconnectDelay(long maxReconnectDelay) {
427        this.maxReconnectDelay = maxReconnectDelay;
428    }
429
430    public long getReconnectDelay() {
431        return reconnectDelay;
432    }
433
434    public void setReconnectDelay(long reconnectDelay) {
435        this.reconnectDelay = reconnectDelay;
436    }
437
438    public double getReconnectDelayExponent() {
439        return backOffMultiplier;
440    }
441
442    public void setReconnectDelayExponent(double reconnectDelayExponent) {
443        this.backOffMultiplier = reconnectDelayExponent;
444    }
445
446    public Transport getConnectedTransport() {
447        return connectedTransport.get();
448    }
449
450    public URI getConnectedTransportURI() {
451        return connectedTransportURI;
452    }
453
454    public int getMaxReconnectAttempts() {
455        return maxReconnectAttempts;
456    }
457
458    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
459        this.maxReconnectAttempts = maxReconnectAttempts;
460    }
461
462    public int getStartupMaxReconnectAttempts() {
463        return this.startupMaxReconnectAttempts;
464    }
465
466    public void setStartupMaxReconnectAttempts(int startupMaxReconnectAttempts) {
467        this.startupMaxReconnectAttempts = startupMaxReconnectAttempts;
468    }
469
470    public long getTimeout() {
471        return timeout;
472    }
473
474    public void setTimeout(long timeout) {
475        this.timeout = timeout;
476    }
477
478    /**
479     * @return Returns the randomize.
480     */
481    public boolean isRandomize() {
482        return randomize;
483    }
484
485    /**
486     * @param randomize The randomize to set.
487     */
488    public void setRandomize(boolean randomize) {
489        this.randomize = randomize;
490    }
491
492    public boolean isBackup() {
493        return backup;
494    }
495
496    public void setBackup(boolean backup) {
497        this.backup = backup;
498    }
499
500    public int getBackupPoolSize() {
501        return backupPoolSize;
502    }
503
504    public void setBackupPoolSize(int backupPoolSize) {
505        this.backupPoolSize = backupPoolSize;
506    }
507
508    public int getCurrentBackups() {
509        return this.backups.size();
510    }
511
512    public boolean isTrackMessages() {
513        return trackMessages;
514    }
515
516    public void setTrackMessages(boolean trackMessages) {
517        this.trackMessages = trackMessages;
518    }
519
520    public boolean isTrackTransactionProducers() {
521        return this.trackTransactionProducers;
522    }
523
524    public void setTrackTransactionProducers(boolean trackTransactionProducers) {
525        this.trackTransactionProducers = trackTransactionProducers;
526    }
527
528    public int getMaxCacheSize() {
529        return maxCacheSize;
530    }
531
532    public void setMaxCacheSize(int maxCacheSize) {
533        this.maxCacheSize = maxCacheSize;
534    }
535
536    public boolean isPriorityBackup() {
537        return priorityBackup;
538    }
539
540    public void setPriorityBackup(boolean priorityBackup) {
541        this.priorityBackup = priorityBackup;
542    }
543
544    public void setPriorityURIs(String priorityURIs) {
545        StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
546        while (tokenizer.hasMoreTokens()) {
547            String str = tokenizer.nextToken();
548            try {
549                URI uri = new URI(str);
550                priorityList.add(uri);
551            } catch (Exception e) {
552                LOG.error("Failed to parse broker address: " + str, e);
553            }
554        }
555    }
556
557    @Override
558    public void oneway(Object o) throws IOException {
559
560        Command command = (Command) o;
561        Exception error = null;
562        try {
563
564            synchronized (reconnectMutex) {
565
566                if (command != null && connectedTransport.get() == null) {
567                    if (command.isShutdownInfo()) {
568                        // Skipping send of ShutdownInfo command when not connected.
569                        return;
570                    } else if (command instanceof RemoveInfo || command.isMessageAck()) {
571                        // Simulate response to RemoveInfo command or MessageAck (as it will be stale)
572                        stateTracker.track(command);
573                        if (command.isResponseRequired()) {
574                            Response response = new Response();
575                            response.setCorrelationId(command.getCommandId());
576                            myTransportListener.onCommand(response);
577                        }
578                        return;
579                    } else if (command instanceof MessagePull) {
580                        // Simulate response to MessagePull if timed as we can't honor that now.
581                        MessagePull pullRequest = (MessagePull) command;
582                        if (pullRequest.getTimeout() != 0) {
583                            MessageDispatch dispatch = new MessageDispatch();
584                            dispatch.setConsumerId(pullRequest.getConsumerId());
585                            dispatch.setDestination(pullRequest.getDestination());
586                            myTransportListener.onCommand(dispatch);
587                        }
588                        return;
589                    }
590                }
591
592                // Keep trying until the message is sent.
593                for (int i = 0; !disposed; i++) {
594                    try {
595
596                        // Wait for transport to be connected.
597                        Transport transport = connectedTransport.get();
598                        long start = System.currentTimeMillis();
599                        boolean timedout = false;
600                        while (transport == null && !disposed && connectionFailure == null
601                                && !Thread.currentThread().isInterrupted()) {
602                            if (LOG.isTraceEnabled()) {
603                                LOG.trace("Waiting for transport to reconnect..: " + command);
604                            }
605                            long end = System.currentTimeMillis();
606                            if (command.isMessage() && timeout > 0 && (end - start > timeout)) {
607                                timedout = true;
608                                if (LOG.isInfoEnabled()) {
609                                    LOG.info("Failover timed out after " + (end - start) + "ms");
610                                }
611                                break;
612                            }
613                            try {
614                                reconnectMutex.wait(100);
615                            } catch (InterruptedException e) {
616                                Thread.currentThread().interrupt();
617                                if (LOG.isDebugEnabled()) {
618                                    LOG.debug("Interupted: " + e, e);
619                                }
620                            }
621                            transport = connectedTransport.get();
622                        }
623
624                        if (transport == null) {
625                            // Previous loop may have exited due to use being
626                            // disposed.
627                            if (disposed) {
628                                error = new IOException("Transport disposed.");
629                            } else if (connectionFailure != null) {
630                                error = connectionFailure;
631                            } else if (timedout == true) {
632                                error = new IOException("Failover timeout of " + timeout + " ms reached.");
633                            } else {
634                                error = new IOException("Unexpected failure.");
635                            }
636                            break;
637                        }
638
639                        Tracked tracked = null;
640                        try {
641                            tracked = stateTracker.track(command);
642                        } catch (IOException ioe) {
643                            LOG.debug("Cannot track the command " + command, ioe);
644                        }
645                        // If it was a request and it was not being tracked by
646                        // the state tracker,
647                        // then hold it in the requestMap so that we can replay
648                        // it later.
649                        synchronized (requestMap) {
650                            if (tracked != null && tracked.isWaitingForResponse()) {
651                                requestMap.put(Integer.valueOf(command.getCommandId()), tracked);
652                            } else if (tracked == null && command.isResponseRequired()) {
653                                requestMap.put(Integer.valueOf(command.getCommandId()), command);
654                            }
655                        }
656
657                        // Send the message.
658                        try {
659                            transport.oneway(command);
660                            stateTracker.trackBack(command);
661                            if (command.isShutdownInfo()) {
662                                shuttingDown = true;
663                            }
664                        } catch (IOException e) {
665
666                            // If the command was not tracked.. we will retry in
667                            // this method
668                            if (tracked == null) {
669
670                                // since we will retry in this method.. take it
671                                // out of the request
672                                // map so that it is not sent 2 times on
673                                // recovery
674                                if (command.isResponseRequired()) {
675                                    requestMap.remove(Integer.valueOf(command.getCommandId()));
676                                }
677
678                                // Rethrow the exception so it will handled by
679                                // the outer catch
680                                throw e;
681                            } else {
682                                // Handle the error but allow the method to return since the
683                                // tracked commands are replayed on reconnect.
684                                if (LOG.isDebugEnabled()) {
685                                    LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
686                                }
687                                handleTransportFailure(e);
688                            }
689                        }
690
691                        return;
692
693                    } catch (IOException e) {
694                        if (LOG.isDebugEnabled()) {
695                            LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
696                        }
697                        handleTransportFailure(e);
698                    }
699                }
700            }
701        } catch (InterruptedException e) {
702            // Some one may be trying to stop our thread.
703            Thread.currentThread().interrupt();
704            throw new InterruptedIOException();
705        }
706
707        if (!disposed) {
708            if (error != null) {
709                if (error instanceof IOException) {
710                    throw (IOException) error;
711                }
712                throw IOExceptionSupport.create(error);
713            }
714        }
715    }
716
717    @Override
718    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
719        throw new AssertionError("Unsupported Method");
720    }
721
722    @Override
723    public Object request(Object command) throws IOException {
724        throw new AssertionError("Unsupported Method");
725    }
726
727    @Override
728    public Object request(Object command, int timeout) throws IOException {
729        throw new AssertionError("Unsupported Method");
730    }
731
732    @Override
733    public void add(boolean rebalance, URI u[]) {
734        boolean newURI = false;
735        for (URI uri : u) {
736            if (!contains(uri)) {
737                uris.add(uri);
738                newURI = true;
739            }
740        }
741        if (newURI) {
742            reconnect(rebalance);
743        }
744    }
745
746    @Override
747    public void remove(boolean rebalance, URI u[]) {
748        for (URI uri : u) {
749            uris.remove(uri);
750        }
751        // rebalance is automatic if any connected to removed/stopped broker
752    }
753
754    public void add(boolean rebalance, String u) {
755        try {
756            URI newURI = new URI(u);
757            if (contains(newURI) == false) {
758                uris.add(newURI);
759                reconnect(rebalance);
760            }
761
762        } catch (Exception e) {
763            LOG.error("Failed to parse URI: " + u);
764        }
765    }
766
767    public void reconnect(boolean rebalance) {
768        synchronized (reconnectMutex) {
769            if (started) {
770                if (rebalance) {
771                    doRebalance = true;
772                }
773                LOG.debug("Waking up reconnect task");
774                try {
775                    reconnectTask.wakeup();
776                } catch (InterruptedException e) {
777                    Thread.currentThread().interrupt();
778                }
779            } else {
780                LOG.debug("Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.");
781            }
782        }
783    }
784
785    private List<URI> getConnectList() {
786        if (!updated.isEmpty()) {
787            return updated;
788        }
789        ArrayList<URI> l = new ArrayList<URI>(uris);
790        boolean removed = false;
791        if (failedConnectTransportURI != null) {
792            removed = l.remove(failedConnectTransportURI);
793        }
794        if (randomize) {
795            // Randomly, reorder the list by random swapping
796            for (int i = 0; i < l.size(); i++) {
797                // meed parenthesis due other JDKs (see AMQ-4826)
798                int p = ((int) (Math.random() * 100)) % l.size();
799                URI t = l.get(p);
800                l.set(p, l.get(i));
801                l.set(i, t);
802            }
803        }
804        if (removed) {
805            l.add(failedConnectTransportURI);
806        }
807        if (LOG.isDebugEnabled()) {
808            LOG.debug("urlList connectionList:" + l + ", from: " + uris);
809        }
810        return l;
811    }
812
813    @Override
814    public TransportListener getTransportListener() {
815        return transportListener;
816    }
817
818    @Override
819    public void setTransportListener(TransportListener commandListener) {
820        synchronized (listenerMutex) {
821            this.transportListener = commandListener;
822            listenerMutex.notifyAll();
823        }
824    }
825
826    @Override
827    public <T> T narrow(Class<T> target) {
828
829        if (target.isAssignableFrom(getClass())) {
830            return target.cast(this);
831        }
832        Transport transport = connectedTransport.get();
833        if (transport != null) {
834            return transport.narrow(target);
835        }
836        return null;
837
838    }
839
840    protected void restoreTransport(Transport t) throws Exception, IOException {
841        t.start();
842        // send information to the broker - informing it we are an ft client
843        ConnectionControl cc = new ConnectionControl();
844        cc.setFaultTolerant(true);
845        t.oneway(cc);
846        stateTracker.restore(t);
847        Map<Integer, Command> tmpMap = null;
848        synchronized (requestMap) {
849            tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
850        }
851        for (Command command : tmpMap.values()) {
852            if (LOG.isTraceEnabled()) {
853                LOG.trace("restore requestMap, replay: " + command);
854            }
855            t.oneway(command);
856        }
857    }
858
859    public boolean isUseExponentialBackOff() {
860        return useExponentialBackOff;
861    }
862
863    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
864        this.useExponentialBackOff = useExponentialBackOff;
865    }
866
867    @Override
868    public String toString() {
869        return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
870    }
871
872    @Override
873    public String getRemoteAddress() {
874        Transport transport = connectedTransport.get();
875        if (transport != null) {
876            return transport.getRemoteAddress();
877        }
878        return null;
879    }
880
881    @Override
882    public boolean isFaultTolerant() {
883        return true;
884    }
885
886    private void doUpdateURIsFromDisk() {
887        // If updateURIsURL is specified, read the file and add any new
888        // transport URI's to this FailOverTransport.
889        // Note: Could track file timestamp to avoid unnecessary reading.
890        String fileURL = getUpdateURIsURL();
891        if (fileURL != null) {
892            BufferedReader in = null;
893            String newUris = null;
894            StringBuffer buffer = new StringBuffer();
895
896            try {
897                in = new BufferedReader(getURLStream(fileURL));
898                while (true) {
899                    String line = in.readLine();
900                    if (line == null) {
901                        break;
902                    }
903                    buffer.append(line);
904                }
905                newUris = buffer.toString();
906            } catch (IOException ioe) {
907                LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
908            } finally {
909                if (in != null) {
910                    try {
911                        in.close();
912                    } catch (IOException ioe) {
913                        // ignore
914                    }
915                }
916            }
917
918            processNewTransports(isRebalanceUpdateURIs(), newUris);
919        }
920    }
921
922    final boolean doReconnect() {
923        Exception failure = null;
924        synchronized (reconnectMutex) {
925
926            // First ensure we are up to date.
927            doUpdateURIsFromDisk();
928
929            if (disposed || connectionFailure != null) {
930                reconnectMutex.notifyAll();
931            }
932            if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
933                return false;
934            } else {
935                List<URI> connectList = getConnectList();
936                if (connectList.isEmpty()) {
937                    failure = new IOException("No uris available to connect to.");
938                } else {
939                    if (doRebalance) {
940                        if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) {
941                            // already connected to first in the list, no need to rebalance
942                            doRebalance = false;
943                            return false;
944                        } else {
945                            if (LOG.isDebugEnabled()) {
946                                LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
947                            }
948
949                            try {
950                                Transport transport = this.connectedTransport.getAndSet(null);
951                                if (transport != null) {
952                                    disposeTransport(transport);
953                                }
954                            } catch (Exception e) {
955                                if (LOG.isDebugEnabled()) {
956                                    LOG.debug("Caught an exception stopping existing transport for rebalance", e);
957                                }
958                            }
959                        }
960                        doRebalance = false;
961                    }
962
963                    resetReconnectDelay();
964
965                    Transport transport = null;
966                    URI uri = null;
967
968                    // If we have a backup already waiting lets try it.
969                    synchronized (backupMutex) {
970                        if ((priorityBackup || backup) && !backups.isEmpty()) {
971                            ArrayList<BackupTransport> l = new ArrayList<BackupTransport>(backups);
972                            if (randomize) {
973                                Collections.shuffle(l);
974                            }
975                            BackupTransport bt = l.remove(0);
976                            backups.remove(bt);
977                            transport = bt.getTransport();
978                            uri = bt.getUri();
979                            if (priorityBackup && priorityBackupAvailable) {
980                                Transport old = this.connectedTransport.getAndSet(null);
981                                if (old != null) {
982                                    disposeTransport(old);
983                                }
984                                priorityBackupAvailable = false;
985                            }
986                        }
987                    }
988
989                    // Sleep for the reconnectDelay if there's no backup and we aren't trying
990                    // for the first time, or we were disposed for some reason.
991                    if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
992                        synchronized (sleepMutex) {
993                            if (LOG.isDebugEnabled()) {
994                                LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
995                            }
996                            try {
997                                sleepMutex.wait(reconnectDelay);
998                            } catch (InterruptedException e) {
999                                Thread.currentThread().interrupt();
1000                            }
1001                        }
1002                    }
1003
1004                    Iterator<URI> iter = connectList.iterator();
1005                    while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
1006
1007                        try {
1008                            SslContext.setCurrentSslContext(brokerSslContext);
1009
1010                            // We could be starting with a backup and if so we wait to grab a
1011                            // URI from the pool until next time around.
1012                            if (transport == null) {
1013                                uri = addExtraQueryOptions(iter.next());
1014                                transport = TransportFactory.compositeConnect(uri);
1015                            }
1016
1017                            if (LOG.isDebugEnabled()) {
1018                                LOG.debug("Attempting  " + connectFailures + "th  connect to: " + uri);
1019                            }
1020                            transport.setTransportListener(myTransportListener);
1021                            transport.start();
1022
1023                            if (started &&  !firstConnection) {
1024                                restoreTransport(transport);
1025                            }
1026
1027                            if (LOG.isDebugEnabled()) {
1028                                LOG.debug("Connection established");
1029                            }
1030                            reconnectDelay = initialReconnectDelay;
1031                            connectedTransportURI = uri;
1032                            connectedTransport.set(transport);
1033                            connectedToPriority = isPriority(connectedTransportURI);
1034                            reconnectMutex.notifyAll();
1035                            connectFailures = 0;
1036
1037                            // Make sure on initial startup, that the transportListener
1038                            // has been initialized for this instance.
1039                            synchronized (listenerMutex) {
1040                                if (transportListener == null) {
1041                                    try {
1042                                        // if it isn't set after 2secs - it probably never will be
1043                                        listenerMutex.wait(2000);
1044                                    } catch (InterruptedException ex) {
1045                                    }
1046                                }
1047                            }
1048
1049                            if (transportListener != null) {
1050                                transportListener.transportResumed();
1051                            } else {
1052                                if (LOG.isDebugEnabled()) {
1053                                    LOG.debug("transport resumed by transport listener not set");
1054                                }
1055                            }
1056
1057                            if (firstConnection) {
1058                                firstConnection = false;
1059                                LOG.info("Successfully connected to " + uri);
1060                            } else {
1061                                LOG.info("Successfully reconnected to " + uri);
1062                            }
1063
1064                            return false;
1065                        } catch (Exception e) {
1066                            failure = e;
1067                            if (LOG.isDebugEnabled()) {
1068                                LOG.debug("Connect fail to: " + uri + ", reason: " + e);
1069                            }
1070                            if (transport != null) {
1071                                try {
1072                                    transport.stop();
1073                                    transport = null;
1074                                } catch (Exception ee) {
1075                                    if (LOG.isDebugEnabled()) {
1076                                        LOG.debug("Stop of failed transport: " + transport +
1077                                                  " failed with reason: " + ee);
1078                                    }
1079                                }
1080                            }
1081                        } finally {
1082                            SslContext.setCurrentSslContext(null);
1083                        }
1084                    }
1085                }
1086            }
1087
1088            int reconnectLimit = calculateReconnectAttemptLimit();
1089
1090            connectFailures++;
1091            if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
1092                LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
1093                connectionFailure = failure;
1094
1095                // Make sure on initial startup, that the transportListener has been
1096                // initialized for this instance.
1097                synchronized (listenerMutex) {
1098                    if (transportListener == null) {
1099                        try {
1100                            listenerMutex.wait(2000);
1101                        } catch (InterruptedException ex) {
1102                        }
1103                    }
1104                }
1105
1106                propagateFailureToExceptionListener(connectionFailure);
1107                return false;
1108            }
1109
1110            int warnInterval = getWarnAfterReconnectAttempts();
1111            if (warnInterval > 0 && (connectFailures % warnInterval) == 0) {
1112                LOG.warn("Failed to connect to {} after: {} attempt(s) continuing to retry.",
1113                         uris, connectFailures);
1114            }
1115        }
1116
1117        if (!disposed) {
1118            doDelay();
1119        }
1120
1121        return !disposed;
1122    }
1123
1124    private void doDelay() {
1125        if (reconnectDelay > 0) {
1126            synchronized (sleepMutex) {
1127                if (LOG.isDebugEnabled()) {
1128                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
1129                }
1130                try {
1131                    sleepMutex.wait(reconnectDelay);
1132                } catch (InterruptedException e) {
1133                    Thread.currentThread().interrupt();
1134                }
1135            }
1136        }
1137
1138        if (useExponentialBackOff) {
1139            // Exponential increment of reconnect delay.
1140            reconnectDelay *= backOffMultiplier;
1141            if (reconnectDelay > maxReconnectDelay) {
1142                reconnectDelay = maxReconnectDelay;
1143            }
1144        }
1145    }
1146
1147    private void resetReconnectDelay() {
1148        if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
1149            reconnectDelay = initialReconnectDelay;
1150        }
1151    }
1152
1153    /*
1154      * called with reconnectMutex held
1155     */
1156    private void propagateFailureToExceptionListener(Exception exception) {
1157        if (transportListener != null) {
1158            if (exception instanceof IOException) {
1159                transportListener.onException((IOException)exception);
1160            } else {
1161                transportListener.onException(IOExceptionSupport.create(exception));
1162            }
1163        }
1164        reconnectMutex.notifyAll();
1165    }
1166
1167    private int calculateReconnectAttemptLimit() {
1168        int maxReconnectValue = this.maxReconnectAttempts;
1169        if (firstConnection && this.startupMaxReconnectAttempts != INFINITE) {
1170            maxReconnectValue = this.startupMaxReconnectAttempts;
1171        }
1172        return maxReconnectValue;
1173    }
1174
1175    private boolean shouldBuildBackups() {
1176       return (backup && backups.size() < backupPoolSize) || (priorityBackup && !(priorityBackupAvailable || connectedToPriority));
1177    }
1178
1179    final boolean buildBackups() {
1180        synchronized (backupMutex) {
1181            if (!disposed && shouldBuildBackups()) {
1182                ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
1183                List<URI> connectList = getConnectList();
1184                for (URI uri: connectList) {
1185                    if (!backupList.contains(uri)) {
1186                        backupList.add(uri);
1187                    }
1188                }
1189                // removed disposed backups
1190                List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
1191                for (BackupTransport bt : backups) {
1192                    if (bt.isDisposed()) {
1193                        disposedList.add(bt);
1194                    }
1195                }
1196                backups.removeAll(disposedList);
1197                disposedList.clear();
1198                for (Iterator<URI> iter = backupList.iterator(); !disposed && iter.hasNext() && shouldBuildBackups(); ) {
1199                    URI uri = addExtraQueryOptions(iter.next());
1200                    if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
1201                        try {
1202                            SslContext.setCurrentSslContext(brokerSslContext);
1203                            BackupTransport bt = new BackupTransport(this);
1204                            bt.setUri(uri);
1205                            if (!backups.contains(bt)) {
1206                                Transport t = TransportFactory.compositeConnect(uri);
1207                                t.setTransportListener(bt);
1208                                t.start();
1209                                bt.setTransport(t);
1210                                if (priorityBackup && isPriority(uri)) {
1211                                   priorityBackupAvailable = true;
1212                                   backups.add(0, bt);
1213                                   // if this priority backup overflows the pool
1214                                   // remove the backup with the lowest priority
1215                                   if (backups.size() > backupPoolSize) {
1216                                       BackupTransport disposeTransport = backups.remove(backups.size() - 1);
1217                                       disposeTransport.setDisposed(true);
1218                                       Transport transport = disposeTransport.getTransport();
1219                                       if (transport != null) {
1220                                           transport.setTransportListener(disposedListener);
1221                                           disposeTransport(transport);
1222                                       }
1223                                   }
1224                                } else {
1225                                    backups.add(bt);
1226                                }
1227                            }
1228                        } catch (Exception e) {
1229                            LOG.debug("Failed to build backup ", e);
1230                        } finally {
1231                            SslContext.setCurrentSslContext(null);
1232                        }
1233                    }
1234                }
1235            }
1236        }
1237        return false;
1238    }
1239
1240    protected boolean isPriority(URI uri) {
1241        if (!priorityBackup) {
1242            return false;
1243        }
1244
1245        if (!priorityList.isEmpty()) {
1246            return priorityList.contains(uri);
1247        }
1248        return uris.indexOf(uri) == 0;
1249    }
1250
1251    @Override
1252    public boolean isDisposed() {
1253        return disposed;
1254    }
1255
1256    @Override
1257    public boolean isConnected() {
1258        return connectedTransport.get() != null;
1259    }
1260
1261    @Override
1262    public void reconnect(URI uri) throws IOException {
1263        add(true, new URI[]{uri});
1264    }
1265
1266    @Override
1267    public boolean isReconnectSupported() {
1268        return this.reconnectSupported;
1269    }
1270
1271    public void setReconnectSupported(boolean value) {
1272        this.reconnectSupported = value;
1273    }
1274
1275    @Override
1276    public boolean isUpdateURIsSupported() {
1277        return this.updateURIsSupported;
1278    }
1279
1280    public void setUpdateURIsSupported(boolean value) {
1281        this.updateURIsSupported = value;
1282    }
1283
1284    @Override
1285    public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
1286        if (isUpdateURIsSupported()) {
1287            HashSet<URI> copy = new HashSet<URI>();
1288            synchronized (reconnectMutex) {
1289                copy.addAll(this.updated);
1290                updated.clear();
1291                if (updatedURIs != null && updatedURIs.length > 0) {
1292                    for (URI uri : updatedURIs) {
1293                        if (uri != null && !updated.contains(uri)) {
1294                            updated.add(uri);
1295                        }
1296                    }
1297                }
1298            }
1299            if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(new HashSet<URI>(updated))) {
1300                buildBackups();
1301                reconnect(rebalance);
1302            }
1303        }
1304    }
1305
1306    /**
1307     * @return the updateURIsURL
1308     */
1309    public String getUpdateURIsURL() {
1310        return this.updateURIsURL;
1311    }
1312
1313    /**
1314     * @param updateURIsURL the updateURIsURL to set
1315     */
1316    public void setUpdateURIsURL(String updateURIsURL) {
1317        this.updateURIsURL = updateURIsURL;
1318    }
1319
1320    /**
1321     * @return the rebalanceUpdateURIs
1322     */
1323    public boolean isRebalanceUpdateURIs() {
1324        return this.rebalanceUpdateURIs;
1325    }
1326
1327    /**
1328     * @param rebalanceUpdateURIs the rebalanceUpdateURIs to set
1329     */
1330    public void setRebalanceUpdateURIs(boolean rebalanceUpdateURIs) {
1331        this.rebalanceUpdateURIs = rebalanceUpdateURIs;
1332    }
1333
1334    @Override
1335    public int getReceiveCounter() {
1336        Transport transport = connectedTransport.get();
1337        if (transport == null) {
1338            return 0;
1339        }
1340        return transport.getReceiveCounter();
1341    }
1342
1343    public int getConnectFailures() {
1344        return connectFailures;
1345    }
1346
1347    public void connectionInterruptProcessingComplete(ConnectionId connectionId) {
1348        synchronized (reconnectMutex) {
1349            stateTracker.connectionInterruptProcessingComplete(this, connectionId);
1350        }
1351    }
1352
1353    public ConnectionStateTracker getStateTracker() {
1354        return stateTracker;
1355    }
1356
1357    private boolean contains(URI newURI) {
1358        boolean result = false;
1359        for (URI uri : uris) {
1360            if (compareURIs(newURI, uri)) {
1361                result = true;
1362                break;
1363            }
1364        }
1365
1366        return result;
1367    }
1368
1369    private boolean compareURIs(final URI first, final URI second) {
1370
1371        boolean result = false;
1372        if (first == null || second == null) {
1373            return result;
1374        }
1375
1376        if (first.getPort() == second.getPort()) {
1377            InetAddress firstAddr = null;
1378            InetAddress secondAddr = null;
1379            try {
1380                firstAddr = InetAddress.getByName(first.getHost());
1381                secondAddr = InetAddress.getByName(second.getHost());
1382
1383                if (firstAddr.equals(secondAddr)) {
1384                    result = true;
1385                }
1386
1387            } catch(IOException e) {
1388
1389                if (firstAddr == null) {
1390                    LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
1391                } else {
1392                    LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e);
1393                }
1394
1395                if (first.getHost().equalsIgnoreCase(second.getHost())) {
1396                    result = true;
1397                }
1398            }
1399        }
1400
1401        return result;
1402    }
1403
1404    private InputStreamReader getURLStream(String path) throws IOException {
1405        InputStreamReader result = null;
1406        URL url = null;
1407        try {
1408            url = new URL(path);
1409            result = new InputStreamReader(url.openStream());
1410        } catch (MalformedURLException e) {
1411            // ignore - it could be a path to a a local file
1412        }
1413        if (result == null) {
1414            result = new FileReader(path);
1415        }
1416        return result;
1417    }
1418
1419    private URI addExtraQueryOptions(URI uri) {
1420        try {
1421            if( nestedExtraQueryOptions!=null && !nestedExtraQueryOptions.isEmpty() ) {
1422                if( uri.getQuery() == null ) {
1423                    uri = URISupport.createURIWithQuery(uri, nestedExtraQueryOptions);
1424                } else {
1425                    uri = URISupport.createURIWithQuery(uri, uri.getQuery()+"&"+nestedExtraQueryOptions);
1426                }
1427            }
1428        } catch (URISyntaxException e) {
1429            throw new RuntimeException(e);
1430        }
1431        return uri;
1432    }
1433
1434    public void setNestedExtraQueryOptions(String nestedExtraQueryOptions) {
1435        this.nestedExtraQueryOptions = nestedExtraQueryOptions;
1436    }
1437
1438    public int getWarnAfterReconnectAttempts() {
1439        return warnAfterReconnectAttempts;
1440    }
1441
1442    /**
1443     * Sets the number of Connect / Reconnect attempts that must occur before a warn message
1444     * is logged indicating that the transport is not connected.  This can be useful when the
1445     * client is running inside some container or service as it give an indication of some
1446     * problem with the client connection that might not otherwise be visible.  To disable the
1447     * log messages this value should be set to a value @{code attempts <= 0}
1448     *
1449     * @param warnAfterReconnectAttempts
1450     *      The number of failed connection attempts that must happen before a warning is logged.
1451     */
1452    public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) {
1453        this.warnAfterReconnectAttempts = warnAfterReconnectAttempts;
1454    }
1455
1456}