Red Hat Training

A Red Hat training course is available for Red Hat AMQ

Chapter 5. Using the API

This chapter explains how to use the AMQ C++ API to perform common messaging tasks.

5.1. Basic Operation

5.1.1. Handling Messaging Events

AMQ C++ is an asynchronous event-driven API. To define how the application handles events, the user implements callback methods on the messaging_handler class. These methods are then called as network activity or timers trigger new events.

Example: Handling Messaging Events

struct example_handler : public proton::messaging_handler {
    void on_container_start(proton::container& cont) override {
        std::cout << "The container has started\n";
    }

    void on_sendable(proton::sender& snd) override {
        std::cout << "A message can be sent\n";
    }

    void on_message(proton::delivery& dlv, proton::message& msg) override {
        std::cout << "A message is received\n";
    }
};

These are only a few common-case events. The full set is documented in the API reference.

5.1.2. Creating a Container

The container is the top-level API object. It is the entry point for creating connections, and it is responsible for running the main event loop. It is often constructed with a global event handler.

Example: Creating a Container

int main() {
    example_handler handler {};
    proton::container cont {handler};
    cont.run();
}

Setting the Container Identity

Each container instance has a unique identity called the container ID. When AMQ C++ makes a connection, it sends the container ID to the remote peer. To set the container ID, pass it to the proton::container constructor.

Example: Setting the Container Identity

proton::container cont {handler, "job-processor-3"};

If the user does not set the ID, the library will generate a UUID when the container is constucted.

5.2. Network Connections

5.2.1. Connection URLs

Connection URLs encode the information used to establish new connections.

Connection URL Syntax

scheme://host[:port]

  • Scheme - The connection transport, either amqp for unencrypted TCP or amqps for TCP with SSL/TLS encryption.
  • Host - The remote network host. The value can be a hostname or a numeric IP address. IPv6 addresses must be enclosed in square brackets.
  • Port - The remote network port. This value is optional. The default value is 5672 for the amqp scheme and 5671 for the amqps scheme.

Connection URL Examples

amqps://example.com
amqps://example.net:56720
amqp://127.0.0.1
amqp://[::1]:2000

5.2.2. Creating Outgoing Connections

To connect to a remote server, call the container::connect() method with a connection URL. This is typically done inside the messaging_handler::on_container_start() method.

Example: Creating Outgoing Connections

class example_handler: public proton::messaging_handler {
    void on_container_start(proton::container& cont) override {
        cont.connect("amqp://example.com");
    }

    void on_connection_open(proton::connection& conn) override {
        std::cout << "The connection to is open\n";
    }
};

See the Section 5.5, “Security” section for information about creating secure connections.

5.2.3. Configuring Reconnect

Reconnect allows a client to recover from lost connections. It is used to ensure that the components in a distributed system reestablish communication after temporary network or component failures.

AMQ C++ disables reconnect by default. To enable it, set the reconnect connection option to an instance of the reconnect_options class.

Example: Enabling Reconnect

proton::connection_options opts {};
proton::reconnect_options ropts {};

opts.reconnect(ropts);

container.connect("amqp://example.com", opts);

With reconnect enabled, if a connection is lost or a connection attempt fails, the client will try again after a brief delay. The delay increases exponentially for each new attempt.

To control the delays between connection attempts, set the delay, delay_multiplier, and max_delay options. All durations are specified in milliseconds.

To limit the number of reconnect attempts, set the max_attempts option. Setting it to 0 removes any limit.

Example: Configuring Reconnect

proton::connection_options opts {};
proton::reconnect_options ropts {};

ropts.delay(proton::duration(10));
ropts.delay_multiplier(2.0);
ropts.max_delay(proton::duration::FOREVER);
ropts.max_attempts(0);

opts.reconnect(ropts);

container.connect("amqp://example.com", opts);

5.2.4. Configuring Failover

AMQ C++ allows you to configure multiple connection endpoints. If connecting to one fails, the client attempts to connect to the next in the list. If the list is exhausted, the process starts over.

To specify alternate connection endpoints, set the failover_urls reconnect option to a list of connection URLs.

Example: Configuring Failover

std::vector<std::string> failover_urls = {
    "amqp://backup1.example.com",
    "amqp://backup2.example.com"
};

proton::connection_options opts {};
proton::reconnect_options ropts {};

opts.reconnect(ropts);
ropts.failover_urls(failover_urls);

container.connect("amqp://primary.example.com", opts);

5.3. Message Delivery

5.3.1. Sending Messages

To send a message, override the on_sendable event handler and call the sender::send() method. The on_sendable method is called when the proton::sender has enough credit to send at least one message.

Example: Sending Messages

struct send_handler : public proton::messaging_handler {
    void on_container_start(proton::container& cont) override {
        proton::connection conn = cont.connect("amqp://example.com");
        conn.open_sender("jobs");
    }

    void on_sendable(proton::sender& snd) override {
        proton::message msg {"job-1"};
        snd.send(msg);
    }
};

5.3.2. Tracking Sent Messages

To track sent messages, override the on_tracker_* methods.

Example: Tracking Sent Messages

void on_sendable(proton::sender& snd) override {
    proton::message msg {"job-1"};
    snd.send(msg);
}

void on_tracker_accept(proton::tracker& trk) override {
    std::cout << "Delivery accepted\n";
}

void on_tracker_reject(proton::tracker& trk) override {
    std::cout << "Delivery rejected\n";
}

5.3.3. Receiving Messages

To receive messages, create a receiver and override the on_message method.

Example: Receiving Messages

struct receive_handler : public proton::messaging_handler {
    void on_container_start(proton::container& cont) override {
        proton::connection conn = cont.connect("amqp://example.com");
        conn.open_receiver("jobs");
    }

    void on_message(proton::delivery& dlv, proton::message& msg) override {
        std::cout << "Received message '" << msg.body() << "'\n";
    }
};

5.3.4. Acknowledging Received Messages

To explicitly accept or reject a delivery, use the delivery::accept() or delivery::reject() methods in the on_message method.

Example: Acknowledging Received Messages

void on_message(proton::delivery& dlv, proton::message& msg) override {
    try {
        process_message(msg);
        dlv.accept();
    } catch (std::exception& e) {
        dlv.reject();
    }
}

5.4. Error Handling

Errors in AMQ C++ can be handled in two different ways.

  • Catching exceptions
  • Overriding virtual functions to handle AMQP protocol or connection errors

Catching Exceptions

Catching exceptions is the most basic, but least granular, way to handle errors. If an error is not handled using an override in a handler routine, an exception will be thrown and can be caught and handled. An exception thrown in this way will be thrown by the container’s run method.

All of the exceptions that can be thrown by AMQ C++ are descended from proton::error, which in turn is a subclass of std::runtime_error (which is a subclass of std::exception).

The code example below illustrates how a block could be written to catch any exception thrown from AMQ C++.

Example: API-Specific Exception Handling

try {
    // Something that might throw an exception
} catch (proton::error& e) {
    // Handle Proton-specific problems here
} catch (std::exception& e) {
    // Handle more general problems here
}

If you require no API-specific exception handling, you only need to catch std::exception since proton::error descends from it.

Example: General Exception Handling

int main() {
    try {
        // Something that might throw an exception
    } catch (std::exception& e) {
        std::cerr << "Caught exception: " << e.what() << std::endl;
    }
}

Note

Because all exceptions in a C++ program descend from std::exception, you can write a code block to wrap your main method and display information about any std::exception errors.

Handling Connection and Protocol Errors

You can handle protocol-level errors by overriding the following messaging_handler methods:

  • on_transport_error(proton::transport&)
  • on_connection_error(proton::connection&)
  • on_session_error(proton::session&)
  • on_receiver_error(proton::receiver&)
  • on_sender_error(proton::sender&)

These event handling routines are called whenever there is an error condition with the specific object that is in the event. After calling the error handler, the appropriate close handler will also be called.

If not overridden the default error handler will be called with an indication of the error condition that occurred.

There is also a default error handler:

  • on_error(proton::error_condition&)

If one of the more specific error handlers is not overridden, this will be called.

Note

As the close handlers will be called in the event of any error, only error itself need be handled within the error handler. Resource clean up can be managed by close handlers. If there is no error handling that is specific to a particular object it is typical to use the general on_error handler and not have a more specific handler.

5.5. Security

5.5.1. Securing Connections with SSL/TLS

AMQ C++ uses SSL/TLS to encrypt communication between clients and servers.

To connect to a remote server with SSL/TLS, use a connection URL with the amqps scheme.

Example: Enabling SSL/TLS

container.connect("amqps://example.com");

5.5.2. Connecting with a User and Password

AMQ C++ can authenticate connections with a user and password.

To specify the credentials used for authentication, set the user and password options on the connect method.

Example: Connecting with a User and Password

proton::connection_options opts {};
opts.user("alice");
opts.password("secret");

container.connect("amqps://example.com", opts);

5.5.3. Configuring SASL Authentication

AMQ C++ uses the SASL protocol to perform authentication. SASL can use a number of different authentication mechanisms. When two network peers connect, they exchange their allowed mechanisms, and the strongest mechanism allowed by both is selected.

Note

The client uses Cyrus SASL to perform authentication. Cyrus SASL uses plug-ins to support specific SASL mechanisms. Before you can use a particular SASL mechanism, the relevant plug-in must be installed. For example, you need the cyrus-sasl-plain plug-in in order to use SASL PLAIN authentication.

To see a list of Cyrus SASL plug-ins in Red Hat Enterprise Linux, use the yum search cyrus-sasl command. To install a Cyrus SASL plug-in, use the yum install PLUG-IN command.

By default, AMQ C++ allows all of the mechanisms supported by the local SASL library configuration. To restrict the allowed mechanisms and thereby control what mechanisms can be negotiated, use the sasl_allowed_mechs connection option. It takes a string containing a space-separated list of mechanism names.

Example: Configuring SASL Authentication

proton::connection_options opts {};
opts.sasl_allowed_mechs("ANONYMOUS");

container.connect("amqps://example.com", opts);

This example forces the connection to authenticate using the ANONYMOUS mechanism even if the server we connect to offers other options. Valid mechanisms include ANONYMOUS, PLAIN, SCRAM-SHA-256, SCRAM-SHA-1, GSSAPI, and EXTERNAL.

AMQ C++ enables SASL by default. To disable it, set the sasl_enabled connection option to false.

Example: Disabling SASL

proton::connection_options opts {};
opts.sasl_enabled(false);

container.connect("amqps://example.com", opts);

5.5.4. Authenticating Using Kerberos

Kerberos is a network protocol for centrally managed authentication based on the exchange of encrypted tickets. See Using Kerberos for more information.

  1. Configure Kerberos in your operating system. See Configuring Kerberos to set up Kerberos on Red Hat Enterprise Linux.
  2. Enable the GSSAPI SASL mechanism in your client application.

    proton::connection_options opts {};
    opts.sasl_allowed_mechs("GSSAPI");
    
    container.connect("amqps://example.com", opts);
  3. Use the kinit command to authenticate your user credentials and store the resulting Kerberos ticket.

    $ kinit USER@REALM
  4. Run the client program.

5.6. More Information

For more information, see the API reference.