001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.InterruptedIOException; 020import java.util.ArrayList; 021import java.util.Arrays; 022import java.util.HashMap; 023import java.util.List; 024 025import javax.jms.JMSException; 026import javax.jms.TransactionInProgressException; 027import javax.jms.TransactionRolledBackException; 028import javax.transaction.xa.XAException; 029import javax.transaction.xa.XAResource; 030import javax.transaction.xa.Xid; 031 032import org.apache.activemq.command.Command; 033import org.apache.activemq.command.ConnectionId; 034import org.apache.activemq.command.DataArrayResponse; 035import org.apache.activemq.command.DataStructure; 036import org.apache.activemq.command.IntegerResponse; 037import org.apache.activemq.command.LocalTransactionId; 038import org.apache.activemq.command.Response; 039import org.apache.activemq.command.TransactionId; 040import org.apache.activemq.command.TransactionInfo; 041import org.apache.activemq.command.XATransactionId; 042import org.apache.activemq.transaction.Synchronization; 043import org.apache.activemq.transport.failover.FailoverTransport; 044import org.apache.activemq.util.JMSExceptionSupport; 045import org.apache.activemq.util.LongSequenceGenerator; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * A TransactionContext provides the means to control a JMS transaction. It 051 * provides a local transaction interface and also an XAResource interface. <p/> 052 * An application server controls the transactional assignment of an XASession 053 * by obtaining its XAResource. It uses the XAResource to assign the session to 054 * a transaction, prepare and commit work on the transaction, and so on. <p/> An 055 * XAResource provides some fairly sophisticated facilities for interleaving 056 * work on multiple transactions, recovering a list of transactions in progress, 057 * and so on. A JTA aware JMS provider must fully implement this functionality. 058 * This could be done by using the services of a database that supports XA, or a 059 * JMS provider may choose to implement this functionality from scratch. <p/> 060 * 061 * 062 * @see javax.jms.Session 063 * @see javax.jms.QueueSession 064 * @see javax.jms.TopicSession 065 * @see javax.jms.XASession 066 */ 067public class TransactionContext implements XAResource { 068 069 public static final String xaErrorCodeMarker = "xaErrorCode:"; 070 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); 071 072 // XATransactionId -> ArrayList of TransactionContext objects 073 private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = 074 new HashMap<TransactionId, List<TransactionContext>>(); 075 076 private ActiveMQConnection connection; 077 private final LongSequenceGenerator localTransactionIdGenerator; 078 private List<Synchronization> synchronizations; 079 080 // To track XA transactions. 081 private Xid associatedXid; 082 private TransactionId transactionId; 083 private LocalTransactionEventListener localTransactionEventListener; 084 private int beforeEndIndex; 085 086 // for RAR recovery 087 public TransactionContext() { 088 localTransactionIdGenerator = null; 089 } 090 091 public TransactionContext(ActiveMQConnection connection) { 092 this.connection = connection; 093 this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); 094 } 095 096 public boolean isInXATransaction() { 097 if (transactionId != null && transactionId.isXATransaction()) { 098 return true; 099 } else { 100 if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) { 101 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 102 for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) { 103 if (transactions.contains(this)) { 104 return true; 105 } 106 } 107 } 108 } 109 } 110 111 return false; 112 } 113 114 public boolean isInLocalTransaction() { 115 return transactionId != null && transactionId.isLocalTransaction(); 116 } 117 118 public boolean isInTransaction() { 119 return transactionId != null; 120 } 121 122 /** 123 * @return Returns the localTransactionEventListener. 124 */ 125 public LocalTransactionEventListener getLocalTransactionEventListener() { 126 return localTransactionEventListener; 127 } 128 129 /** 130 * Used by the resource adapter to listen to transaction events. 131 * 132 * @param localTransactionEventListener The localTransactionEventListener to 133 * set. 134 */ 135 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { 136 this.localTransactionEventListener = localTransactionEventListener; 137 } 138 139 // /////////////////////////////////////////////////////////// 140 // 141 // Methods that work with the Synchronization objects registered with 142 // the transaction. 143 // 144 // /////////////////////////////////////////////////////////// 145 146 public void addSynchronization(Synchronization s) { 147 if (synchronizations == null) { 148 synchronizations = new ArrayList<Synchronization>(10); 149 } 150 synchronizations.add(s); 151 } 152 153 private void afterRollback() throws JMSException { 154 if (synchronizations == null) { 155 return; 156 } 157 158 Throwable firstException = null; 159 int size = synchronizations.size(); 160 for (int i = 0; i < size; i++) { 161 try { 162 synchronizations.get(i).afterRollback(); 163 } catch (Throwable t) { 164 LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t); 165 if (firstException == null) { 166 firstException = t; 167 } 168 } 169 } 170 synchronizations = null; 171 if (firstException != null) { 172 throw JMSExceptionSupport.create(firstException); 173 } 174 } 175 176 private void afterCommit() throws JMSException { 177 if (synchronizations == null) { 178 return; 179 } 180 181 Throwable firstException = null; 182 int size = synchronizations.size(); 183 for (int i = 0; i < size; i++) { 184 try { 185 synchronizations.get(i).afterCommit(); 186 } catch (Throwable t) { 187 LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t); 188 if (firstException == null) { 189 firstException = t; 190 } 191 } 192 } 193 synchronizations = null; 194 if (firstException != null) { 195 throw JMSExceptionSupport.create(firstException); 196 } 197 } 198 199 private void beforeEnd() throws JMSException { 200 if (synchronizations == null) { 201 return; 202 } 203 204 int size = synchronizations.size(); 205 try { 206 for (;beforeEndIndex < size;) { 207 synchronizations.get(beforeEndIndex++).beforeEnd(); 208 } 209 } catch (JMSException e) { 210 throw e; 211 } catch (Throwable e) { 212 throw JMSExceptionSupport.create(e); 213 } 214 } 215 216 public TransactionId getTransactionId() { 217 return transactionId; 218 } 219 220 // /////////////////////////////////////////////////////////// 221 // 222 // Local transaction interface. 223 // 224 // /////////////////////////////////////////////////////////// 225 226 /** 227 * Start a local transaction. 228 * @throws javax.jms.JMSException on internal error 229 */ 230 public void begin() throws JMSException { 231 232 if (isInXATransaction()) { 233 throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress."); 234 } 235 236 if (transactionId == null) { 237 synchronizations = null; 238 beforeEndIndex = 0; 239 this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId()); 240 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 241 this.connection.ensureConnectionInfoSent(); 242 this.connection.asyncSendPacket(info); 243 244 // Notify the listener that the tx was started. 245 if (localTransactionEventListener != null) { 246 localTransactionEventListener.beginEvent(); 247 } 248 if (LOG.isDebugEnabled()) { 249 LOG.debug("Begin:" + transactionId); 250 } 251 } 252 253 } 254 255 /** 256 * Rolls back any work done in this transaction and releases any locks 257 * currently held. 258 * 259 * @throws JMSException if the JMS provider fails to roll back the 260 * transaction due to some internal error. 261 * @throws javax.jms.IllegalStateException if the method is not called by a 262 * transacted session. 263 */ 264 public void rollback() throws JMSException { 265 if (isInXATransaction()) { 266 throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); 267 } 268 269 try { 270 beforeEnd(); 271 } catch (TransactionRolledBackException canOcurrOnFailover) { 272 LOG.warn("rollback processing error", canOcurrOnFailover); 273 } 274 if (transactionId != null) { 275 if (LOG.isDebugEnabled()) { 276 LOG.debug("Rollback: " + transactionId 277 + " syncCount: " 278 + (synchronizations != null ? synchronizations.size() : 0)); 279 } 280 281 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); 282 this.transactionId = null; 283 //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 284 this.connection.syncSendPacket(info); 285 // Notify the listener that the tx was rolled back 286 if (localTransactionEventListener != null) { 287 localTransactionEventListener.rollbackEvent(); 288 } 289 } 290 291 afterRollback(); 292 } 293 294 /** 295 * Commits all work done in this transaction and releases any locks 296 * currently held. 297 * 298 * @throws JMSException if the JMS provider fails to commit the transaction 299 * due to some internal error. 300 * @throws javax.jms.IllegalStateException if the method is not called by a 301 * transacted session. 302 */ 303 public void commit() throws JMSException { 304 if (isInXATransaction()) { 305 throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); 306 } 307 308 try { 309 beforeEnd(); 310 } catch (JMSException e) { 311 rollback(); 312 throw e; 313 } 314 315 // Only send commit if the transaction was started. 316 if (transactionId != null) { 317 if (LOG.isDebugEnabled()) { 318 LOG.debug("Commit: " + transactionId 319 + " syncCount: " 320 + (synchronizations != null ? synchronizations.size() : 0)); 321 } 322 323 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); 324 this.transactionId = null; 325 // Notify the listener that the tx was committed back 326 try { 327 syncSendPacketWithInterruptionHandling(info); 328 if (localTransactionEventListener != null) { 329 localTransactionEventListener.commitEvent(); 330 } 331 afterCommit(); 332 } catch (JMSException cause) { 333 LOG.info("commit failed for transaction " + info.getTransactionId(), cause); 334 if (localTransactionEventListener != null) { 335 localTransactionEventListener.rollbackEvent(); 336 } 337 afterRollback(); 338 throw cause; 339 } 340 341 } 342 } 343 344 // /////////////////////////////////////////////////////////// 345 // 346 // XAResource Implementation 347 // 348 // /////////////////////////////////////////////////////////// 349 /** 350 * Associates a transaction with the resource. 351 */ 352 public void start(Xid xid, int flags) throws XAException { 353 354 if (LOG.isDebugEnabled()) { 355 LOG.debug("Start: " + xid + ", flags:" + flags); 356 } 357 if (isInLocalTransaction()) { 358 throw new XAException(XAException.XAER_PROTO); 359 } 360 // Are we already associated? 361 if (associatedXid != null) { 362 throw new XAException(XAException.XAER_PROTO); 363 } 364 365 // if ((flags & TMJOIN) == TMJOIN) { 366 // TODO: verify that the server has seen the xid 367 // // } 368 // if ((flags & TMJOIN) == TMRESUME) { 369 // // TODO: verify that the xid was suspended. 370 // } 371 372 // associate 373 synchronizations = null; 374 beforeEndIndex = 0; 375 setXid(xid); 376 } 377 378 /** 379 * @return connectionId for connection 380 */ 381 private ConnectionId getConnectionId() { 382 return connection.getConnectionInfo().getConnectionId(); 383 } 384 385 public void end(Xid xid, int flags) throws XAException { 386 387 if (LOG.isDebugEnabled()) { 388 LOG.debug("End: " + xid + ", flags:" + flags); 389 } 390 391 if (isInLocalTransaction()) { 392 throw new XAException(XAException.XAER_PROTO); 393 } 394 395 if ((flags & (TMSUSPEND | TMFAIL)) != 0) { 396 // You can only suspend the associated xid. 397 if (!equals(associatedXid, xid)) { 398 throw new XAException(XAException.XAER_PROTO); 399 } 400 401 // TODO: we may want to put the xid in a suspended list. 402 try { 403 beforeEnd(); 404 } catch (JMSException e) { 405 throw toXAException(e); 406 } finally { 407 setXid(null); 408 } 409 } else if ((flags & TMSUCCESS) == TMSUCCESS) { 410 // set to null if this is the current xid. 411 // otherwise this could be an asynchronous success call 412 if (equals(associatedXid, xid)) { 413 try { 414 beforeEnd(); 415 } catch (JMSException e) { 416 throw toXAException(e); 417 } finally { 418 setXid(null); 419 } 420 } 421 } else { 422 throw new XAException(XAException.XAER_INVAL); 423 } 424 } 425 426 private boolean equals(Xid xid1, Xid xid2) { 427 if (xid1 == xid2) { 428 return true; 429 } 430 if (xid1 == null ^ xid2 == null) { 431 return false; 432 } 433 return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) 434 && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); 435 } 436 437 public int prepare(Xid xid) throws XAException { 438 if (LOG.isDebugEnabled()) { 439 LOG.debug("Prepare: " + xid); 440 } 441 442 // We allow interleaving multiple transactions, so 443 // we don't limit prepare to the associated xid. 444 XATransactionId x; 445 // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been 446 // called first 447 if (xid == null || (equals(associatedXid, xid))) { 448 throw new XAException(XAException.XAER_PROTO); 449 } else { 450 // TODO: cache the known xids so we don't keep recreating this one?? 451 x = new XATransactionId(xid); 452 } 453 454 try { 455 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); 456 457 // Find out if the server wants to commit or rollback. 458 IntegerResponse response = (IntegerResponse)syncSendPacketWithInterruptionHandling(info); 459 if (XAResource.XA_RDONLY == response.getResult()) { 460 // transaction stops now, may be syncs that need a callback 461 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 462 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 463 if (l != null && !l.isEmpty()) { 464 if (LOG.isDebugEnabled()) { 465 LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid); 466 } 467 for (TransactionContext ctx : l) { 468 ctx.afterCommit(); 469 } 470 } 471 } 472 } 473 return response.getResult(); 474 475 } catch (JMSException e) { 476 LOG.warn("prepare of: " + x + " failed with: " + e, e); 477 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 478 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 479 if (l != null && !l.isEmpty()) { 480 for (TransactionContext ctx : l) { 481 try { 482 ctx.afterRollback(); 483 } catch (Throwable ignored) { 484 if (LOG.isDebugEnabled()) { 485 LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " + 486 x + ", context: " + ctx, ignored); 487 } 488 } 489 } 490 } 491 } 492 throw toXAException(e); 493 } 494 } 495 496 public void rollback(Xid xid) throws XAException { 497 498 if (LOG.isDebugEnabled()) { 499 LOG.debug("Rollback: " + xid); 500 } 501 502 // We allow interleaving multiple transactions, so 503 // we don't limit rollback to the associated xid. 504 XATransactionId x; 505 if (xid == null) { 506 throw new XAException(XAException.XAER_PROTO); 507 } 508 if (equals(associatedXid, xid)) { 509 // I think this can happen even without an end(xid) call. Need to 510 // check spec. 511 x = (XATransactionId)transactionId; 512 } else { 513 x = new XATransactionId(xid); 514 } 515 516 try { 517 this.connection.checkClosedOrFailed(); 518 this.connection.ensureConnectionInfoSent(); 519 520 // Let the server know that the tx is rollback. 521 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); 522 syncSendPacketWithInterruptionHandling(info); 523 524 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 525 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 526 if (l != null && !l.isEmpty()) { 527 for (TransactionContext ctx : l) { 528 ctx.afterRollback(); 529 } 530 } 531 } 532 } catch (JMSException e) { 533 throw toXAException(e); 534 } 535 } 536 537 // XAResource interface 538 public void commit(Xid xid, boolean onePhase) throws XAException { 539 540 if (LOG.isDebugEnabled()) { 541 LOG.debug("Commit: " + xid + ", onePhase=" + onePhase); 542 } 543 544 // We allow interleaving multiple transactions, so 545 // we don't limit commit to the associated xid. 546 XATransactionId x; 547 if (xid == null || (equals(associatedXid, xid))) { 548 // should never happen, end(xid,TMSUCCESS) must have been previously 549 // called 550 throw new XAException(XAException.XAER_PROTO); 551 } else { 552 x = new XATransactionId(xid); 553 } 554 555 try { 556 this.connection.checkClosedOrFailed(); 557 this.connection.ensureConnectionInfoSent(); 558 559 // Notify the server that the tx was committed back 560 TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); 561 562 syncSendPacketWithInterruptionHandling(info); 563 564 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 565 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 566 if (l != null && !l.isEmpty()) { 567 for (TransactionContext ctx : l) { 568 try { 569 ctx.afterCommit(); 570 } catch (Exception ignored) { 571 LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored); 572 } 573 } 574 } 575 } 576 577 } catch (JMSException e) { 578 LOG.warn("commit of: " + x + " failed with: " + e, e); 579 if (onePhase) { 580 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 581 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 582 if (l != null && !l.isEmpty()) { 583 for (TransactionContext ctx : l) { 584 try { 585 ctx.afterRollback(); 586 } catch (Throwable ignored) { 587 if (LOG.isDebugEnabled()) { 588 LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored); 589 } 590 } 591 } 592 } 593 } 594 } 595 throw toXAException(e); 596 } 597 598 } 599 600 public void forget(Xid xid) throws XAException { 601 if (LOG.isDebugEnabled()) { 602 LOG.debug("Forget: " + xid); 603 } 604 605 // We allow interleaving multiple transactions, so 606 // we don't limit forget to the associated xid. 607 XATransactionId x; 608 if (xid == null) { 609 throw new XAException(XAException.XAER_PROTO); 610 } 611 if (equals(associatedXid, xid)) { 612 // TODO determine if this can happen... I think not. 613 x = (XATransactionId)transactionId; 614 } else { 615 x = new XATransactionId(xid); 616 } 617 618 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET); 619 620 try { 621 // Tell the server to forget the transaction. 622 syncSendPacketWithInterruptionHandling(info); 623 } catch (JMSException e) { 624 throw toXAException(e); 625 } 626 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 627 ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 628 } 629 } 630 631 public boolean isSameRM(XAResource xaResource) throws XAException { 632 if (xaResource == null) { 633 return false; 634 } 635 if (!(xaResource instanceof TransactionContext)) { 636 return false; 637 } 638 TransactionContext xar = (TransactionContext)xaResource; 639 try { 640 return getResourceManagerId().equals(xar.getResourceManagerId()); 641 } catch (Throwable e) { 642 throw (XAException)new XAException("Could not get resource manager id.").initCause(e); 643 } 644 } 645 646 public Xid[] recover(int flag) throws XAException { 647 LOG.debug("recover({})", flag); 648 649 TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); 650 try { 651 this.connection.checkClosedOrFailed(); 652 this.connection.ensureConnectionInfoSent(); 653 654 DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info); 655 DataStructure[] data = receipt.getData(); 656 XATransactionId[] answer; 657 if (data instanceof XATransactionId[]) { 658 answer = (XATransactionId[])data; 659 } else { 660 answer = new XATransactionId[data.length]; 661 System.arraycopy(data, 0, answer, 0, data.length); 662 } 663 LOG.debug("recover({})={}", flag, answer); 664 return answer; 665 } catch (JMSException e) { 666 throw toXAException(e); 667 } 668 } 669 670 public int getTransactionTimeout() throws XAException { 671 return 0; 672 } 673 674 public boolean setTransactionTimeout(int seconds) throws XAException { 675 return false; 676 } 677 678 // /////////////////////////////////////////////////////////// 679 // 680 // Helper methods. 681 // 682 // /////////////////////////////////////////////////////////// 683 protected String getResourceManagerId() throws JMSException { 684 return this.connection.getResourceManagerId(); 685 } 686 687 private void setXid(Xid xid) throws XAException { 688 689 try { 690 this.connection.checkClosedOrFailed(); 691 this.connection.ensureConnectionInfoSent(); 692 } catch (JMSException e) { 693 disassociate(); 694 throw toXAException(e); 695 } 696 697 if (xid != null) { 698 // associate 699 associatedXid = xid; 700 transactionId = new XATransactionId(xid); 701 702 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 703 try { 704 this.connection.asyncSendPacket(info); 705 if (LOG.isDebugEnabled()) { 706 LOG.debug("{} started XA transaction {} ", this, transactionId); 707 } 708 } catch (JMSException e) { 709 disassociate(); 710 throw toXAException(e); 711 } 712 713 } else { 714 715 if (transactionId != null) { 716 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END); 717 try { 718 syncSendPacketWithInterruptionHandling(info); 719 if (LOG.isDebugEnabled()) { 720 LOG.debug("{} ended XA transaction {}", this, transactionId); 721 } 722 } catch (JMSException e) { 723 disassociate(); 724 throw toXAException(e); 725 } 726 727 // Add our self to the list of contexts that are interested in 728 // post commit/rollback events. 729 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 730 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); 731 if (l == null) { 732 l = new ArrayList<TransactionContext>(3); 733 ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); 734 l.add(this); 735 } else if (!l.contains(this)) { 736 l.add(this); 737 } 738 } 739 } 740 741 disassociate(); 742 } 743 } 744 745 private void disassociate() { 746 // dis-associate 747 associatedXid = null; 748 transactionId = null; 749 } 750 751 /** 752 * Sends the given command. Also sends the command in case of interruption, 753 * so that important commands like rollback and commit are never interrupted. 754 * If interruption occurred, set the interruption state of the current 755 * after performing the action again. 756 * 757 * @return the response 758 */ 759 private Response syncSendPacketWithInterruptionHandling(Command command) throws JMSException { 760 try { 761 return this.connection.syncSendPacket(command); 762 } catch (JMSException e) { 763 if (e.getLinkedException() instanceof InterruptedIOException) { 764 try { 765 Thread.interrupted(); 766 return this.connection.syncSendPacket(command); 767 } finally { 768 Thread.currentThread().interrupt(); 769 } 770 } 771 772 throw e; 773 } 774 } 775 776 /** 777 * Converts a JMSException from the server to an XAException. if the 778 * JMSException contained a linked XAException that is returned instead. 779 * 780 * @param e JMSException to convert 781 * @return XAException wrapping original exception or its message 782 */ 783 private XAException toXAException(JMSException e) { 784 if (e.getCause() != null && e.getCause() instanceof XAException) { 785 XAException original = (XAException)e.getCause(); 786 XAException xae = new XAException(original.getMessage()); 787 xae.errorCode = original.errorCode; 788 if (xae.errorCode == XA_OK) { 789 // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable 790 xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR); 791 } 792 xae.initCause(original); 793 return xae; 794 } 795 796 XAException xae = new XAException(e.getMessage()); 797 xae.errorCode = XAException.XAER_RMFAIL; 798 xae.initCause(e); 799 return xae; 800 } 801 802 private int parseFromMessageOr(String message, int fallbackCode) { 803 final String marker = "xaErrorCode:"; 804 final int index = message.lastIndexOf(marker); 805 if (index > -1) { 806 try { 807 return Integer.parseInt(message.substring(index + marker.length())); 808 } catch (Exception ignored) {} 809 } 810 return fallbackCode; 811 } 812 813 public ActiveMQConnection getConnection() { 814 return connection; 815 } 816 817 818 // for RAR xa recovery where xaresource connection is per request 819 public ActiveMQConnection setConnection(ActiveMQConnection connection) { 820 ActiveMQConnection existing = this.connection; 821 this.connection = connection; 822 return existing; 823 } 824 825 public void cleanup() { 826 associatedXid = null; 827 transactionId = null; 828 } 829 830 @Override 831 public String toString() { 832 return "TransactionContext{" + 833 "transactionId=" + transactionId + 834 ",connection=" + connection + 835 '}'; 836 } 837}