001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp.protocol; 018 019import static org.apache.activemq.transport.amqp.AmqpSupport.toLong; 020 021import java.io.IOException; 022import java.util.LinkedList; 023 024import org.apache.activemq.command.ActiveMQDestination; 025import org.apache.activemq.command.ActiveMQMessage; 026import org.apache.activemq.command.ConsumerControl; 027import org.apache.activemq.command.ConsumerId; 028import org.apache.activemq.command.ConsumerInfo; 029import org.apache.activemq.command.ExceptionResponse; 030import org.apache.activemq.command.LocalTransactionId; 031import org.apache.activemq.command.MessageAck; 032import org.apache.activemq.command.MessageDispatch; 033import org.apache.activemq.command.MessagePull; 034import org.apache.activemq.command.RemoveInfo; 035import org.apache.activemq.command.RemoveSubscriptionInfo; 036import org.apache.activemq.command.Response; 037import org.apache.activemq.transport.amqp.AmqpProtocolConverter; 038import org.apache.activemq.transport.amqp.ResponseHandler; 039import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor; 040import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer; 041import org.apache.activemq.transport.amqp.message.EncodedMessage; 042import org.apache.activemq.transport.amqp.message.OutboundTransformer; 043import org.apache.qpid.proton.amqp.messaging.Accepted; 044import org.apache.qpid.proton.amqp.messaging.Modified; 045import org.apache.qpid.proton.amqp.messaging.Outcome; 046import org.apache.qpid.proton.amqp.messaging.Rejected; 047import org.apache.qpid.proton.amqp.messaging.Released; 048import org.apache.qpid.proton.amqp.transaction.TransactionalState; 049import org.apache.qpid.proton.amqp.transport.AmqpError; 050import org.apache.qpid.proton.amqp.transport.DeliveryState; 051import org.apache.qpid.proton.amqp.transport.ErrorCondition; 052import org.apache.qpid.proton.amqp.transport.SenderSettleMode; 053import org.apache.qpid.proton.engine.Delivery; 054import org.apache.qpid.proton.engine.Sender; 055import org.fusesource.hawtbuf.Buffer; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059/** 060 * An AmqpSender wraps the AMQP Sender end of a link from the remote peer 061 * which holds the corresponding Receiver which receives messages transfered 062 * across the link from the Broker. 063 * 064 * An AmqpSender is in turn a message consumer subscribed to some destination 065 * on the broker. As messages are dispatched to this sender that are sent on 066 * to the remote Receiver end of the lin. 067 */ 068public class AmqpSender extends AmqpAbstractLink<Sender> { 069 070 private static final Logger LOG = LoggerFactory.getLogger(AmqpSender.class); 071 072 private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {}; 073 074 private final OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE); 075 private final AmqpTransferTagGenerator tagCache = new AmqpTransferTagGenerator(); 076 private final LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>(); 077 private final LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>(); 078 private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT"; 079 080 private final ConsumerInfo consumerInfo; 081 private final boolean presettle; 082 083 private int currentCredit; 084 private boolean draining; 085 private long lastDeliveredSequenceId; 086 087 private Buffer currentBuffer; 088 private Delivery currentDelivery; 089 090 /** 091 * Creates a new AmqpSender instance that manages the given Sender 092 * 093 * @param session 094 * the AmqpSession object that is the parent of this instance. 095 * @param endpoint 096 * the AMQP Sender instance that this class manages. 097 * @param consumerInfo 098 * the ConsumerInfo instance that holds configuration for this sender. 099 */ 100 public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) { 101 super(session, endpoint); 102 103 this.currentCredit = endpoint.getRemoteCredit(); 104 this.consumerInfo = consumerInfo; 105 this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; 106 } 107 108 @Override 109 public void open() { 110 if (!isClosed()) { 111 session.registerSender(getConsumerId(), this); 112 } 113 114 super.open(); 115 } 116 117 @Override 118 public void detach() { 119 if (!isClosed() && isOpened()) { 120 RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); 121 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 122 sendToActiveMQ(removeCommand, null); 123 124 session.unregisterSender(getConsumerId()); 125 } 126 127 super.detach(); 128 } 129 130 @Override 131 public void close() { 132 if (!isClosed() && isOpened()) { 133 RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); 134 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 135 sendToActiveMQ(removeCommand, null); 136 137 if (consumerInfo.isDurable()) { 138 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 139 rsi.setConnectionId(session.getConnection().getConnectionId()); 140 rsi.setSubscriptionName(getEndpoint().getName()); 141 rsi.setClientId(session.getConnection().getClientId()); 142 143 sendToActiveMQ(rsi, null); 144 } 145 146 session.unregisterSender(getConsumerId()); 147 } 148 149 super.close(); 150 } 151 152 @Override 153 public void flow() throws Exception { 154 int updatedCredit = getEndpoint().getCredit(); 155 156 LOG.trace("Flow: drain={} credit={}, remoteCredit={}", 157 getEndpoint().getDrain(), getEndpoint().getCredit(), getEndpoint().getRemoteCredit()); 158 159 if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) { 160 currentCredit = updatedCredit >= 0 ? updatedCredit : 0; 161 draining = true; 162 163 // Revert to a pull consumer. 164 ConsumerControl control = new ConsumerControl(); 165 control.setConsumerId(getConsumerId()); 166 control.setDestination(getDestination()); 167 control.setPrefetch(0); 168 sendToActiveMQ(control, null); 169 170 // Now request dispatch of the drain amount, we request immediate 171 // timeout and an completion message regardless so that we can know 172 // when we should marked the link as drained. 173 MessagePull pullRequest = new MessagePull(); 174 pullRequest.setConsumerId(getConsumerId()); 175 pullRequest.setDestination(getDestination()); 176 pullRequest.setTimeout(-1); 177 pullRequest.setAlwaysSignalDone(true); 178 pullRequest.setQuantity(currentCredit); 179 sendToActiveMQ(pullRequest, null); 180 } else if (updatedCredit != currentCredit) { 181 currentCredit = updatedCredit >= 0 ? updatedCredit : 0; 182 ConsumerControl control = new ConsumerControl(); 183 control.setConsumerId(getConsumerId()); 184 control.setDestination(getDestination()); 185 control.setPrefetch(currentCredit); 186 sendToActiveMQ(control, null); 187 } 188 } 189 190 @Override 191 public void delivery(Delivery delivery) throws Exception { 192 MessageDispatch md = (MessageDispatch) delivery.getContext(); 193 DeliveryState state = delivery.getRemoteState(); 194 195 if (state instanceof TransactionalState) { 196 TransactionalState txState = (TransactionalState) state; 197 LOG.trace("onDelivery: TX delivery state = {}", state); 198 if (txState.getOutcome() != null) { 199 Outcome outcome = txState.getOutcome(); 200 if (outcome instanceof Accepted) { 201 if (!delivery.remotelySettled()) { 202 TransactionalState txAccepted = new TransactionalState(); 203 txAccepted.setOutcome(Accepted.getInstance()); 204 txAccepted.setTxnId(((TransactionalState) state).getTxnId()); 205 206 delivery.disposition(txAccepted); 207 } 208 settle(delivery, MessageAck.DELIVERED_ACK_TYPE); 209 } 210 } 211 } else { 212 if (state instanceof Accepted) { 213 LOG.trace("onDelivery: accepted state = {}", state); 214 if (!delivery.remotelySettled()) { 215 delivery.disposition(new Accepted()); 216 } 217 settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE); 218 } else if (state instanceof Rejected) { 219 // re-deliver /w incremented delivery counter. 220 md.setRedeliveryCounter(md.getRedeliveryCounter() + 1); 221 LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter()); 222 settle(delivery, -1); 223 } else if (state instanceof Released) { 224 LOG.trace("onDelivery: Released state = {}", state); 225 // re-deliver && don't increment the counter. 226 settle(delivery, -1); 227 } else if (state instanceof Modified) { 228 Modified modified = (Modified) state; 229 if (modified.getDeliveryFailed()) { 230 // increment delivery counter.. 231 md.setRedeliveryCounter(md.getRedeliveryCounter() + 1); 232 } 233 LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter()); 234 byte ackType = -1; 235 Boolean undeliverableHere = modified.getUndeliverableHere(); 236 if (undeliverableHere != null && undeliverableHere) { 237 // receiver does not want the message.. 238 // perhaps we should DLQ it? 239 ackType = MessageAck.POSION_ACK_TYPE; 240 } 241 settle(delivery, ackType); 242 } 243 } 244 245 pumpOutbound(); 246 } 247 248 @Override 249 public void commit() throws Exception { 250 if (!dispatchedInTx.isEmpty()) { 251 for (MessageDispatch md : dispatchedInTx) { 252 MessageAck pendingTxAck = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1); 253 pendingTxAck.setFirstMessageId(md.getMessage().getMessageId()); 254 pendingTxAck.setTransactionId(md.getMessage().getTransactionId()); 255 256 LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck); 257 258 sendToActiveMQ(pendingTxAck, new ResponseHandler() { 259 @Override 260 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 261 if (response.isException()) { 262 if (response.isException()) { 263 Throwable exception = ((ExceptionResponse) response).getException(); 264 exception.printStackTrace(); 265 getEndpoint().close(); 266 } 267 } 268 session.pumpProtonToSocket(); 269 } 270 }); 271 } 272 273 dispatchedInTx.clear(); 274 } 275 } 276 277 @Override 278 public void rollback() throws Exception { 279 synchronized (outbound) { 280 281 LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size()); 282 283 for (MessageDispatch dispatch : dispatchedInTx) { 284 dispatch.setRedeliveryCounter(dispatch.getRedeliveryCounter() + 1); 285 dispatch.getMessage().setTransactionId(null); 286 outbound.addFirst(dispatch); 287 } 288 289 dispatchedInTx.clear(); 290 } 291 } 292 293 /** 294 * Event point for incoming message from ActiveMQ on this Sender's 295 * corresponding subscription. 296 * 297 * @param dispatch 298 * the MessageDispatch to process and send across the link. 299 * 300 * @throws Exception if an error occurs while encoding the message for send. 301 */ 302 public void onMessageDispatch(MessageDispatch dispatch) throws Exception { 303 if (!isClosed()) { 304 // Lock to prevent stepping on TX redelivery 305 synchronized (outbound) { 306 outbound.addLast(dispatch); 307 } 308 pumpOutbound(); 309 session.pumpProtonToSocket(); 310 } 311 } 312 313 /** 314 * Called when the Broker sends a ConsumerControl command to the Consumer that 315 * this sender creates to obtain messages to dispatch via the sender for this 316 * end of the open link. 317 * 318 * @param control 319 * The ConsumerControl command to process. 320 */ 321 public void onConsumerControl(ConsumerControl control) { 322 if (control.isClose()) { 323 close(new ErrorCondition(AmqpError.INTERNAL_ERROR, "Receiver forcably closed")); 324 session.pumpProtonToSocket(); 325 } 326 } 327 328 @Override 329 public String toString() { 330 return "AmqpSender {" + getConsumerId() + "}"; 331 } 332 333 //----- Property getters and setters -------------------------------------// 334 335 public ConsumerId getConsumerId() { 336 return consumerInfo.getConsumerId(); 337 } 338 339 @Override 340 public ActiveMQDestination getDestination() { 341 return consumerInfo.getDestination(); 342 } 343 344 @Override 345 public void setDestination(ActiveMQDestination destination) { 346 consumerInfo.setDestination(destination); 347 } 348 349 //----- Internal Implementation ------------------------------------------// 350 351 public void pumpOutbound() throws Exception { 352 while (!isClosed()) { 353 while (currentBuffer != null) { 354 int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length); 355 if (sent > 0) { 356 currentBuffer.moveHead(sent); 357 if (currentBuffer.length == 0) { 358 if (presettle) { 359 settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE); 360 } else { 361 getEndpoint().advance(); 362 } 363 currentBuffer = null; 364 currentDelivery = null; 365 } 366 } else { 367 return; 368 } 369 } 370 371 if (outbound.isEmpty()) { 372 return; 373 } 374 375 final MessageDispatch md = outbound.removeFirst(); 376 try { 377 378 ActiveMQMessage temp = null; 379 if (md.getMessage() != null) { 380 381 // Topics can dispatch the same Message to more than one consumer 382 // so we must copy to prevent concurrent read / write to the same 383 // message object. 384 if (md.getDestination().isTopic()) { 385 synchronized (md.getMessage()) { 386 temp = (ActiveMQMessage) md.getMessage().copy(); 387 } 388 } else { 389 temp = (ActiveMQMessage) md.getMessage(); 390 } 391 392 if (!temp.getProperties().containsKey(MESSAGE_FORMAT_KEY)) { 393 temp.setProperty(MESSAGE_FORMAT_KEY, 0); 394 } 395 } 396 397 final ActiveMQMessage jms = temp; 398 if (jms == null) { 399 LOG.trace("Sender:[{}] browse done.", getEndpoint().getName()); 400 // It's the end of browse signal in response to a MessagePull 401 getEndpoint().drained(); 402 draining = false; 403 currentCredit = 0; 404 } else { 405 jms.setRedeliveryCounter(md.getRedeliveryCounter()); 406 jms.setReadOnlyBody(true); 407 final EncodedMessage amqp = outboundTransformer.transform(jms); 408 if (amqp != null && amqp.getLength() > 0) { 409 currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength()); 410 if (presettle) { 411 currentDelivery = getEndpoint().delivery(EMPTY_BYTE_ARRAY, 0, 0); 412 } else { 413 final byte[] tag = tagCache.getNextTag(); 414 currentDelivery = getEndpoint().delivery(tag, 0, tag.length); 415 } 416 currentDelivery.setContext(md); 417 } else { 418 // TODO: message could not be generated what now? 419 } 420 } 421 } catch (Exception e) { 422 LOG.warn("Error detected while flushing outbound messages: {}", e.getMessage()); 423 } 424 } 425 } 426 427 private void settle(final Delivery delivery, final int ackType) throws Exception { 428 byte[] tag = delivery.getTag(); 429 if (tag != null && tag.length > 0 && delivery.remotelySettled()) { 430 tagCache.returnTag(tag); 431 } 432 433 if (ackType == -1) { 434 // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ 435 delivery.settle(); 436 onMessageDispatch((MessageDispatch) delivery.getContext()); 437 } else { 438 MessageDispatch md = (MessageDispatch) delivery.getContext(); 439 lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId(); 440 MessageAck ack = new MessageAck(); 441 ack.setConsumerId(getConsumerId()); 442 ack.setFirstMessageId(md.getMessage().getMessageId()); 443 ack.setLastMessageId(md.getMessage().getMessageId()); 444 ack.setMessageCount(1); 445 ack.setAckType((byte) ackType); 446 ack.setDestination(md.getDestination()); 447 448 DeliveryState remoteState = delivery.getRemoteState(); 449 if (remoteState != null && remoteState instanceof TransactionalState) { 450 TransactionalState s = (TransactionalState) remoteState; 451 long txid = toLong(s.getTxnId()); 452 LocalTransactionId localTxId = new LocalTransactionId(session.getConnection().getConnectionId(), txid); 453 ack.setTransactionId(localTxId); 454 455 // Store the message sent in this TX we might need to 456 // re-send on rollback 457 md.getMessage().setTransactionId(localTxId); 458 dispatchedInTx.addFirst(md); 459 } 460 461 LOG.trace("Sending Ack to ActiveMQ: {}", ack); 462 463 sendToActiveMQ(ack, new ResponseHandler() { 464 @Override 465 public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { 466 if (response.isException()) { 467 if (response.isException()) { 468 Throwable exception = ((ExceptionResponse) response).getException(); 469 exception.printStackTrace(); 470 getEndpoint().close(); 471 } 472 } else { 473 delivery.settle(); 474 } 475 session.pumpProtonToSocket(); 476 } 477 }); 478 } 479 } 480}