6.6.3. Last Value Queue Example

This example demonstrates how to create and use a Last Value Queue. The language bindings and programming details differ between languages, but the principles are the same.
We will create a messaging queue that provides regular stock price updates. Message consumers are interested in the current stock price, and do not wish or need to receive messages with historical information. A last value queue is perfect for this application: newly arriving messages can update and replace older ones.
We will call our queue "stock-ticker". Our stock-ticker queue will use "stock-symbol" as the last value queue key. The value of this key in the message header will identify a message as a new message to the queue, or an update to a message already in the queue.
First we import the Qpid Messaging client library:
Python
import sys
from qpid.messaging import *
Now we create a Connection to the broker running on the standard AMQP port, 5672, on the local machine:
Python
connection = Connection("localhost:5672")
connection.open()
And now we use this connection to create a session:
Python
session = connection.session()
Now we create a sender and declare a last value queue at the same time. We will create a queue called "stock-ticker", and use "stock-symbol" as the last value queue key. Messages sent to this queue will identify themselves as an update to a previous message by specifying the same "stock-symbol" in their headers.
The following statement is a single line of code. It may break across lines in display, but it should be entered as a single line.
Python
stockSender = session.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
Sidenote: We could also create the queue using the qpid-config command line tool:
qpid-config add queue stock-ticker --argument qpid.last_value_queue_key=stock-symbol
Now let's create and send some messages to the queue. We use the "stock-symbol" key in the header to identify which stock a message describes. Our last value queue uses this header key to match our message with messages already in the queue.
Python
msg1 = Message("10")
msg1.properties = {'stock-symbol':'RHT'}

msg2 = Message("10")  
msg2.properties = {'stock-symbol':'JAVA'}

msg3 = Message("10")
msg3.properties = {'stock-symbol':'MSFT'}

msg4 = Message("12")
msg4.properties = {'stock-symbol':'RHT'}
After sending these messages to our last value queue a new consumer should see three messages in the queue, one for each stock symbol, with msg4 updating msg1. To contrast the behavior of the last value queue with a standard FIFO queue, we'll send our messages to a control queue, called control-queue at the same time:
Python
controlSender = session.sender("control-queue;{create:always, node:{type:queue}}")
Now we send our messages to the two queues:
Python
stockSender.send(msg1)
controlSender.send(msg1)

stockSender.send(msg2)
controlSender.send(msg2)

stockSender.send(msg3)
controlSender.send(msg3)

stockSender.send(msg4)
controlSender.send(msg4)
Our messages are now in the queues. We create two receivers to now examine the content of the queues:
Python
stockBrowser = session.receiver("stock-ticker; {mode:browse}")
controlBrowser = session.receiver("control-queue; {mode:browse}")
These are browsing receivers, so they do not acquire messages and remove them from the queue. To clear the queues, remove the browse property from the receiver declarations, like so: session.receiver("stock-ticker"), and run the demo again. With the receivers browsing, you will be able to see more distinctly the effect of a Last Value Queue over time by running the demo several times in succession without clearing the queues.
We will use the prefetch capability of the receivers to browse messages on the queue, and to allow us to count how many messages are in the queue using the available() method. We do this by setting the receivers' prefetch capacity to a value higher than the default of 0:
Python
stockBrowser.capacity = 20
controlBrowser.capacity = 20
Once the prefetch capacity of the receiver is set to 20, up to 20 available messages are retrieved asynchronously from the queue. Because the operation is asynchronous we need to wait for it to complete. We will put our application to sleep for 10 seconds before examining the prefetched messages:
Python
sleep 10
We need to import sleep from the time library:
Python
from time import sleep
Note that we do this in order to examine the available() property of the receiver with certainty that this represents the number of messages in the queue. When operating asynchronously available() reports the number of messages available locally. After a ten second delay, we can be reasonably certain that this is the total number of messages in the queue. In an actual asynchronous operation you would not want to block execution of your application. Instead you would use a pattern like this:
Python
while True:
  try:
    msg = stockBrowser.fetch(timeout = 10)
    print msg.properties["stock-symbol"] + ":" + msg.content
  except Empty:
    break
When our application finishes its sleep cycle, we will examine the number of messages in the queue, and print them out:
Python
print "Last Value Queue has " + str(stockBrowser.available()) + " messages"

print "\nLast Value Queue messages:"
      
for x in range(stockBrowser.available()):
  try:
    msg = stockBrowser.fetch(timeout = 1)
    print msg.properties["stock-symbol"] + ":" + msg.content
  except MessagingError, m:
    pass
      
print "Control Queue has " + str(controlBrowser.available()) + " messages"
      
print "\nControl Queue messages:"
for x in range(controlBrowser.available()):
  try:
    msg = controlBrowser.fetch(timeout = 1)
    print msg.properties["stock-symbol"] + ":" + msg.content
  except MessagingError, m:
    pass
And finally we acknowledge our session and close the connection:
Python
session.acknowledge()
connection.close()
We are now ready to run our test. Here's the complete program listing:
Python
import sys
from qpid.messaging import *
from time import sleep

connection = Connection("localhost:5672")
try:
  connection.open()
  session = connection.session()

  stockSender = session.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
  controlSender = session.sender("control-queue;{create:always, node:{type:queue}}")

  stockBrowser = session.receiver("stock-ticker;{mode:browse}")
  controlBrowser = session.receiver("control-queue;{mode:browse}")
  controlBrowser = session.receiver("control-queue")

  msg1 = Message("10")
  msg1.properties = {'stock-symbol':'RHT'}

  msg2 = Message("10")
  msg2.properties = {'stock-symbol':'JAVA'}

  msg3 = Message("10")
  msg3.properties = {'stock-symbol':'MSFT'}

  msg4 = Message("12")
  msg4.properties = {'stock-symbol':'RHT'}

  stockSender.send(msg1)
  controlSender.send(msg1)

  stockSender.send(msg2)
  controlSender.send(msg2)

  stockSender.send(msg3)
  controlSender.send(msg3)

  stockSender.send(msg4)
  controlSender.send(msg4)

  stockBrowser.capacity = 20
  controlBrowser.capacity = 20

  sleep(10)

  print "\nLast Value Queue has " + str(stockBrowser.available()) + " messages"
      
  print "Last Value Queue messages:"
      
  for x in range(stockBrowser.available()):
    try:
      msg = stockBrowser.fetch(timeout = 1)
      print msg.properties["stock-symbol"] + ":" + msg.content
    except MessagingError, m:
      pass
      
  print "\nControl Queue has " + str(controlBrowser.available()) + " messages"
      
  print "Control Queue messages:"
      
    for x in range(controlBrowser.available()):
    try:
      msg = controlBrowser.fetch(timeout = 1)
      print msg.properties["stock-symbol"] + ":" + msg.content
    except MessagingError, m:
      pass

  session.acknowledge()
      
except MessagingError,m:
  print m
finally:
  connection.close()