5.2. Browsing and Consuming Messages
5.2.1. Message Acquisition and Acceptance
The included drain program can be used in either browse or acquisition mode.
drain can be found in:
/usr/share/doc/python-qpid-0.14/examples/api/drain /usr/share/qpidc/examples/messaging/drain.cpp
qpid-config command:
qpid-config add queue browse-acquire-demo
browse-acquire-demo queue when you run qpid-config queues.
browse-acquire-demo using spout. Spout is included in the same packages as drain, and can be found in the same directories. Run spout to send a message to the queue:
./spout browse-acquire-demo "Hello World"
browse-acquire-demo queue. Let's use drain to browse it first of all:
./drain -c 0 "browse-acquire-demo; {mode:browse}"drain a second time, and you'll see the message again. Running the drain program twice simulates two different browsing consumers accessing the queue. The message is read and remains available for other consuming applications when it is browsed.
browse-acquire-demo queue using qpid-config:
qpid-config del queue browse-acquire-demo
qpid-config responds with an error because a message remains in the queue.
./drain -c 0 "browse-acquire-demo"
qpid-config:
qpid-config del queue browse-acquire-demo
drain demo is the fact that browsers see a message only once. Because each time drain is run it creates a different browser, it sees the message in the queue each time. The same browser, however, sees the message only once, no matter how many times it looks.
- Python
import sys from qpid.messaging import * def msgfetch(rx): try: msg = rx.fetch(timeout=1) except MessagingError, m: msg = m return msg connection = Connection("localhost:5672") connection.open() try: session = connection.session() tx = session.sender("browse-acquire-demo;{create:always}") rxbrowse1 = session.receiver("browse-acquire-demo;{mode:browse}") rxbrowse2 = session.receiver("browse-acquire-demo;{mode:browse}") rxbrowse3 = session.receiver("browse-acquire-demo;{mode:browse}") rxacquire = session.receiver("browse-acquire-demo") tx.send("Hello World") print "\nBrowser 1 saw message:" print msgfetch(rxbrowse1) print "Browser 1 then saw message:" print msgfetch(rxbrowse1) print "\nBrowser 2 saw message:" print msgfetch(rxbrowse2) print "Browser 2 then saw message:" print msgfetch(rxbrowse2) print "\nAcquired message:" print msgfetch(rxacquire) print "\nBrowser 3 saw message:" print msgfetch(rxbrowse3) except MessagingError, m: print m finally: connection.close()
drain to examine the queue:
./drain -c 0 browse-acquire-demo
When our receiver acquired the message from the queue, the broker set the message to acquired. When a message is acquired, the broker treats the message as if it has been delivered, but it does not delete it from the queue. One of a number of things happen from here: the consumer who acquired the message acknowledges the message, releases the message, or rejects the message, or the consumer might disconnect through a network failure.
acquired, and message consumers browsing or fetching from the queue will not see the message. When our application disconnects without acknowledging receipt, the broker switches the message out of acquired state and sets a header redelivered=True. The message is then made available to other consumers, such as the drain browser that we ran after our application closed.
redelivered=true'. This alerts the other nodes that this message may have already been acted on, and they can perform checks to see if that is so. This narrows the window for exceptions even further, when the applications are designed to take advantage of these features.
connection.close() line:
- Python
connection.open() try: session=connection.session() rxacquire2 = session.receiver("browse-acquire-demo") print "\nAcquirer 2 saw message:" print msgfetch(rxacquire2) except MessagingError, m: print m finally: session.acknowledge() connection.close()
redelivered to inform us that another consumer acquired this message previously. We have now acquired this message, and it will again disappear for other consumers browsing or fetching from this queue. This time, however, we call session.acknowledge() before closing the connection. This method acknowledges receipt of the message (it acknowledges all messages as-yet unacknowledged for the session). Since we have acknowledged receipt of the message, the message is acquired, and it is removed from the queue.
drain now, you will see that there are no messages in the queue.
A consumer can explicitly release a message. When this happens, the message is returned to the queue for redelivery. The effect is the same as if the consumer lost its connection to the broker.
acknowledge() method with the message and Disposition(RELEASED) as parameters:
session.acknowledge(msg, Disposition(RELEASED))
release() method.
Note that this two-phase acquisition and acceptance behavior is the behavior over a reliable link (technically an at-least-once link), which is the default link for receiver connections to the broker. If you explicitly connect your receiver to a queue using an unreliable link, or directly connect to an exchange, then received messages are immediately acquired with no need to acknowledge them.
To delete the queue we used for this demo, you can either restart the broker (all non-durable queues are deleted when the broker is restarted), or you can use qpid-config:
qpid-config del queue browse-acquire-demo
--force switch to override this check and delete a queue with messages in it, or you can use drain to empty the queue, and then reissue the command on the now-empty queue.

Where did the comment section go?
Red Hat's documentation publication system recently went through an upgrade to enable speedier, more mobile-friendly content. We decided to re-evaluate our commenting platform to ensure that it meets your expectations and serves as an optimal feedback mechanism. During this redesign, we invite your input on providing feedback on Red Hat documentation via the discussion platform.