Chapter 6. Connecting applications to AMQ Online
You can connect your application to AMQ Online using one of the following client examples.
To connect to the messaging service from outside the OpenShift cluster, TLS must be used with SNI set to specify the fully qualified host name for the address space. The port used is 443.
The messaging protocols supported depends on the type of address space used. For more information about address space types, see Address space.
6.1. Retrieving the self-signed CA certificate
If you opted for the selfsigned
certificate provider type in your AddressSpace
endpoint configuration, the generated CA that signed the AddressSpace
server certificate is required when connecting to the messaging client application. You can retrieve the certificate from the AddressSpace
using the following procedure.
Using a self-signed certificate in production environments is not recommended.
Procedure
Log in as a messaging tenant:
oc login -u developer
Retrieve the CA certificate from the
AddressSpace
.This will give you a file containing the CA certificate, in PEM format.
oc get addressspace myspace -n namespace -o jsonpath='{.status.caCert}{"\n"}' | base64 --decode > ca.crt
If a PKCS12 or JKS format trust store is required, use the following commands to generate one:
For PKS:
keytool -import -trustcacerts -alias root -file ca.crt -storetype pkcs12 -keystore ca.pkcs12 -storepass password -noprompt
For JKS:
keytool -import -trustcacerts -alias root -file ca.crt -storetype jks -keystore ca.jsk -storepass password -noprompt
6.2. Client examples
6.2.1. AMQ Online Python example
You can use the following AMQ Online Python example to connect your application to AMQ Online. This example assumes you have created an address of type queue
named myqueue
.
from __future__ import print_function, unicode_literals from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container class HelloWorld(MessagingHandler): def __init__(self, server, address): super(HelloWorld, self).__init__() self.server = server self.address = address def on_start(self, event): conn = event.container.connect(self.server) event.container.create_receiver(conn, self.address) event.container.create_sender(conn, self.address) def on_sendable(self, event): event.sender.send(Message(body="Hello World!")) event.sender.close() def on_message(self, event): print(event.message.body) event.connection.close() Container(HelloWorld("amqps://_messaging-route-hostname_:443", "myqueue")).run()
6.2.1.1. Known issue with creating a subscriber on a hierarchical topic
A known issue exists where creating a subscriber on a hierarchical topic in AMQ Online causes the broker to instead create it as a competing consumer (handling the address like a queue rather than a topic).
The workaround for this issue involves setting the capability "topic"
in the source.
Procedure
-
In the
simple_recv.py
file, modify thefrom proton.reactor import Container
to add theReceiverOption
:
class CapabilityOptions(ReceiverOption): def apply(self, receiver): receiver.source.capabilities.put_object(symbol("topic"))
-
Modify the following line to add
options=CapabilityOptions()
:
def on_start(self, event): event.container.create_receiver(conn, self.address, options=CapabilityOptions())
6.2.2. AMQ Online JMS example
You can use the following AMQ Online JMS example to connect your application to AMQ Online. This example assumes you have created an address of type queue
named myqueue
.
package org.apache.qpid.jms.example; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.InitialContext; public class HelloWorld { public static void main(String[] args) throws Exception { try { // The configuration for the Qpid InitialContextFactory has been supplied in // a jndi.properties file in the classpath, which results in it being picked // up automatically by the InitialContext constructor. Context context = new InitialContext(); ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup"); Destination queue = (Destination) context.lookup("myQueueLookup"); Connection connection = factory.createConnection(System.getProperty("USER"), System.getProperty("PASSWORD")); connection.setExceptionListener(new MyExceptionListener()); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageProducer messageProducer = session.createProducer(queue); MessageConsumer messageConsumer = session.createConsumer(queue); TextMessage message = session.createTextMessage("Hello world!"); messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L); if (receivedMessage != null) { System.out.println(receivedMessage.getText()); } else { System.out.println("No message received within the given timeout!"); } connection.close(); } catch (Exception exp) { System.out.println("Caught exception, exiting."); exp.printStackTrace(System.out); System.exit(1); } } private static class MyExceptionListener implements ExceptionListener { @Override public void onException(JMSException exception) { System.out.println("Connection ExceptionListener fired, exiting."); exception.printStackTrace(System.out); System.exit(1); } } }
with jndi.properties:
connectionfactory.myFactoryLookup = amqps://messaging-route-hostname:443?transport.trustAll=true&transport.verifyHost=false
queue.myQueueLookup = myqueue
6.2.3. AMQ Online JavaScript example
You can use the following AMQ Online JavaScript example to connect your application to AMQ Online. This example assumes you have created an address of type queue
named myqueue
.
var container = require('rhea'); container.on('connection_open', function (context) { context.connection.open_receiver('myqueue'); context.connection.open_sender('myqueue'); }); container.on('message', function (context) { console.log(context.message.body); context.connection.close(); }); container.on('sendable', function (context) { context.sender.send({body:'Hello World!'}); context.sender.detach(); }); container.connect({username: 'username', password: 'password', port:443, host:'messaging-route-hostname', transport:'tls', rejectUnauthorized:false});
6.2.3.1. AMQ Online JavaScript example using WebSockets
var container = require('rhea'); var WebSocket = require('ws'); container.on('connection_open', function (context) { context.connection.open_receiver('myqueue'); context.connection.open_sender('myqueue'); }); container.on('message', function (context) { console.log(context.message.body); context.connection.close(); }); container.on('sendable', function (context) { context.sender.send({body:'Hello World!'}); context.sender.detach(); }); var ws = container.websocket_connect(WebSocket); container.connect({username: 'username', password: 'password', connection_details: ws("wss://messaging-route-hostname:443", ["binary"], {rejectUnauthorized: false})});
6.2.4. AMQ Online C++ example
The C++ client has equivalent simple_recv
and simple_send
examples with the same options as Python. However, the C++ library does not perform the same level of processing on the URL; in particular it will not accept amqps://
to imply using TLS, so the example needs to be modified as follows:
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/default_container.hpp>
#include <proton/delivery.hpp>
#include <proton/message.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/ssl.hpp>
#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/url.hpp>
#include <iostream>
#include "fake_cpp11.hpp"
class hello_world : public proton::messaging_handler {
private:
proton::url url;
public:
hello_world(const std::string& u) : url(u) {}
void on_container_start(proton::container& c) OVERRIDE {
proton::connection_options co;
co.ssl_client_options(proton::ssl_client_options());
c.client_connection_options(co);
c.connect(url);
}
void on_connection_open(proton::connection& c) OVERRIDE {
c.open_receiver(url.path());
c.open_sender(url.path());
}
void on_sendable(proton::sender &s) OVERRIDE {
proton::message m("Hello World!");
s.send(m);
s.close();
}
void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
std::cout << m.body() << std::endl;
d.connection().close();
}
};
int main(int argc, char **argv) {
try {
std::string url = argc > 1 ? argv[1] : "messaging-route-hostname:443/myqueue";
hello_world hw(url);
proton::default_container(hw).run();
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
6.2.4.1. Known issue with creating a subscriber on a hierarchical topic
A known issue exists where creating a subscriber on a hierarchical topic in AMQ Online causes the broker to instead create it as a competing consumer (handling the address like a queue rather than a topic).
The workaround involves setting the capability "topic"
in the source.
Procedure
-
In the
topic_receive.cpp
file, edit the code so that it is similar to what is shown in this example:
void on_container_start(proton::container& cont) override { proton::connection conn = cont.connect(conn_url_); proton::receiver_options opts {}; proton::source_options sopts {}; sopts.capabilities(std::vector<proton::symbol> { "topic" }); opts.source(sopts); conn.open_receiver(address_, opts); }
6.2.5. AMQ Online .NET example
You can use the following AMQ Online .NET example to connect your application to AMQ Online. This example assumes you have created an address of type queue
named myqueue
.
using System;
using Amqp;
namespace Test
{
public class Program
{
public static void Main(string[] args)
{
String url = (args.Length > 0) ? args[0] : "amqps://messaging-route-hostname:443";
String address = (args.Length > 1) ? args[1] : "myqueue";
Connection.DisableServerCertValidation = true;
Connection connection = new Connection(new Address(url));
Session session = new Session(connection);
SenderLink sender = new SenderLink(session, "test-sender", address);
Message messageSent = new Message("Test Message");
sender.Send(messageSent);
ReceiverLink receiver = new ReceiverLink(session, "test-receiver", address);
Message messageReceived = receiver.Receive(TimeSpan.FromSeconds(2));
Console.WriteLine(messageReceived.Body);
receiver.Accept(messageReceived);
sender.Close();
receiver.Close();
session.Close();
connection.Close();
}
}
}