001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; 020 021import java.io.IOException; 022 023import javax.jms.Destination; 024 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.ActiveMQMessage; 027import org.apache.activemq.command.ExceptionResponse; 028import org.apache.activemq.command.LocalTransactionId; 029import org.apache.activemq.command.MessageId; 030import org.apache.activemq.command.ProducerId; 031import org.apache.activemq.command.ProducerInfo; 032import org.apache.activemq.command.RemoveInfo; 033import org.apache.activemq.command.Response; 034import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 035import org.apache.activemq.transport.amqp.ResponseHandler; 036import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer; 037import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer; 038import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; 039import org.apache.activemq.transport.amqp.message.EncodedMessage; 040import org.apache.activemq.transport.amqp.message.InboundTransformer; 041import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer; 042import org.apache.activemq.util.LongSequenceGenerator; 043import org.apache.qpid.proton.amqp.Symbol; 044import org.apache.qpid.proton.amqp.messaging.Accepted; 045import org.apache.qpid.proton.amqp.messaging.Rejected; 046import org.apache.qpid.proton.amqp.transaction.TransactionalState; 047import org.apache.qpid.proton.amqp.transport.DeliveryState; 048import org.apache.qpid.proton.amqp.transport.ErrorCondition; 049import org.apache.qpid.proton.engine.Delivery; 050import org.apache.qpid.proton.engine.Receiver; 051import org.fusesource.hawtbuf.Buffer; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * An AmqpReceiver wraps the AMQP Receiver end of a link from the remote peer 057 * which holds the corresponding Sender which transfers message accross the 058 * link. The AmqpReceiver handles all incoming deliveries by converting them 059 * or wrapping them into an ActiveMQ message object and forwarding that message 060 * on to the appropriate ActiveMQ Destination. 061 */ 062public class AmqpReceiver extends AmqpAbstractReceiver { 063 064 private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class); 065 066 private final ProducerInfo producerInfo; 067 private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); 068 069 private InboundTransformer inboundTransformer; 070 071 /** 072 * Create a new instance of an AmqpReceiver 073 * 074 * @param session 075 * the Session that is the parent of this AmqpReceiver instance. 076 * @param endpoint 077 * the AMQP receiver endpoint that the class manages. 078 * @param producerInfo 079 * the ProducerInfo instance that contains this sender's configuration. 080 */ 081 public AmqpReceiver(AmqpSession session, Receiver endpoint, ProducerInfo producerInfo) { 082 super(session, endpoint); 083 084 this.producerInfo = producerInfo; 085 } 086 087 @Override 088 public void close() { 089 if (!isClosed() && isOpened()) { 090 sendToActiveMQ(new RemoveInfo(getProducerId())); 091 } 092 093 super.close(); 094 } 095 096 //----- Configuration accessors ------------------------------------------// 097 098 /** 099 * @return the ActiveMQ ProducerId used to register this Receiver on the Broker. 100 */ 101 public ProducerId getProducerId() { 102 return producerInfo.getProducerId(); 103 } 104 105 @Override 106 public ActiveMQDestination getDestination() { 107 return producerInfo.getDestination(); 108 } 109 110 @Override 111 public void setDestination(ActiveMQDestination destination) { 112 producerInfo.setDestination(destination); 113 } 114 115 /** 116 * If the Sender that initiated this Receiver endpoint did not define an address 117 * then it is using anonymous mode and message are to be routed to the address 118 * that is defined in the AMQP message 'To' field. 119 * 120 * @return true if this Receiver should operate in anonymous mode. 121 */ 122 public boolean isAnonymous() { 123 return producerInfo.getDestination() == null; 124 } 125 126 //----- Internal Implementation ------------------------------------------// 127 128 protected InboundTransformer getTransformer() { 129 if (inboundTransformer == null) { 130 String transformer = session.getConnection().getConfiguredTransformer(); 131 if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_JMS)) { 132 inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE); 133 } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_NATIVE)) { 134 inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); 135 } else if (transformer.equalsIgnoreCase(InboundTransformer.TRANSFORMER_RAW)) { 136 inboundTransformer = new AMQPRawInboundTransformer(ActiveMQJMSVendor.INSTANCE); 137 } else { 138 LOG.warn("Unknown transformer type {} using native one instead", transformer); 139 inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE); 140 } 141 } 142 return inboundTransformer; 143 } 144 145 @Override 146 protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception { 147 if (!isClosed()) { 148 EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), deliveryBytes.data, deliveryBytes.offset, deliveryBytes.length); 149 150 InboundTransformer transformer = getTransformer(); 151 ActiveMQMessage message = null; 152 153 while (transformer != null) { 154 try { 155 message = (ActiveMQMessage) transformer.transform(em); 156 break; 157 } catch (Exception e) { 158 LOG.debug("Transform of message using [{}] transformer, failed", getTransformer().getTransformerName()); 159 LOG.trace("Transformation error:", e); 160 161 transformer = transformer.getFallbackTransformer(); 162 } 163 } 164 165 if (message == null) { 166 throw new IOException("Failed to transform incoming delivery, skipping."); 167 } 168 169 current = null; 170 171 if (isAnonymous()) { 172 Destination toDestination = message.getJMSDestination(); 173 if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) { 174 Rejected rejected = new Rejected(); 175 ErrorCondition condition = new ErrorCondition(); 176 condition.setCondition(Symbol.valueOf("failed")); 177 condition.setDescription("Missing to field for message sent to an anonymous producer"); 178 rejected.setError(condition); 179 delivery.disposition(rejected); 180 return; 181 } 182 } else { 183 message.setJMSDestination(getDestination()); 184 } 185 186 message.setProducerId(getProducerId()); 187 188 // Always override the AMQP client's MessageId with our own. Preserve 189 // the original in the TextView property for later Ack. 190 MessageId messageId = new MessageId(getProducerId(), messageIdGenerator.getNextSequenceId()); 191 192 MessageId amqpMessageId = message.getMessageId(); 193 if (amqpMessageId != null) { 194 if (amqpMessageId.getTextView() != null) { 195 messageId.setTextView(amqpMessageId.getTextView()); 196 } else { 197 messageId.setTextView(amqpMessageId.toString()); 198 } 199 } 200 201 message.setMessageId(messageId); 202 203 LOG.trace("Inbound Message:{} from Producer:{}", 204 message.getMessageId(), getProducerId() + ":" + messageId.getProducerSequenceId()); 205 206 final DeliveryState remoteState = delivery.getRemoteState(); 207 if (remoteState != null && remoteState instanceof TransactionalState) { 208 TransactionalState s = (TransactionalState) remoteState; 209 long txid = toLong(s.getTxnId()); 210 message.setTransactionId(new LocalTransactionId(session.getConnection().getConnectionId(), txid)); 211 } 212 213 message.onSend(); 214 if (!delivery.remotelySettled()) { 215 sendToActiveMQ(message, new ResponseHandler() { 216 217 @Override 218 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 219 if (response.isException()) { 220 ExceptionResponse er = (ExceptionResponse) response; 221 Rejected rejected = new Rejected(); 222 ErrorCondition condition = new ErrorCondition(); 223 condition.setCondition(Symbol.valueOf("failed")); 224 condition.setDescription(er.getException().getMessage()); 225 rejected.setError(condition); 226 delivery.disposition(rejected); 227 } else { 228 if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) { 229 LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId()); 230 getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit()); 231 } 232 233 if (remoteState != null && remoteState instanceof TransactionalState) { 234 TransactionalState txAccepted = new TransactionalState(); 235 txAccepted.setOutcome(Accepted.getInstance()); 236 txAccepted.setTxnId(((TransactionalState) remoteState).getTxnId()); 237 238 delivery.disposition(txAccepted); 239 } else { 240 delivery.disposition(Accepted.getInstance()); 241 } 242 } 243 244 delivery.settle(); 245 session.pumpProtonToSocket(); 246 } 247 }); 248 } else { 249 if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) { 250 LOG.debug("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId()); 251 getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit()); 252 session.pumpProtonToSocket(); 253 } 254 255 delivery.settle(); 256 sendToActiveMQ(message); 257 } 258 } 259 } 260}