-
Language:
English
-
Language:
English
Chapter 5. Using the API
This chapter explains how to use the AMQ C++ API to perform common messaging tasks.
For more information, see the AMQ C++ API reference and AMQ C++ example suite.
5.1. Basic operation
5.1.1. Handling messaging events
AMQ C++ is an asynchronous event-driven API. To define how the application handles events, the user implements callback methods on the messaging_handler
class. These methods are then called as network activity or timers trigger new events.
Example: Handling messaging events
struct example_handler : public proton::messaging_handler { void on_container_start(proton::container& cont) override { std::cout << "The container has started\n"; } void on_sendable(proton::sender& snd) override { std::cout << "A message can be sent\n"; } void on_message(proton::delivery& dlv, proton::message& msg) override { std::cout << "A message is received\n"; } };
These are only a few common-case events. The full set is documented in the API reference.
5.1.2. Creating a container
The container is the top-level API object. It is the entry point for creating connections, and it is responsible for running the main event loop. It is often constructed with a global event handler.
Example: Creating a container
int main() {
example_handler handler {};
proton::container cont {handler};
cont.run();
}
Setting the container identity
Each container instance has a unique identity called the container ID. When AMQ C++ makes a connection, it sends the container ID to the remote peer. To set the container ID, pass it to the proton::container
constructor.
Example: Setting the container identity
proton::container cont {handler, "job-processor-3"};
If the user does not set the ID, the library will generate a UUID when the container is constucted.
5.2. Network connections
5.2.1. Connection URLs
Connection URLs encode the information used to establish new connections.
Connection URL syntax
scheme://host[:port]
-
Scheme - The connection transport, either
amqp
for unencrypted TCP oramqps
for TCP with SSL/TLS encryption. - Host - The remote network host. The value can be a hostname or a numeric IP address. IPv6 addresses must be enclosed in square brackets.
-
Port - The remote network port. This value is optional. The default value is 5672 for the
amqp
scheme and 5671 for theamqps
scheme.
Connection URL examples
amqps://example.com amqps://example.net:56720 amqp://127.0.0.1 amqp://[::1]:2000
5.2.2. Creating outgoing connections
To connect to a remote server, call the container::connect()
method with a connection URL. This is typically done inside the messaging_handler::on_container_start()
method.
Example: Creating outgoing connections
class example_handler : public proton::messaging_handler {
void on_container_start(proton::container& cont) override {
cont.connect("amqp://example.com");
}
void on_connection_open(proton::connection& conn) override {
std::cout << "The connection is open\n";
}
};
See the Section 5.6, “Security” section for information about creating secure connections.
5.2.3. Listening for incoming connections
AMQ C++ can accept inbound network connections, allowing you to build messaging servers. To start listening for connections, use the proton::container::listen()
method with a URL containing the local host address and port to listen on.
class example_handler : public proton::messaging_handler {
void on_container_start(proton::container& cont) override {
cont.listen("0.0.0.0");
}
void on_connection_open(proton::connection& conn) override {
std::cout << "New incoming connection\n";
}
};
The special IP address 0.0.0.0
listens on all available IPv4 interfaces. To listen on all IPv6 interfaces, use [::0]
.
For more information, see the server receive.cpp example.
5.2.4. Configuring reconnect
Reconnect allows a client to recover from lost connections. It is used to ensure that the components in a distributed system reestablish communication after temporary network or component failures.
AMQ C++ disables reconnect by default. To enable it, set the reconnect
connection option to an instance of the reconnect_options
class.
Example: Enabling reconnect
proton::connection_options opts {}; proton::reconnect_options ropts {}; opts.reconnect(ropts); container.connect("amqp://example.com", opts);
With reconnect enabled, if a connection is lost or a connection attempt fails, the client will try again after a brief delay. The delay increases exponentially for each new attempt.
To control the delays between connection attempts, set the delay
, delay_multiplier
, and max_delay
options. All durations are specified in milliseconds.
To limit the number of reconnect attempts, set the max_attempts
option. Setting it to 0 removes any limit.
Example: Configuring reconnect
proton::connection_options opts {}; proton::reconnect_options ropts {}; ropts.delay(proton::duration(10)); ropts.delay_multiplier(2.0); ropts.max_delay(proton::duration::FOREVER); ropts.max_attempts(0); opts.reconnect(ropts); container.connect("amqp://example.com", opts);
5.2.5. Configuring failover
AMQ C++ allows you to configure multiple connection endpoints. If connecting to one fails, the client attempts to connect to the next in the list. If the list is exhausted, the process starts over.
To specify alternate connection endpoints, set the failover_urls
reconnect option to a list of connection URLs.
Example: Configuring failover
std::vector<std::string> failover_urls = { "amqp://backup1.example.com", "amqp://backup2.example.com" }; proton::connection_options opts {}; proton::reconnect_options ropts {}; opts.reconnect(ropts); ropts.failover_urls(failover_urls); container.connect("amqp://primary.example.com", opts);
5.3. Senders and receivers
The client uses sender and receiver links to represent channels for delivering messages. Senders and receivers are unidirectional, with a source end for the message origin, and a target end for the message destination.
Source and targets often point to queues or topics on a message broker. Sources are also used to represent subscriptions.
5.3.1. Creating queues and topics on demand
Some message servers support on-demand creation of queues and topics. When a sender or receiver is attached, the server uses the sender target address or the receiver source address to create a queue or topic with a name matching the address.
The message server typically defaults to creating either a queue (for one-to-one message delivery) or a topic (for one-to-many message delivery). The client can indicate which it prefers by setting the queue
or topic
capability on the source or target.
To select queue or topic semantics, follow these steps:
- Configure your message server for automatic creation of queues and topics. This is often the default configuration.
-
Set either the
queue
ortopic
capability on your sender target or receiver source, as in the examples below.
Example: Sending to a queue created on demand
void on_container_start(proton::container& cont) override { proton::connection conn = cont.connect("amqp://example.com"); proton::sender_options opts {}; proton::target_options topts {}; topts.capabilities(std::vector<proton::symbol> { "queue" }); opts.target(topts); conn.open_sender("jobs", opts); }
Example: Receiving from a topic created on demand
void on_container_start(proton::container& cont) override { proton::connection conn = cont.connect("amqp://example.com"); proton::receiver_options opts {}; proton::source_options sopts {}; sopts.capabilities(std::vector<proton::symbol> { "topic" }); opts.source(sopts); conn.open_receiver("notifications", opts); }
For more details, see the following examples:
5.3.2. Creating durable subscriptions
A durable subscription is a piece of state on the remote server representing a message receiver. Ordinarily, message receivers are discarded when a client closes. However, because durable subscriptions are persistent, clients can detach from them and then re-attach later. Any messages received while detached are available when the client re-attaches.
Durable subscriptions are uniquely identified by combining the client container ID and receiver name to form a subscription ID. These must have stable values so that the subscription can be recovered.
To create a durable subscription, follow these steps:
Set the connection container ID to a stable value, such as
client-1
:proton::container cont {handler, "client-1"};
Create a receiver with a stable name, such as
sub-1
, and configure the receiver source for durability by setting thedurability_mode
andexpiry_policy
options:void on_container_start(proton::container& cont) override { proton::connection conn = cont.connect("amqp://example.com"); proton::receiver_options opts {}; proton::source_options sopts {}; opts.name("sub-1"); sopts.durability_mode(proton::source::UNSETTLED_STATE); sopts.expiry_policy(proton::source::NEVER); opts.source(sopts); conn.open_receiver("notifications", opts); }
To detach from a subscription, use the proton::receiver::detach()
method. To terminate the subscription, use the proton::receiver::close()
method.
For more information, see the durable-subscribe.cpp example.
5.3.3. Creating shared subscriptions
A shared subscription is a piece of state on the remote server representing one or more message receivers. Because it is shared, multiple clients can consume from the same stream of messages.
The client configures a shared subscription by setting the shared
capability on the receiver source.
Shared subscriptions are uniquely identified by combining the client container ID and receiver name to form a subscription ID. These must have stable values so that multiple client processes can locate the same subscription. If the global
capability is set in addition to shared
, the receiver name alone is used to identify the subscription.
To create a durable subscription, follow these steps:
Set the connection container ID to a stable value, such as
client-1
:proton::container cont {handler, "client-1"};
Create a receiver with a stable name, such as
sub-1
, and configure the receiver source for sharing by setting theshared
capability:void on_container_start(proton::container& cont) override { proton::connection conn = cont.connect("amqp://example.com"); proton::receiver_options opts {}; proton::source_options sopts {}; opts.name("sub-1"); sopts.capabilities(std::vector<proton::symbol> { "shared" }); opts.source(sopts); conn.open_receiver("notifications", opts); }
To detach from a subscription, use the proton::receiver::detach()
method. To terminate the subscription, use the proton::receiver::close()
method.
For more information, see the shared-subscribe.cpp example.
5.4. Message delivery
5.4.1. Sending messages
To send a message, override the on_sendable
event handler and call the sender::send()
method. The sendable
event fires when the proton::sender
has enough credit to send at least one message.
Example: Sending messages
struct example_handler : public proton::messaging_handler { void on_container_start(proton::container& cont) override { proton::connection conn = cont.connect("amqp://example.com"); conn.open_sender("jobs"); } void on_sendable(proton::sender& snd) override { proton::message msg {"job-1"}; snd.send(msg); } };
5.4.2. Tracking sent messages
When a message is sent, the sender can keep a reference to the tracker
object representing the transfer. After the message is delivered, the receiver accepts or rejects it. The sender is notified of the outcome for each tracked delivery.
To monitor the outcome of a sent message, override the on_tracker_accept
and on_tracker_reject
event handlers and map the delivery state update to the tracker returned from send()
.
Example: Tracking sent messages
void on_sendable(proton::sender& snd) override { proton::message msg {"job-1"}; proton::tracker trk = snd.send(msg); } void on_tracker_accept(proton::tracker& trk) override { std::cout << "Delivery for " << trk << " is accepted\n"; } void on_tracker_reject(proton::tracker& trk) override { std::cout << "Delivery for " << trk << " is rejected\n"; }
5.4.3. Receiving messages
To receive messages, create a receiver and override the on_message
event handler.
Example: Receiving messages
struct example_handler : public proton::messaging_handler { void on_container_start(proton::container& cont) override { proton::connection conn = cont.connect("amqp://example.com"); conn.open_receiver("jobs"); } void on_message(proton::delivery& dlv, proton::message& msg) override { std::cout << "Received message '" << msg.body() << "'\n"; } };
5.4.4. Acknowledging received messages
To explicitly accept or reject a delivery, use the delivery::accept()
or delivery::reject()
methods in the on_message
event handler.
Example: Acknowledging received messages
void on_message(proton::delivery& dlv, proton::message& msg) override { try { process_message(msg); dlv.accept(); } catch (std::exception& e) { dlv.reject(); } }
By default, if you do not explicity acknowledge a delivery, then the library accepts it after on_message
returns. To disable this behavior, set the auto_accept
receiver option to false.
5.5. Error handling
Errors in AMQ C++ can be handled in two different ways:
- Catching exceptions
- Overriding virtual functions to handle AMQP protocol or connection errors
Catching exceptions
Catching exceptions is the most basic, but least granular, way to handle errors. If an error is not handled using an override in a handler routine, an exception will be thrown and can be caught and handled. An exception thrown in this way will be thrown by the container’s run
method.
All of the exceptions that can be thrown by AMQ C++ are descended from proton::error
, which in turn is a subclass of std::runtime_error
(which is a subclass of std::exception
).
The code example below illustrates how a block could be written to catch any exception thrown from AMQ C++.
Example: API-Specific exception handling
try { // Something that might throw an exception } catch (proton::error& e) { // Handle Proton-specific problems here } catch (std::exception& e) { // Handle more general problems here }
If you require no API-specific exception handling, you only need to catch std::exception
since proton::error
descends from it.
Example: General exception handling
int main() { try { // Something that might throw an exception } catch (std::exception& e) { std::cerr << "Caught exception: " << e.what() << std::endl; } }
Because all exceptions in a C++ program descend from std::exception
, you can write a code block to wrap your main
method and display information about any std::exception
errors.
Handling connection and protocol errors
You can handle protocol-level errors by overriding the following messaging_handler
methods:
-
on_transport_error(proton::transport&)
-
on_connection_error(proton::connection&)
-
on_session_error(proton::session&)
-
on_receiver_error(proton::receiver&)
-
on_sender_error(proton::sender&)
These event handling routines are called whenever there is an error condition with the specific object that is in the event. After calling the error handler, the appropriate close handler will also be called.
If not overridden the default error handler will be called with an indication of the error condition that occurred.
There is also a default error handler:
-
on_error(proton::error_condition&)
If one of the more specific error handlers is not overridden, this will be called.
As the close handlers will be called in the event of any error, only error itself need be handled within the error handler. Resource clean up can be managed by close handlers. If there is no error handling that is specific to a particular object it is typical to use the general on_error
handler and not have a more specific handler.
5.6. Security
5.6.1. Securing connections with SSL/TLS
AMQ C++ uses SSL/TLS to encrypt communication between clients and servers.
To connect to a remote server with SSL/TLS, set the ssl_client_options
connection option and use a connection URL with the amqps
scheme. The ssl_client_options
constructor takes the filename, directory, or database ID of a CA certificate.
Example: Enabling SSL/TLS
proton::ssl_client_options sopts {"/etc/pki/ca-trust"}; proton::connection_options opts {}; opts.ssl_client_options(sopts); container.connect("amqps://example.com", opts);
5.6.2. Connecting with a user and password
AMQ C++ can authenticate connections with a user and password.
To specify the credentials used for authentication, set the user
and password
options on the connect
method.
Example: Connecting with a user and password
proton::connection_options opts {}; opts.user("alice"); opts.password("secret"); container.connect("amqps://example.com", opts);
5.6.3. Configuring SASL authentication
AMQ C++ uses the SASL protocol to perform authentication. SASL can use a number of different authentication mechanisms. When two network peers connect, they exchange their allowed mechanisms, and the strongest mechanism allowed by both is selected.
The client uses Cyrus SASL to perform authentication. Cyrus SASL uses plug-ins to support specific SASL mechanisms. Before you can use a particular SASL mechanism, the relevant plug-in must be installed. For example, you need the cyrus-sasl-plain
plug-in in order to use SASL PLAIN authentication.
To see a list of Cyrus SASL plug-ins in Red Hat Enterprise Linux, use the yum search cyrus-sasl
command. To install a Cyrus SASL plug-in, use the yum install PLUG-IN
command.
By default, AMQ C++ allows all of the mechanisms supported by the local SASL library configuration. To restrict the allowed mechanisms and thereby control what mechanisms can be negotiated, use the sasl_allowed_mechs
connection option. It takes a string containing a space-separated list of mechanism names.
Example: Configuring SASL authentication
proton::connection_options opts {};
opts.sasl_allowed_mechs("ANONYMOUS");
container.connect("amqps://example.com", opts);
This example forces the connection to authenticate using the ANONYMOUS
mechanism even if the server we connect to offers other options. Valid mechanisms include ANONYMOUS
, PLAIN
, SCRAM-SHA-256
, SCRAM-SHA-1
, GSSAPI
, and EXTERNAL
.
AMQ C++ enables SASL by default. To disable it, set the sasl_enabled
connection option to false.
Example: Disabling SASL
proton::connection_options opts {};
opts.sasl_enabled(false);
container.connect("amqps://example.com", opts);
5.6.4. Authenticating using Kerberos
Kerberos is a network protocol for centrally managed authentication based on the exchange of encrypted tickets. See Using Kerberos for more information.
- Configure Kerberos in your operating system. See Configuring Kerberos to set up Kerberos on Red Hat Enterprise Linux.
Enable the
GSSAPI
SASL mechanism in your client application.proton::connection_options opts {}; opts.sasl_allowed_mechs("GSSAPI"); container.connect("amqps://example.com", opts);
Use the
kinit
command to authenticate your user credentials and store the resulting Kerberos ticket.$ kinit USER@REALM
- Run the client program.
5.7. Timers
AMQ C++ has the ability to execute code after a delay. You can use this to implement time-based behaviors in your application, such as periodically scheduled work or timeouts.
5.7.1. Scheduling deferred work
To defer work for a fixed amount of time, use the schedule
method to set the delay and register a function defining the work.
Example: Sending a message after a delay
void on_sender_open(proton::sender& snd) override {
proton::duration interval {5 * proton::duration::SECOND};
snd.work_queue().schedule(interval, [=] { send(snd); });
}
void send(proton::sender snd) {
if (snd.credit() > 0) {
proton::message msg {"hello"};
snd.send(msg);
}
}
This example uses the schedule
method on the work queue of the sender in order to establish it as the execution context for the work.
5.8. Logging
5.8.1. Enabling protocol logging
The client can log AMQP protocol frames to the console. This data is often critical when diagnosing problems.
To enable protocol logging, set the PN_TRACE_FRM
environment variable to 1
:
Example: Enabling protocol logging
$ export PN_TRACE_FRM=1
$ <your-client-program>
To disable protocol logging, unset the PN_TRACE_FRM
environment variable.