001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes; 020import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; 021 022import java.io.IOException; 023 024import org.apache.activemq.command.ActiveMQDestination; 025import org.apache.activemq.command.ConnectionId; 026import org.apache.activemq.command.ExceptionResponse; 027import org.apache.activemq.command.LocalTransactionId; 028import org.apache.activemq.command.Response; 029import org.apache.activemq.command.TransactionInfo; 030import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 031import org.apache.activemq.transport.amqp.ResponseHandler; 032import org.apache.qpid.proton.Proton; 033import org.apache.qpid.proton.amqp.Binary; 034import org.apache.qpid.proton.amqp.Symbol; 035import org.apache.qpid.proton.amqp.messaging.Accepted; 036import org.apache.qpid.proton.amqp.messaging.AmqpValue; 037import org.apache.qpid.proton.amqp.messaging.Rejected; 038import org.apache.qpid.proton.amqp.transaction.Declare; 039import org.apache.qpid.proton.amqp.transaction.Declared; 040import org.apache.qpid.proton.amqp.transaction.Discharge; 041import org.apache.qpid.proton.amqp.transport.ErrorCondition; 042import org.apache.qpid.proton.engine.Delivery; 043import org.apache.qpid.proton.engine.Receiver; 044import org.apache.qpid.proton.message.Message; 045import org.fusesource.hawtbuf.Buffer; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Implements the AMQP Transaction Coordinator support to manage local 051 * transactions between an AMQP client and the broker. 052 */ 053public class AmqpTransactionCoordinator extends AmqpAbstractReceiver { 054 055 private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class); 056 057 private long nextTransactionId; 058 059 /** 060 * Creates a new Transaction coordinator used to manage AMQP transactions. 061 * 062 * @param session 063 * the AmqpSession under which the coordinator was created. 064 * @param receiver 065 * the AMQP receiver link endpoint for this coordinator. 066 */ 067 public AmqpTransactionCoordinator(AmqpSession session, Receiver endpoint) { 068 super(session, endpoint); 069 } 070 071 @Override 072 protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception { 073 Message message = Proton.message(); 074 int offset = deliveryBytes.offset; 075 int len = deliveryBytes.length; 076 077 while (len > 0) { 078 final int decoded = message.decode(deliveryBytes.data, offset, len); 079 assert decoded > 0 : "Make progress decoding the message"; 080 offset += decoded; 081 len -= decoded; 082 } 083 084 final AmqpSession session = (AmqpSession) getEndpoint().getSession().getContext(); 085 ConnectionId connectionId = session.getConnection().getConnectionId(); 086 final Object action = ((AmqpValue) message.getBody()).getValue(); 087 088 LOG.debug("COORDINATOR received: {}, [{}]", action, deliveryBytes); 089 if (action instanceof Declare) { 090 Declare declare = (Declare) action; 091 if (declare.getGlobalId() != null) { 092 throw new Exception("don't know how to handle a declare /w a set GlobalId"); 093 } 094 095 long txid = getNextTransactionId(); 096 TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN); 097 sendToActiveMQ(txinfo, null); 098 LOG.trace("started transaction {}", txid); 099 100 Declared declared = new Declared(); 101 declared.setTxnId(new Binary(toBytes(txid))); 102 delivery.disposition(declared); 103 delivery.settle(); 104 } else if (action instanceof Discharge) { 105 Discharge discharge = (Discharge) action; 106 long txid = toLong(discharge.getTxnId()); 107 108 final byte operation; 109 if (discharge.getFail()) { 110 LOG.trace("rollback transaction {}", txid); 111 operation = TransactionInfo.ROLLBACK; 112 } else { 113 LOG.trace("commit transaction {}", txid); 114 operation = TransactionInfo.COMMIT_ONE_PHASE; 115 } 116 117 if (operation == TransactionInfo.ROLLBACK) { 118 session.rollback(); 119 } else { 120 session.commit(); 121 } 122 123 TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation); 124 sendToActiveMQ(txinfo, new ResponseHandler() { 125 @Override 126 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 127 if (response.isException()) { 128 ExceptionResponse er = (ExceptionResponse) response; 129 Rejected rejected = new Rejected(); 130 rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), er.getException().getMessage())); 131 delivery.disposition(rejected); 132 } else { 133 delivery.disposition(Accepted.getInstance()); 134 } 135 LOG.debug("TX: {} settling {}", operation, action); 136 delivery.settle(); 137 session.pumpProtonToSocket(); 138 } 139 }); 140 141 if (operation == TransactionInfo.ROLLBACK) { 142 session.flushPendingMessages(); 143 } 144 145 } else { 146 throw new Exception("Expected coordinator message type: " + action.getClass()); 147 } 148 149 replenishCredit(); 150 } 151 152 private void replenishCredit() { 153 if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) { 154 LOG.debug("Sending more credit ({}) to transaction coordinator on session {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), session.getSessionId()); 155 getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit()); 156 session.pumpProtonToSocket(); 157 } 158 } 159 160 private long getNextTransactionId() { 161 return ++nextTransactionId; 162 } 163 164 @Override 165 public ActiveMQDestination getDestination() { 166 return null; 167 } 168 169 @Override 170 public void setDestination(ActiveMQDestination destination) { 171 } 172}