Chapter 11. Reliability

In general, in a broker based architecture, the reliability feature is strictly related to the "store and forward" mechanism offered by each broker. Thanks to persistent journals, a broker can offer fault tolerance thus avoiding message loss; of course, it is not so true when messages are stored only in a volatile memory.

This is completely different using AMQ Interconnect, because each router neither takes ownership of messages nor stores them in a persistent storage. In this case, the reliability feature is offered by path redundancy which provides the possibility to reach the destination on different paths through the router network. In normal conditions, the best path is always chosen in terms of lowest cost but, when one or more routers go down, the topology is revisited by all remained routers and new paths are processed in order to reach always each destination. Of course, it means that the reliability is strictly related to the network topology the user chooses for his solution.

Because a solution based on AMQ Interconnect could be made not only by routers but by brokers too, the reliability is improved with persistent storage on them which add not only fault tolerance but temporal decoupling as well; without "store and forward" feature offered by brokers, the temporal decoupling is not possible only with routers and direct peers, both senders and receivers; the receiver must be online at same time of the sender in order to receive messages.

11.1. Path Redundancy

Offering path redundancy means designing the network topology in a way that even when one or more routers go down or even connections between them, each destination is always reachable following alternate paths through the routers that are still part of the network.

Consider the following simple scenario :

  • a network with three routers "Router.A", "Router.B" and "Router.C".
  • the "Router.A" is connected to both "Router.B" and "Router.C".
  • the "Router.C is connected to the "Router.B".
  • all three routers listen for client connections.
  • a sender client connects to the "Router.A" in order to send messages to a receiver client.
  • a receiver client connects to the "Router.B" initially in order to receive messages from the sender peer.

Figure 11.1. Path Redundancy Enabled Topology

Path Redundancy Enabled Topology

The "Router.A" configuration is something like following.

router {
    mode: interior
    id: Router.A
}

listener {
    host: 0.0.0.0
    port: 6000
    authenticatePeer: no
}

connector {
    name: INTER_ROUTER_B
    addr: 127.0.0.1
    port: 5001
    role: inter-router
}

connector {
    name: INTER_ROUTER_C
    addr: 127.0.0.1
    port: 5002
    role: inter-router
}

There is only one listener in order to accept client connections and two connector entities for connecting to the other two routers.

The "Router.B" configuration is the following.

router {
    mode: interior
    id: Router.B
}

listener {
    addr: 0.0.0.0
    port: 5001
    authenticatePeer: no
    role: inter-router
}

listener {
    host: 0.0.0.0
    port: 6001
    authenticatePeer: no
}

It has two listener entities in order to listen for connections from clients and from other routers in the network (in this case from the "Router.A" and "Router.C").

Finally, quite similar is the "Router.C" configuration.

router {
    mode: interior
    id: Router.C
}

listener {
    addr: 0.0.0.0
    port: 5002
    authenticatePeer: no
    role: inter-router
}

listener {
    host: 0.0.0.0
    port: 6002
    authenticatePeer: no
}

connector {
    name: INTER_ROUTER_B
    addr: 127.0.0.1
    port: 5001
    role: inter-router
}

It has two listener entities in order to listen for connections from clients and from other routers in the network (in this case from the "Router.A") and finally it has a connector (for connecting to the "Router.B")

Consider a sender client connected to "Router.A" and attached to my_address address which start to send messages (that is, 10 messages) and a receiver client connected to the "Router.B" and attached to the same address.

Starting the receiver, it waits for messages with no output on the console.

$ sudo python simple_recv.py -a localhost:6001/my_queue -m 10

Starting the sender, all the messages flow through "Router.A" and "Router.B" reaching the receiver; at this point the messages are all confirmed at sender side.

$ sudo python simple_send.py -a localhost:6001/my_queue -m 10
all messages confirmed

At same time, the receivers shows the messages received through the "Router.B".

{u'sequence': 1L}
{u'sequence': 2L}
{u'sequence': 3L}
{u'sequence': 4L}
{u'sequence': 5L}
{u'sequence': 6L}
{u'sequence': 7L}
{u'sequence': 8L}
{u'sequence': 9L}
{u'sequence': 10L}

The path redundancy is provided by the other available path through the "Router.A", "Router.C" and then "Router.B". It means that if the connection between "Router.A" and "Router.B" goes down, the alternative path is used to reach the receiver.

Now, consider a fault on the "Router.B"; the receiver is not reachable anymore on that path but it can connect to the "Router.C" in order to continue to receive messages from the sender which does not know what’s happened and it can continue to send messages to the "Router.A" in order to reach the receiver.

Figure 11.2. Path Redundancy after Router Failure

Path Redundancy after Router Failure

The receiver is still reachable in order to get messages from the sender as displayed in the console output.

$ sudo python simple_recv.py -a localhost:6002/my_queue -m 10
{u'sequence': 1L}
{u'sequence': 2L}
{u'sequence': 3L}
{u'sequence': 4L}
{u'sequence': 5L}
{u'sequence': 6L}
{u'sequence': 7L}
{u'sequence': 8L}
{u'sequence': 9L}
{u'sequence': 10L}

11.2. Path Redundancy and Temporal Decoupling

In order to have temporal decoupling in a solution based on AMQ Interconnect, adding one or more brokers is a must for its "store and forward" feature. Choosing the right topology, it is possible to have a solution which offers reliability with both path redundancy and permanent storing for messages.

Consider the following simple scenario :

  • a network with three routers "Router.A", "Router.B" and "Router.C" and finally a broker.
  • the "Router.A" is connected to both "Router.B" and "Router.C".
  • initially only the "Router.B" is connected to the broker.
  • all three routers listen for client connections.
  • a sender client connects to the "Router.A" in order to send messages to a queue in the broker.
  • a receiver client connects to the "Router.A" in order to get messages from the queue in the broker.

Figure 11.3. Path Redundancy and Temporal Decoupling Enabled Topology

Path Redundancy and Temporal Decoupling Enabled Topology

The receiver client can be offline when the sender starts to send messages because they’ll be stored into the queue permanently; coming back online, the receiver can get messages from the queue itself without message loss.

The "Router.A" configuration is something like following.

router {
    mode: interior
    id: Router.A
}

listener {
    host: 0.0.0.0
    port: 6000
    authenticatePeer: no
}

connector {
    name: INTER_ROUTER_B
    addr: 127.0.0.1
    port: 5001
    role: inter-router
}

connector {
    name: INTER_ROUTER_C
    addr: 127.0.0.1
    port: 5002
    role: inter-router
}

address {
    prefix: my_queue
    waypoint: yes
}

It has a listener for accepting incoming connections from clients and two connector entities in order to connect to the other routers. The queue named my_queue on the broker is exposed by a waypoint.

The "Router.B" configuration is the following.

router {
    mode: interior
    id: Router.B
}

listener {
    addr: 0.0.0.0
    port: 5001
    authenticatePeer: no
    role: inter-router
}

listener {
    host: 0.0.0.0
    port: 6001
    authenticatePeer: no
}

connector {
    name: BROKER
    addr: 127.0.0.1
    port: 5672
    role: route-container
}

address {
    prefix: my_queue
    waypoint: yes
}

autoLink {
    addr: my_queue
    connection: BROKER
    direction: in
}

autoLink {
    addr: my_queue
    connection: BROKER
    direction: out
}

It can accept incoming connections from clients and from other routers (in this case the "Router.A") and connects to the broker. The queue named my_queue on the broker is exposed by a waypoint with the related auto-links in both directions in order to send and receive messages to/from the queue itself.

Finally, the simple "Router.C" configuration.

router {
    mode: interior
    id: Router.C
}

listener {
    addr: 0.0.0.0
    port: 5002
    authenticatePeer: no
    role: inter-router
}

listener {
    host: 0.0.0.0
    port: 6002
    authenticatePeer: no
}

It can accept incoming connections from clients and from other routers (in this case the "Router.A"). Initially there is no connection between this router and the broker.

First of all, thanks to the broker and its "store and forward" feature, the sender can connect to the "Router.A" and start to send messages even if the receiver is not online in that moment. Using the Python sample from the Qpid Proton library, the console output is like following.

$ sudo python simple_send.py -a localhost:6000/my_queue -m 10
all messages confirmed

All messages are confirmed because they reached the queue inside the broker through "Router.A" and "Router.B"; it is confirmed using the qdstat tool.

$ sudo qdstat -b localhost:6001 -a
Router Addresses
  class   addr                   phs  distrib    in-proc  local  remote  cntnr  in  out  thru  to-proc  from-proc
  =================================================================================================================
  local   $_management_internal       closest    1        0      0       0      0   0    0     0        0
  local   $displayname                closest    1        0      0       0      0   0    0     0        0
  mobile  $management            0    closest    1        0      0       0      1   0    0     1        0
  local   $management                 closest    1        0      0       0      0   0    0     0        0
  router  Router.A                    closest    0        0      1       0      0   0    6     0        6
  router  Router.C                    closest    0        0      1       0      0   0    4     0        4
  mobile  my_queue               1    balanced   0        0      0       0      0   0    0     0        0
  mobile  my_queue               0    balanced   0        1      0       0      0   10   0     0        0
  local   qdhello                     flood      1        1      0       0      0   0    0     97       117
  local   qdrouter                    flood      1        0      0       0      0   0    0     7        0
  topo    qdrouter                    flood      1        0      2       0      0   0    8     13       9
  local   qdrouter.ma                 multicast  1        0      0       0      0   0    0     2        0
  topo    qdrouter.ma                 multicast  1        0      2       0      0   0    0     0        1
  local   temp.7f2u0zv9_U6QC5e        closest    0        1      0       0      0   0    0     0        0

For the "Router.B", there are 10 messages as output (from the router to the broker) on the my_queue address.

Starting the receiver connected to the "Router.A", it gets all the available messages from the queue.

$ sudo python simple_recv.py -a localhost:6000/my_queue -m 10
{u'sequence': 1L}
{u'sequence': 2L}
{u'sequence': 3L}
{u'sequence': 4L}
{u'sequence': 5L}
{u'sequence': 6L}
{u'sequence': 7L}
{u'sequence': 8L}
{u'sequence': 9L}
{u'sequence': 10L}

Using the qdstat tool on the "Router.B" another time, the output is like following.

$ sudo qdstat -b localhost:6001 -a
Router Addresses
  class   addr                   phs  distrib    in-proc  local  remote  cntnr  in  out  thru  to-proc  from-proc
  =================================================================================================================
  local   $_management_internal       closest    1        0      0       0      0   0    0     0        0
  local   $displayname                closest    1        0      0       0      0   0    0     0        0
  mobile  $management            0    closest    1        0      0       0      2   0    0     2        0
  local   $management                 closest    1        0      0       0      0   0    0     0        0
  router  Router.A                    closest    0        0      1       0      0   0    6     0        6
  router  Router.C                    closest    0        0      1       0      0   0    4     0        4
  mobile  my_queue               1    balanced   0        0      0       0      10  0    10    0        0
  mobile  my_queue               0    balanced   0        1      0       0      0   10   0     0        0
  local   qdhello                     flood      1        1      0       0      0   0    0     156      182
  local   qdrouter                    flood      1        0      0       0      0   0    0     7        0
  topo    qdrouter                    flood      1        0      2       0      0   0    10    18       11
  local   qdrouter.ma                 multicast  1        0      0       0      0   0    0     2        0
  topo    qdrouter.ma                 multicast  1        0      2       0      0   0    0     2        1
  local   temp.Xov_ZUcyti3jjXY        closest    0        1      0       0      0   0    0     0        0

For the "Router.B", there are 10 messages as input (from the broker to the router) on the my_queue address.

Now, consider a fault on the "Router.B"; in this case the broker is not reachable but it is possible to set up path redundancy through the "Router.C".

Figure 11.4. Path Redundancy and Temporal Decoupling after Router Failure

Path Redundancy and Temporal Decoupling after Router Failure

Using the qdmanage tool, it is possible to configure the waypoint on my_queue address, the related auto-links in both directions and finally the connector instance in order to enable the connection to the broker.

$ sudo qdmanage -b localhost:6002 create --stdin
[
{ "type":"connector", "name":"BROKER", "port":5672, "role":"route-container" },
{ "type":"address", "prefix":"my_queue", "waypoint":"yes" },
{ "type":"autoLink", "addr":"my_queue", "connection":"BROKER", "direction":"in" },
{ "type":"autoLink", "addr":"my_queue", "connection":"BROKER", "direction":"out" }
]
[
  {
    "verifyHostname": true,
    "stripAnnotations": "both",
    "name": "BROKER",
    "allowRedirect": true,
    "idleTimeoutSeconds": 16,
    "maxFrameSize": 65536,
    "host": "127.0.0.1",
    "cost": 1,
    "role": "route-container",
    "maxSessions": 32768,
    "type": "org.apache.qpid.dispatch.connector",
    "port": "5672",
    "identity": "connector/127.0.0.1:5672:BROKER",
    "addr": "127.0.0.1"
  },
  {
    "name": null,
    "prefix": "my_queue",
    "ingressPhase": 0,
    "waypoint": false,
    "distribution": "balanced",
    "type": "org.apache.qpid.dispatch.router.config.address",
    "identity": "7",
    "egressPhase": 0
  },
  {
    "addr": "my_queue",
    "name": null,
    "linkRef": null,
    "type": "org.apache.qpid.dispatch.router.config.autoLink",
    "operStatus": "inactive",
    "connection": "BROKER",
    "direction": "in",
    "phase": 1,
    "lastError": null,
    "externalAddr": null,
    "identity": "8",
    "containerId": null
  },
  {
    "addr": "my_queue",
    "name": null,
    "linkRef": null,
    "type": "org.apache.qpid.dispatch.router.config.autoLink",
    "operStatus": "inactive",
    "connection": "BROKER",
    "direction": "out",
    "phase": 0,
    "lastError": null,
    "externalAddr": null,
    "identity": "9",
    "containerId": null
  }
]

The "Router.C" configuration changes in the same way as "Router.B". It can accept incoming connections from clients and from other routers (in this case the "Router.A") and connects to the broker. The queue named my_queue on the broker is exposed by a waypoint with the related auto-links in both directions in order to send and receive messages to/from the queue itself.

At this point, the sender can connect to the "Router.A" for sending messages to the queue in the broker thanks to the "Router.C".

$ sudo python simple_send.py -a localhost:6000/my_queue -m 10
all messages confirmed

All messages are confirmed because they reached the queue inside the broker through "Router.A" and "Router.C"; it is confirmed using the qdstat tool.

$ sudo qdstat -b localhost:6002 -a
Router Addresses
  class   addr                   phs  distrib    in-proc  local  remote  cntnr  in  out  thru  to-proc  from-proc
  =================================================================================================================
  local   $_management_internal       closest    1        0      0       0      0   0    0     1        1
  local   $displayname                closest    1        0      0       0      0   0    0     0        0
  mobile  $management            0    closest    1        0      0       0      5   0    0     5        0
  local   $management                 closest    1        0      0       0      0   0    0     0        0
  router  Router.A                    closest    0        0      1       0      0   0    5     0        5
  mobile  my_queue               0    balanced   0        1      0       0      0   10   0     0        0
  mobile  my_queue               1    balanced   0        0      0       0      0   0    0     0        0
  local   qdhello                     flood      1        1      0       0      0   0    0     665      647
  local   qdrouter                    flood      1        0      0       0      0   0    0     8        0
  topo    qdrouter                    flood      1        0      1       0      0   0    31    52       32
  local   qdrouter.ma                 multicast  1        0      0       0      0   0    0     1        0
  topo    qdrouter.ma                 multicast  1        0      1       0      0   0    1     2        1
  local   temp.k6UMaS4P0JmtSlL        closest    0        1      0       0      0   0    0     0        0

For the "Router.C", there are 10 messages as output (from the router to the broker) on the my_queue address.

Starting the receiver connected to the "Router.A", it gets all the available messages from the queue.

$ sudo python simple_recv.py -a localhost:6000/my_queue -m 10
{u'sequence': 1L}
{u'sequence': 2L}
{u'sequence': 3L}
{u'sequence': 4L}
{u'sequence': 5L}
{u'sequence': 6L}
{u'sequence': 7L}
{u'sequence': 8L}
{u'sequence': 9L}
{u'sequence': 10L}

Using the qdstat tool on the "Router.C" another time, the output is like following.

$ sudo qdstat -b localhost:6002 -a
Router Addresses
  class   addr                   phs  distrib    in-proc  local  remote  cntnr  in  out  thru  to-proc  from-proc
  =================================================================================================================
  local   $_management_internal       closest    1        0      0       0      0   0    0     1        1
  local   $displayname                closest    1        0      0       0      0   0    0     0        0
  mobile  $management            0    closest    1        0      0       0      6   0    0     6        0
  local   $management                 closest    1        0      0       0      0   0    0     0        0
  router  Router.A                    closest    0        0      1       0      0   0    5     0        5
  mobile  my_queue               0    balanced   0        1      0       0      0   10   0     0        0
  mobile  my_queue               1    balanced   0        0      0       0      10  0    10    0        0
  local   qdhello                     flood      1        1      0       0      0   0    0     746      726
  local   qdrouter                    flood      1        0      0       0      0   0    0     8        0
  topo    qdrouter                    flood      1        0      1       0      0   0    34    55       35
  local   qdrouter.ma                 multicast  1        0      0       0      0   0    0     1        0
  topo    qdrouter.ma                 multicast  1        0      1       0      0   0    1     4        1
  local   temp.Hso3moy3l+Sn+Fy        closest    0        1      0       0      0   0    0     0        0

For the "Router.C", there are 10 messages as input (from the broker to the router) on the my_queue address.

11.3. Sharded Queue

Every broker has limits in terms of queue size but in order to overcome this problem, one possible solution is "sharding" queues : in that way a single queue is divided in more "shards" (chunks) each on a different broker. It means that such solution needs more than one broker instance in order to host a shard on each of them. Of course, a sender connected to one of these brokers can send messages to the shard hosted only on that broker. At same time, a receiver connected to a broker can get messages from the shard that is hosted on that broker and can not see available messages in the shards hosted on the other brokers, even if they are all parts of the same queue.

Note

Even if speaking about shards it is obvious that they are real queues all with same name but on different brokers. The "shard" concept is an abstract one because finally a shard is a real queue stored on a broker.

The big problem in this scenario, designed only with brokers, is that a receiver can be stucked on an empty shard without reading any messages while the shards on the other brokers have messages to deliver. it is a real problem because the receiver is interested in receiving messages from the whole queue and it does not take care if it is shared or not. Because of this problem, the receiver sees the queue as empty even if it is not so true due to the sharding and the messages available on the other shards.

The above problem can be solved adding a AMQ Interconnect instance in the network in front of the brokers and leverage on its waypoint feature with related auto-links.

Consider the following simple scenario :

  • a network with one router "Router.A" and two brokers.
  • the "Router.A" listens for clients connections and it is connected to both brokers.
  • the brokers host shards for a queue; each broker has one shard.
  • a sender client connects to the "Router.A" in order to send messages to the queue.
  • a receiver client connects to the "Router.A" in order to get messages from the queue.

Figure 11.5. Sharded Queue Enabled Topology

Sharded Queue Enabled Topology

With such solution and connecting to the "Router.A", sender and receiver do not know anything about sharding; they want send and receive messages to/from the whole queue that is the only thing they are aware of. They are both connected to the router and see only one address (related to the queue).

The "Router.A" configuration is something like following.

router {
    mode: standalone
    id: Router.A
}

listener {
    host: 0.0.0.0
    port: 6000
    authenticatePeer: no
}

connector {
    name: BROKER1
    addr: 127.0.0.1
    port: 5672
    role: route-container
}

connector {
    name: BROKER2
    addr: 127.0.0.1
    port: 5673
    role: route-container
}

address {
    prefix: my_queue
    waypoint: yes
}

autoLink {
    addr: my_queue
    connection: BROKER1
    direction: in
}

autoLink {
    addr: my_queue
    connection: BROKER1
    direction: out
}

autoLink {
    addr: my_queue
    connection: BROKER2
    direction: in
}

autoLink {
    addr: my_queue
    connection: BROKER2
    direction: out
}

The router has a listener for incoming connection from clients and two connector instances in order to connect to both brokers. The whole queue is named my_queue hosted in terms of shards on both brokers and the router is configured with a waypoint for that address. Finally, there are two auto-links in both directions for that queue on both brokers.

Using the Python sample from the Qpid Proton library, the sender can connect to the "Router.A" and start to send messages to the queue; the console output is like following.

$ sudo python simple_send.py -a localhost:6000/my_queue -m 10
all messages confirmed

All messages are confirmed because they reached the queue and, thanks to the default balanced distribution on the address, the messages are delivered to both shards on the brokers (5 messages per shard). Using the qdstat tool on the router, the distribution is clear.

$ sudo qdstat -b localhost:6000 -l
Router Links
  type      dir  conn id  id  peer  class   addr                  phs  cap  undel  unsettled  deliveries  admin    oper
  =======================================================================================================================
  endpoint  in   1        6         mobile  my_queue              1    250  0      0          0           enabled  up
  endpoint  out  1        7         mobile  my_queue              0    250  0      0          5           enabled  up
  endpoint  in   2        8         mobile  my_queue              1    250  0      0          0           enabled  up
  endpoint  out  2        9         mobile  my_queue              0    250  0      0          5           enabled  up
  endpoint  in   8        19        mobile  $management           0    250  0      0          1           enabled  up
  endpoint  out  8        20        local   temp.qCGHruCa4UIvYrS       250  0      0          0           enabled  up

There are the out links (from router to brokers) for the my_queue address (id values 7 and 9) which have each 5 deliveries. It shows messages distributed across brokers and related shards for the queue; it is confirmed by the different connections they are tied (conn id values 1 and 2).

Starting the receiver connected to the "Router.A", it gets all the available messages from the queue.

$ sudo python simple_recv.py -a localhost:6000/my_queue -m 10
{u'sequence': 1L}
{u'sequence': 2L}
{u'sequence': 3L}
{u'sequence': 4L}
{u'sequence': 5L}
{u'sequence': 6L}
{u'sequence': 7L}
{u'sequence': 8L}
{u'sequence': 9L}
{u'sequence': 10L}

As for the sender, they are received through both the brokers and related shards. it is confirmed using the qdstat tool.

$ sudo qdstat -b localhost:6000 -l
Router Links
  type      dir  conn id  id  peer  class   addr                  phs  cap  undel  unsettled  deliveries  admin    oper
  =======================================================================================================================
  endpoint  in   1        6         mobile  my_queue              1    250  0      0          5           enabled  up
  endpoint  out  1        7         mobile  my_queue              0    250  0      0          5           enabled  up
  endpoint  in   2        8         mobile  my_queue              1    250  0      0          5           enabled  up
  endpoint  out  2        9         mobile  my_queue              0    250  0      0          5           enabled  up
  endpoint  in   10       22        mobile  $management           0    250  0      0          1           enabled  up
  endpoint  out  10       23        local   temp.HT+f3ZilGP5o3wo       250  0      0          0           enabled  up

There are the in links (from brokers to router) for the my_queue address (id values 6 and 8) which have each 5 deliveries. It shows messages distributed across brokers and related shards for the queue; it is confirmed by the different connections they are tied (conn id values 1 and 2).

One disadvantage of sharded queues is that the receiver might receive messages "out of order" even with very good performance.