3.2. JMS AMQP 1.0 Client API

JMS AMQP 1.0 Client API is based on the Apache Qpid JMS AMQP 1.0 Client API.
Note
This is an initial version of documentation for the JMS client. Regular updates and enhancements of the documentation can be expected after the GA release of Fuse 6.2.0

3.2.1. Getting Started with AMQP

Getting started with AMQP

To run a simple demonstration of AMQP in JBoss A-MQ, you need to set up the following parts of the application:
  • Configure the broker to use AMQP—to enable AMQP in the broker, add an AMQP endpoint to the broker's configuration. This implicitly activates the broker's AMQP integration, ensuring that incoming messages are converted from AMQP message format to JMS message format, as required.
  • Implement the AMQP clients—the AMQP clients are based on the Apache Qpid JMS client libraries.

3.2.2. Configuring the Broker for AMQP

Overview

Configuring the broker to use AMQP is relatively straightforward in JBoss A-MQ, because the required AMQP packages are pre-installed in the container. There are essentially two main points you need to pay attention to:
  • Make sure that you have appropriate user entries in the etc/users.properties file, so that the AMQP clients will be able to log on to the broker.
  • Add an AMQP endpoint to the broker (by inserting a transportConnector element into the broker's XML configuration).

Steps to configure the broker

Perform the following steps to configure the broker with an AMQP endpoint:
  1. This example assumes that you are working with a fresh install of a standalone JBoss A-MQ broker, InstallDir.
  2. Define a JAAS user for the AMQP clients, so that the AMQP clients can authenticate themselves to the broker using JAAS security (security is enabled by default in the broker). Edit the InstallDir/etc/users.properties file and add a new user entry, as follows:
    #
    # This file contains the valid users who can log into JBoss A-MQ.
    # Each line has to be of the format:
    #
    # USER=PASSWORD,ROLE1,ROLE2,...
    #
    # All users and roles entered in this file are available after JBoss A-MQ startup
    # and modifiable via the JAAS command group. These users reside in a JAAS domain
    # with the name "karaf"..
    #
    # You must have at least one users to be able to access JBoss A-MQ resources
    
    #admin=admin,admin
    guest=guest
    At this point, you can add entries for any other secure users you want. In particular, it is advisable to have at least one user with the admin role, so that you can log into the secure container remotely (remembering to choose a secure password for the admin user).
    Note
    To avoid authentication issue, include the user guest in the list of authorizationEntries for jaasAuthenticationPlugin in activemq.xml
  3. Add an AMQP endpoint to the broker configuration. Edit the broker configuration file, InstallDir/etc/activemq.xml. As shown in the following XML fragment, add the highlighted transportConnector element as a child of the transportConnectors element in the broker configuration:
    <beans ...>
        ...
        <broker ...>
            ...
            <transportConnectors>
                <transportConnector name="amqp" uri="amqp://127.0.0.1:5672"/>
    	    <transportConnector name="openwire" uri="tcp://${bindAddress}:${bindPort}"/>
            </transportConnectors>
        </broker>
    
    </beans>
  4. To start the broker, open a new command prompt, change directory to InstallDir/bin, and enter the following command:
    ./amq

Message conversion

The AMQP endpoint in the broker implicitly converts incoming AMQP format messages into JMS format messages (which is the format in which messages are stored in the broker). The endpoint configuration shown here uses the default options for this conversion.

Reference

For full details of how to configure an AMQP endpoint in the broker, see the "Advanced Message Queueing Protocol (AMQP)" chapter from the Connection Reference. This also includes details of how to customize the message conversion from AMQP format to JMS format.

3.2.3. AMQP Example Clients

Overview

This section explains how to implement two basic AMQP clients: an AMQP sender client, which sends messages to a queue on the broker; and an AMQP reciever client, which pulls messages off the queue on the broker. The clients themselves use generic JMS code to access the messaging system. The key details of the AMQP configuration are retrieved using JNDI properties.

Prerequisites

Before building the example clients, you must install and configure the Apache Maven build tool, as described in Section 1.2, “Preparing to use Maven”.
Ensure A-MQ broker is running.
The Qpid client and the example packages are downloaded from the repository Qpid-JMS and build.

Steps to implement and run the AMQP clients

Perform the following steps to implement and run an AMQP producer client and an AMQP consumer client:
  1. At any convenient location, download and extract the qpid-jms code for example examples, to hold the example code:
    mkdir example
  2. The extracted files should have the following directory structure for the examples project:
    ├── apache-qpid-jms
    │   ├── pom.xml
    │   └── src
    ├── LICENSE
    ├── NOTICE
    ├── pom.xml
    ├── qpid-jms-client
    │   ├── pom.xml
    │   └── src
    ├── qpid-jms-discovery
    │   ├── pom.xml
    │   └── src
    ├── qpid-jms-docs
    │   ├── Configuration.md
    │   ├── pom.xml
    │   └── README.txt
    ├── qpid-jms-examples
    │   ├── pom.xml
    │   ├── README.txt
    │   └── src
    ├── qpid-jms-interop-tests
    │   ├── pom.xml
    │   ├── qpid-jms-activemq-tests
    │   └── README.md
    ├── README.md
    └── target
        └── maven-shared-archive-resources
    
  3. On the console, run the command mvn clean package dependency:copy-dependencies -DincludeScope=runtime -DskipTests
    After building the code (and downloading any packages required by Maven), if the build is successful, you should see output like the following in the console window:
    [INFO] ------------------------------------------------------------------------
    [INFO] Reactor Summary:
    [INFO] 
    [INFO] QpidJMS ............................................ SUCCESS [05:36 min]
    [INFO] QpidJMS Client ..................................... SUCCESS [01:04 min]
    [INFO] QpidJMS Discovery Library .......................... SUCCESS [ 33.068 s]
    [INFO] QpidJMS Broker Interop Tests ....................... SUCCESS [  0.024 s]
    [INFO] QpidJMS ActiveMQ Interop Tests ..................... SUCCESS [ 18.120 s]
    [INFO] QpidJMS Examples ................................... SUCCESS [  0.144 s]
    [INFO] QpidJMS Docs ....................................... SUCCESS [  0.017 s]
    [INFO] Apache Qpid JMS .................................... SUCCESS [ 22.253 s]
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 08:33 min
    [INFO] Finished at: 2015-06-15T21:24:01+05:30
    [INFO] Final Memory: 35M/200M
    [INFO] ------------------------------------------------------------------------
    
  4. Run the following java command:
    java -cp "target/classes/:target/dependency/*" org.apache.qpid.jms.example.HelloWorld
    After building the code, this target proceeds to run the consumer client, which reads messages from the queue queue. You should see output like the following in the console window:
    Hello world!
    

A Simple Messaging Program in Java JMS

The following program shows how to send and receive a message using the Qpid JMS client. JMS programs typically use JNDI to obtain connection factory and destination objects which the application needs. In this way the configuration is kept separate from the application code itself.
In this example, we create a JNDI context using a properties file, use the context to lookup a connection factory, create and start a connection, create a session, and lookup a destination from the JNDI context. Then we create a producer and a consumer, send a message with the producer and receive it with the consumer. This code should be straightforward for anyone familiar with Java JMS.
Note
The example uses a Queue named "queue". You need to create this before running the example, depending on the broker/peer you are using.

Example 3.1. "Hello world!" in Java

      package org.apache.qpid.jms.example;

     import javax.jms.Connection;
     import javax.jms.ConnectionFactory;
     import javax.jms.DeliveryMode;
     import javax.jms.Destination;
     import javax.jms.ExceptionListener;
     import javax.jms.JMSException;
     import javax.jms.Message;
     import javax.jms.MessageConsumer;
     import javax.jms.MessageProducer;
     import javax.jms.Session;
     import javax.jms.TextMessage;
     import javax.naming.Context;
     import javax.naming.InitialContext;

      public class HelloWorld {

       private static final String USER = "guest";
       private static final String PASSWORD = "guest";\
       
    
      public static void main(String[] args) throws Exception { 
      
      try {
      
          // The configuration for the Qpid InitialContextFactory has been supplied in
          // a jndi.properties file in the classpath, which results in it being picked
          // up automatically by the InitialContext constructor.
          Context context = new InitialContext(); 1
      
          ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup"); 2
          Destination queue = (Destination) context.lookup("myQueueLookup");
          
          Connection connection = factory.createConnection(USER, PASSWORD); 3
          connection.setExceptionListener(new MyExceptionListener());
          connection.start(); 
          
          Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 4
          
          MessageProducer messageProducer = session.createProducer(queue); 5
          MessageConsumer messageConsumer = session.createConsumer(queue); 6
          
          
          TextMessage message = session.createTextMessage("Hello world!");
              messageProducer.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
              TextMessage receivedMessage = (TextMessage) messageConsumer.receive(2000L); 7
          
          if (receivedMessage != null) {
                System.out.println(receivedMessage.getText());
          } else {
                System.out.println("No message received within the given timeout!");
          }

          connection.close();  8
       } catch (Exception exp) {
          System.out.println("Caught exception, exiting.");
          exp.printStackTrace(System.out);
          System.exit(1);
       }
      }

      private static class MyExceptionListener implements ExceptionListener {
        @Override
        public void onException(JMSException exception) {
            System.out.println("Connection ExceptionListener fired, exiting.");
            exception.printStackTrace(System.out);
            System.exit(1);
        }
      }
    }


1
Creates the JNDI initial context.
2
Creates a JMS connection factory for Qpid.
3
Creates a JMS connection.
4
Creates a session. This session is not transactional (transactions='false'), and messages are automatically acknowledged.
5
Creates a producer that sends messages to the topic exchange.
6
Creates a consumer that reads messages from the topic exchange.
7
Reads the next available message.
8
Closes the connection, all sessions managed by the connection, and all senders and receivers managed by each session.
The contents of the jndi.properties file are shown below.

Example 3.2. JNDI Properties File for "Hello world!" example

              
              java.naming.factory.initial = org.apache.qpid.jms.jndi.JmsInitialContextFactory
              
              connectionfactory.myFactoryLookup = amqp://localhost:5672 1

              queue.myQueueLookup = queue
              topic.myTopicLookup = topic  2

1
Defines a connection factory from which connections can be created. The syntax of a ConnectionURL is given in the section called “Apache Qpid JMS Client Configuration”.
2
Defines a destination for which MessageProducers and/or MessageConsumers can be created to send and receive messages. The value for the destination in the properties file is an address string. In the JMS implementation MessageProducers are analogous to senders in the Qpid Message API, and MessageConsumers are analogous to receivers.

Apache Qpid JMS Client Configuration

Apache Qpid JMS 0.5.0 provides various configuration options for the client such as, configuring and creating a JNDI InitialContext, configuration syntax, and URI options that can be set when defining a ConnectionFactory.

Configuring a JNDI InitialContext

JNDI InitialContext is used to to look up JMS objects such as ConnectionFactory and is obtained from an InitialContextFactory. The Qpid JMS client provides an implementation of the InitialContextFactory in class org.apache.qpid.jms.jndi.JmsInitialContextFactory. You can configure JNDI InitialContext in three ways.
  • Using jndi.properties file on the Java Classpath.
    To configure JNDI InitialContext using the properties file, Include the file jndi.properties on the Classpath and set the java.naming.factory.initial property to value org.apache.qpid.jms.jndi.JmsInitialContextFactory. The Qpid InitialContextFactory implementation is discovered while instantiating InitialContext object.
    javax.naming.Context ctx = new javax.naming.InitialContext();
    The ConnectionFactory, Queue and Topic objects contained in the context are configured using properties, either directly within the jndi.properties file, or in a separate file which is referenced in jndi.properties using the java.naming.provider.url property.
  • Using system properties.
    To configure JNDI InitialContext using the system properties, set the java.naming.factory.initial to value org.apache.qpid.jms.jndi.JmsInitialContextFactory. The Qpid InitialContextFactory implementation is discovered while instantiating InitialContext object.
    javax.naming.Context ctx = new javax.naming.InitialContext();
    The ConnectionFactory, Queue and Topic objects contained in the context are configured using properties, which is passed using the java.naming.provider.url system property.
  • Programmatically using an environment Hashtable.
    The InitialContext can be configured by passing an environment during creation:
    Hashtable<Object, Object> env = new Hashtable<Object, Object>();
    env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
    javax.naming.Context context = new javax.naming.InitialContext(env);
    The ConnectionFactory, Queue and Topic objects contained in the context are configured using properties, either directly within the environment Hashtable or a seperate file which is referenced using the java.naming.provider.url property within the environment Hashtable.

Syntax of the Properties file

The property syntax used in the properties file or environment Hashtable is as follows:
  • For ConnectionFactory, use connectionfactory.lookupName = URI, for example, connectionfactory.myFactoryLookup = amqp://localhost:5672
  • For a Queue, use queue.lookupName = queueName , for example, queue.myQueueLookup = queueA
  • For a Topic, use topic.lookupName = topicName,for example, topic.myTopicLookup = topicA
These objects could then be looked up from a Context as follows:
ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
Queue queue = (Queue) context.lookup("myQueueLookup");
Topic topic = (Topic) context.lookup("myTopicLookup");

Connection URI

The basic format of the clients Connection URI is as follows:
amqp://hostname:port[?option=value[&option2=value...]]
The client can be configured in different settings using the URI while defining the ConnectionFactory, these settings are detailed in the following sections.

JMS Configuration options

The options are applicable to the JMS objects such as Connection, Session, MessageConsumer, and MessageProducer.
Option Name
Description
jms.username
User name value used to authenticate the connection
jms.password
Password value used to authenticate the connection.
jms.clientID
The ClientID value that is applied to the connection.
jms.forceAsyncSend
Configures whether all Messages sent from a MessageProducer are sent asynchronously or only those Message that qualify such as Messages inside a transaction or non-persistent messages.
jms.alwaysSyncSend
Override all asynchronous send conditions and always sends every Message from a MessageProducer synchronously.
jms.sendAcksAsync
Causes all Message acknowledgments to be sent asynchronously.
jms.localMessagePriority
If enabled prefetched messages are reordered locally based on their given Message priority value. Default value is false
jms.localMessageExpiry
Controls whether MessageConsumer instances locally filter expired Messages or deliver them. By default this value is set to true and expired messages are filtered
jms.validatePropertyNames
If message property names should be validated as valid Java identifiers. Default is true.
jms.queuePrefix
Optional prefix value added to the name of any Queue created from a JMS Session.
jms.topicPrefix
Optional prefix value added to the name of any Topic created from a JMS Session.
jms.closeTimeout
Timeout value that controls how long the client waits on Connection close before returning. (By default the client waits 15 seconds for a normal close completion event).
jms.connectTimeout
Timeout value that controls how long the client waits on Connection establishment before returning with an error. (By default the client waits 15 seconds for a connection to be established before failing).
jms.clientIDPrefix
Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory. The default prefix is 'ID:'.
jms.connectionIDPrefix
Optional prefix value that is used for generated Connection ID values when a new Connection is created for the JMS ConnectionFactory. This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing the logs easier. The default prefix is 'ID:'.
These values control how many messages the remote peer can send to the client and be held in a prefetch buffer for each consumer instance.
  • jms.prefetchPolicy.queuePrefetch defaults to 1000
  • jms.prefetchPolicy.topicPrefetch defaults to 1000
  • jms.prefetchPolicy.queueBrowserPrefetch defaults to 1000
  • jms.prefetchPolicy.durableTopicPrefetch defaults to 1000
  • jms.prefetchPolicy.all used to set all prefetch values at once.
The RedeliveryPolicy controls how redelivered messages are handled to the client. jms.redeliveryPolicy.maxRedeliveries controls when an incoming message is rejected based on the number of times it has been redelivered, the default value is (-1) disabled. A value of zero would indicate no message redeliveries are accepted, a value of five would allow a message to be redelivered five times, and so on.

TCP Transport Configuration options

To use a remote connection using plain TCP these options configure the behavior of the underlying socket. These options are appended to the connection URI along with the other configuration options, for example:
amqp://localhost:5672?jms.clientID=foo&transport.connectTimeout=30000
The TCP Transport options are listed below:
Option Name
Default Value
transport.sendBufferSize
64k
transport.receiveBufferSize
64k
transport.trafficClass
10
transport.connectTimeout
60 secs
transport.soTimeout
-1
transport.soLinger
-1
transport.tcpKeepAlive
false
transport.tcpNoDelay
true

SSL Transport Configuration options

The SSL Transport extends the TCP Transport and is enabled using the amqps URI scheme hence all the TCP Transport options are valid on an SSL Transport URI.
A simple SSL based client URI is shown below:
amqps://localhost:5673
SSL Transport options is listed below:
transport.keyStoreLocation
default is to read from the system property javax.net.ssl.keyStore.
transport.keyStorePassword
default is to read from the system property javax.net.ssl.keyStorePassword.
transport.trustStoreLocation
default is to read from the system property javax.net.ssl.trustStore.
transport.trustStorePassword
default is to read from the system property javax.net.ssl.trustStorePassword.
transport.storeType
The type of trust store being used. Default is "JKS".
transport.contextProtocol
The protocol argument used when getting an SSLContext. Default is "TLS".
transport.enabledCipherSuites
The cipher suites to enable, comma separated. The context default ciphers are used. Any disabled ciphers are removed.
transport.disabledCipherSuites
The cipher suites to disable, comma separated. Ciphers listed here are removed from the enabled ciphers. No default.
transport.enabledProtocols
The protocols to enable, comma separated, the context default protocols are used. Any disabled protocols are removed.
transport.disabledProtocols
The protocols to disable, comma separated. Protocols listed here are removed from the enabled protocols. Default is "SSLv2Hello,SSLv3".
transport.trustAll
Whether to trust the provided server certificate implicitly, regardless of any configured trust store. Defaults to false.
transport.verifyHost
Whether to verify that the hostname being connected to matches with the provided server certificate. Defaults to true.
transport.keyAlias
The alias to use when selecting a keypair from the keystore to send a client certificate to the server. No default.

Failover Configuration options

If failover is enabled the client can reconnect to a different broker automatically when the connection to the current connection is lost. The failover URI is always initiated with the failover prefix and a list of URIs for the brokers. The jms.* options are applied to the overall failover URI, outside the parentheses, and affect the JMS Connection object for its lifetime.
The URI for failover is shown as follows:
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.maxReconnectAttempts=20
The individual broker details within the parentheses can use the transport. or amqp. options defined earlier, with these being applied as each host is connected to:
failover:(amqp://host1:5672?amqp.option=value,amqp://host2:5672?transport.option=value)?jms.clientID=foo
Failover configuration options are listed below:
failover.initialReconnectDelay
The amount of time the client will wait before the first attempt to reconnect to a remote peer. The default value is zero.
failover.reconnectDelay
Controls the delay between successive reconnection attempts, defaults to 10 milliseconds. If the backoff option is not enabled this value remains constant.
failover.maxReconnectDelay
The maximum time that the client will wait before attempting a reconnect. This value is used when the backoff feature is enabled to ensure that the delay is not too long. Defaults to 30 seconds.
failover.useReconnectBackOff
Controls whether the time between reconnection attempts should grow based on a configured multiplier. Defaults value is true.
failover.reconnectBackOffMultiplier
The multiplier used to grow the reconnection delay value, defaults to 2.0d.
failover.maxReconnectAttempts
The number of reconnection attempts allowed before reporting the connection as failed to the client. The default is no limit or (-1).
failover.startupMaxReconnectAttempts
For a client that has never connected to a remote peer. This option controls the number of attempts made to connect before reporting the connection as failed. The default value is maxReconnectAttempts.
failover.warnAfterReconnectAttempts
Controls how often the client logs a message indicating that failover reconnection is being attempted. The default is to log every 10 connection attempts.
transport.enabledProtocols
The protocols to enable, the values are comma separated and the context default protocols are used. Any disabled protocols are removed.
The failover URI also supports defining nested options as a means of specifying AMQP and transport option values applicable to all the individual nested broker URI's, which can be useful to avoid repetition. This is accomplished using the same transport. and amqp. URI options outlined earlier for a non-failover broker URI but prefixed with failover.nested.
For example, to apply the same value for the amqp.vhost option to every broker connected to you might have a URI like:

failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.nested.amqp.vhost=myhost

AMQP Configuration options

The AMQP configuration options apply to certain functionality.
  • amqp.idleTimeout : The idle timeout in milliseconds, the connection fails if the peer sends no AMQP frames. Default value 60000.
  • amqp.vhost : The vhost to connect to. Used to populate the Sasl and Open hostname fields. Default is the main hostname from the Connection URI.
  • amqp.saslLayer: Controls whether connections should use a SASL layer or not. Default is true.
  • amqp.saslMechanisms: Which SASL mechanism(s) the client should allow selection of, if offered by the server and usable with the configured credentials. Comma separated if specifying more than 1 mechanism. Default is to allow selection from all the clients supported mechanisms, which are currently EXTERNAL, CRAM-MD5, PLAIN, and ANONYMOUS.
  • amqp.maxFrameSize: The max-frame-size value in bytes that is advertised to the peer. Default is 1048576.

Discovery Configuration options

The client has an optional Discovery module, which provides a customized failover layer where the broker URIs to connect to are not given in the initial URI, but discovered as the client operates via associated discovery agents. There are currently two discovery agent implementations, a file watcher that loads URIs from a file, and a multicast listener that works with ActiveMQ 5 brokers which have been configured to broadcast their broker addresses for listening clients.
The general set of failover related options when using discovery are the same as those detailed earlier, with the main prefix updated from failover. to discovery., and with the 'nested' options prefix used to supply URI options common to all the discovered broker URIs bring updated from failover.nested. to discovery.discovered. For example, without the agent URI details, a general discovery URI might look like:
	    discovery:(<agent-uri>)?discovery.maxReconnectAttempts=20&discovery.discovered.jms.clientID=foo
To use the file watcher discovery agent, utilise an agent URI of the form: discovery:(file:///path/to/monitored-file?updateInterval=60000)
The URI options for the file watcher discovery agent are listed below:
  • updateInterval: Controls the frequency in milliseconds which the file is inspected for change. The default value is 30000.
To use the multicast discovery agent with an ActiveMQ 5 broker, utilise an agent URI of the form:
 discovery:(multicast://default?group=default)
Note
The use of default as the host in the multicast agent URI above is a special value (that is substituted by the agent with the default "239.255.2.3:6155"). You may change this to specify the actual IP and port in use with your multicast configuration.
The URI options for the multicast discovery agent are listed below:
  • group: Controls which multicast group messages are listened for on. The default value is "default".

JMS Client Logging

The JMS Client logging is handled using the Simple Logging Facade for Java (SLF4J). As the name implies, slf4j is a facade that delegates to other logging systems like log4j or JDK 1.4 logging. For more information on how to configure slf4j for specific logging systems, please consult the slf4j documentation.
you can configure a logging implementation by using the org.apache.qpid.jms.
For debugging you can enable additional protocol trace logging from the Qpid Proton AMQP 1.0 library. There are two options to enable the logging:
  • By setting the environment variable PN_TRACE_FRM to true, which enables Proton to emit frame logging to stdout.
  • Add the option amqp.traceFrames=true to the connection URI. This enables the client to add a protocol tracer to Proton, and configure the org.apache.qpid.jms.provider.amqp.FRAMES Logger to TRACE level to include the output in the logs.