Chapter 6. Connecting applications to AMQ Online

You can connect your application to AMQ Online using one of the following client examples.

To connect to the messaging service from outside the OpenShift cluster, TLS must be used with SNI set to specify the fully qualified host name for the address space. The port used is 443.

The messaging protocols supported depends on the type of address space used. For more information about address space types, see Address space.

6.1. Retrieving the self-signed CA certificate

If you opted for the selfsigned certificate provider type in your AddressSpace endpoint configuration, the generated CA that signed the AddressSpace server certificate is required when connecting to the messaging client application. You can retrieve the certificate from the AddressSpace using the following procedure.

Warning

Using a self-signed certificate in production environments is not recommended.

Procedure

  1. Log in as a messaging tenant:

    oc login -u developer
  2. Retrieve the CA certificate from the AddressSpace.

    This will give you a file containing the CA certificate, in PEM format.

    oc get addressspace myspace -n namespace -o jsonpath='{.status.caCert}{"\n"}' | base64 --decode > ca.crt
  3. If a PKCS12 or JKS format trust store is required, use the following commands to generate one:

    For PKS:

    keytool -import -trustcacerts -alias root -file ca.crt -storetype pkcs12 -keystore ca.pkcs12 -storepass password -noprompt

    For JKS:

    keytool -import -trustcacerts -alias root -file ca.crt -storetype jks -keystore ca.jsk -storepass password -noprompt

6.2. Client examples

6.2.1. AMQ Online Python example

You can use the following AMQ Online Python example to connect your application to AMQ Online. This example assumes you have created an address of type queue named myqueue.

from __future__ import print_function, unicode_literals
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container

class HelloWorld(MessagingHandler):
    def __init__(self, server, address):
        super(HelloWorld, self).__init__()
        self.server = server
        self.address = address

    def on_start(self, event):
        conn = event.container.connect(self.server)
        event.container.create_receiver(conn, self.address)
        event.container.create_sender(conn, self.address)

    def on_sendable(self, event):
        event.sender.send(Message(body="Hello World!"))
        event.sender.close()

    def on_message(self, event):
        print(event.message.body)
        event.connection.close()

Container(HelloWorld("amqps://_messaging-route-hostname_:443", "myqueue")).run()

6.2.1.1. Known issue with creating a subscriber on a hierarchical topic

A known issue exists where creating a subscriber on a hierarchical topic in AMQ Online causes the broker to instead create it as a competing consumer (handling the address like a queue rather than a topic).

The workaround for this issue involves setting the capability "topic" in the source.

Procedure

  1. In the simple_recv.py file, modify the from proton.reactor import Container to add the ReceiverOption:
class CapabilityOptions(ReceiverOption):
    def apply(self, receiver):
        receiver.source.capabilities.put_object(symbol("topic"))
  1. Modify the following line to add options=CapabilityOptions():
def on_start(self, event):
        event.container.create_receiver(conn, self.address, options=CapabilityOptions())

6.2.2. AMQ Online JMS example

You can use the following AMQ Online JMS example to connect your application to AMQ Online. This example assumes you have created an address of type queue named myqueue.

package org.apache.qpid.jms.example;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        try {
            // The configuration for the Qpid InitialContextFactory has been supplied in
            // a jndi.properties file in the classpath, which results in it being picked
            // up automatically by the InitialContext constructor.
            Context context = new InitialContext();

            ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
            Destination queue = (Destination) context.lookup("myQueueLookup");

            Connection connection = factory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD"));
            connection.setExceptionListener(new MyExceptionListener());
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            MessageProducer messageProducer = session.createProducer(queue);
            MessageConsumer messageConsumer = session.createConsumer(queue);

            TextMessage message = session.createTextMessage("Hello world!");
            messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
            TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L);

            if (receivedMessage != null) {
                System.out.println(receivedMessage.getText());
            } else {
                System.out.println("No message received within the given timeout!");
            }

            connection.close();
        } catch (Exception exp) {
            System.out.println("Caught exception, exiting.");
            exp.printStackTrace(System.out);
            System.exit(1);
        }
    }

    private static class MyExceptionListener implements ExceptionListener {
        @Override
        public void onException(JMSException exception) {
            System.out.println("Connection ExceptionListener fired, exiting.");
            exception.printStackTrace(System.out);
            System.exit(1);
        }
    }
}

with jndi.properties:

connectionfactory.myFactoryLookup = amqps://messaging-route-hostname:443?transport.trustAll=true&transport.verifyHost=false
queue.myQueueLookup = myqueue

6.2.3. AMQ Online JavaScript example

You can use the following AMQ Online JavaScript example to connect your application to AMQ Online. This example assumes you have created an address of type queue named myqueue.

var container = require('rhea');
container.on('connection_open', function (context) {
    context.connection.open_receiver('myqueue');
    context.connection.open_sender('myqueue');
});
container.on('message', function (context) {
    console.log(context.message.body);
    context.connection.close();
});
container.on('sendable', function (context) {
    context.sender.send({body:'Hello World!'});
    context.sender.detach();
});
container.connect({username: 'username', password: 'password', port:443, host:'messaging-route-hostname', transport:'tls', rejectUnauthorized:false});

6.2.3.1. AMQ Online JavaScript example using WebSockets

var container = require('rhea');
var WebSocket = require('ws');

container.on('connection_open', function (context) {
    context.connection.open_receiver('myqueue');
    context.connection.open_sender('myqueue');
});
container.on('message', function (context) {
    console.log(context.message.body);
    context.connection.close();
});
container.on('sendable', function (context) {
    context.sender.send({body:'Hello World!'});
    context.sender.detach();
});

var ws = container.websocket_connect(WebSocket);
container.connect({username: 'username', password: 'password', connection_details: ws("wss://messaging-route-hostname:443", ["binary"], {rejectUnauthorized: false})});

6.2.4. AMQ Online C++ example

The C++ client has equivalent simple_recv and simple_send examples with the same options as Python. However, the C++ library does not perform the same level of processing on the URL; in particular it will not accept amqps:// to imply using TLS, so the example needs to be modified as follows:

#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/default_container.hpp>
#include <proton/delivery.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/ssl.hpp>
#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/url.hpp>

#include <iostream>

#include "fake_cpp11.hpp"

class hello_world : public proton::messaging_handler {
  private:
    proton::url url;

  public:
    hello_world(const std::string& u) : url(u) {}

    void on_container_start(proton::container& c) OVERRIDE {
        proton::connection_options co;
        co.ssl_client_options(proton::ssl_client_options());
        c.client_connection_options(co);
        c.connect(url);
    }

    void on_connection_open(proton::connection& c) OVERRIDE {
        c.open_receiver(url.path());
        c.open_sender(url.path());
    }

    void on_sendable(proton::sender &s) OVERRIDE {
        proton::message m("Hello World!");
        s.send(m);
        s.close();
    }

    void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
        std::cout << m.body() << std::endl;
        d.connection().close();
    }
};

int main(int argc, char **argv) {
    try {
        std::string url = argc > 1 ? argv[1] : "messaging-route-hostname:443/myqueue";

        hello_world hw(url);
        proton::default_container(hw).run();

        return 0;
    } catch (const std::exception& e) {
        std::cerr << e.what() << std::endl;
    }

    return 1;
}

6.2.4.1. Known issue with creating a subscriber on a hierarchical topic

A known issue exists where creating a subscriber on a hierarchical topic in AMQ Online causes the broker to instead create it as a competing consumer (handling the address like a queue rather than a topic).

The workaround involves setting the capability "topic" in the source.

Procedure

  • In the topic_receive.cpp file, edit the code so that it is similar to what is shown in this example:
void on_container_start(proton::container& cont) override {
        proton::connection conn = cont.connect(conn_url_);
        proton::receiver_options opts {};
        proton::source_options sopts {};

        sopts.capabilities(std::vector<proton::symbol> { "topic" });
        opts.source(sopts);

        conn.open_receiver(address_, opts);
    }

6.2.5. AMQ Online .NET example

You can use the following AMQ Online .NET example to connect your application to AMQ Online. This example assumes you have created an address of type queue named myqueue.

using System;
using Amqp;

namespace Test
{
    public class Program
    {
        public static void Main(string[] args)
        {
            String url = (args.Length > 0) ? args[0] : "amqps://messaging-route-hostname:443";
            String address = (args.Length > 1) ? args[1] : "myqueue";

            Connection.DisableServerCertValidation = true;
            Connection connection = new Connection(new Address(url));
            Session session = new Session(connection);
            SenderLink sender = new SenderLink(session, "test-sender", address);

            Message messageSent = new Message("Test Message");
            sender.Send(messageSent);

            ReceiverLink receiver = new ReceiverLink(session, "test-receiver", address);
            Message messageReceived = receiver.Receive(TimeSpan.FromSeconds(2));
            Console.WriteLine(messageReceived.Body);
            receiver.Accept(messageReceived);

            sender.Close();
            receiver.Close();
            session.Close();
            connection.Close();
        }
    }
}