Red Hat Training

A Red Hat training course is available for Red Hat JBoss Enterprise Application Platform

Chapter 29. Clusters Overview

JBoss EAP messaging clusters allow groups of JBoss EAP messaging servers to be grouped together in order to share message processing load. Each active node in the cluster is an active JBoss EAP messaging server which manages its own messages and handles its own connections.

The cluster is formed by each node declaring cluster connections to other nodes in the JBoss EAP configuration file. When a node forms a cluster connection to another node, it internally creates a core bridge connection between itself and the other node. This is done transparently behind the scenes; you do not have to declare an explicit bridge for each node. These cluster connections allow messages to flow between the nodes of the cluster to balance the load.

An important part of clustering is server discovery where servers can broadcast their connection details so clients or other servers can connect to them with minimum configuration.

This section also discusses client-side load balancing, to balance client connections across the nodes of the cluster, and message redistribution, where JBoss EAP messaging will redistribute messages between nodes to avoid starvation.

Warning

Once a cluster node has been configured, it is common to simply copy that configuration to other nodes to produce a symmetric cluster.

In fact, each node in the cluster must share the same configuration for the following elements in order to avoid unexpected errors:

  • cluster-connection
  • broadcast-group
  • discovery-group
  • address-settings, including queues and topics

However, care must be taken when copying the JBoss EAP messaging files. Do not copy the messaging data, the bindings, journal, and large-messages directories from one node to another. When a cluster node is started for the first time and initializes its journal files, it persists a special identifier to the journal directory. The identifier must be unique among nodes for the cluster to form properly.

29.1. Server Discovery

Server discovery is a mechanism by which servers can propagate their connection details to:

  • Messaging clients

    A messaging client wants to be able to connect to the servers of the cluster without having specific knowledge of which servers in the cluster are up at any one time.

  • Other servers.

    Servers in a cluster want to be able to create cluster connections to each other without having prior knowledge of all the other servers in the cluster.

This information, or cluster topology, is sent around normal JBoss EAP messaging connections to clients and to other servers over cluster connections. However, you need a way to establish the initial first connection. This can be done using dynamic discovery techniques like UDP and JGroups, or by providing a static list of initial connectors.

29.1.1. Broadcast Groups

A broadcast group is the means by which a server broadcasts connectors over the network. A connector defines a way in which a client, or other server, can make connections to the server.

The broadcast group takes a set of connectors and broadcasts them on the network. Depending on which broadcasting technique you configure the cluster, it uses either UDP or JGroups to broadcast connector pairs information.

Broadcast groups are defined in the messaging-activemq subsystem of the server configuration. There can be many broadcast groups per JBoss EAP messaging server.

Configure a Broadcast Group Using UDP

Below is an example configuration of a messaging server that defines a UDP broadcast group. Note that this configuration relies on a messaging-group socket binding.

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <broadcast-group name="my-broadcast-group" connectors="http-connector" socket-binding="messaging-group"/>
    ...
  </server>
</subsystem>
...
<socket-binding-group name="standard-sockets" default-interface="public" port-offset="${jboss.socket.binding.port-offset:0}">
  ...
  <socket-binding name="messaging-group" interface="private" port="5432" multicast-address="231.7.7.7" multicast-port="9876"/>
  ...
</socket-binding-group>

This configuration can be achieved using the following management CLI commands:

  1. Add the messaging-group socket binding.

    /socket-binding-group=standard-sockets/socket-binding=messaging-group:add(interface=private,port=5432,multicast-address=231.7.7.7,multicast-port=9876)
  2. Add the broadcast group.

    /subsystem=messaging-activemq/server=default/broadcast-group=my-broadcast-group:add(socket-binding=messaging-group,broadcast-period=2000,connectors=[http-connector])
Configure a Broadcast Group Using JGroups

Below is an example configuration of a messaging server that defines broadcast group that uses the default JGroups broadcast group, which uses UDP. Note that to be able to use JGroups to broadcast, you must set a jgroups-channel.

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <broadcast-group name="my-broadcast-group" connectors="http-connector" jgroups-channel="activemq-cluster"/>
    ...
  </server>
</subsystem>

This can be configured using the following management CLI command:

/subsystem=messaging-activemq/server=default/broadcast-group=my-broadcast-group:add(connectors=[http-connector],jgroups-channel=activemq-cluster)
Broadcast Group Attributes

The below table lists the configurable attributes for a broadcast group.

AttributeDescription

broadcast-period

The period in milliseconds between consecutive broadcasts.

connectors

The names of connectors that will be broadcast.

jgroups-channel

The name used by a JGroups channel to join a cluster.

jgroups-stack

The name of a stack defined in the jgroups subsystem that is used to form a cluster.

socket-binding

The broadcast group socket binding.

29.1.2. Discovery Groups

While the broadcast group defines how connector information is broadcasted from a server, a discovery group defines how connector information is received from a broadcast endpoint, for example, a UDP multicast address or JGroup channel.

A discovery group maintains a list of connectors, one for each broadcast by a different server. As it receives broadcasts on the broadcast endpoint from a particular server, it updates its entry in the list for that server. If it has not received a broadcast from a particular server for a length of time it will remove that server’s entry from its list.

Discovery groups are used in two places in JBoss EAP messaging:

  • By cluster connections so they know how to obtain an initial connection to download the topology.
  • By messaging clients so they know how to obtain an initial connection to download the topology.

Although a discovery group will always accept broadcasts, its current list of available live and backup servers is only ever used when an initial connection is made. From then on, server discovery is done over the normal JBoss EAP messaging connections.

Note

Each discovery group must be configured with a broadcast endpoint (UDP or JGroups) that matches its broadcast group counterpart. For example, if the broadcast group is configured using UDP, the discovery group must also use UDP and the same multicast address.

29.1.2.1. Configure Discovery Groups on the Server

Discovery groups are defined in the messaging-activemq subsystem of the server configuration. There can be many discovery groups per JBoss EAP messaging server.

Configure a Discovery Group Using UDP

Below is an example configuration of a messaging server that defines a UDP discovery group. Note that this configuration relies on a messaging-group socket binding.

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <discovery-group name="my-discovery-group" refresh-timeout="10000" socket-binding="messaging-group"/>
    ...
  </server>
</subsystem>
...
<socket-binding-group name="standard-sockets" default-interface="public" port-offset="${jboss.socket.binding.port-offset:0}">
  ...
  <socket-binding name="messaging-group" interface="private" port="5432" multicast-address="231.7.7.7" multicast-port="9876"/>
  ...
</socket-binding-group>

This configuration can be achieved using the following management CLI commands:

  1. Add the messaging-group socket binding.

    /socket-binding-group=standard-sockets/socket-binding=messaging-group:add(interface=private,port=5432,multicast-address=231.7.7.7,multicast-port=9876)
  2. Add the discovery group.

    /subsystem=messaging-activemq/server=default/discovery-group=my-discovery-group:add(socket-binding=messaging-group,refresh-timeout=10000)
Configure a Discovery Group Using JGroups

Below is an example configuration of a messaging server that defines a JGroups discovery group.

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <discovery-group name="my-discovery-group" refresh-timeout="10000" jgroups-channel="activemq-cluster"/>
    ...
  </server>
</subsystem>

This can be configured using the following management CLI command:

/subsystem=messaging-activemq/server=default/discovery-group=my-discovery-group:add(refresh-timeout=10000,jgroups-channel=activemq-cluster)
Discovery Group Attributes

The below table lists the configurable attributes for a discovery group.

AttributeDescription

initial-wait-timeout

Period, in milliseconds, to wait for an initial broadcast to give us at least one node in the cluster.

jgroups-channel

The name used by a JGroups channel to join a cluster.

jgroups-stack

The name of a stack defined in the jgroups subsystem that is used to form a cluster.

refresh-timeout

Period the discovery group waits after receiving the last broadcast from a particular server before removing that server’s connector pair entry from its list.

socket-binding

The discovery group socket binding.

Warning

The JGroups attributes and UDP-specific attributes described above are exclusive of each other. Only one set can be specified in a discovery group configuration.

29.1.2.2. Configure Discovery Groups on the Client Side

You can use JMS or the core API to configure a JBoss EAP messaging client to discover a list of servers to which it can connect.

Configure Client Discovery using JMS

Clients using JMS can look up the relevant ConnectionFactory with JNDI. The entries attribute of a connection-factory or a pooled-connection-factory specifies the JNDI name under which the factory will be exposed. Below is an example of a ConnectionFactory configured for a remote client to lookup with JNDI:

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <connection-factory name="RemoteConnectionFactory" entries="java:jboss/exported/jms/RemoteConnectionFactory" connectors="http-connector"/>
    ...
  </server>
</subsystem>
Note

It is important to remember that only JNDI names bound in the java:jboss/exported namespace are available to remote clients. If a connection-factory has an entry bound in the java:jboss/exported namespace a remote client would look up the connection-factory using the text after java:jboss/exported. For example, the RemoteConnectionFactory is bound by default to java:jboss/exported/jms/RemoteConnectionFactory which means a remote client would look-up this connection-factory using jms/RemoteConnectionFactory. A pooled-connection-factory should not have any entry bound in the java:jboss/exported namespace because a pooled-connection-factory is not suitable for remote clients.

Since JMS 2.0, a default JMS connection factory is accessible to Java EE applications under the JNDI name java:comp/DefaultJMSConnectionFactory. The JBoss EAP messaging-activemq subsystem defines a pooled-connection-factory that is used to provide this default connection factory. Any parameter change on this pooled-connection-factory will be taken into account by any Java EE application looking the default JMS provider under the JNDI name java:comp/DefaultJMSConnectionFactory. Below is the default pooled connection factory as defined in the *-full and *-full-ha profiles.

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <pooled-connection-factory name="activemq-ra" transaction="xa" entries="java:/JmsXA java:jboss/DefaultJMSConnectionFactory" connectors="in-vm"/>
    ...
  </server>
</subsystem>
Configure Client Discovery using the Core API

If you are using the core API to directly instantiate ClientSessionFactory instances, then you can specify the discovery group parameters directly when creating the session factory. For example:

final String groupAddress = "231.7.7.7";
final int groupPort = 9876;

ServerLocator factory =
  ActiveMQClient.createServerLocatorWithHA(new DiscoveryGroupConfiguration(
      groupAddress,
      groupPort,
      new UDPBroadcastGroupConfiguration(groupAddress, groupPort, null, -1)));
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session1 = factory.createSession();
ClientSession session2 = factory.createSession();

You can use the setDiscoveryRefreshTimeout() setter method on the DiscoveryGroupConfiguration to set the refresh-timeout value, which defaults to 10000 milliseconds.

You can also use the setDiscoveryInitialWaitTimeout() setter method on the DiscoveryGroupConfiguration to set the initial-wait-timeout value, which determines how long the session factory will wait before creating the first session. The default value is 10000 milliseconds.

29.1.3. Static Discovery

In situations where you can not or do not want to use UDP on your network, you can configure a connection with an initial list of one or more servers.

This does not mean that you have to know where all your servers are going to be hosted. You can configure these servers to connect to a reliable server, and have their connection details propagated by way of that server.

Configuring a Cluster Connection

For cluster connections there, is no additional configuration needed, you just need to make sure that any connectors are defined in the usual manner. These are then referenced by the cluster connection configuration.

Configuring a Client Connection

A static list of possible servers can also be used by a client.

Configuring Client Discovery Using JMS

The recommended way to use static discovery with JMS is to configure a connection-factory with multiple connectors (each pointing to a unique node in the cluster) and have the client look up the ConnectionFactory using JNDI. Below is a snippet of configuration showing just such a connection-factory:

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <connection-factory name="MyConnectionFactory" entries="..." connectors="http-connector http-node1 http-node2"/>
    ...
  </server>
</subsystem>

In the above example, http-connector is an HTTP connector (<http-connector>) pointing to the local server, http-node1 is an HTTP connector pointing to server node1, and so on. See the Connectors and Acceptors section for configuring connectors in the server configuration.

Configuring Client Discovery Using the Core API

If you are using the core API, create a unique TransportConfiguration for each server in the cluster and pass them into the method responsible for creating the ServerLocator, as in the below example code.

HashMap<String, Object> map = new HashMap<String, Object>();
map.put("host", "myhost");
map.put("port", "8080");

HashMap<String, Object> map2 = new HashMap<String, Object>();
map2.put("host", "myhost2");
map2.put("port", "8080");

TransportConfiguration server1 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
TransportConfiguration server2 = new TransportConfiguration(NettyConnectorFactory.class.getName(), map2);

ServerLocator locator = ActiveMQClient.createServerLocatorWithHA(server1, server2);
ClientSessionFactory factory = locator.createSessionFactory();
ClientSession session = factory.createSession();

29.2. Server-side Message Load Balancing

If a cluster connection is defined between nodes of a cluster, then JBoss EAP messaging will load balance messages arriving at a particular node from a client.

A messaging cluster connection can be configured to load balance messages in a round robin fashion, irrespective of whether there are any matching consumers on other nodes. It can also be configured to distribute to other nodes only if matching consumers exist. See the Message Redistribution section for more information.

Configuring the Cluster Connection

A cluster connection group servers into clusters so that messages can be load balanced between the nodes of the cluster. A cluster connection is defined in the JBoss EAP server configuration using the cluster-connection element.

Warning

Red Hat supports using only one cluster-connection within the messaging-activemq subsystem.

Below is the default cluster-connection as defined in the *-full and *-full-ha profiles. See Cluster Connection Attributes for the complete list of attributes.

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <cluster-connection name="my-cluster" discovery-group="dg-group1" connector-name="http-connector" address="jms"/>
    ...
  </server>
</subsystem>

In the case shown above the cluster connection will load balance messages sent to addresses that start with "jms". This cluster connection will, in effect, apply to all JMS queues and topics since they map to core queues that start with the substring "jms".

Note

When a packet is sent using a cluster connection and is at a blocked state and waiting for acknowledgements, the call-timeout attribute specifies how long it will wait for the reply before throwing an exception. The default value is 30000. In certain cases, for example, if the remote JMS broker is disconnected from network and the transaction is incomplete, the thread could remain stuck until connection is re-established. To avoid this situation, it is recommended to use the call-failover-timeout attribute along with the call-timeout attribute. The call-failover-timeout attribute is used when a call is made during a failover attempt. The default value is -1, which means no timeout. For more information on Client Failover, see Automatic Client Failover.

Note

Alternatively, if you would like the cluster connection to use a static list of servers for discovery then you can use the static-connectors attribute. For example:

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <cluster-connection name="my-cluster" static-connectors="server0-connector server1-connector" .../>
    ...
  </server>
</subsystem>

In this example, there are two servers defined where we know that at least one will be available. There may be many more servers in the cluster, but these will be discovered using one of these connectors once an initial connection has been made.

Configuring a Cluster Connection for Duplicate Detection

The cluster connection internally uses a core bridge to move messages between nodes of the cluster. To configure a cluster connection for duplicate message detection, set the use-duplicate-detection attribute to true, which is the default value.

/subsystem=messaging-activemq/server=default/cluster-connection=my-cluster:write-attribute(name=use-duplicate-detection,value=true)

Cluster User Credentials

When creating connections between nodes of a cluster to form a cluster connection, JBoss EAP messaging uses a cluster user and password.

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <cluster password="${jboss.messaging.cluster.password:CHANGE ME!!}"/>
    ...
  </server>
</subsystem>
Warning

The default value for cluster-user is ACTIVEMQ.CLUSTER.ADMIN.USER and the default value for cluster-password is CHANGE ME!!. It is imperative that these values are changed from their default, or remote clients will be able to make connections to the server using the default values. If they are not changed from the default, JBoss EAP messaging will detect this and display a warning upon every startup.

29.3. Client-side Load balancing

With JBoss EAP messaging client-side load balancing, subsequent sessions created using a single session factory can be connected to different nodes of the cluster. This allows sessions to spread smoothly across the nodes of a cluster and not be clumped on any particular node.

The recommended way to declare a load balancing policy to be used by the client factory is to set the connection-load-balancing-policy-class-name attribute of the <connection-factory> resource. JBoss EAP messaging provides the following out-of-the-box load balancing policies, and you can also implement your own.

Round robin

With this policy, the first node is chosen randomly then each subsequent node is chosen sequentially in the same order.

For example, nodes might be chosen in the order B, C, D, A, B, C, D, A, B or D, A, B, C, D, A, B, C.

Use org.apache.activemq.artemis.api.core.client.loadbalance.RoundRobinConnectionLoadBalancingPolicy as the connection-load-balancing-policy-class-name .

Random

With this policy, each node is chosen randomly.

Use org.apache.activemq.artemis.api.core.client.loadbalance.RandomConnectionLoadBalancingPolicy as the connection-load-balancing-policy-class-name .

Random Sticky

With this policy, the first node is chosen randomly and then reused for subsequent connections.

Use org.apache.activemq.artemis.api.core.client.loadbalance.RandomStickyConnectionLoadBalancingPolicy as the connection-load-balancing-policy-class-name .

First Element

With this policy, the first, or 0th, node is always returned.

Use org.apache.activemq.artemis.api.core.client.loadbalance.FirstElementConnectionLoadBalancingPolicy as the connection-load-balancing-policy-class-name .

You can also implement your own policy by implementing the interface org.apache.activemq.artemis.api.core.client.loadbalance.ConnectionLoadBalancingPolicy

29.4. Message Redistribution

With message redistribution, JBoss EAP messaging can be configured to automatically redistribute messages from queues which have no consumers back to other nodes in the cluster which do have matching consumers. To enable this functionality, cluster connection’s message-load-balancing-type must be set to ON_DEMAND, which is the default value. You can set this using the following management CLI command.

/subsystem=messaging-activemq/server=default/cluster-connection=my-cluster:write-attribute(name=message-load-balancing-type,value=ON_DEMAND)

Message redistribution can be configured to kick in immediately after the last consumer on a queue is closed, or to wait a configurable delay after the last consumer on a queue is closed before redistributing. This is configured using the redistribution-delay attribute.

You use the redistribution-delay attribute to set how many milliseconds to wait after the last consumer is closed on a queue before redistributing messages from that queue to other nodes of the cluster that have matching consumers. A value of -1, which is the default value, means that messages will never be redistributed. A value of 0 means that messages will be immediately redistributed.

The address-setting in the default JBoss EAP configuration sets a redistribution-delay value of 1000, meaning that it will wait 1000 milliseconds before redistributing messages.

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <address-setting name="#" redistribution-delay="1000" message-counter-history-day-limit="10" page-size-bytes="2097152" max-size-bytes="10485760" expiry-address="jms.queue.ExpiryQueue" dead-letter-address="jms.queue.DLQ"/>
    ...
  </server>
</subsystem>

It often makes sense to introduce a delay before redistributing as it is a common case that a consumer closes but another one quickly is created on the same queue. In this case, you may not want to redistribute immediately since the new consumer will arrive shortly.

Below is an example of an address-setting that sets a redistribution-delay of 0 for any queue or topic that is bound to an address that starts with "jms.". In this case, messages will be redistributed immediately.

<subsystem xmlns="urn:jboss:domain:messaging-activemq:1.0">
  <server name="default">
    ...
    <address-setting name="jms.#" redistribution-delay="0"/>
    ...
  </server>
</subsystem>

This address setting can be added using the following management CLI command.

/subsystem=messaging-activemq/server=default/address-setting=jms.#:add(redistribution-delay=1000)

29.5. Clustered Message Grouping

Important

This feature is provided as Technology Preview only. It is not supported for use in a production environment, and it may be subject to significant future changes. See Technology Preview Features Support Scope on the Red Hat Customer Portal for information about the support scope for Technology Preview features.

Clustered grouping follows a different approach relative to normal message grouping. In a cluster, message groups with specific group ids can arrive on any of the nodes. It is important for a node to determine which group ids are bound to which consumer on which node. Each node is responsible for routing message groups correctly to the node which has the consumer processing those group ids, irrespective of where the message groups arrive by default. Once messages with a given group id are sent to a specific consumer connected to the given node in the cluster, then those messages are never sent to another node even if the consumer is disconnected.

This situation is addressed by a grouping handler. Each node has a grouping handler and this grouping handler (along with other handlers) is responsible for routing the message groups to the correct node. There are two types of grouping handlers: LOCAL and REMOTE.

The local handler is responsible for deciding the route that a message group should take. The remote handlers communicate with the local handler and work accordingly. Each cluster should choose a specific node to have a local grouping handler and all the other nodes should have remote handlers.

Warning

If message grouping is used in a cluster, it will break if a node configured as a remote grouping handler fails. Setting up a backup for the remote grouping handler will not correct this.

The node that initially receives a message group takes the routing decision based on regular cluster routing conditions (round-robin queue availability). The node proposes this decision to the respective grouping handler which then routes the messages to the proposed queue if it accepts the proposal.

If the grouping handler rejects the proposal, it proposes some other route and the routing takes place accordingly. The other nodes follow suite and forward the message groups to the chosen queue. After a message arrives on a queue, it is pinned to a customer on that queue.

You can configure grouping handlers using the management CLI. The following command adds a LOCAL grouping handler with the address news.europe.#.

/subsystem=messaging-activemq/server=default/grouping-handler=my-group-handler:add(grouping-handler-address="news.europe.#",type=LOCAL)

This will require a server reload.

reload

The below table lists the configurable attributes for a grouping-handler.

AttributeDescription

group-timeout

With a REMOTE handler, this value specifies how often the REMOTE will notify the LOCAL that the route was used. With a LOCAL handler, if a route is not used for the time specified, it is removed, and a new path would need to be established. The value is in milliseconds.

grouping-handler-address

A reference to a cluster connection and the address it uses.

reaper-period

How often the reaper will be run to check for timed out group bindings (only valid for LOCAL handlers).

timeout

How long to wait for a handling decision to be made; an exception will be thrown during the send if this timeout is reached, ensuring that strict ordering is kept.

type

Whether the handler is the single local handler for the cluster, which makes handling decisions, or a remote handler which converses with the local handler. Possible values are LOCAL or REMOTE.

29.5.1. Best Practices for Clustered Message Grouping

Some best practices for clustered grouping are as follows:

  • If you create and close consumers regularly, make sure that your consumers are distributed evenly across the different nodes. Once a queue is pinned, messages are automatically transferred to that queue regardless of removing customers from it.
  • If you wish to remove a queue that has a message group bound to it, make sure the queue is deleted by the session that is sending the messages. Doing this will ensure that other nodes will not try to route messages to this queue after it is removed.
  • As a failover mechanism, always replicate the node that has the local grouping handler.