001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp.protocol;
018
019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
020
021import java.io.IOException;
022
023import javax.jms.Destination;
024
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ActiveMQMessage;
027import org.apache.activemq.command.ExceptionResponse;
028import org.apache.activemq.command.LocalTransactionId;
029import org.apache.activemq.command.MessageId;
030import org.apache.activemq.command.ProducerId;
031import org.apache.activemq.command.ProducerInfo;
032import org.apache.activemq.command.RemoveInfo;
033import org.apache.activemq.command.Response;
034import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
035import org.apache.activemq.transport.amqp.ResponseHandler;
036import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
037import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
038import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
039import org.apache.activemq.transport.amqp.message.EncodedMessage;
040import org.apache.activemq.transport.amqp.message.InboundTransformer;
041import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer;
042import org.apache.activemq.util.LongSequenceGenerator;
043import org.apache.qpid.proton.amqp.Symbol;
044import org.apache.qpid.proton.amqp.messaging.Accepted;
045import org.apache.qpid.proton.amqp.messaging.Rejected;
046import org.apache.qpid.proton.amqp.transaction.TransactionalState;
047import org.apache.qpid.proton.amqp.transport.DeliveryState;
048import org.apache.qpid.proton.amqp.transport.ErrorCondition;
049import org.apache.qpid.proton.engine.Delivery;
050import org.apache.qpid.proton.engine.Receiver;
051import org.fusesource.hawtbuf.Buffer;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * An AmqpReceiver wraps the AMQP Receiver end of a link from the remote peer
057 * which holds the corresponding Sender which transfers message accross the
058 * link.  The AmqpReceiver handles all incoming deliveries by converting them
059 * or wrapping them into an ActiveMQ message object and forwarding that message
060 * on to the appropriate ActiveMQ Destination.
061 */
062public class AmqpReceiver extends AmqpAbstractReceiver {
063
064    private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
065
066    private final ProducerInfo producerInfo;
067    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
068
069    private InboundTransformer inboundTransformer;
070
071    /**
072     * Create a new instance of an AmqpReceiver
073     *
074     * @param session
075     *        the Session that is the parent of this AmqpReceiver instance.
076     * @param endpoint
077     *        the AMQP receiver endpoint that the class manages.
078     * @param producerInfo
079     *        the ProducerInfo instance that contains this sender's configuration.
080     */
081    public AmqpReceiver(AmqpSession session, Receiver endpoint, ProducerInfo producerInfo) {
082        super(session, endpoint);
083
084        this.producerInfo = producerInfo;
085    }
086
087    @Override
088    public void close() {
089        if (!isClosed() && isOpened()) {
090            sendToActiveMQ(new RemoveInfo(getProducerId()));
091        }
092
093        super.close();
094    }
095
096    //----- Configuration accessors ------------------------------------------//
097
098    /**
099     * @return the ActiveMQ ProducerId used to register this Receiver on the Broker.
100     */
101    public ProducerId getProducerId() {
102        return producerInfo.getProducerId();
103    }
104
105    @Override
106    public ActiveMQDestination getDestination() {
107        return producerInfo.getDestination();
108    }
109
110    @Override
111    public void setDestination(ActiveMQDestination destination) {
112        producerInfo.setDestination(destination);
113    }
114
115    /**
116     * If the Sender that initiated this Receiver endpoint did not define an address
117     * then it is using anonymous mode and message are to be routed to the address
118     * that is defined in the AMQP message 'To' field.
119     *
120     * @return true if this Receiver should operate in anonymous mode.
121     */
122    public boolean isAnonymous() {
123        return producerInfo.getDestination() == null;
124    }
125
126    //----- Internal Implementation ------------------------------------------//
127
128    protected InboundTransformer getTransformer() {
129        if (inboundTransformer == null) {
130            String transformer = session.getConnection().getConfiguredTransformer();
131            if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) {
132                inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
133            } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_NATIVE)) {
134                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
135            } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_RAW)) {
136                inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE);
137            } else {
138                LOG.warn("Unknown transformer type {} using native one instead", transformer);
139                inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
140            }
141        }
142        return inboundTransformer;
143    }
144
145    @Override
146    protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
147        if (!isClosed()) {
148            EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length);
149
150            InboundTransformer transformer = getTransformer();
151            ActiveMQMessage message = null;
152
153            while (transformer != null) {
154                try {
155                    message = (ActiveMQMessage) transformer.transform(em);
156                    break;
157                } catch (Exception e) {
158                    LOG.debug("Transform of message using [{}] transformer, failed", getTransformer().getTransformerName());
159                    LOG.trace("Transformation error:", e);
160
161                    transformer = transformer.getFallbackTransformer();
162                }
163            }
164
165            if (message == null) {
166                throw new IOException("Failed to transform incoming delivery, skipping.");
167            }
168
169            current = null;
170
171            if (isAnonymous()) {
172                Destination toDestination = message.getJMSDestination();
173                if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) {
174                    Rejected rejected = new Rejected();
175                    ErrorCondition condition = new ErrorCondition();
176                    condition.setCondition(Symbol.valueOf("failed"));
177                    condition.setDescription("Missing to field for message sent to an anonymous producer");
178                    rejected.setError(condition);
179                    delivery.disposition(rejected);
180                    return;
181                }
182            } else {
183                message.setJMSDestination(getDestination());
184            }
185
186            message.setProducerId(getProducerId());
187
188            // Always override the AMQP client's MessageId with our own.  Preserve
189            // the original in the TextView property for later Ack.
190            MessageId messageId = new MessageId(getProducerId(), messageIdGenerator.getNextSequenceId());
191
192            MessageId amqpMessageId = message.getMessageId();
193            if (amqpMessageId != null) {
194                if (amqpMessageId.getTextView() != null) {
195                    messageId.setTextView(amqpMessageId.getTextView());
196                } else {
197                    messageId.setTextView(amqpMessageId.toString());
198                }
199            }
200
201            message.setMessageId(messageId);
202
203            LOG.trace("Inbound Message:{} from Producer:{}",
204                      message.getMessageId(), getProducerId() + ":" + messageId.getProducerSequenceId());
205
206            final DeliveryState remoteState = delivery.getRemoteState();
207            if (remoteState != null && remoteState instanceof TransactionalState) {
208                TransactionalState s = (TransactionalState) remoteState;
209                long txid = toLong(s.getTxnId());
210                message.setTransactionId(new LocalTransactionId(session.getConnection().getConnectionId(), txid));
211            }
212
213            message.onSend();
214            if (!delivery.remotelySettled()) {
215                sendToActiveMQ(message, new ResponseHandler() {
216
217                    @Override
218                    public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
219                        if (response.isException()) {
220                            ExceptionResponse er = (ExceptionResponse) response;
221                            Rejected rejected = new Rejected();
222                            ErrorCondition condition = new ErrorCondition();
223                            condition.setCondition(Symbol.valueOf("failed"));
224                            condition.setDescription(er.getException().getMessage());
225                            rejected.setError(condition);
226                            delivery.disposition(rejected);
227                        } else {
228                            if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
229                                LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
230                                getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
231                            }
232
233                            if (remoteState != null && remoteState instanceof TransactionalState) {
234                                TransactionalState txAccepted = new TransactionalState();
235                                txAccepted.setOutcome(Accepted.getInstance());
236                                txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId());
237
238                                delivery.disposition(txAccepted);
239                            } else {
240                                delivery.disposition(Accepted.getInstance());
241                            }
242                        }
243
244                        delivery.settle();
245                        session.pumpProtonToSocket();
246                    }
247                });
248            } else {
249                if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
250                    LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
251                    getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
252                    session.pumpProtonToSocket();
253                }
254
255                delivery.settle();
256                sendToActiveMQ(message);
257            }
258        }
259    }
260}