001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.toBytes;
020import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
021
022import java.io.IOException;
023
024import org.apache.activemq.command.ActiveMQDestination;
025import org.apache.activemq.command.ConnectionId;
026import org.apache.activemq.command.ExceptionResponse;
027import org.apache.activemq.command.LocalTransactionId;
028import org.apache.activemq.command.Response;
029import org.apache.activemq.command.TransactionInfo;
030import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
031import org.apache.activemq.transport.amqp.ResponseHandler;
032import org.apache.qpid.proton.Proton;
033import org.apache.qpid.proton.amqp.Binary;
034import org.apache.qpid.proton.amqp.Symbol;
035import org.apache.qpid.proton.amqp.messaging.Accepted;
036import org.apache.qpid.proton.amqp.messaging.AmqpValue;
037import org.apache.qpid.proton.amqp.messaging.Rejected;
038import org.apache.qpid.proton.amqp.transaction.Declare;
039import org.apache.qpid.proton.amqp.transaction.Declared;
040import org.apache.qpid.proton.amqp.transaction.Discharge;
041import org.apache.qpid.proton.amqp.transport.ErrorCondition;
042import org.apache.qpid.proton.engine.Delivery;
043import org.apache.qpid.proton.engine.Receiver;
044import org.apache.qpid.proton.message.Message;
045import org.fusesource.hawtbuf.Buffer;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Implements the AMQP Transaction Coordinator support to manage local
051 * transactions between an AMQP client and the broker.
052 */
053public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
054
055    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
056
057    private long nextTransactionId;
058
059    /**
060     * Creates a new Transaction coordinator used to manage AMQP transactions.
061     *
062     * @param session
063     *        the AmqpSession under which the coordinator was created.
064     * @param receiver
065     *        the AMQP receiver link endpoint for this coordinator.
066     */
067    public AmqpTransactionCoordinator(AmqpSession session, Receiver endpoint) {
068        super(session, endpoint);
069    }
070
071    @Override
072    protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
073        Message message = Proton.message();
074        int offset = deliveryBytes.offset;
075        int len = deliveryBytes.length;
076
077        while (len > 0) {
078            final int decoded = message.decode(deliveryBytes.data, offset, len);
079            assert decoded > 0 : "Make progress decoding the message";
080            offset += decoded;
081            len -= decoded;
082        }
083
084        final AmqpSession session = (AmqpSession) getEndpoint().getSession().getContext();
085        ConnectionId connectionId = session.getConnection().getConnectionId();
086        final Object action = ((AmqpValue) message.getBody()).getValue();
087
088        LOG.debug("COORDINATOR received: {}, [{}]", action, deliveryBytes);
089        if (action instanceof Declare) {
090            Declare declare = (Declare) action;
091            if (declare.getGlobalId() != null) {
092                throw new Exception("don't know how to handle a declare /w a set GlobalId");
093            }
094
095            long txid = getNextTransactionId();
096            TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
097            sendToActiveMQ(txinfo, null);
098            LOG.trace("started transaction {}", txid);
099
100            Declared declared = new Declared();
101            declared.setTxnId(new Binary(toBytes(txid)));
102            delivery.disposition(declared);
103            delivery.settle();
104        } else if (action instanceof Discharge) {
105            Discharge discharge = (Discharge) action;
106            long txid = toLong(discharge.getTxnId());
107
108            final byte operation;
109            if (discharge.getFail()) {
110                LOG.trace("rollback transaction {}", txid);
111                operation = TransactionInfo.ROLLBACK;
112            } else {
113                LOG.trace("commit transaction {}", txid);
114                operation = TransactionInfo.COMMIT_ONE_PHASE;
115            }
116
117            if (operation == TransactionInfo.ROLLBACK) {
118                session.rollback();
119            } else {
120                session.commit();
121            }
122
123            TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
124            sendToActiveMQ(txinfo, new ResponseHandler() {
125                @Override
126                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
127                    if (response.isException()) {
128                        ExceptionResponse er = (ExceptionResponse) response;
129                        Rejected rejected = new Rejected();
130                        rejected.setError(new ErrorCondition(Symbol.valueOf("failed"), er.getException().getMessage()));
131                        delivery.disposition(rejected);
132                    } else {
133                        delivery.disposition(Accepted.getInstance());
134                    }
135                    LOG.debug("TX: {} settling {}", operation, action);
136                    delivery.settle();
137                    session.pumpProtonToSocket();
138                }
139            });
140
141            if (operation == TransactionInfo.ROLLBACK) {
142                session.flushPendingMessages();
143            }
144
145        } else {
146            throw new Exception("Expected coordinator message type: " + action.getClass());
147        }
148
149        replenishCredit();
150    }
151
152    private void replenishCredit() {
153        if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
154            LOG.debug("Sending more credit ({}) to transaction coordinator on session {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), session.getSessionId());
155            getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
156            session.pumpProtonToSocket();
157        }
158    }
159
160    private long getNextTransactionId() {
161        return ++nextTransactionId;
162    }
163
164    @Override
165    public ActiveMQDestination getDestination() {
166        return null;
167    }
168
169    @Override
170    public void setDestination(ActiveMQDestination destination) {
171    }
172}