Chapter 5. Using the API
This chapter explains how to use the AMQ Python API to perform common messaging tasks.
5.1. Basic operation
5.1.1. Handling messaging events
AMQ Python is an asynchronous event-driven API. To define how an application handles events, the user implements callback methods on the
MessagingHandler class. These methods are then called as network activity or timers trigger new events.
Example: Handling messaging events
class ExampleHandler(MessagingHandler): def on_start(self, event): print("The container event loop has started") def on_sendable(self, event): print("A message can be sent") def on_message(self, event): print("A message is received")
These are only a few common-case events. The full set is documented in the API reference.
event argument has attributes for accessing the object the event is regarding. Attributes with no relevance to a particular event are null.
Example: Accessing event objects
event.container event.connection event.session event.sender event.receiver event.delivery event.message
5.1.2. Creating a container
The container is the top-level API object. It is the entry point for creating connections, and it is responsible for running the main event loop. It is often constructed with a global event handler.
Example: Creating a container
handler = ExampleHandler() container = Container(handler) container.run()
Setting the container identity
Each container instance has a unique identity called the container ID. When AMQ Python makes a connection, it sends the container ID to the remote peer. To set the container ID, pass it to the
Example: Setting the container identity
container = Container(handler) container.container_id = "job-processor-3"
If the user does not set the ID, the library will generate a UUID when the container is constucted.
5.2. Network connections
5.2.1. Connection URLs
Connection URLs encode the information used to establish new connections.
Connection URL syntax
Scheme - The connection transport, either
amqpfor unencrypted TCP or
amqpsfor TCP with SSL/TLS encryption.
- Host - The remote network host. The value can be a hostname or a numeric IP address. IPv6 addresses must be enclosed in square brackets.
Port - The remote network port. This value is optional. The default value is 5672 for the
amqpscheme and 5671 for the
Connection URL examples
amqps://example.com amqps://example.net:56720 amqp://127.0.0.1 amqp://[::1]:2000
5.2.2. Creating outgoing connections
To connect to a remote server, call the
Container.connect() method with a connection URL. This is typically done inside the
Example: Creating outgoing connections
class ExampleHandler(MessagingHandler): def on_start(self, event): event.container.connect("amqp://example.com") def on_connection_opened(self, event): print("Connection", event.connection, "is open")
See the Section 5.5, “Security” section for information about creating secure connections.
5.2.3. Listening for incoming connections
AMQ Python can accept inbound network connections, allowing you to build messaging servers. To start listening for connections, use the
Container.listen() method with a URL containing the local host address and port to listen on.
Example: Listening for incoming connections
class ExampleHandler(MessagingHandler): def on_start(self, event): event.container.listen("0.0.0.0") def on_connection_opened(self, event): print("New incoming connection", event.connection)
The special IP address
0.0.0.0 listens on all available IPv4 interfaces. To listen on all IPv6 interfaces, use
For more information, see the server receive.py example.
5.2.4. Configuring reconnect
Reconnect allows a client to recover from lost connections. It is used to ensure that the components in a distributed system reestablish communication after temporary network or component failures.
AMQ Python enables reconnect by default. If a connection is lost or a connection attempt fails, the client will try again after a brief delay. The delay increases exponentially for each new attempt, up to a default maximum of 10 seconds.
To disable reconnect, set the
reconnect connection option to
Example: Disabling reconnect
To control the delays between connection attempts, define a class implementing the
next() methods and set the
reconnect connection option to an instance of that class.
Example: Configuring reconnect
class ExampleReconnect(object): def __init__(self): self.delay = 0 def reset(self): self.delay = 0 def next(self): if self.delay == 0: self.delay = 0.1 else: self.delay = min(10, 2 * self.delay) return self.delay container.connect("amqp://example.com", reconnect=ExampleReconnect())
next method returns the next delay in seconds. The
reset method is called once before the reconnect process begins.
5.2.5. Configuring failover
AMQ Python allows you to configure multiple connection endpoints. If connecting to one fails, the client attempts to connect to the next in the list. If the list is exhausted, the process starts over.
To specify multiple connection endpoints, set the
urls connection option to a list of connection URLs.
Example: Configuring failover
urls = ["amqp://alpha.example.com", "amqp://beta.example.com"] container.connect(urls=urls)
It is an error to use the
urls options at the same time.
5.3. Senders and receivers
The client uses sender and receiver links to represent channels for delivering messages. Senders and receivers are unidirectional, with a source end for the message origin, and a target end for the message destination.
Source and targets often point to queues or topics on a message broker. Sources are also used to represent subscriptions.
5.3.1. Creating queues and topics on demand
Some message servers support on-demand creation of queues and topics. When a sender or receiver is attached, the server uses the sender target address or the receiver source address to create a queue or topic with a name matching the address.
The message server typically defaults to creating either a queue (for one-to-one message delivery) or a topic (for one-to-many message delivery). The client can indicate which it prefers by setting the
topic capability on the source or target.
To select queue or topic semantics, follow these steps:
- Configure your message server for automatic creation of queues and topics. This is often the default configuration.
Set either the
topiccapability on your sender target or receiver source, as in the examples below.
Example: Sending to a queue created on demand
class CapabilityOptions(SenderOption): def apply(self, sender): sender.target.capabilities.put_object(symbol("queue")) class ExampleHandler(MessagingHandler): def on_start(self, event): conn = event.container.connect("amqp://example.com") event.container.create_sender(conn, "jobs", options=CapabilityOptions())
Example: Receiving from a topic created on demand
class CapabilityOptions(ReceiverOption): def apply(self, receiver): receiver.source.capabilities.put_object(symbol("topic")) class ExampleHandler(MessagingHandler): def on_start(self, event): conn = event.container.connect("amqp://example.com") event.container.create_receiver(conn, "notifications", options=CapabilityOptions())
For more information, see the following examples:
5.3.2. Creating durable subscriptions
A durable subscription is a piece of state on the remote server representing a message receiver. Ordinarily, message receivers are discarded when a client closes. However, because durable subscriptions are persistent, clients can detach from them and then re-attach later. Any messages received while detached are available when the client re-attaches.
Durable subscriptions are uniquely identified by combining the client container ID and receiver name to form a subscription ID. These must have stable values so that the subscription can be recovered.
To create a durable subscription, follow these steps:
Set the connection container ID to a stable value, such as
container = Container(handler) container.container_id = "client-1"
Configure the receiver source for durability by setting the
class SubscriptionOptions(ReceiverOption): def apply(self, receiver): receiver.source.durability = Terminus.DELIVERIES receiver.source.expiry_policy = Terminus.EXPIRE_NEVER
Create a receiver with a stable name, such as
sub-1, and apply the source properties:
event.container.create_receiver(conn, "notifications", name="sub-1", options=SubscriptionOptions())
To detach from a subscription, use the
Receiver.detach() method. To terminate the subscription, use the
For more information, see the durable-subscribe.py example.
5.4. Message delivery
5.4.1. Sending messages
To send a message, override the
on_sendable event handler and call the
Sender.send() method. The
sendable event fires when the
Sender has enough credit to send at least one message.
Example: Sending messages
class ExampleHandler(MessagingHandler): def on_start(self, event): conn = event.container.connect("amqp://example.com") sender = event.container.create_sender(conn, "jobs") def on_sendable(self, event): message = Message("job-content") event.sender.send(message)
For more information, see the send.py example.
5.4.2. Tracking sent messages
When a message is sent, the sender can keep a reference to the
delivery object representing the transfer. After the message is delivered, the receiver accepts or rejects it. The sender is notified of the outcome for each delivery.
To monitor the outcome of a sent message, override the
on_rejected event handlers and map the delivery state update to the delivery returned from
Example: Tracking sent messages
def on_sendable(self, event): message = Message(self.message_body) delivery = event.sender.send(message) def on_accepted(self, event): print("Delivery", event.delivery, "is accepted") def on_rejected(self, event): print("Delivery", event.delivery, "is rejected")
5.4.3. Receiving messages
To receive a message, create a receiver and override the
on_message event handler.
Example: Receiving messages
class ExampleHandler(MessagingHandler): def on_start(self, event): conn = event.container.connect("amqp://example.com") receiver = event.container.create_receiver(conn, "jobs") def on_message(self, event): print("Received message", event.message, "from", event.receiver)
For more information, see the receive.py example.
5.4.4. Acknowledging received messages
To explicitly accept or reject a delivery, use the
Delivery.update() method with the
REJECTED state in the
on_message event handler.
Example: Acknowledging received messages
def on_message(self, event): try: process_message(event.message) event.delivery.update(ACCEPTED) except: event.delivery.update(REJECTED)
By default, if you do not explicity acknowledge a delivery, then the library accepts it after
on_message returns. To disable this behavior, set the
auto_accept receiver option to false.
5.5.1. Securing connections with SSL/TLS
AMQ Python uses SSL/TLS to encrypt communication between clients and servers.
To connect to a remote server with SSL/TLS, use a connection URL with the
Example: Enabling SSL/TLS
5.5.2. Connecting with a user and password
AMQ Python can authenticate connections with a user and password.
To specify the credentials used for authentication, set the
password options on the
Example: Connecting with a user and password
container.connect("amqps://example.com", user="alice", password="secret")
5.5.3. Configuring SASL authentication
AMQ Python uses the SASL protocol to perform authentication. SASL can use a number of different authentication mechanisms. When two network peers connect, they exchange their allowed mechanisms, and the strongest mechanism allowed by both is selected.
The client uses Cyrus SASL to perform authentication. Cyrus SASL uses plug-ins to support specific SASL mechanisms. Before you can use a particular SASL mechanism, the relevant plug-in must be installed. For example, you need the
cyrus-sasl-plain plug-in in order to use SASL PLAIN authentication.
To see a list of Cyrus SASL plug-ins in Red Hat Enterprise Linux, use the
yum search cyrus-sasl command. To install a Cyrus SASL plug-in, use the
yum install PLUG-IN command.
By default, AMQ Python allows all of the mechanisms supported by the local SASL library configuration. To restrict the allowed mechanisms and thereby control what mechanisms can be negotiated, use the
allowed_mechs connection option. It takes a string containing a space-separated list of mechanism names.
Example: Configuring SASL authentication
This example forces the connection to authenticate using the
ANONYMOUS mechanism even if the server we connect to offers other options. Valid mechanisms include
AMQ Python enables SASL by default. To disable it, set the
sasl_enabled connection option to false.
Example: Disabling SASL
5.5.4. Authenticating using Kerberos
Kerberos is a network protocol for centrally managed authentication based on the exchange of encrypted tickets. See Using Kerberos for more information.
- Configure Kerberos in your operating system. See Configuring Kerberos to set up Kerberos on Red Hat Enterprise Linux.
GSSAPISASL mechanism in your client application.
kinitcommand to authenticate your user credentials and store the resulting Kerberos ticket.
$ kinit <user>@<realm>
- Run the client program.
5.6.1. Enabling protocol logging
The client can log AMQP protocol frames to the console. This data is often critical when diagnosing problems.
To enable protocol logging, set the
PN_TRACE_FRM environment variable to
Example: Enabling protocol logging
$ export PN_TRACE_FRM=1 $ <your-client-program>
To disable protocol logging, unset the
PN_TRACE_FRM environment variable.
The client offers distributed tracing based on the Jaeger implementation of the OpenTracing standard. Use the following steps to enable tracing in your application:
Install the tracing dependencies.
Red Hat Enterprise Linux 7
$ sudo yum install https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm $ sudo yum install python2-pip $ pip install --user --upgrade setuptools $ pip install --user opentracing jaeger-client
Red Hat Enterprise Linux 8
$ sudo dnf install python3-pip $ pip3 install --user opentracing jaeger-client
Register the global tracer in your program.
Example: Global tracer configuration
from proton.tracing import init_tracer tracer = init_tracer("<service-name>")
For more information about Jaeger configuration, see Jaeger Sampling.
When testing or debugging, you may want to force Jaeger to trace a particular operation. See the Jaeger Python client documentation for more information.
To view the traces your application captures, use the Jaeger Getting Started to run the Jaeger infrastructure and console.