-
Language:
English
-
Language:
English
Using Networks of Brokers
Networking multiple brokers for better performance
Red Hat
Copyright © 2013 Red Hat, Inc. and/or its affiliates.
Abstract
Chapter 1. Introduction
Abstract
Overview
Network of brokers
Dynamic networks
Chapter 2. Network Connectors
Abstract
Overview
Active consumers
Subscriptions
Propagation of subscriptions
ActiveMQ.Advisory
.
advisorySupport
attribute on the broker
element is not set to false
.
Network connector
networkConnector
element, which is a child of the networkConnectors
element.
Single connector
Figure 2.1. Single Connector
Example 2.1. Single connector configuration
<beans ...> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerA" brokerId="A" ... > ... <networkConnectors> <networkConnector name="linkToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3" /> </networkConnectors> ... <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61001"/> </transportConnectors> </broker> </beans>
networkConnector
element in the preceding example sets the following basic attributes:
name
- Identifies this network connector instance uniquely (for example, when monitoring the broker through JMX). If you define more than one
networkConnector
element on a broker, you must set the name in order to ensure that the connector name is unique within the scope of the broker. uri
- The discovery agent URI that returns which brokers to connect to. In other words, broker A connects to every transport URI returned by the discovery agent.In the preceding example, the static discovery agent URI returns a single transport URI,
tcp://localhost:61002
, which refers to a port opened by one of the transport connectors on broker B. networkTTL
- The network time-to-live (TTL) attribute specifies the maximum number of hops that a message can make through the broker network. It is almost always necessary to set this attribute, because the default value of 1 would only enable a message to make a single hop to a neighboring broker.
Connectors in each direction
Figure 2.2. Connectors in Each Direction
Example 2.2. Two way connector
<beans ...> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerB" brokerId="B"... > ... <networkConnectors> <networkConnector name="linkToBrokerA" uri="static:(tcp://localhost:61001)" networkTTL="3" /> </networkConnectors> ... <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61002" /> </transportConnectors> </broker> </beans>
Duplex connector
Figure 2.3. Duplex Connector
duplex
attibute to true
. For example, to make the network connector on broker A a duplex connector, you can configure it as follows:
Example 2.3. Duplex connector configuration
<networkConnectors> <networkConnector name="linkToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3" duplex="true" /> </networkConnectors>
Multiple connectors
Figure 2.4. Multiple Connectors
networkConnector
element for each connector and specify a unique name for each connector, as follows:
<networkConnectors> <networkConnector name="link01ToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3" /> <networkConnector name="link02ToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3" /> <networkConnector name="link03ToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3" /> </networkConnectors>
- Spreading the load amongst multiple connections.
- Defining separate configuration for topics and queues. That is, you can configure one network connector to transmit queue subscriptions only and another network connector to transmit topic subscriptions only.
Conduit subscriptions
Figure 2.5. Conduit Subscriptions
t
, which gives rise to the subscriptions, C1:t
and C2:t
in broker B. Both of these subscriptions propagate automatically from broker B to broker A. Because broker A has conduit subscriptions enabled, its network connector consolidates the duplicate subscriptions, C1:t
and C2:t
, into a single subscription, B:t
. Now, if a message on topic t
is sent to broker A, broker A sends a single copy of the message to broker B, to honor the conduit subscription, B:t
. Broker B then sends a copy of the message to each consumer, to honor the topic subscriptions, C1:t
and C2:t
.
B:C1:t
and B:C2:t
, would be registered in broker A. Now, if a message on topic t
is sent to broker A, broker A would send two copies of the message to broker B, to honor the topic subscriptions, B:C1:t
and B:C2:t
. Broker B would then send two copies of the message to each consumer, to honor the topic subscriptions, C1:t
and C2:t
. In other words, each consumer would receive the topic message twice.
conduitSubscriptions
attribute to false
on the networkConnector
element. See Section 9.1, “Balancing Consumer Load” for more details.
Chapter 3. Dynamic and Static Propagation
Abstract
Overview
Dynamic propagation
Figure 3.1. Dynamic Propagation of Queue Messages
- As shown in part (a), initially, there are no consumers attached to the network. A producer,
P
, connects to broker A and starts sending messages to a particular queue,TEST.FOO
. Because there are no consumers attached to the network, all of the messages accumulate in broker A. The messages do not propagate any further at this time. - As shown in part (b), a consumer, C, now connects to the network at broker E and subscribes to the same queue,
TEST.FOO
, to which the producer is sending messages. - The consumer's subscription,
s
, propagates through the broker network, following the reverse arrow direction, until it reaches broker A. - After broker A receives the subscription,
s
, it knows that it can send the messages accumulated in the queue,TEST.FOO
, to the consumer, C. Based on the information in the subscription,s
, broker A sends messages along the path ABCE to reach consumer C.
Static propagation
networkConnector
element, insert staticallyIncludedDestinations
as a child element and then list the queues and topics you want to propagate using the queue
and topic
child elements. For example, to specify that messages in the queue, TEST.FOO
, are statically propagated from A to B, you would define the network connector in broker A's configuration as follows:
<networkConnectors> <networkConnector name="linkToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3"> <staticallyIncludedDestinations> <queue physicalName="TEST.FOO"/> </staticallyIncludedDestinations> </networkConnector> </networkConnectors>
physicalName
attribute in the preceding example could be set to TEST.*
. See Chapter 4, Destination Filtering.
TEST.FOO
, are configured to propagate statically on all on all of the network connectors, (A,B)
, (B,C)
, (C,D)
, and (C,E)
.
Figure 3.2. Static Propagation of Queue Messages
- Initially, there are no consumers attached to the network. A producer,
P
, connects to broker A and sends 10 messages to the queue,TEST.FOO
. - Because the network connector,
(A,B)
, has enabled static propagation for the queue,TEST.FOO
, the 10 messages on broker A are forwarded to broker B. - Likewise, because the network connector,
(B,C)
, has enabled static propagation for the queue,TEST.FOO
, the 10 messages on broker B are forwarded to broker C. - Finally, because the network connectors,
(C,D)
and(C,E)
, have enabled static propagation for the queue,TEST.FOO
, the 10 messages on broker C are alternately sent to broker D and broker E. In other words, the brokers, D and E, receive every second message. Hence, at the end of the static propagation, there are 5 messages on broker D and 5 messages on broker E.
Duplex mode and static propagation
TEST.FOO
.
Figure 3.3. Duplex Mode and Static Propagation
TEST.FOO
. The static message propagation then proceeds as follows:
- Because the duplex connector,
{B,C}
, has enabled static propagation for the queue,TEST.FOO
, the 10 messages on broker B are forwarded to broker C. - Because the duplex connectors,
{C,D}
and{C,E}
, have enabled static propagation for the queue,TEST.FOO
, the 10 messages on broker C are alternately sent to broker D and broker E. At the end of the static propagation, there are 5 messages on broker D and 5 messages on broker E.
TEST.FOO
. Because static propagation is enabled on all of the connectors, broker C sends messages alternately to B, D, and E. At the end of the static propagation, there are 3 messages on broker B, 3 messages on broker D, and 3 messages on broker E.
Self-avoiding paths
Figure 3.4. Self-Avoiding Paths
- The producer, P, connects to broker A and sends 100 messages to the queue,
TEST.FOO
. - The 100 messages on broker A are alternately sent to broker B and broker C. The 50 messages sent to broker B are immediately forwarded to broker C, but at this point the messages stop moving and remain on broker C. The self-avoiding path strategy dictates that messages can not return to a broker they have already visited.
- Similarly, the 50 messages sent from broker A to broker C are immediately forwarded to broker B, but do not travel any further than that.
brokerId and self-avoiding paths
broker
element's brokerId
attribute) to figure out self-avoiding paths. By default, the broker ID value is generated dynamically and assigned a new value each time a broker starts up. If your network topology relies on self-avoiding paths, however, this default behavior is not appropriate. If a broker is stopped and restarted, it would rejoin the network with a different broker ID, which confuses the self-avoiding path algorithm and can lead to stuck messages.
broker
element, as shown in the following example:
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="brokerA" brokerId="A"... >
...
</broker>
Chapter 4. Destination Filtering
Abstract
Overview
- specifying which destinations' messages can pass through a connector
- excluding messages for specific destinations from passing through a connector
Destination wildcards
PRICE.STOCK
to denote all of the destinations that handle stock quotes. The application may then further segment the destination names such that all stock quotes from the New York Stock Exchange were prefixed with PRICE.STOCK.NYSE
and stock quotes from NASDAQ used the prefix PRICE.STOCK.NASDAQ
. Using wildcards would be a natural way to create filters for specific types of destinations.
Table 4.1. Destination Name Wildcards
Wildcard | Description |
---|---|
. | Separates segments in a path name. |
* | Matches any single segment in a path name. |
> | Matches any number of segments in a path name. |
Table 4.2. Example Destination Wildcards
Destination wildcard | What it matches |
---|---|
PRICE.> | Any price for any product on any exchange. |
PRICE.STOCK.> | Any price for a stock on any exchange. |
PRICE.STOCK.NASDAQ.* | Any stock price on NASDAQ. |
PRICE.STOCK.*.IBM | Any IBM stock price on any exchange. |
Filtering destinations by inclusion
dynamicallyIncludedDestinations
child to the network connector's networkConnector
element. The included destinations are specified using queue
and topic
children. Example 4.1, “Network Connector Using Inclusive Filtering” shows configuration for a network connector that only passes messages destined for queues with names that match TRADE.STOCK.>
and topics with names that match PRICE.STOCK.>
.
Example 4.1. Network Connector Using Inclusive Filtering
<networkConnectors> <networkConnector name="linkToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3"> <dynamicallyIncludedDestinations> <queue physicalName="TRADE.STOCK.>"/> <topic physicalName="PRICE.STOCK.>"/> </dynamicallyIncludedDestinations> </networkConnector> </networkConnectors>
dynamicallyIncludedDestinations
to a network connector's configuration, the network connector will only pass messages for the specified destinations.
Filtering destinations by exclusion
excludedDestinations
child to the network connector's networkConnector
element. The excluded destinations are specified using queue
and topic
children. Example 4.2, “Network Connector Using Exclusive Filtering” shows configuration for a network connector that blocks messages destined for queues with names that match TRADE.STOCK.NYSE.*
and topics with names that match PRICE.STOCK.NYSE.*
.
Example 4.2. Network Connector Using Exclusive Filtering
<networkConnectors> <networkConnector name="linkToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3"> <excludedDestinations> <queue physicalName="TRADE.STOCK.NYSE.*"/> <topic physicalName="PRICE.STOCK.NYSE.*"/> </excludedDestinations> </networkConnector> </networkConnectors>
Combining inclusive and exclusive filters
Example 4.3. Combining Exclusive and Inclusive Filters
<networkConnectors> <networkConnector name="linkToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3"> <dynamicallyIncludedDestinations> <queue physicalName="TRADE.STOCK.>"/> <topic physicalName="PRICE.STOCK.>"/> </dynamicallyIncludedDestinations> <excludedDestinations> <queue physicalName="TRADE.STOCK.NYSE.*"/> <topic physicalName="PRICE.STOCK.NYSE.*"/> </excludedDestinations> </networkConnector> </networkConnectors>
Chapter 5. Using JMS Message Selectors
Abstract
Overview
Scenarios that do not work
region
header is equal to us
. C2 selects messages for which the region
header is equal to emea
.
Figure 5.1. JMS Message Selectors and Conduit Subscriptions
s1
and s2
, automatically propagate to broker A. Because these subscriptions are both on the same queue broker A combines the subscriptions into a single conduit subscription, cs
, which does not include any selector details. When the producer P starts sending messages to the queue, broker A forwards the messages alternately to broker B and broker C without checking whether the messages satisfy the relevant selectors.
emea
end up on broker B and all of the messages for region us
end up on broker C. Chances are that the result would be somewhere in the middle. However, that means that at least some messages will sit at a broker where they will never be consumed.
Resolving the problem
networkConnector
element's conduitSubscriptions
to false
. Example 5.1, “Disabling Conduit Subscriptions” shows configuration for a network connector with conduit subscriptions disabled.
Example 5.1. Disabling Conduit Subscriptions
<networkConnectors> <networkConnector name="linkToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3" conduitSubscriptions="false" /> </networkConnectors>
- ensure that your network topology is such that messages won't be sent to brokers without appropriate consumers
- ensure that the orphaned messages will not create issues in your application
Chapter 6. Network Topologies
Abstract
Overview
Concentrator topology
Figure 6.1. Concentrator Topology
P1
to Pn
). The next layer consists of a smaller number of brokers, where each broker in the first layer connects to all of the brokers in the second layer. With this topology, each broker in the second layer can receive messages from any of the producers.
Hub and spokes topology
Figure 6.2. Hub and Spoke Topology
Tree topology
Figure 6.3. Tree Topology
R
could represent the hub in the basement of the IT department's building and A
could represent a router in the ground floor of another building. If you want to extend the LAN to the first and second floor of building A
, you are unlikely to run dedicated cables back to the IT hub for each of these floors. It is more likely that you will simply plug a second tier of routers, A1
and A2
, into the existing router, A
, on the ground floor. In this way, you effectively add another layer to the tree topology.
Mesh topology
Figure 6.4. Mesh Topology
Complete graph
n
vertices is the graph with n
vertices that has edges joining every pair of vertices. This graph is denoted by the symbol, Kn
. For example, Figure 6.5, “The Complete Graph, K5” shows the graph, K5
.
Figure 6.5. The Complete Graph, K5
networkTTL=1
in the network connector elements, in order for the broker network to function correctly.
Chapter 7. Optimizing Routes
Abstract
decreaseNetworkConsumerPriority
—deprecates the priority of a network connector based on the number of hops from the message's origin so that messages are routed along the shortest routesuppressDuplicateQueueSubscriptions
—suppresses duplicate subscriptions from intermediary brokers so that alternative paths are reduced
7.1. Choosing the Shortest Route
Overview
Connector configuration
networkConnetor
element's decreaseNetworkConsumerPriority
attribute to true
.
Example 7.1. Network Connector for Choosing the Shortest Route
<networkConnectors> <networkConnector name="linkToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3" decreaseNetworkConsumerPriority="true" /> </networkConnectors>
decreaseNetworkConsumerPriority
is set to true
, the route priority is determined as follows:
- Local consumers (attached directly to the broker) have a priority of
0
. - Network subscriptions have an initial priority of
-5
. - The priority of a network subscription is reduced by
1
for every network hop that it traverses.
decreaseNetworkConsumerPriority
on all of the connectors in your network, the brokers will not be able to accurately determine the shortest route. Some network connectors will not have the proper starting priority and will not reduce their priority as required.
Route priority and broker load
Example
decreaseNetworkConsumerPriority
in a broker network.
Figure 7.1. Shortest Route in a Mesh Network
PBAFEC1
(three broker hops), PBEC1
(one broker hop), and PBCDEC1
(three broker hops). When decreaseNetworkConsumerPriority
is enabled, the route PBEC1
has highest priority, so messages from P to C1 are sent along this route unless connector BE
's prefetch buffer is full. In the case where connector BE
's prefetch buffer is full messages will be sent to route PBAFEC1
and route PBCDEC1
on an alternating basis.
7.2. Suppressing Duplicate Routes
Abstract
Overview
Connector configuration
networkConnector
element's suppressDuplicateQueueSubscriptions
attribute to true
on all of the network connectors in you network. Example 7.2, “Network Connector that Suppresses Duplicate Routes” shows a network connector that is configured to suppress duplicate routes.
Example 7.2. Network Connector that Suppresses Duplicate Routes
<networkConnectors> <networkConnector name="linkToBrokerB" uri="static:(tcp://localhost:61002)" suppressDuplicateQueueSubscriptions="true"/> </networkConnectors>
Broker ID and duplicate routes
broker
element's brokerId
for each broker in the network. Example 7.3, “Setting a Broker's ID” shows configuration setting a broker's ID.
Example 7.3. Setting a Broker's ID
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="brokerA" brokerId="A"... >
...
</broker>
Example
2
, so two alternative routes are possible:
- the short route:
PABC1
- long route:
PACBC1
Figure 7.2. Duplicate Subscriptions in a Network
decreaseNetworkConsumerPriority
to true
, the short route is preferred. and messages are propagated along the route PABC1
. However, under heavy load conditions, the short route, PABC1
, can become overloaded and in this case the broker, A, will fall back to the long route, PACBC1
. The problem with this scenario is that when the consumer, C1, shuts down, it can lead to messages getting stuck on broker C.
suppressDuplicateQueueSubscriptions
attribute to true
will suppress the intermediary subscriptions that are generated between A and B. Because this subscription is suppressed the only route left is PACC1
. Routing becomes fully deterministic.
Chapter 8. Discovering Brokers
Abstract
- discovery agents—components that advertise the brokers available to other members of a messaging applicaiton
- discovery URI—a URI that looks up all of the discoverable brokers and presents them as a list of actual URIs for use by the client or network connector
8.1. Discovery Agents
Abstract
transportConnector
element as shown in Example 8.1, “Enabling a Discovery Agent on a Broker”.
Example 8.1. Enabling a Discovery Agent on a Broker
<transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61716" discoveryUri="multicast://default" /> </transportConnectors>
discoveryUri
attribute on the transportConnector
element is initialized to multicast://default
.
8.1.1. Fuse Fabric Discovery Agent
Abstract
Overview
URI
Example 8.2. Fuse Fabric Discovery Agent URI Format
fabric://GID
Configuring a broker
Configuring a client
Example 8.3. Client Connection URL using Fuse Fabric Discovery
discovery://(fabric://nwBrokers)
nwBrokers
broker group and generate a list of brokers to which it can connect.
8.1.2. Static Discovery Agent
Abstract
Overview
Using the agent
Example 8.4. Static Discovery Agent URI Format
static://(URI1,URI2,URI3,...)
Example
Example 8.5. Discovery URI using the Static Discovery Agent
discovery://(static://(tcp://localhost:61716,tcp://localhost:61816))
8.1.3. Multicast Discovery Agent
Abstract
Overview
URI
Example 8.6. Multicast Discovery Agent URI Format
multicast://GroupID
Configuring a broker
transportConnector
element's discoveryUri
attribute to a mulitcast discovery agent URI as shown in Example 8.7, “Enabling a Multicast Discovery Agent on a Broker”.
Example 8.7. Enabling a Multicast Discovery Agent on a Broker
<transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61716" discoveryUri="multicast://default" /> </transportConnectors>
default
.
Configuring a client
Example 8.8. Client Connection URL using Multicast Discovery
discovery://(multicast://default)
default
multicast group and generate a list of brokers to which it can connect.
8.1.4. Zeroconf Discovery Agent
Abstract
Overview
URI
Example 8.9. Zeroconf Discovery Agent URI Format
zeroconf://GroupID
Configuring a broker
transportConnector
element's discoveryUri
attribute to a mulitcast discovery agent URI as shown in Example 8.10, “Enabling a Multicast Discovery Agent on a Broker”.
Example 8.10. Enabling a Multicast Discovery Agent on a Broker
<transportConnectors> <transportConnector name="openwire" uri="tcp://localhost:61716" discoveryUri="multicast://NEGroup" /> </transportConnectors>
NEGroup
.
Configuring a client
Example 8.11. Client Connection URL using Zeroconf Discovery
discovery://(zeroconf://NEGroup)
NEGroup
multicast group and generate a list of brokers to which it can connect.
8.2. Dynamic Discovery Protocol
Abstract
Overview
URI syntax
Example 8.12. Dynamic Discovery URI
discovery://(DiscoveryAgentUri)?Options
?Options
, are specified in the form of a query list. The discovery options are described in Table 8.1, “Dynamic Discovery Protocol Options”. You can also inject transport options as described in the section called “Setting options on the discovered transports”.
Transport options
Table 8.1. Dynamic Discovery Protocol Options
Sample URI
Example 8.13. Discovery Protocol URI
discovery://(multicast://default)?initialReconnectDelay=100
Setting options on the discovered transports
connectionTimeout
option to 10 seconds.
Example 8.14. Injecting Transport Options into a Discovered Transport
discovery://(multicast://default)?connectionTimeout=10000
8.3. Fanout Protocol
Abstract
Overview
URI syntax
Example 8.15. Fanout URI Syntax
fanout://(DiscoveryAgentUri)?Options
?Options
, are specified in the form of a query list. The discovery options are described in Table 8.2, “Fanout Protocol Options”. You can also inject transport options as described in the section called “Setting options on the discovered transports”.
Transport options
Table 8.2. Fanout Protocol Options
Option Name | Default | Description |
---|---|---|
initialReconnectDelay | 10 | Specifies, in milliseconds, how long the transport will wait before the first reconnect attempt. |
maxReconnectDelay | 30000 | Specifies, in milliseconds, the maximum amount of time to wait between reconnect attempts. |
useExponentialBackOff | true | Specifies if an exponential back-off is used between reconnect attempts. |
backOffMultiplier | 2 | Specifies the exponent used in the exponential back-off algorithm. |
maxReconnectAttempts | 0 | Specifies the maximum number of reconnect attempts before an error is sent back to the client. 0 specifies unlimited attempts. |
fanOutQueues | false | Specifies whether queue messages are replicated to every connected broker. For more information see the section called “Applying fanout to queue messages”. |
minAckCount | 2 | Specifies the minimum number of brokers to which the client must connect before it sends out messages. For more informaiton see the section called “Minimum number of brokers”. |
Sample URI
Example 8.16. Fanout Protocol URI
fanout://(multicast://default)?initialReconnectDelay=100
Applying fanout to queue messages
fanOutQueues
option to true
. This configures the protocol so that it also replicates queue messages.
Minimum number of brokers
minAckCount
option.
Using fanout with a broker network
hello.jason
to broker A and connected a producer to broker B to send messages to topic hello.jason
, the consumer would get one copy of the messages. If, on the other hand, the producer connects to the network using the fanout protocol, the producer will connect to every broker in the network simultaneously and start sending messages. Each of the four brokers will receive a copy of every message and deliver its copy to the consumer. So, for each message, the consumer will get four copies.
Chapter 9. Load Balancing
Abstract
9.1. Balancing Consumer Load
Abstract
Overview
Default load behavior
Figure 9.1. Message Flow when Conduit Subscriptions Enabled
TEST.FOO
queue. Producer, P, connects to Broker A and sends 12 messages to the TEST.FOO
queue. By default conduit subscriptions are enabled and Broker A sees only a single subscription from Broker B and a single subscription from consumer C1. So, Broker A sends messages alternately to C1 and B. Assuming that C1 and B process messages at the same speed, A sends a total of 6 messages to C1 and 6 messages to B.
Disabling conduit subscriptions
networkConnector
element's conduitSubscriptions
to false
. Example 9.1, “Disabling Conduit Subscriptions” shows configuration for a network connector with conduit subscriptions disabled.
Example 9.1. Disabling Conduit Subscriptions
<networkConnectors> <networkConnector name="linkToBrokerB" uri="static:(tcp://localhost:61002)" networkTTL="3" conduitSubscriptions="false" /> </networkConnectors>
Balanced load behavior
Figure 9.2. Message Flow when Conduit Subscriptions Disabled
TEST.FOO
queue. Producer, P, connects to Broker A and sends 12 messages to the TEST.FOO
queue. With conduit subscriptions disabled, Broker A sees both of the subscriptions on Broker B and a single subscription from consumer C1. Broker A sends messages alternately to each of the subscriptions. Assuming that all of the consumers process messages at equal speeds, C1 receives 4 messages and Broker B receives 8 messages.
Separate connectors for topics and queues
conduitSubscriptions
attribute applies simultaneously to queues and topics, you cannot configure this using a single network connector. It is possible to configure topics and queues differently by using multiple network connectors: one for queues and another for topics.
queuesOnly
network connector, which has conduit subscriptions disabled, is equipped with a filter that transmits only queue messages. The topicsOnly
network connector, which has conduit subscriptions enabled, is equipped with a filter that transmits only topic messages.
Example 9.2. Separate Configuration of Topics and Queues
<networkConnectors> <networkConnector name="queuesOnly" uri="static:(tcp://localhost:61002)" networkTTL="3" conduitSubscriptions="false"> <dynamicallyIncludedDestinations> <queue physicalName=">"/> </dynamicallyIncludedDestinations> </networkConnector> <networkConnector name="topicsOnly" uri="static:(tcp://localhost:61002)" networkTTL="3"> <dynamicallyIncludedDestinations> <topic physicalName=">"/> </dynamicallyIncludedDestinations> </networkConnector> </networkConnectors>
9.2. Managing Producer Load
Overview
Concentrator topology
Figure 9.3. Load Balancing with the Concentrator Topology
- The first layer of brokers, A, B, and C, accepts connections from message producers and specializes in receiving incoming messages.
- The second layer of brokers, X and Y, accepts connections from message consumers and specializes in sending messages to the consumers.
Client configuration
Chapter 10. JMS-to-JMS Bridge
Abstract
10.1. Bridge Architecture
Overview
Figure 10.1. Architecture of the JMS-to-JMS Bridge
Wire protocols
ActiveMQ client libraries
Third-party client libraries
JMS API
Router rules
- Apache Camel JMS-to-JMS bridge—a general-purpose routing engine, which includes support for processing messages using enterprise integration patterns, and over 100 integration components (including FTP, HTTP, and Web services).
- Native ActiveMQ JMS-to-JMS bridge (deprecated)—special-purpose routing engine, which is capable of routing JMS messages between arbitrary JMS providers. This implementation includes automatic proxy support for ReplyTo messages.
10.2. Apache Camel JMS-to-JMS Bridge
10.2.1. Embedded Camel Bridge Configuration
Overview
Spring configuration
InstallDir/etc/activemq.xml
. The bridge configuration is described in detail in the following sections.
Example 10.1. Embedded Apache Camel JMS-to-JMS Bridge
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" ... > ... <broker xmlns="http://activemq.apache.org/schema/core" brokerName="${broker-name}" dataDirectory="${data}" start="false"> ... <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000"/> <!-- Create a VM endpoint to enable embedded connections --> <transportConnector uri="vm://local" /> </transportConnectors> </broker> ... <!-- Configure the ActiveMQ JMS connection factory --> <!-- Configure the Third-Party JMS connection factory --> <!-- Define the Apache Camel Routes --> </beans>
10.2.2. Configuring ActiveMQ JMS Connections
The Camel ActiveMQ component
Spring XML example
InstallDir/etc/activemq.xml
.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" ... > ... <broker xmlns="http://activemq.apache.org/schema/core" brokerName="${broker-name}" dataDirectory="${data}" start="false"> ... <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:0?maximumConnections=1000"/> <transportConnector uri="vm://local" /> </transportConnectors> </broker> ... <!-- -- Configure the ActiveMQ broker connection --> <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="${broker-name}"> <property name="brokerURL" value="vm://local?create=false"/> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="amqConnectionFactory"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig"/> </bean> ... </beans>
- The bean with ID,
activemq
, and of type,ActiveMQComponent
, defines the Camel ActiveMQ component instance. This bean overrides the default ActiveMQ component instance and implicitly associates the bean ID value,activemq
, with the URI scheme of the same name. Theactivemq
URI scheme can then be used to define endpoints of this component in a Camel route. - The bean with ID,
jmsConfig
, is used to configure the ActiveMQ component (and supports many additional options). - The bean with ID,
amqConnectionFactory
, is a JMS connection factory that is used to create connections to the ActiveMQ broker. Note the following attribute settings:- The
brokerURL
attribute specifies the transport protocol for connecting to the broker. In this case, the protocol isvm://local
, which uses the Java Virtual Machine to route messages directly to and from the embedded broker. - The
depends-on
attribute is a standard Spring attribute that ensures the ActiveMQ connection factory is created after the co-located ActiveMQ broker. This is important, because otherwise thevm:
transport would try to instantiate a default broker to connect to. For extra security, thecreate=false
flag is set on the URL to prevent this from happening.
Defining an endpoint with the activemq scheme
activemq
) is implicitly adopted as the URI scheme for defining ActiveMQ endpoints in Camel routes. For example, to define an endpoint that connects to the QueueA queue in the ActiveMQ broker, use the following URI:
activemq:queue:QueueA
TopicA
topic in the ActiveMQ broker, use the following URI:
activemq:topic:TopicA
Other types of ActiveMQ connection factory
ActiveMQConnectionFactory
- For ordinary JMS connections (includes support for JMS authentication).
ActiveMQSslConnectionFactory
- For configuring JMS connections over SSL/TLS (encrypted transport).
ActiveMQXAConnectionFactory
- For integrating the ActiveMQ client with an XA transaction manager.
References
- The ActiveMQ component chapter from the EIP Component Reference.
- The community documentation for the ActiveMQ Component.
10.2.3. Configuring Third-Party JMS Connections
The Camel JMS component
Alternative approaches
Reference a connection factory bean
javax.jms.ConnectionFactory
instance directly as a Spring bean. You can then inject this third-party connection factory bean into the configuration of the Camel JMS component.
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" ... > ... <broker xmlns="http://activemq.apache.org/schema/core" brokerName="${broker-name}" dataDirectory="${data}" start="false"> ... </broker> ... <!-- Configure IBM WebSphere MQ connection factory --> <bean id="weblogicConnectionFactory" class="com.ibm.mq.jms.MQConnectionFactory"> <property name="transportType" value="1"/> <property name="hostName" value="localhost"/> <property name="port" value="1414"/> <property name="queueManager" value="QM_TEST"/> </bean> <bean id="weblogicConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="weblogicConnectionFactory"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="weblogic" class="org.apache.camel.component.jms.JmsComponent"> <property name="configuration" ref="weblogicConfig"/> </bean> ... </beans>
Look up a connection factory in JNDI
destinationResolver
attribute of Camel's JmsComponent
class to reference a Spring JndiDestinationResolver
instance.
<beans ... > ... <broker xmlns="http://activemq.apache.org/schema/core" brokerName="${broker-name}" dataDirectory="${data}" start="false"> ... </broker> ... <!-- Configure a Spring JNDI template instance --> <bean id="jmsJndiTemplate" class="org.springframework.jndi.JndiTemplate"> <property name="environment"> <props> <prop key="java.naming.factory.initial">com.sun.jndi.ldap.LdapCtxFactory</prop> <prop key="java.naming.provider.url">ldap://server.company.com/o=company_us,c=us</prop> </props> </property> </bean> <bean id="jmsConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean"> <property name="jndiTemplate" ref="jmsJndiTemplate"/> <property name="jndiName" value="jms/weblogic-test"/> </bean> <bean id="jndiDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver"> <property name="jndiTemplate" ref="jmsJndiTemplate"/> </bean> <bean id="weblogicConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="jmsConnectionFactory"/> <property name="destinationResolver" ref="jndiDestinationResolver"/> <property name="concurrentConsumers" value="10"/> </bean> <bean id="weblogic" class="org.apache.camel.component.jms.JmsComponent"> <property name="configuration" ref="weblogicConfig"/> </bean> ... </beans>
References
- The JMS component chapter from the EIP Component Reference.
- The community documentation for the JMS Component.
10.2.4. Defining Apache Camel Routes
Overview
JMS endpoint syntax
JmsUriScheme:queue:QueueName[?Options]
JmsUriScheme:topic:TopicName[?Options]
activemq
or weblogic
.
Route syntax
camelContext
element. Each route definition appears inside a route
element, starting with a from
element (which defines a consumer endpoint for receiving messages) and ending with a to
element (which defines a producer endpoint for sending messages).
TEST.FOO
queue on the ActiveMQ broker and passing them straight on to the TEST.FOO
queue on the WebLogic messaging system, you can use the following route definition:
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="activemq:queue:TEST.FOO"/> <to uri="weblogic:queue:TEST.FOO"/> </route> </camelContext>
Sample routes
<beans ... > ... <broker xmlns="http://activemq.apache.org/schema/core" brokerName="${broker-name}" dataDirectory="${data}" start="false"> ... </broker> ... <camelContext xmlns="http://camel.apache.org/schema/spring"> <!-- Route outgoing QueueA queue --> <route> <from uri="activemq:queue:QueueA?mapJmsMessage=false"/> <to uri="weblogic:queue:QueueA"/> </route> <!-- Route outgoing TopicA topic --> <route> <from uri="activemq:topic:TopicA?mapJmsMessage=false"/> <to uri="weblogic:topic:TopicA"/> </route> <!-- Route incoming QueueX queue --> <route> <from uri="weblogic:queue:QueueX?mapJmsMessage=false"/> <to uri="activemq:queue:QueueX"/> </route> <!-- Route incoming TopicX topic --> <route> <from uri="weblogic:topic:TopicX?mapJmsMessage=false"/> <to uri="activemq:topic:TopicX"/> </route> </camelContext> </beans>
mapJmsMessage option
mapJmsMessage
set to false
. This prevents the JMS message from being parsed into the standard Java data format, thus ensuring that the message is passed straight through without processing, which gives optimum performance for a pass-through route.
true
).
Camel schema location
http://camel.apache.org/schema/spring/camel-spring.xsd
http://camel.apache.org/schema/spring/camel-spring-Version.xsd
References
- Implementing Enterprise Integration Patterns
- Routing Expression and Predicate Languages
- EIP Component Reference
10.3. Native ActiveMQ JMS-to-JMS Bridge
10.3.1. Embedded Native Configuration
Overview
Spring configuration
jmsBridgeConnectors
element, which can contain any number of jmsQueueConnector
elements and jmsTopicConnector
elements. The detailed configuration of the bridge is described in the following sections.
Example 10.2. Embedded native JMS-to-JMS Bridge
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" ... > ... <broker xmlns="http://activemq.apache.org/schema/core" id="localbroker" brokerName="localBroker" persistent="false"> <jmsBridgeConnectors> <jmsQueueConnector> ... </jmsQueueConnector> ... <jmsTopicConnector> ... </jmsTopicConnector> ... </jmsBridgeConnectors> <transportConnectors> <transportConnector uri="tcp://localhost:61234" /> </transportConnectors> </broker> ... </beans>
Router rules
Figure 10.2. Routing Rules in the JMS-to-JMS Bridge
Rule types
- Inbound queue-to-queue mapping—defines a rule for pulling messages off a queue in the third-party JMS provider and forwarding the messages to a queue (possibly with a different name) in the ActiveMQ broker.
- Outbound queue-to-queue mapping—defines a rule for pulling messages off a queue in the ActiveMQ broker and forwarding the messages to a queue (possibly with a different name) in the third-party JMS provider.
- Inbound topic-to-topic mapping—defines a rule for receiving messages from a topic in the third-party JMS provider and forwarding the messages to a topic (possibly with a different name) in the ActiveMQ broker.
- Outbound topic-to-topic mapping—defines a rule for receiving messages from a topic in the ActiveMQ broker and forwarding the messages to a topic (possibly with a different name) in the third-party JMS provider.
10.3.2. Connecting to the ActiveMQ Broker
Bootstrapping an embedded bridge
Non-embedded deployments
jmsQueueConnector
supports various attributes (localQueueConnection
, localQueueConnectionFactory
, and so on), which you can use to configure the ActiveMQ broker connection explicitly. Likewise, the jmsTopicConnector
element supports attributes for configuring an ActiveMQ broker connection explicitly. This type of deployment lies beyond the scope of the current guide. We recommend that you use an embedded deployment of the native JMS-to-JMS bridge.
10.3.3. Connecting to the Third-Party JMS Provider
Overview
Reference a connection factory bean
javax.jms.QueueConnectionFactory
instance directly as a Spring bean. You can then reference this bean from the native JMS-to-JMS bridge by setting the outboundQueueConnectionFactory
attribute of the jmsQueueConnector
element.
<beans ... > ... <broker xmlns="http://activemq.apache.org/schema/core" ... > <jmsBridgeConnectors> <jmsQueueConnector outboundQueueConnectionFactory="#remoteFactory"> ... </jmsQueueConnector> </jmsBridgeConnectors> ... </broker> ... <!-- Configure IBM WebSphere MQ queue connection factory --> <bean id="remoteFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory"> <property name="transportType" value="1"/> <property name="hostName" value="localhost"/> <property name="port" value="1414"/> <property name="queueManager" value="QM_TEST"/> </bean> ... </beans>
jmsTopicConnector
element by setting the outboundTopicConnectionFactory
attribute to reference a javax.jms.TopicConnectionFactory
instance.
Look up a connection factory in JNDI
javax.jms.QueueConnectionFactory
instance in a JNDI directory (assuming that some administrative tool has already instantiated and registered the connection factory in JNDI).
<beans ... > ... <broker xmlns="http://activemq.apache.org/schema/core" ... > <jmsBridgeConnectors> <jmsQueueConnector jndiOutboundTemplate="#remoteJndi" outboundQueueConnectionFactoryName="cn=MQQueueCF"> ... </jmsQueueConnector> </jmsBridgeConnectors> ... </broker> ... <!-- Configure a Spring JNDI template instance --> <bean id="remoteJndi" class="org.springframework.jndi.JndiTemplate"> <property name="environment"> <props> <prop key="java.naming.factory.initial"> com.sun.jndi.ldap.LdapCtxFactory </prop> <prop key="java.naming.provider.url"> ldap://server.company.com/o=company_us,c=us </prop> </props> </property> </bean> ... </beans>
jndiOutboundTemplate
attribute references an org.springframework.jndi.JndiTemplate
bean instance, which is a Spring wrapper class that configures a JNDI directory. In this example, the JNDI directory is LDAP based, so the JndiTemplate
bean is configured with the URL for connecting to the LDAP server. The outboundQueueConnectionFactoryName
attribute specifies a query on the LDAP server, which should return a javax.jms.QueueConnectionFactory
instance.
10.3.4. Configuring Queue Bridges
Overview
inboundQueueBridges
element and the outboundQueueBridges
element. Using these elements, you can specify which queues to bridge between the third-party JMS provider and the ActiveMQ broker.
Inbound queue bridges
inboundQueueBridges
element and the inboundQueueBridge
child elements are used to route queue messages from the third-party JMS provider to the ActiveMQ broker. In this case, inbound means heading into the ActiveMQ broker.
<inboundQueueBridges> <inboundQueueBridge inboundQueueName="QueueA" /> </inboundQueueBridges>
QueueA
queue on the third-party JMS provider and pushes the messages on to the QueueA
queue on the ActiveMQ broker.
<inboundQueueBridges> <inboundQueueBridge inboundQueueName="QueueA" localQueueName="org.activemq.example.QueueA" /> </inboundQueueBridges>
inboundQueueName
specifies the name of the queue on the third-party JMS provider and localQueueName
specifies the name of the queue on the ActiveMQ broker.
Outbound queue bridges
outboundQueueBridges
element and the outboundQueueBridge
child elements are used to route queue messages from the ActiveMQ broker to the third-party JMS provider. In this case, outbound means heading away from the ActiveMQ broker.
<outboundQueueBridges> <outboundQueueBridge outboundQueueName="QueueX" /> </inboundQueueBridges>
QueueX
queue on the ActiveMQ broker and pushes the messages on to the QueueX
queue on the third-party JMS provider.
<outboundQueueBridges> <outboundQueueBridge outboundQueueName="QueueX" localQueueName="org.activemq.example.QueueX" /> </inboundQueueBridges>
outboundQueueName
specifies the name of the queue on the third-party JMS provider and localQueueName
specifies the name of the queue on the ActiveMQ broker.
Sample queue bridges
QueueA
, QueueB
, and QueueC
, and three outbound queues, QueueX
, QueueY
, and QueueZ
.
Example 10.3. Sample Queue Bridges
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" ... > ... <broker xmlns="http://activemq.apache.org/schema/core" ... > <jmsBridgeConnectors> <jmsQueueConnector outboundQueueConnectionFactory="#remoteFactory"> <inboundQueueBridges> <inboundQueueBridge inboundQueueName="QueueA" /> <inboundQueueBridge inboundQueueName="QueueB" /> <inboundQueueBridge inboundQueueName="QueueC" /> </inboundQueueBridges> <outboundQueueBridges> <outboundQueueBridge outboundQueueName="QueueX" /> <outboundQueueBridge outboundQueueName="QueueY" /> <outboundQueueBridge outboundQueueName="QueueZ" /> </outboundQueueBridges> </jmsQueueConnector> </jmsBridgeConnectors> <transportConnectors> <transportConnector uri="tcp://localhost:61234" /> </transportConnectors> </broker> ... </beans>
10.3.5. Configuring Topic Bridges
Overview
inboundTopicBridges
element and the outboundTopicBridges
element. Using these elements, you can specify which topics to bridge between the third-party JMS provider and the ActiveMQ broker.
Inbound topic bridges
inboundTopicBridges
element and the inboundTopicBridge
child elements are used to route topic messages from the third-party JMS provider to the ActiveMQ broker. In this case, inbound means heading into the ActiveMQ broker.
<inboundTopicBridges> <inboundTopicBridge inboundTopicName="TopicA" /> </inboundTopicBridges>
TopicA
topic on the third-party JMS provider and pushes the messages on to the TopicA
topic on the ActiveMQ broker.
<inboundTopicBridges> <inboundTopicBridge inboundTopicName="TopicA" localTopicName="org.activemq.example.TopicA" /> </inboundTopicBridges>
inboundTopicName
specifies the name of the topic on the third-party JMS provider and localTopicName
specifies the name of the topic on the ActiveMQ broker.
Outbound topic bridges
outboundTopicBridges
element and the outboundTopicBridge
child elements are used to route topic messages from the ActiveMQ broker to the third-party JMS provider. In this case, outbound means heading away from the ActiveMQ broker.
<outboundTopicBridges> <outboundTopicBridge outboundTopicName="TopicX" /> </inboundTopicBridges>
TopicX
topic on the ActiveMQ broker and pushes the messages on to the TopicX
topic on the third-party JMS provider.
<outboundTopicBridges> <outboundTopicBridge outboundTopicName="TopicX" localTopicName="org.activemq.example.TopicX" /> </inboundTopicBridges>
outboundTopicName
specifies the name of the topic on the third-party JMS provider and localTopicName
specifies the name of the topic on the ActiveMQ broker.
Sample topic bridges
TopicA
, TopicB
, and TopicC
, and three outbound topics, TopicX
, TopicY
, and TopicZ
.
Example 10.4. Sample Topic Bridges
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" ... > ... <broker xmlns="http://activemq.apache.org/schema/core" ... > <jmsBridgeConnectors> <jmsTopicConnector outboundTopicConnectionFactory="#remoteFactory"> <inboundTopicBridges> <inboundTopicBridge inboundTopicName="TopicA" /> <inboundTopicBridge inboundTopicName="TopicB" /> <inboundTopicBridge inboundTopicName="TopicC" /> </inboundTopicBridges> <outboundTopicBridges> <outboundTopicBridge outboundTopicName="TopicX" /> <outboundTopicBridge outboundTopicName="TopicY" /> <outboundTopicBridge outboundTopicName="TopicZ" /> </outboundTopicBridges> </jmsTopicConnector> </jmsBridgeConnectors> <transportConnectors> <transportConnector uri="tcp://localhost:61234" /> </transportConnectors> </broker> ... </beans>
10.3.6. Deploying a Bridge
Overview
Deploy the Third-Party Client Libraries
wrap:
URL prefix.
foo-jms-client.jar
located in the /tmp
directory, you could deploy it into the OSGi container as follows:
JBossA-MQ:karaf@root> osgi:install -s wrap:file:///tmp/foo-jms-client.jar
wrap:
prefix.
Deploy the Bridge
InstallDir/etc/activemq.xml
file, and insert the jmsBridgeConnectors
element as a child of the broker
element, as follows:
<beans ... > <!-- Allows us to use system properties and fabric as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="properties"> <bean class="org.fusesource.mq.fabric.ConfigurationProperties"/> </property> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="${broker-name}" dataDirectory="${data}" start="false"> ... <jmsBridgeConnectors> <jmsQueueConnector> ... </jmsQueueConnector> ... <jmsTopicConnector> ... </jmsTopicConnector> ... </jmsBridgeConnectors> ... </broker> </beans>
10.3.7. Sample Bridge Configuration
Overview
ActiveMQ-to-WebSphere MQ bridge
InstallDir/etc/activemq.xml
file.
Example 10.5. ActiveMQ-to-WebSphere MQ Configuration
<beans ... > <!-- Allows us to use system properties and fabric as variables in this configuration file --> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="properties"> <bean class="org.fusesource.mq.fabric.ConfigurationProperties"/> </property> </bean> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="${broker-name}" dataDirectory="${data}" start="false"> ... <jmsBridgeConnectors> <jmsQueueConnector outboundQueueConnectionFactory="#remoteFactory"> <inboundQueueBridges> <inboundQueueBridge inboundQueueName="QueueA" /> </inboundQueueBridges> <outboundQueueBridges> <outboundQueueBridge outboundQueueName="QueueX" /> </outboundQueueBridges> </jmsQueueConnector> </jmsBridgeConnectors> ... </broker> ... <!-- Configure IBM WebSphere MQ queue connection factory --> <bean id="remoteFactory" class="com.ibm.mq.jms.MQQueueConnectionFactory"> <property name="transportType" value="1"/> <property name="hostName" value="localhost"/> <property name="port" value="1414"/> <property name="queueManager" value="QM_TEST"/> </bean> ... </beans>
QueueA
queue and a QueueX
queue on the WebSphere MQ server. ActiveMQ will create the corresponding queues dynamically—there is no need to create them in advance.
Deploying the WebSphere MQ client libraries
$MQ_INSTALL_DIR/java/lib/OSGI
com.ibm.mq.osgi.directip_7.0.1.3.jar com.ibm.msg.client.osgi.commonservices.j2se_7.0.1.3.jar com.ibm.msg.client.osgi.jms.prereq_7.0.1.3.jar com.ibm.msg.client.osgi.jms_7.0.1.3.jar com.ibm.msg.client.osgi.nls_7.0.1.3.jar com.ibm.msg.client.osgi.wmq.nls_7.0.1.3.jar com.ibm.msg.client.osgi.wmq.prereq_7.0.1.3.jar com.ibm.msg.client.osgi.wmq_7.0.1.3.jar
install
commands:
JBossA-MQ:karaf@root> osgi:install -s file:/tmp/mqclient/com.ibm.mq.osgi.directip_7.0.1.3.jar JBossA-MQ:karaf@root> osgi:install -s file:/tmp/mqclient/com.ibm.msg.client.osgi.commonservices.j2se_7.0.1.3.jar JBossA-MQ:karaf@root> osgi:install -s file:/tmp/mqclient/com.ibm.msg.client.osgi.jms.prereq_7.0.1.3.jar JBossA-MQ:karaf@root> osgi:install -s file:/tmp/mqclient/com.ibm.msg.client.osgi.jms_7.0.1.3.jar JBossA-MQ:karaf@root> osgi:install -s file:/tmp/mqclient/com.ibm.msg.client.osgi.nls_7.0.1.3.jar JBossA-MQ:karaf@root> osgi:install -s file:/tmp/mqclient/com.ibm.msg.client.osgi.wmq.nls_7.0.1.3.jar JBossA-MQ:karaf@root> osgi:install -s file:/tmp/mqclient/com.ibm.msg.client.osgi.wmq.prereq_7.0.1.3.jar JBossA-MQ:karaf@root> osgi:install -s file:/tmp/mqclient/com.ibm.msg.client.osgi.wmq_7.0.1.3.jar
10.3.8. Handling ReplyTo Destinations
Overview
ReplyTo destinations
JMSReplyTo
header, which specifies the Destination (Queue or Topic) on which the sender expects to receive a reply.
Automatic proxification
JMSReplyTo
header, a corresponding rule must be put in place to ensure that the reply message is propagated back through the bridge. In general, the most effective approach is for the bridge to create the required rule dynamically, whenever a JMSReployTo
header is encountered.
M1
, where the ReplyTo destination is a queue named ReplyA
.
Figure 10.3. Automatic Proxification in the native JMS-to-JMS Bridge
10.3.9. Implementing Message Convertors
Overview
- Inbound message convertor—converts third-party JMS messages to ActiveMQ messages.
- Outbound message convertor—converts ActiveMQ messages to third-party JMS messages.
JmsMessageConvertor interface
JmsMessageConvertor
interface, which can be used as the basis for implementing either an inbound message convertor or an outbound message convertor.
Example 10.6. JmsMessageConvertor interface
package org.apache.activemq.network.jms; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; /** * Converts Message from one JMS to another */ public interface JmsMesageConvertor { Message convert(Message message) throws JMSException; Message convert(Message message, Destination replyTo) throws JMSException; void setConnection(Connection connection); }
Message converter methods
JmsMesageConvertor
interface exposes the following methods:
Message convert(Message message)
- This variant of the
convert
method is called, if thedoHandleReplyTo
option is set tofalse
or if theReplyTo
destination on the message isnull
. In this case, theconvert
method should simply reformat the message content as required. Message convert(Message message, Destination replyTo)
- This variant of the
convert
method is called, if theReplyTo
destination on the message is non-null. In this case, in addition to reformatting the message, you have the ability to change thereplyTo
destination, so that the reply to this message is redirected to a different destination.ThereplyTo
argument contains the original destination of the message. If you want to change the ReplyTo destination, you can do so by calling themessage.setJMSReplyTo()
method, passing in the changed destination.The reason you might want to change the ReplyTo destination is in order to take control of proxification (see Section 10.3.8, “Handling ReplyTo Destinations”). Proxification is necessary, because the ActiveMQ broker is not able to make a direct connection back to the third-party JMS provider. void setConnection(Connection connection)
- Provides a reference to the
javax.jms.Connection
object to which messages will be forwarded. In other words, if this message convertor is used as an inbound message convertor, this connection is the connection to the ActiveMQ broker. If this message convertor is used as an outbound message convertor, this connection is the connection to the third-party JMS provider.
Sample implementation
Example 10.7. Sample Message Convertor
package org.apache.activemq.network.jms; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; /** * Converts Message from one JMS to another * * @org.apache.xbean.XBean */ public class SimpleJmsMessageConvertor implements JmsMesageConvertor { public Message convert(Message message) throws JMSException { return message; } public Message convert(Message message, Destination replyTo) throws JMSException { Message msg = convert(message); if (replyTo != null) { msg.setJMSReplyTo(replyTo); } else { msg.setJMSReplyTo(null); } return msg; } public void setConnection(Connection connection) { // do nothing } }
Configuring the message convertor
inboundMessageConvertor
and outboundMessageConvertor
attributes.
SimpleJmsMessageConvertor
implementation both as an inbound message convertor and as an outbound message convertor, you could configure a JMS queue connector as follows:
<jmsBridgeConnectors> <jmsQueueConnector outboundQueueConnectionFactory="#remoteFactory" inboundMessageConvertor="org.apache.activemq.network.jms.SimpleJmsMessageConvertor" outboundMessageConvertor="org.apache.activemq.network.jms.SimpleJmsMessageConvertor"> <inboundQueueBridges> <inboundQueueBridge inboundQueueName="QueueA" /> </inboundQueueBridges> <outboundQueueBridges> <outboundQueueBridge outboundQueueName="QueueX" /> </outboundQueueBridges> </jmsQueueConnector> </jmsBridgeConnectors>
10.3.10. Configuration Reference
Overview
jmsQueueConnector attributes
jmsQueueConnector
element:
id
- Optional beran ID of
xs:ID
type, which could be used to reference this bean. inboundMessageConvertor
- References a bean instance of
org.apache.activemq.network.jms.JmsMesageConvertor
type, which transforms inbound messages from the third-party JMS provider into a format that is suitable for the ActiveMQ broker. jndiLocalTemplate
- References a bean instance of
org.springframework.jndi.JndiTemplate
type, which provides access to a JNDI directory instance. Used in combination with thelocalConnectionFactoryName
attribute to locate a JMSQueueConnectionFactory
instance in a JNDI directory, where this connection factory instance is then used to connect to the local JMS provider (that is, the ActiveMQ broker). jndiOutboundTemplate
- References a bean instance of
org.springframework.jndi.JndiTemplate
type, which provides access to a JNDI directory instance. Used in combination with theoutboundQueueConnectionFactoryName
attribute to locate a JMSQueueConnectionFactory
instance in a JNDI directory, where this connection factory instance is then used to connect to the third-party JMS provider. localClientId
- Sets the ID of the local connection (useful for logging and JMX monitoring).
localConnectionFactoryName
- Specifies the JNDI name of a
QueueConnectionFactory
instance. Used in combination with thejndiLocalTemplate
attribute to connect to the local JMS provider (that is, the ActiveMQ broker). localPassword
- Specifies the password part of the credentials used to log on to the local JMS provider (ActiveMQ broker). Used in combination with the
localUsername
attribute. localQueueConnection
- References a bean of
javax.jms.QueueConnection
type, which is used to connect to the local JMS provider (ActiveMQ broker). localQueueConnectionFactory
- References a bean of
javax.jms.QueueConnectionFactory
type, which is used to connect to the local JMS provider (ActiveMQ broker). localUsername
- Specifies the username part of the credentials used to log on to the local JMS provider (ActiveMQ broker). Used in combination with the
localPassword
attribute. name
- Assigns a name to this
jmsQueueConnector
element (useful for logging and JMX monitoring). outboundClientId
- Sets the ID of the third-party connection (useful for logging and JMX monitoring).
outboundMessageConvertor
- References a bean instance of
org.apache.activemq.network.jms.JmsMesageConvertor
type, which transforms outbound messages from the ActiveMQ broker into a format that is suitable for the third-party JMS provider. outboundPassword
- Specifies the password part of the credentials used to log on to the third-party JMS provider. Used in combination with the
outboundUsername
attribute. outboundQueueConnection
- References a bean of
javax.jms.QueueConnection
type, which is used to connect to the third-party JMS provider. outboundQueueConnectionFactory
- References a bean of
javax.jms.QueueConnectionFactory
type, which is used to connect to the third-party JMS provider. outboundQueueConnectionFactoryName
- Specifies the JNDI name of a
QueueConnectionFactory
instance. Used in combination with thejndiOutboundTemplate
attribute to connect to the third-party JMS provider. outboundUsername
- Specifies the username part of the credentials used to log on to the third-party JMS provider. Used in combination with the
outboundPassword
attribute. preferJndiDestinationLookup
- Specifies whether the connector should first try to find a destination in JNDI before using JMS semantics to create a
Destination
. By default, the connector will first use JMS semantics and then fall back to JNDI look-up. Setting this attribute totrue
reverses that order.
jmsTopicConnector attributes
jmsTopicConnector
element:
id
- Optional bean ID of
xs:ID
type, which could be used to reference this bean. inboundMessageConvertor
- References a bean instance of
org.apache.activemq.network.jms.JmsMesageConvertor
type, which transforms inbound messages from the third-party JMS provider into a format that is suitable for the ActiveMQ broker. jndiLocalTemplate
- References a bean instance of
org.springframework.jndi.JndiTemplate
type, which provides access to a JNDI directory instance. Used in combination with thelocalConnectionFactoryName
attribute to locate a JMSTopicConnectionFactory
instance in a JNDI directory, where this connection factory instance is then used to connect to the local JMS provider (that is, the ActiveMQ broker). jndiOutboundTemplate
- References a bean instance of
org.springframework.jndi.JndiTemplate
type, which provides access to a JNDI directory instance. Used in combination with theoutboundTopicConnectionFactoryName
attribute to locate a JMSTopicConnectionFactory
instance in a JNDI directory, where this connection factory instance is then used to connect to the third-party JMS provider. localClientId
- Sets the ID of the local connection (useful for logging and JMX monitoring).
localConnectionFactoryName
- Specifies the JNDI name of a
TopicConnectionFactory
instance. Used in combination with thejndiLocalTemplate
attribute to connect to the local JMS provider (that is, the ActiveMQ broker). localPassword
- Specifies the password part of the credentials used to log on to the local JMS provider (ActiveMQ broker). Used in combination with the
localUsername
attribute. localTopicConnection
- References a bean of
javax.jms.TopicConnection
type, which is used to connect to the local JMS provider (ActiveMQ broker). localTopicConnectionFactory
- References a bean of
javax.jms.TopicConnectionFactory
type, which is used to connect to the local JMS provider (ActiveMQ broker). localUsername
- Specifies the username part of the credentials used to log on to the local JMS provider (ActiveMQ broker). Used in combination with the
localPassword
attribute. name
- Assigns a name to this
jmsTopicConnector
element (useful for logging and JMX monitoring). outboundClientId
- Sets the ID of the third-party connection (useful for logging and JMX monitoring).
outboundMessageConvertor
- References a bean instance of
org.apache.activemq.network.jms.JmsMesageConvertor
type, which transforms outbound messages from the ActiveMQ broker into a format that is suitable for the third-party JMS provider. outboundPassword
- Specifies the password part of the credentials used to log on to the third-party JMS provider. Used in combination with the
outboundUsername
attribute. outboundTopicConnection
- References a bean of
javax.jms.TopicConnection
type, which is used to connect to the third-party JMS provider. outboundTopicConnectionFactory
- References a bean of
javax.jms.TopicConnectionFactory
type, which is used to connect to the third-party JMS provider. outboundTopicConnectionFactoryName
- Specifies the JNDI name of a
TopicConnectionFactory
instance. Used in combination with thejndiOutboundTemplate
attribute to connect to the third-party JMS provider. outboundUsername
- Specifies the username part of the credentials used to log on to the third-party JMS provider. Used in combination with the
outboundPassword
attribute. preferJndiDestinationLookup
- Specifies whether the connector should first try to find a destination in JNDI before using JMS semantics to create a
Destination
. By default, the connector will first use JMS semantics and then fall back to JNDI look-up. Setting this attribute totrue
reverses that order.
inboundQueueBridge attributes
inboundQueueBridge
element:
doHandleReplyTo
- A boolean attribute that specifies whether ReplyTo messages should be handled or not. Default is
true
. id
- Optional bean ID of
xs:ID
type, which could be used to reference this bean. inboundQueueName
- Specifies the name of the queue in the third-party JMS provider, from which messages are consumed.
localQueueName
- Specifies the local queue name (in the ActiveMQ broker), into which messages are pushed. If not specified, defaults to the same value as
inboundQueueName
. selector
- Optionally, specifies a JMS selector string.
outboundQueueBridge attributes
outboundQueueBridge
element:
doHandleReplyTo
- A boolean attribute that specifies whether ReplyTo messages should be handled or not. Default is
true
. id
- Optional bean ID of
xs:ID
type, which could be used to reference this bean. outboundQueueName
- Specifies the name of the queue in the third-party JMS provider, into which messages are pushed.
localQueueName
- Specifies the local queue name (in the ActiveMQ broker), from which messages are consumed. If not specified, defaults to the same value as
outboundQueueName
. selector
- Optionally, specifies a JMS selector string.
inboundTopicBridge attributes
inboundTopicBridge
element:
consumerName
- If this attribute is set, the bridge creates a durable consumer for this topic.
doHandleReplyTo
- A boolean attribute that specifies whether ReplyTo messages should be handled or not. Default is
true
. id
- Optional bean ID of
xs:ID
type, which could be used to reference this bean. inboundTopicName
- Specifies the name of the topic in the third-party JMS provider, from which messages are consumed.
localTopicName
- Specifies the local topic name (in the ActiveMQ broker), into which messages are pushed. If not specified, defaults to the same value as
inboundTopicName
. selector
- Optionally, specifies a JMS selector string.
outboundTopicBridge attributes
outboundTopicBridge
element:
consumerName
- If this attribute is set, the bridge creates a durable consumer for this topic.
doHandleReplyTo
- A boolean attribute that specifies whether ReplyTo messages should be handled or not. Default is
true
. id
- Optional bean ID of
xs:ID
type, which could be used to reference this bean. outboundTopicName
- Specifies the name of the topic in the third-party JMS provider, into which messages are pushed.
localTopicName
- Specifies the local topic name (in the ActiveMQ broker), from which messages are consumed. If not specified, defaults to the same value as
outboundTopicName
. selector
- Optionally, specifies a JMS selector string.
Index
A
- active consumer, Active consumers
B
- broker
- brokerId, Broker ID and duplicate routes
- brokerId, Broker ID and duplicate routes
C
- concentrator topology, Concentrator topology
- conduit subscription
- disabling, Resolving the problem, Disabling conduit subscriptions
- impact on queues, Default load behavior
- conduitSubscriptions, Resolving the problem, Disabling conduit subscriptions
D
- decreaseNetworkConsumerPriority, Connector configuration
- destination filtering, Separate connectors for topics and queues
- by exclusion, Filtering destinations by exclusion
- by inclusion, Filtering destinations by inclusion
- destinations
- wildcards, Destination wildcards
- discovery agent
- Fuse Fabric, Fuse Fabric Discovery Agent
- multicast, Multicast Discovery Agent
- static, Static Discovery Agent
- zeroconf, Zeroconf Discovery Agent
- discovery protocol
- backOffMultiplier, Transport options
- initialReconnectDelay, Transport options
- maxReconnectAttempts, Transport options
- maxReconnectDelay, Transport options
- URI, URI syntax
- useExponentialBackOff, Transport options
- discovery URI, URI syntax
- discovery://, URI syntax
- discoveryUri, Configuring a broker, Configuring a broker
- dynamicallyIncludedDestinations, Filtering destinations by inclusion
E
- excludedDestinations, Filtering destinations by exclusion
F
- fabric://, URI
- fanout protocol
- backOffMultiplier, Transport options
- fanOutQueues, Transport options
- initialReconnectDelay, Transport options
- maxReconnectAttempts, Transport options
- maxReconnectDelay, Transport options
- minAckCount, Transport options
- URI, URI syntax
- useExponentialBackOff, Transport options
- fanout URI, URI syntax
- fanout://, URI syntax
- Fuse Fabric discovery agent
- URI, URI
M
- multicast discovery agent
- broker configuration, Configuring a broker
- URI, URI
- multicast://, URI
N
- network connectors
- networkConnector, Separate connectors for topics and queues
- conduitSubscriptions, Resolving the problem, Disabling conduit subscriptions
- decreaseNetworkConsumerPriority, Connector configuration
- dynamicallyIncludedDestinations, Filtering destinations by inclusion
- excludedDestinations, Filtering destinations by exclusion
- name, Single connector
- networkTTL, Single connector
- suppressDuplicateQueueSubscriptions, Connector configuration
- uri, Single connector
S
- shortest route, Overview
- static discovery agent
- URI, Using the agent
- static://, Using the agent
- suppressDuplicateQueueSubscriptions, Connector configuration
T
- transportConnector
- discoveryUri, Configuring a broker, Configuring a broker
W
- wildcards
- destinations, Destination wildcards
Z
- zeroconf discovery agent
- broker configuration, Configuring a broker
- URI, URI
- zeroconf://, URI
Legal Notice
Trademark Disclaimer
Legal Notice
Third Party Acknowledgements
- JLine (http://jline.sourceforge.net) jline:jline:jar:1.0License: BSD (LICENSE.txt) - Copyright (c) 2002-2006, Marc Prud'hommeaux
mwp1@cornell.edu
All rights reserved.Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:- Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
- Neither the name of JLine nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - Stax2 API (http://woodstox.codehaus.org/StAX2) org.codehaus.woodstox:stax2-api:jar:3.1.1License: The BSD License (http://www.opensource.org/licenses/bsd-license.php)Copyright (c) <YEAR>, <OWNER> All rights reserved.Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - jibx-run - JiBX runtime (http://www.jibx.org/main-reactor/jibx-run) org.jibx:jibx-run:bundle:1.2.3License: BSD (http://jibx.sourceforge.net/jibx-license.html) Copyright (c) 2003-2010, Dennis M. Sosnoski.All rights reserved.Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
- Neither the name of JiBX nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - JavaAssist (http://www.jboss.org/javassist) org.jboss.javassist:com.springsource.javassist:jar:3.9.0.GA:compileLicense: MPL (http://www.mozilla.org/MPL/MPL-1.1.html)
- HAPI-OSGI-Base Module (http://hl7api.sourceforge.net/hapi-osgi-base/) ca.uhn.hapi:hapi-osgi-base:bundle:1.2License: Mozilla Public License 1.1 (http://www.mozilla.org/MPL/MPL-1.1.txt)