001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
020
021import java.io.IOException;
022import java.util.LinkedList;
023
024import org.apache.activemq.command.ActiveMQDestination;
025import org.apache.activemq.command.ActiveMQMessage;
026import org.apache.activemq.command.ConsumerControl;
027import org.apache.activemq.command.ConsumerId;
028import org.apache.activemq.command.ConsumerInfo;
029import org.apache.activemq.command.ExceptionResponse;
030import org.apache.activemq.command.LocalTransactionId;
031import org.apache.activemq.command.MessageAck;
032import org.apache.activemq.command.MessageDispatch;
033import org.apache.activemq.command.MessagePull;
034import org.apache.activemq.command.RemoveInfo;
035import org.apache.activemq.command.RemoveSubscriptionInfo;
036import org.apache.activemq.command.Response;
037import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
038import org.apache.activemq.transport.amqp.ResponseHandler;
039import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
040import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
041import org.apache.activemq.transport.amqp.message.EncodedMessage;
042import org.apache.activemq.transport.amqp.message.OutboundTransformer;
043import org.apache.qpid.proton.amqp.messaging.Accepted;
044import org.apache.qpid.proton.amqp.messaging.Modified;
045import org.apache.qpid.proton.amqp.messaging.Outcome;
046import org.apache.qpid.proton.amqp.messaging.Rejected;
047import org.apache.qpid.proton.amqp.messaging.Released;
048import org.apache.qpid.proton.amqp.transaction.TransactionalState;
049import org.apache.qpid.proton.amqp.transport.AmqpError;
050import org.apache.qpid.proton.amqp.transport.DeliveryState;
051import org.apache.qpid.proton.amqp.transport.ErrorCondition;
052import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
053import org.apache.qpid.proton.engine.Delivery;
054import org.apache.qpid.proton.engine.Sender;
055import org.fusesource.hawtbuf.Buffer;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059/**
060 * An AmqpSender wraps the AMQP Sender end of a link from the remote peer
061 * which holds the corresponding Receiver which receives messages transfered
062 * across the link from the Broker.
063 *
064 * An AmqpSender is in turn a message consumer subscribed to some destination
065 * on the broker.  As messages are dispatched to this sender that are sent on
066 * to the remote Receiver end of the lin.
067 */
068public class AmqpSender extends AmqpAbstractLink<Sender> {
069
070    private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class);
071
072    private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
073
074    private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
075    private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator();
076    private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
077    private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
078    private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
079
080    private final ConsumerInfo consumerInfo;
081    private final boolean presettle;
082
083    private int currentCredit;
084    private boolean draining;
085    private long lastDeliveredSequenceId;
086
087    private Buffer currentBuffer;
088    private Delivery currentDelivery;
089
090    /**
091     * Creates a new AmqpSender instance that manages the given Sender
092     *
093     * @param session
094     *        the AmqpSession object that is the parent of this instance.
095     * @param endpoint
096     *        the AMQP Sender instance that this class manages.
097     * @param consumerInfo
098     *        the ConsumerInfo instance that holds configuration for this sender.
099     */
100    public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
101        super(session, endpoint);
102
103        this.currentCredit = endpoint.getRemoteCredit();
104        this.consumerInfo = consumerInfo;
105        this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
106    }
107
108    @Override
109    public void open() {
110        if (!isClosed()) {
111            session.registerSender(getConsumerId(), this);
112        }
113
114        super.open();
115    }
116
117    @Override
118    public void detach() {
119        if (!isClosed() && isOpened()) {
120            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
121            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
122            sendToActiveMQ(removeCommand, null);
123
124            session.unregisterSender(getConsumerId());
125        }
126
127        super.detach();
128    }
129
130    @Override
131    public void close() {
132        if (!isClosed() && isOpened()) {
133            RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
134            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
135            sendToActiveMQ(removeCommand, null);
136
137            if (consumerInfo.isDurable()) {
138                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
139                rsi.setConnectionId(session.getConnection().getConnectionId());
140                rsi.setSubscriptionName(getEndpoint().getName());
141                rsi.setClientId(session.getConnection().getClientId());
142
143                sendToActiveMQ(rsi, null);
144            }
145
146            session.unregisterSender(getConsumerId());
147        }
148
149        super.close();
150    }
151
152    @Override
153    public void flow() throws Exception {
154        int updatedCredit = getEndpoint().getCredit();
155
156        LOG.trace("Flow: drain={} credit={}, remoteCredit={}",
157                  getEndpoint().getDrain(), getEndpoint().getCredit(), getEndpoint().getRemoteCredit());
158
159        if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) {
160            currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
161            draining = true;
162
163            // Revert to a pull consumer.
164            ConsumerControl control = new ConsumerControl();
165            control.setConsumerId(getConsumerId());
166            control.setDestination(getDestination());
167            control.setPrefetch(0);
168            sendToActiveMQ(control, null);
169
170            // Now request dispatch of the drain amount, we request immediate
171            // timeout and an completion message regardless so that we can know
172            // when we should marked the link as drained.
173            MessagePull pullRequest = new MessagePull();
174            pullRequest.setConsumerId(getConsumerId());
175            pullRequest.setDestination(getDestination());
176            pullRequest.setTimeout(-1);
177            pullRequest.setAlwaysSignalDone(true);
178            pullRequest.setQuantity(currentCredit);
179            sendToActiveMQ(pullRequest, null);
180        } else if (updatedCredit != currentCredit) {
181            currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
182            ConsumerControl control = new ConsumerControl();
183            control.setConsumerId(getConsumerId());
184            control.setDestination(getDestination());
185            control.setPrefetch(currentCredit);
186            sendToActiveMQ(control, null);
187        }
188    }
189
190    @Override
191    public void delivery(Delivery delivery) throws Exception {
192        MessageDispatch md = (MessageDispatch) delivery.getContext();
193        DeliveryState state = delivery.getRemoteState();
194
195        if (state instanceof TransactionalState) {
196            TransactionalState txState = (TransactionalState) state;
197            LOG.trace("onDelivery: TX delivery state = {}", state);
198            if (txState.getOutcome() != null) {
199                Outcome outcome = txState.getOutcome();
200                if (outcome instanceof Accepted) {
201                    if (!delivery.remotelySettled()) {
202                        TransactionalState txAccepted = new TransactionalState();
203                        txAccepted.setOutcome(Accepted.getInstance());
204                        txAccepted.setTxnId(((TransactionalState) state).getTxnId());
205
206                        delivery.disposition(txAccepted);
207                    }
208                    settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
209                }
210            }
211        } else {
212            if (state instanceof Accepted) {
213                LOG.trace("onDelivery: accepted state = {}", state);
214                if (!delivery.remotelySettled()) {
215                    delivery.disposition(new Accepted());
216                }
217                settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
218            } else if (state instanceof Rejected) {
219                // re-deliver /w incremented delivery counter.
220                md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
221                LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter());
222                settle(delivery, -1);
223            } else if (state instanceof Released) {
224                LOG.trace("onDelivery: Released state = {}", state);
225                // re-deliver && don't increment the counter.
226                settle(delivery, -1);
227            } else if (state instanceof Modified) {
228                Modified modified = (Modified) state;
229                if (modified.getDeliveryFailed()) {
230                    // increment delivery counter..
231                    md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
232                }
233                LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter());
234                byte ackType = -1;
235                Boolean undeliverableHere = modified.getUndeliverableHere();
236                if (undeliverableHere != null && undeliverableHere) {
237                    // receiver does not want the message..
238                    // perhaps we should DLQ it?
239                    ackType = MessageAck.POSION_ACK_TYPE;
240                }
241                settle(delivery, ackType);
242            }
243        }
244
245        pumpOutbound();
246    }
247
248    @Override
249    public void commit() throws Exception {
250        if (!dispatchedInTx.isEmpty()) {
251            for (MessageDispatch md : dispatchedInTx) {
252                MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1);
253                pendingTxAck.setFirstMessageId(md.getMessage().getMessageId());
254                pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
255
256                LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
257
258                sendToActiveMQ(pendingTxAck, new ResponseHandler() {
259                    @Override
260                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
261                        if (response.isException()) {
262                            if (response.isException()) {
263                                Throwable exception = ((ExceptionResponse) response).getException();
264                                exception.printStackTrace();
265                                getEndpoint().close();
266                            }
267                        }
268                        session.pumpProtonToSocket();
269                    }
270                });
271            }
272
273            dispatchedInTx.clear();
274        }
275    }
276
277    @Override
278    public void rollback() throws Exception {
279        synchronized (outbound) {
280
281            LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
282
283            for (MessageDispatch dispatch : dispatchedInTx) {
284                dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1);
285                dispatch.getMessage().setTransactionId(null);
286                outbound.addFirst(dispatch);
287            }
288
289            dispatchedInTx.clear();
290        }
291    }
292
293    /**
294     * Event point for incoming message from ActiveMQ on this Sender's
295     * corresponding subscription.
296     *
297     * @param dispatch
298     *        the MessageDispatch to process and send across the link.
299     *
300     * @throws Exception if an error occurs while encoding the message for send.
301     */
302    public void onMessageDispatch(MessageDispatch dispatch) throws Exception {
303        if (!isClosed()) {
304            // Lock to prevent stepping on TX redelivery
305            synchronized (outbound) {
306                outbound.addLast(dispatch);
307            }
308            pumpOutbound();
309            session.pumpProtonToSocket();
310        }
311    }
312
313    /**
314     * Called when the Broker sends a ConsumerControl command to the Consumer that
315     * this sender creates to obtain messages to dispatch via the sender for this
316     * end of the open link.
317     *
318     * @param control
319     *        The ConsumerControl command to process.
320     */
321    public void onConsumerControl(ConsumerControl control) {
322        if (control.isClose()) {
323            close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed"));
324            session.pumpProtonToSocket();
325        }
326    }
327
328    @Override
329    public String toString() {
330        return "AmqpSender {" + getConsumerId() + "}";
331    }
332
333    //----- Property getters and setters -------------------------------------//
334
335    public ConsumerId getConsumerId() {
336        return consumerInfo.getConsumerId();
337    }
338
339    @Override
340    public ActiveMQDestination getDestination() {
341        return consumerInfo.getDestination();
342    }
343
344    @Override
345    public void setDestination(ActiveMQDestination destination) {
346        consumerInfo.setDestination(destination);
347    }
348
349    //----- Internal Implementation ------------------------------------------//
350
351    public void pumpOutbound() throws Exception {
352        while (!isClosed()) {
353            while (currentBuffer != null) {
354                int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
355                if (sent > 0) {
356                    currentBuffer.moveHead(sent);
357                    if (currentBuffer.length == 0) {
358                        if (presettle) {
359                            settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
360                        } else {
361                            getEndpoint().advance();
362                        }
363                        currentBuffer = null;
364                        currentDelivery = null;
365                    }
366                } else {
367                    return;
368                }
369            }
370
371            if (outbound.isEmpty()) {
372                return;
373            }
374
375            final MessageDispatch md = outbound.removeFirst();
376            try {
377
378                ActiveMQMessage temp = null;
379                if (md.getMessage() != null) {
380
381                    // Topics can dispatch the same Message to more than one consumer
382                    // so we must copy to prevent concurrent read / write to the same
383                    // message object.
384                    if (md.getDestination().isTopic()) {
385                        synchronized (md.getMessage()) {
386                            temp = (ActiveMQMessage) md.getMessage().copy();
387                        }
388                    } else {
389                        temp = (ActiveMQMessage) md.getMessage();
390                    }
391
392                    if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) {
393                        temp.setProperty(MESSAGE_FORMAT_KEY, 0);
394                    }
395                }
396
397                final ActiveMQMessage jms = temp;
398                if (jms == null) {
399                    LOG.trace("Sender:[{}] browse done.", getEndpoint().getName());
400                    // It's the end of browse signal in response to a MessagePull
401                    getEndpoint().drained();
402                    draining = false;
403                    currentCredit = 0;
404                } else {
405                    jms.setRedeliveryCounter(md.getRedeliveryCounter());
406                    jms.setReadOnlyBody(true);
407                    final EncodedMessage amqp = outboundTransformer.transform(jms);
408                    if (amqp != null && amqp.getLength() > 0) {
409                        currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
410                        if (presettle) {
411                            currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0);
412                        } else {
413                            final byte[] tag = tagCache.getNextTag();
414                            currentDelivery = getEndpoint().delivery(tag, 0, tag.length);
415                        }
416                        currentDelivery.setContext(md);
417                    } else {
418                        // TODO: message could not be generated what now?
419                    }
420                }
421            } catch (Exception e) {
422                LOG.warn("Error detected while flushing outbound messages: {}", e.getMessage());
423            }
424        }
425    }
426
427    private void settle(final Delivery delivery, final int ackType) throws Exception {
428        byte[] tag = delivery.getTag();
429        if (tag != null && tag.length > 0 && delivery.remotelySettled()) {
430            tagCache.returnTag(tag);
431        }
432
433        if (ackType == -1) {
434            // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
435            delivery.settle();
436            onMessageDispatch((MessageDispatch) delivery.getContext());
437        } else {
438            MessageDispatch md = (MessageDispatch) delivery.getContext();
439            lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
440            MessageAck ack = new MessageAck();
441            ack.setConsumerId(getConsumerId());
442            ack.setFirstMessageId(md.getMessage().getMessageId());
443            ack.setLastMessageId(md.getMessage().getMessageId());
444            ack.setMessageCount(1);
445            ack.setAckType((byte) ackType);
446            ack.setDestination(md.getDestination());
447
448            DeliveryState remoteState = delivery.getRemoteState();
449            if (remoteState != null && remoteState instanceof TransactionalState) {
450                TransactionalState s = (TransactionalState) remoteState;
451                long txid = toLong(s.getTxnId());
452                LocalTransactionId localTxId = new LocalTransactionId(session.getConnection().getConnectionId(), txid);
453                ack.setTransactionId(localTxId);
454
455                // Store the message sent in this TX we might need to
456                // re-send on rollback
457                md.getMessage().setTransactionId(localTxId);
458                dispatchedInTx.addFirst(md);
459            }
460
461            LOG.trace("Sending Ack to ActiveMQ: {}", ack);
462
463            sendToActiveMQ(ack, new ResponseHandler() {
464                @Override
465                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
466                    if (response.isException()) {
467                        if (response.isException()) {
468                            Throwable exception = ((ExceptionResponse) response).getException();
469                            exception.printStackTrace();
470                            getEndpoint().close();
471                        }
472                    } else {
473                        delivery.settle();
474                    }
475                    session.pumpProtonToSocket();
476                }
477            });
478        }
479    }
480}