001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.InterruptedIOException;
020import java.util.ArrayList;
021import java.util.Arrays;
022import java.util.HashMap;
023import java.util.List;
024
025import javax.jms.JMSException;
026import javax.jms.TransactionInProgressException;
027import javax.jms.TransactionRolledBackException;
028import javax.transaction.xa.XAException;
029import javax.transaction.xa.XAResource;
030import javax.transaction.xa.Xid;
031
032import org.apache.activemq.command.Command;
033import org.apache.activemq.command.ConnectionId;
034import org.apache.activemq.command.DataArrayResponse;
035import org.apache.activemq.command.DataStructure;
036import org.apache.activemq.command.IntegerResponse;
037import org.apache.activemq.command.LocalTransactionId;
038import org.apache.activemq.command.Response;
039import org.apache.activemq.command.TransactionId;
040import org.apache.activemq.command.TransactionInfo;
041import org.apache.activemq.command.XATransactionId;
042import org.apache.activemq.transaction.Synchronization;
043import org.apache.activemq.transport.failover.FailoverTransport;
044import org.apache.activemq.util.JMSExceptionSupport;
045import org.apache.activemq.util.LongSequenceGenerator;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * A TransactionContext provides the means to control a JMS transaction. It
051 * provides a local transaction interface and also an XAResource interface. <p/>
052 * An application server controls the transactional assignment of an XASession
053 * by obtaining its XAResource. It uses the XAResource to assign the session to
054 * a transaction, prepare and commit work on the transaction, and so on. <p/> An
055 * XAResource provides some fairly sophisticated facilities for interleaving
056 * work on multiple transactions, recovering a list of transactions in progress,
057 * and so on. A JTA aware JMS provider must fully implement this functionality.
058 * This could be done by using the services of a database that supports XA, or a
059 * JMS provider may choose to implement this functionality from scratch. <p/>
060 *
061 *
062 * @see javax.jms.Session
063 * @see javax.jms.QueueSession
064 * @see javax.jms.TopicSession
065 * @see javax.jms.XASession
066 */
067public class TransactionContext implements XAResource {
068
069    public static final String xaErrorCodeMarker = "xaErrorCode:";
070    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
071
072    // XATransactionId -> ArrayList of TransactionContext objects
073    private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
074                new HashMap<TransactionId, List<TransactionContext>>();
075
076    private ActiveMQConnection connection;
077    private final LongSequenceGenerator localTransactionIdGenerator;
078    private List<Synchronization> synchronizations;
079
080    // To track XA transactions.
081    private Xid associatedXid;
082    private TransactionId transactionId;
083    private LocalTransactionEventListener localTransactionEventListener;
084    private int beforeEndIndex;
085
086    // for RAR recovery
087    public TransactionContext() {
088        localTransactionIdGenerator = null;
089    }
090
091    public TransactionContext(ActiveMQConnection connection) {
092        this.connection = connection;
093        this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
094    }
095
096    public boolean isInXATransaction() {
097        if (transactionId != null && transactionId.isXATransaction()) {
098                return true;
099        } else {
100                if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) {
101                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
102                                for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
103                                        if (transactions.contains(this)) {
104                                                return true;
105                                        }
106                                }
107                        }
108                }
109        }
110
111        return false;
112    }
113
114    public boolean isInLocalTransaction() {
115        return transactionId != null && transactionId.isLocalTransaction();
116    }
117
118    public boolean isInTransaction() {
119        return transactionId != null;
120    }
121
122    /**
123     * @return Returns the localTransactionEventListener.
124     */
125    public LocalTransactionEventListener getLocalTransactionEventListener() {
126        return localTransactionEventListener;
127    }
128
129    /**
130     * Used by the resource adapter to listen to transaction events.
131     *
132     * @param localTransactionEventListener The localTransactionEventListener to
133     *                set.
134     */
135    public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
136        this.localTransactionEventListener = localTransactionEventListener;
137    }
138
139    // ///////////////////////////////////////////////////////////
140    //
141    // Methods that work with the Synchronization objects registered with
142    // the transaction.
143    //
144    // ///////////////////////////////////////////////////////////
145
146    public void addSynchronization(Synchronization s) {
147        if (synchronizations == null) {
148            synchronizations = new ArrayList<Synchronization>(10);
149        }
150        synchronizations.add(s);
151    }
152
153    private void afterRollback() throws JMSException {
154        if (synchronizations == null) {
155            return;
156        }
157
158        Throwable firstException = null;
159        int size = synchronizations.size();
160        for (int i = 0; i < size; i++) {
161            try {
162                synchronizations.get(i).afterRollback();
163            } catch (Throwable t) {
164                LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t);
165                if (firstException == null) {
166                    firstException = t;
167                }
168            }
169        }
170        synchronizations = null;
171        if (firstException != null) {
172            throw JMSExceptionSupport.create(firstException);
173        }
174    }
175
176    private void afterCommit() throws JMSException {
177        if (synchronizations == null) {
178            return;
179        }
180
181        Throwable firstException = null;
182        int size = synchronizations.size();
183        for (int i = 0; i < size; i++) {
184            try {
185                synchronizations.get(i).afterCommit();
186            } catch (Throwable t) {
187                LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t);
188                if (firstException == null) {
189                    firstException = t;
190                }
191            }
192        }
193        synchronizations = null;
194        if (firstException != null) {
195            throw JMSExceptionSupport.create(firstException);
196        }
197    }
198
199    private void beforeEnd() throws JMSException {
200        if (synchronizations == null) {
201            return;
202        }
203
204        int size = synchronizations.size();
205        try {
206            for (;beforeEndIndex < size;) {
207                synchronizations.get(beforeEndIndex++).beforeEnd();
208            }
209        } catch (JMSException e) {
210            throw e;
211        } catch (Throwable e) {
212            throw JMSExceptionSupport.create(e);
213        }
214    }
215
216    public TransactionId getTransactionId() {
217        return transactionId;
218    }
219
220    // ///////////////////////////////////////////////////////////
221    //
222    // Local transaction interface.
223    //
224    // ///////////////////////////////////////////////////////////
225
226    /**
227     * Start a local transaction.
228     * @throws javax.jms.JMSException on internal error
229     */
230    public void begin() throws JMSException {
231
232        if (isInXATransaction()) {
233            throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
234        }
235
236        if (transactionId == null) {
237            synchronizations = null;
238            beforeEndIndex = 0;
239            this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId());
240            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
241            this.connection.ensureConnectionInfoSent();
242            this.connection.asyncSendPacket(info);
243
244            // Notify the listener that the tx was started.
245            if (localTransactionEventListener != null) {
246                localTransactionEventListener.beginEvent();
247            }
248            if (LOG.isDebugEnabled()) {
249                LOG.debug("Begin:" + transactionId);
250            }
251        }
252
253    }
254
255    /**
256     * Rolls back any work done in this transaction and releases any locks
257     * currently held.
258     *
259     * @throws JMSException if the JMS provider fails to roll back the
260     *                 transaction due to some internal error.
261     * @throws javax.jms.IllegalStateException if the method is not called by a
262     *                 transacted session.
263     */
264    public void rollback() throws JMSException {
265        if (isInXATransaction()) {
266            throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
267        }
268
269        try {
270            beforeEnd();
271        } catch (TransactionRolledBackException canOcurrOnFailover) {
272            LOG.warn("rollback processing error", canOcurrOnFailover);
273        }
274        if (transactionId != null) {
275            if (LOG.isDebugEnabled()) {
276                LOG.debug("Rollback: "  + transactionId
277                + " syncCount: "
278                + (synchronizations != null ? synchronizations.size() : 0));
279            }
280
281            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
282            this.transactionId = null;
283            //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
284            this.connection.syncSendPacket(info);
285            // Notify the listener that the tx was rolled back
286            if (localTransactionEventListener != null) {
287                localTransactionEventListener.rollbackEvent();
288            }
289        }
290
291        afterRollback();
292    }
293
294    /**
295     * Commits all work done in this transaction and releases any locks
296     * currently held.
297     *
298     * @throws JMSException if the JMS provider fails to commit the transaction
299     *                 due to some internal error.
300     * @throws javax.jms.IllegalStateException if the method is not called by a
301     *                 transacted session.
302     */
303    public void commit() throws JMSException {
304        if (isInXATransaction()) {
305            throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
306        }
307
308        try {
309            beforeEnd();
310        } catch (JMSException e) {
311            rollback();
312            throw e;
313        }
314
315        // Only send commit if the transaction was started.
316        if (transactionId != null) {
317            if (LOG.isDebugEnabled()) {
318                LOG.debug("Commit: "  + transactionId
319                        + " syncCount: "
320                        + (synchronizations != null ? synchronizations.size() : 0));
321            }
322
323            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
324            this.transactionId = null;
325            // Notify the listener that the tx was committed back
326            try {
327                syncSendPacketWithInterruptionHandling(info);
328                if (localTransactionEventListener != null) {
329                    localTransactionEventListener.commitEvent();
330                }
331                afterCommit();
332            } catch (JMSException cause) {
333                LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
334                if (localTransactionEventListener != null) {
335                    localTransactionEventListener.rollbackEvent();
336                }
337                afterRollback();
338                throw cause;
339            }
340
341        }
342    }
343
344    // ///////////////////////////////////////////////////////////
345    //
346    // XAResource Implementation
347    //
348    // ///////////////////////////////////////////////////////////
349    /**
350     * Associates a transaction with the resource.
351     */
352    public void start(Xid xid, int flags) throws XAException {
353
354        if (LOG.isDebugEnabled()) {
355            LOG.debug("Start: " + xid + ", flags:" + flags);
356        }
357        if (isInLocalTransaction()) {
358            throw new XAException(XAException.XAER_PROTO);
359        }
360        // Are we already associated?
361        if (associatedXid != null) {
362            throw new XAException(XAException.XAER_PROTO);
363        }
364
365        // if ((flags & TMJOIN) == TMJOIN) {
366        // TODO: verify that the server has seen the xid
367        // // }
368        // if ((flags & TMJOIN) == TMRESUME) {
369        // // TODO: verify that the xid was suspended.
370        // }
371
372        // associate
373        synchronizations = null;
374        beforeEndIndex = 0;
375        setXid(xid);
376    }
377
378    /**
379     * @return connectionId for connection
380     */
381    private ConnectionId getConnectionId() {
382        return connection.getConnectionInfo().getConnectionId();
383    }
384
385    public void end(Xid xid, int flags) throws XAException {
386
387        if (LOG.isDebugEnabled()) {
388            LOG.debug("End: " + xid + ", flags:" + flags);
389        }
390
391        if (isInLocalTransaction()) {
392            throw new XAException(XAException.XAER_PROTO);
393        }
394
395        if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
396            // You can only suspend the associated xid.
397            if (!equals(associatedXid, xid)) {
398                throw new XAException(XAException.XAER_PROTO);
399            }
400
401            // TODO: we may want to put the xid in a suspended list.
402            try {
403                beforeEnd();
404            } catch (JMSException e) {
405                throw toXAException(e);
406            } finally {
407                setXid(null);
408            }
409        } else if ((flags & TMSUCCESS) == TMSUCCESS) {
410            // set to null if this is the current xid.
411            // otherwise this could be an asynchronous success call
412            if (equals(associatedXid, xid)) {
413                try {
414                    beforeEnd();
415                } catch (JMSException e) {
416                    throw toXAException(e);
417                } finally {
418                    setXid(null);
419                }
420            }
421        } else {
422            throw new XAException(XAException.XAER_INVAL);
423        }
424    }
425
426    private boolean equals(Xid xid1, Xid xid2) {
427        if (xid1 == xid2) {
428            return true;
429        }
430        if (xid1 == null ^ xid2 == null) {
431            return false;
432        }
433        return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
434               && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
435    }
436
437    public int prepare(Xid xid) throws XAException {
438        if (LOG.isDebugEnabled()) {
439            LOG.debug("Prepare: " + xid);
440        }
441
442        // We allow interleaving multiple transactions, so
443        // we don't limit prepare to the associated xid.
444        XATransactionId x;
445        // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
446        // called first
447        if (xid == null || (equals(associatedXid, xid))) {
448            throw new XAException(XAException.XAER_PROTO);
449        } else {
450            // TODO: cache the known xids so we don't keep recreating this one??
451            x = new XATransactionId(xid);
452        }
453
454        try {
455            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
456
457            // Find out if the server wants to commit or rollback.
458            IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info);
459            if (XAResource.XA_RDONLY == response.getResult()) {
460                // transaction stops now, may be syncs that need a callback
461                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
462                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
463                        if (l != null && !l.isEmpty()) {
464                            if (LOG.isDebugEnabled()) {
465                                LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid);
466                            }
467                            for (TransactionContext ctx : l) {
468                                ctx.afterCommit();
469                            }
470                        }
471                        }
472            }
473            return response.getResult();
474
475        } catch (JMSException e) {
476            LOG.warn("prepare of: " + x + " failed with: " + e, e);
477                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
478                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
479                    if (l != null && !l.isEmpty()) {
480                        for (TransactionContext ctx : l) {
481                            try {
482                                ctx.afterRollback();
483                            } catch (Throwable ignored) {
484                                if (LOG.isDebugEnabled()) {
485                                    LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " +
486                                                  x + ", context: " + ctx, ignored);
487                                }
488                            }
489                        }
490                    }
491                }
492            throw toXAException(e);
493        }
494    }
495
496    public void rollback(Xid xid) throws XAException {
497
498        if (LOG.isDebugEnabled()) {
499            LOG.debug("Rollback: " + xid);
500        }
501
502        // We allow interleaving multiple transactions, so
503        // we don't limit rollback to the associated xid.
504        XATransactionId x;
505        if (xid == null) {
506            throw new XAException(XAException.XAER_PROTO);
507        }
508        if (equals(associatedXid, xid)) {
509            // I think this can happen even without an end(xid) call. Need to
510            // check spec.
511            x = (XATransactionId)transactionId;
512        } else {
513            x = new XATransactionId(xid);
514        }
515
516        try {
517            this.connection.checkClosedOrFailed();
518            this.connection.ensureConnectionInfoSent();
519
520            // Let the server know that the tx is rollback.
521            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
522            syncSendPacketWithInterruptionHandling(info);
523
524                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
525                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
526                    if (l != null && !l.isEmpty()) {
527                        for (TransactionContext ctx : l) {
528                            ctx.afterRollback();
529                        }
530                    }
531                }
532        } catch (JMSException e) {
533            throw toXAException(e);
534        }
535    }
536
537    // XAResource interface
538    public void commit(Xid xid, boolean onePhase) throws XAException {
539
540        if (LOG.isDebugEnabled()) {
541            LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
542        }
543
544        // We allow interleaving multiple transactions, so
545        // we don't limit commit to the associated xid.
546        XATransactionId x;
547        if (xid == null || (equals(associatedXid, xid))) {
548            // should never happen, end(xid,TMSUCCESS) must have been previously
549            // called
550            throw new XAException(XAException.XAER_PROTO);
551        } else {
552            x = new XATransactionId(xid);
553        }
554
555        try {
556            this.connection.checkClosedOrFailed();
557            this.connection.ensureConnectionInfoSent();
558
559            // Notify the server that the tx was committed back
560            TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
561
562            syncSendPacketWithInterruptionHandling(info);
563
564                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
565                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
566                    if (l != null && !l.isEmpty()) {
567                        for (TransactionContext ctx : l) {
568                            try {
569                                ctx.afterCommit();
570                            } catch (Exception ignored) {
571                                LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored);
572                            }
573                        }
574                    }
575                }
576
577        } catch (JMSException e) {
578            LOG.warn("commit of: " + x + " failed with: " + e, e);
579            if (onePhase) {
580                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
581                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
582                        if (l != null && !l.isEmpty()) {
583                            for (TransactionContext ctx : l) {
584                                try {
585                                    ctx.afterRollback();
586                                } catch (Throwable ignored) {
587                                    if (LOG.isDebugEnabled()) {
588                                        LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
589                                    }
590                                }
591                            }
592                        }
593                        }
594            }
595            throw toXAException(e);
596        }
597
598    }
599
600    public void forget(Xid xid) throws XAException {
601        if (LOG.isDebugEnabled()) {
602            LOG.debug("Forget: " + xid);
603        }
604
605        // We allow interleaving multiple transactions, so
606        // we don't limit forget to the associated xid.
607        XATransactionId x;
608        if (xid == null) {
609            throw new XAException(XAException.XAER_PROTO);
610        }
611        if (equals(associatedXid, xid)) {
612            // TODO determine if this can happen... I think not.
613            x = (XATransactionId)transactionId;
614        } else {
615            x = new XATransactionId(xid);
616        }
617
618        TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
619
620        try {
621            // Tell the server to forget the transaction.
622            syncSendPacketWithInterruptionHandling(info);
623        } catch (JMSException e) {
624            throw toXAException(e);
625        }
626        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
627                ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
628        }
629    }
630
631    public boolean isSameRM(XAResource xaResource) throws XAException {
632        if (xaResource == null) {
633            return false;
634        }
635        if (!(xaResource instanceof TransactionContext)) {
636            return false;
637        }
638        TransactionContext xar = (TransactionContext)xaResource;
639        try {
640            return getResourceManagerId().equals(xar.getResourceManagerId());
641        } catch (Throwable e) {
642            throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
643        }
644    }
645
646    public Xid[] recover(int flag) throws XAException {
647        LOG.debug("recover({})", flag);
648
649        TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
650        try {
651            this.connection.checkClosedOrFailed();
652            this.connection.ensureConnectionInfoSent();
653
654            DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
655            DataStructure[] data = receipt.getData();
656            XATransactionId[] answer;
657            if (data instanceof XATransactionId[]) {
658                answer = (XATransactionId[])data;
659            } else {
660                answer = new XATransactionId[data.length];
661                System.arraycopy(data, 0, answer, 0, data.length);
662            }
663            LOG.debug("recover({})={}", flag, answer);
664            return answer;
665        } catch (JMSException e) {
666            throw toXAException(e);
667        }
668    }
669
670    public int getTransactionTimeout() throws XAException {
671        return 0;
672    }
673
674    public boolean setTransactionTimeout(int seconds) throws XAException {
675        return false;
676    }
677
678    // ///////////////////////////////////////////////////////////
679    //
680    // Helper methods.
681    //
682    // ///////////////////////////////////////////////////////////
683    protected String getResourceManagerId() throws JMSException {
684        return this.connection.getResourceManagerId();
685    }
686
687    private void setXid(Xid xid) throws XAException {
688
689        try {
690            this.connection.checkClosedOrFailed();
691            this.connection.ensureConnectionInfoSent();
692        } catch (JMSException e) {
693            disassociate();
694            throw toXAException(e);
695        }
696
697        if (xid != null) {
698            // associate
699            associatedXid = xid;
700            transactionId = new XATransactionId(xid);
701
702            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
703            try {
704                this.connection.asyncSendPacket(info);
705                if (LOG.isDebugEnabled()) {
706                    LOG.debug("{} started XA transaction {} ", this, transactionId);
707                }
708            } catch (JMSException e) {
709                disassociate();
710                throw toXAException(e);
711            }
712
713        } else {
714
715            if (transactionId != null) {
716                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END);
717                try {
718                    syncSendPacketWithInterruptionHandling(info);
719                    if (LOG.isDebugEnabled()) {
720                        LOG.debug("{} ended XA transaction {}", this, transactionId);
721                    }
722                } catch (JMSException e) {
723                    disassociate();
724                    throw toXAException(e);
725                }
726
727                // Add our self to the list of contexts that are interested in
728                // post commit/rollback events.
729                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
730                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
731                        if (l == null) {
732                            l = new ArrayList<TransactionContext>(3);
733                            ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
734                            l.add(this);
735                        } else if (!l.contains(this)) {
736                            l.add(this);
737                        }
738                        }
739            }
740
741            disassociate();
742        }
743    }
744
745    private void disassociate() {
746         // dis-associate
747         associatedXid = null;
748         transactionId = null;
749    }
750
751    /**
752     * Sends the given command. Also sends the command in case of interruption,
753     * so that important commands like rollback and commit are never interrupted.
754     * If interruption occurred, set the interruption state of the current
755     * after performing the action again.
756     *
757     * @return the response
758     */
759    private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException {
760        try {
761            return this.connection.syncSendPacket(command);
762        } catch (JMSException e) {
763            if (e.getLinkedException() instanceof InterruptedIOException) {
764                try {
765                    Thread.interrupted();
766                    return this.connection.syncSendPacket(command);
767                } finally {
768                    Thread.currentThread().interrupt();
769                }
770            }
771
772            throw e;
773        }
774    }
775
776    /**
777     * Converts a JMSException from the server to an XAException. if the
778     * JMSException contained a linked XAException that is returned instead.
779     *
780     * @param e JMSException to convert
781     * @return XAException wrapping original exception or its message
782     */
783    private XAException toXAException(JMSException e) {
784        if (e.getCause() != null && e.getCause() instanceof XAException) {
785            XAException original = (XAException)e.getCause();
786            XAException xae = new XAException(original.getMessage());
787            xae.errorCode = original.errorCode;
788            if (xae.errorCode == XA_OK) {
789                // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable
790                xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR);
791            }
792            xae.initCause(original);
793            return xae;
794        }
795
796        XAException xae = new XAException(e.getMessage());
797        xae.errorCode = XAException.XAER_RMFAIL;
798        xae.initCause(e);
799        return xae;
800    }
801
802    private int parseFromMessageOr(String message, int fallbackCode) {
803        final String marker = "xaErrorCode:";
804        final int index = message.lastIndexOf(marker);
805        if (index > -1) {
806            try {
807                return Integer.parseInt(message.substring(index + marker.length()));
808            } catch (Exception ignored) {}
809        }
810        return fallbackCode;
811    }
812
813    public ActiveMQConnection getConnection() {
814        return connection;
815    }
816
817
818    // for RAR xa recovery where xaresource connection is per request
819    public ActiveMQConnection setConnection(ActiveMQConnection connection) {
820        ActiveMQConnection existing = this.connection;
821        this.connection = connection;
822        return existing;
823    }
824
825    public void cleanup() {
826        associatedXid = null;
827        transactionId = null;
828    }
829
830    @Override
831    public String toString() {
832        return "TransactionContext{" +
833                "transactionId=" + transactionId +
834                ",connection=" + connection +
835                '}';
836    }
837}