4.4.3. The Topic Subscriber

topic_subscriber.py sets up its own queues, binding them to the topic exchange with binding keys that identify interesting messages. It sets up queues for news, weather, usa, and europe, then binds these to the topic exchange using binding keys that contain wildcards. For instance, the news queue is bound using the binding key #.news, and the usa queue is bound using the binding key usa.#. If a message is published to the amq.topic exchange using the routing key usa.news, it matches both binding keys, and is delivered to both the usa and news queues.
Since we will be using four queues, print the contents of a queue in a function so that it can be reused:
def dump_queue(queue):

  content = ""		             # Content of the last message read
  final = "That's all, folks!"   # In a message body, signals the last message
  message = 0

  while content != final:
      message = queue.get(timeout=10)
      content = message.body
      print content
    except Empty:
      print "No more messages!"
You can also write a function to subscribe to a queue:
def subscribe_queue(server_queue_name, local_queue_name):

  print "Subscribing local queue '" + local_queue_name + "' to " + server_queue_name + "'"

  queue = session.incoming(local_queue_name)

  session.message_subscribe(queue=server_queue_name, destination=local_queue_name)

  return queue
Because we are using private server-side queues, we need to use unique names for these queues in the main body of the program. We do this using the session name:
# declare queues on the server

news = "news-" + session.name
weather = "weather-" + session.name
usa = "usa-" + session.name
europe = "europe-" + session.name

session.queue_declare(queue=news, exclusive=True)
session.queue_declare(queue=weather, exclusive=True)
session.queue_declare(queue=usa, exclusive=True)
session.queue_declare(queue=europe, exclusive=True)
Now the queues can be bound using wildcard matching. The message producer uses routing keys that contain multiple words separated by the . delimiter: usa.news, usa.weather, europe.news, and europe.weather. Binding keys can include wildcard characters: a # matches one or more words, a * matches a single word. In this example we use binding keys like #.news (all news items) and usa.# (all items in the USA) to match these routing keys:
# Routing keys may be "usa.news", "usa.weather", "europe.news", or "europe.weather".

# The '#' symbol matches one component of a multipart name, e.g. "#.news" matches
# "europe.news" or "usa.news".

session.exchange_bind(exchange="amq.topic", queue=news, binding_key="#.news")
session.exchange_bind(exchange="amq.topic", queue=weather, binding_key="#.weather")
session.exchange_bind(exchange="amq.topic", queue=usa, binding_key="usa.#")
session.exchange_bind(exchange="amq.topic", queue=europe, binding_key="europe.#")
When the topic publisher is finished, it sends a message using the control routing key. In the topic subscriber, we need to be able to identify the last message published to each queue, so we route the control binding queue to all four queues. AMQP guarantees the order of messages posted to a given queue will be maintained, so we know that when we get the final message, we are finished with the queue. Here is the code in topic_subscriber.py that binds the control routing key to each queue:
# Bind each queue to the 'control' binding key so we know when to stop

session.exchange_bind(exchange="amq.topic", queue=news, binding_key="control")
session.exchange_bind(exchange="amq.topic", queue=weather, binding_key="control")
session.exchange_bind(exchange="amq.topic", queue=usa, binding_key="control")
session.exchange_bind(exchange="amq.topic", queue=europe, binding_key="control")
Finally, the topic subscriber creates local queues, subscribes them to its private queues on the server, and dumps the content of each queue to show what messages have arrived:
# Subscribe local queues to server queues

local_news = "local_news"
local_weather = "local_weather"
local_usa = "local_usa" 
local_europe = "local_europe"

local_news_queue = subscribe_queue(news, local_news)
local_weather_queue = subscribe_queue(weather, local_weather)
local_usa_queue = subscribe_queue(usa, local_usa)
local_europe_queue = subscribe_queue(europe, local_europe)
# Call dump_queue to print messages from each queue

print "Messages on 'news' queue:"

print "Messages on 'weather' queue:"

print "Messages on 'usa' queue:"

print "Messages on 'europe' queue:"