Messaging Programming Reference
A Guide to Programming with Red Hat Enterprise Messaging
Abstract
Chapter 1. Introduction
1.1. Red Hat Enterprise MRG Messaging
1.2. Apache Qpid
1.3. AMQP - Advanced Message Queuing Protocol
1.4. Differences between AMQP 0-10 and AMQP 1.0
Broker Architecture
Broker Management
Symmetry
1.5. AMQP 1.0 support in MRG-M 3
1.5.1. Support for the C++ qpid::messaging API
qpid::messaging API to speak AMQP 1.0 in a clear and natural way that avoids tying its use to any particular broker.
1.5.2. Reply-To Addresses and Temporary Queues
#' character by inserting a UUID. This works well for 0-10 where the name is chosen by clients and must be unique. This transformation of the name is done when constructing an Address from a single address string (rather than from its constituent parts). The modified name can then be accessed via Address::getName().
getAddress() - has been added to both Sender and Receiver.
reply-to on any request messages they send. (This new approach will work for both 0-10 and 1.0).
1.5.3. Connections, Session and Links
protocol' connection property. The recognized values are 'amqp1.0' and 'amqp0-10'. AMQP 0-10 is still the default and the 1.0 support is only available if the required module (the Apache Proton library) is loaded.
sasl_mechanisms connection option can be set to NONE.
1.5.4. Addresses
*' or a '#') it is sent as a legacy-amqp-topic-binding, if not it is sent as a legacy-amqp-direct-binding.
#', the dynamic flag is set on the corresponding source or target and the dynamic-node-properties are populated based on the node properties. Note that when the dynamic flag is set the address should not be specified.
1.5.5. On-demand Create Workaround for Legacy Applications
create' behavior similar to that supported over 0-10. That is, it will create a node with the name specified by the client if it does not already exist. This is provided to help transition applications that rely on create policy. However, this is non-standard behavior, and new applications should not rely on this.
#' as the name, or through the create policy - the node properties are sent as dynamic-node-properties on the source or target. These can be specified in a nested map within the node. Additionally, any durable and type properties in the node map are sent. There is also a translation from the 0-10 style x-declare in the node. All fields specified in the node are included as if listed in properties.
1.5.6. Link-scoped x-declare and x-subscribe
x-declare and x-subscribe are not supported.
1.5.7. Node- and Link-scoped x-bindings
x-bindings property is not supported for AMQP 1.0 in nodes or links.
1.5.8. Delete Policy
1.5.9. Node Lifetime Policies
amqp:delete-on-close:list, amqp:delete-on-no-links:list, amqp:delete-on-no-messages:list, amqp:delete-on-no-links-or-messages:list.
delete-on-close, delete-if-unused, delete-if-empty or delete-if-unused-and-empty.
"my-queue;{create:always, node: {properties: {lifetime-policy: delete-if-empty}}}"1.5.11. Accessing AMQP Message Properties and Headers
message-id, correlation-id, user-id, subject, reply-to and content-type fields in the properties section of a 1.0 message can all be set or retrieved via accessors of the same name on the Message instance. The same is true of the durable, priority and ttl fields in the header section.
delivery-count field within the header section. There is no direct accessor for this field. However if the value is greater than 1, then the Message::getRedelivered() method returns true. If Message::setRedelivered() is called with a value of true, then the delivery count is set to 1, else it is set to 0.
application-properties section of a received 1.0 message is available via the properties map of the Message class. The properties map is used to populate the application-properties section when sending a message.
Message class.
x-amqp-<field-name>. The keys in use are: x-amqp-first-acquirer and x-amqp-delivery-count for the header section, and x-amqp-to, x-amqp-absolute-expiry-time, x-amqp-creation-time, x-amqp-group-id, x-amqp-qroup-sequence and x-amqp-reply-to-group-id for the properties section.
x-amqp-delivery-annotations and x-amqp-message-annotations respectively.
1.5.12. AMQP Support in qpidd
qpidd, the amqp module must be loaded. This allows the broker to recognize the 1.0 protocol header alongside the 0-10 one.
1.5.13. Simple Authentication and Security Layer (SASL) Support
1.5.14. Queues and Exchanges
qpid-config tool:
# qpid-config list incoming
# qpid-config list outgoing
dynamic-node-properties are used to determine the characteristics of the node created. The properties are the same as the QMF create method properties: the 0-10 defined options durable, auto-delete, alternate-exchange, exchange-type and any qpidd specific options, such as qpid.max-count.
supported-dist-modes property determines whether a queue or exchange is desired (the create method uses the 'type' property). If 'move' is specified a queue is created, if 'copy' is specified an exchange is created. If this property is not set, then a queue is assumed.
1.5.15. Filters
legacy-amqp-direct-bindinglegacy-amqp-topic-bindinglegacy-amqp-headers-bindingselector-filterxquery-filter
filter property in the link properties specified in the address. The value of this filter property should be a list of maps, with each map specifying a filter through key-value pairs for name, descriptor (can be specified as numeric or symbolic) and a value. For example:
my-xml-exchange; {link:{filter:{value:"declare variable $colour external; colour='red'",name:x,descriptor:"apache.org:xquery-filter:string"}}}Table 1.1. Filter support by Node type
| direct | topic | fanout | headers | xml | queue | |
|---|---|---|---|---|---|---|
legacy-amqp-direct-binding
|
Yes
|
Yes
|
No
|
No
|
Yes
|
Yes
|
legacy-amqp-topic-binding
|
No
|
Yes
|
No
|
No
|
No
|
Yes
|
legacy-amqp-headers-binding
|
No
|
No
|
No
|
Yes
|
No
|
No
|
xquery-filter
|
No
|
No
|
No
|
No
|
Yes
|
No
|
selector-filter
|
Yes
|
Yes
|
Yes
|
Yes
|
Yes
|
Yes
|
1.5.16. Message Conversion Between AMQP 0-10 and AMQP 1.0
message-id, correlation-id, userid, content-type and content-encoding map between the properties section in 1.0 and the message-properties in an 0-10 header. Note, however, that a 0-10 message-id must be a UUID. This field is skipped when translating a 1.0 message to 0-10 if it does not contain a valid UUID.
priority field in the header section of a 1.0 message maps to the priority field in the delivery-properties of an 0-10 message. The durable header in a 1.0 message is equivalent to the delivery-mode in the delivery-properties of an 0-10 message, with a value of true in the former being equivalent to a value of 2 in the latter and a value of false in the former equivalent to 1 in the latter.
reply-to is the routing-key. If the exchange is set then the reply-to address for 1.0 is composed from the exchange and any routing key (separated by a forward slash).
reply-to address is a queue if no type is specified. To ensure that a 0-10 routing-key for an exchange is correctly converted to a 1.0 reply-to, specify the node type in the 0-10 address, for instance 'amq.direct/rk; {node:{type:topic}}', or set the type on the Address instance.
subject field in the properties of the 1.0 message is set to the value of the routing-key from the message-properties of the 0-10 message. In the reverse direction, the subject field of the properties section of the 1.0 message populates the routing-key in the message-properties of the 0-10 message. Note that the routing-key truncates at 255 characters.
to' field of the properties section when converting to 1.0, but the reverse translation is not done (as the destination for messages sent out by the broker is controlled by the subscription in 0-10).
application-properties section of a 1.0 message is converted to the application-headers field in the message-properties of an 0-10 message and vice versa.
reply-to from 1.0 to 0-10, if the address contains a forward slash it is assumed to be of the form exchange/routing key. If it does not contain a forward slash, it is assumed to be a simple node name. If that name matches an existing queue, then the resulting 0-10 reply-to will have the exchange empty and the routing key populated with the queue name. If the name does not match an existing queue, but the name matches an exchange, then the reply-to has the exchange populated with the node name and the routing key left empty. If the node refers to neither a known queue nor exchange then the resulting reply-to will be empty.
1.5.17. Capabilities
shared' capability allows subscriptions from an exchange to be shared by multiple receivers. Where this is specified the subscription queue created takes the name of the link (and does not include the container id).
durable' capability is added if the queue or exchange referred to by the source or target is durable. The 'queue' capability is added if the source or target references a queue. The 'topic' capability is added if the source or target references an exchange. If the source or target references a queue or a direct exchange the 'legacy-amqp-direct-binding' is added. If it references a queue or a topic exchange, 'legacy-amqp-topic-binding' is added.
create-on-demand' capability is an extension that allows legacy applications to use a 'create' policy in the messaging client. If set in the client and the named node does not exist, the node is created using the dynamic-node-properties, in the same way as when the dynamic flag is set.
1.5.18. Capability Matching and Assert
assert option is not exactly equivalent to the 0-10 based mechanism. Over AMQP 1.0, the client sets the capabilities it desires, the broker sets the capabilities it can offer and when the assert option is on, the client ensures that all the capabilities it requested are supported.
durable is set in the node properties, then a capability of 'durable' is requested (meaning the node will not lose messages if its volatile memory is lost).
type is set, then that will also be passed as a requested capability. For example: 'queue' means the node supports queue-like characteristics (stores messages until consumers claim them and allocates messages between competing consumers), 'topic' means the node supports classic pub-sub characteristics.
1.5.19. Configuring Subscription Queues using Topics
# qpid-config add topic my-topic --argument exchange=amq.topic\ --argument qpid.max_count=500 --argument qpid.policy_type=self-destruct
my-topic/my-key' over 1.0 now, it will result in a subscription queue being created with a limit of 500 messages, that deletes itself (thus ending the subscription) if that limit is exceeded and is bound to 'amq.topic' with the key 'my-key'.
1.6. qpid::messaging Message::get/setContentObject()
Message::getContentObject() and Message::setContentObject() to access the semantic content of structured AMQP 1.0 messages. These methods allow the body of the message to be accessed or manipulated as a Variant. Using these methods produces the most widely applicable code as they work for both protocol versions and work with map-, list-, text- or binary- messages.
bool Formatter::isMapMsg(qpid::messaging::Message& msg) {
return(msg.getContentObject().getType() == qpid::types::VAR_MAP);
}
bool Formatter::isListMsg(qpid::messaging::Message& msg) {
return(msg.getContentObject().getType() == qpid::types::VAR_LIST);
}
qpid::types::Variant::Map Formatter::getMsgAsMap(qpid::messaging::Message& msg) {
qpid::types::Variant::Map intMap;
intMap = msg.getContentObject().asMap();
return(intMap);
}
qpid::types::Variant::List Formatter::getMsgAsList(qpid::messaging::Message& msg) {
qpid::types::Variant::List intList;
intList = msg.getContentObject().asList();
return(intList);
}Message::getContent() and Message::setContent() continue to refer to the raw bytes of the content. The encode() and decode() methods in the API continue to decode map- and list- messages in the AMQP 0-10 format.
Chapter 2. AMQP Model Overview
2.1. The Producer - Consumer Model
2.2. Consumer-driven messaging
2.3. Message Producer (Sender)
2.4. Message
2.5. Message Broker
2.6. Routing Key
x-ampq-0.10-routing-key property. However, this is managed by the Qpid Messaging API, and you do not need to manually access or set this property. The exception to this is if you are exchanging messages with another AMQP system. In that case you should understand how the Qpid Messaging API manages this property based on message and sender subject.
2.7. Message Subject
2.8. Message Properties
key:value pairs that can be set for a message. Some predefined properties are used by the message broker to determine how to treat messages while they are in transit; these message properties can be set to ensure quality of service and guaranteed delivery. Other user-defined message properties can be set for application-specific functionality.
2.9. Connection
2.10. Session
2.11. Exchange
2.12. Binding
2.13. Topic
qpid-config add topic my-topic --argument exchange=amq.topic\ --argument qpid.max_count=500 --argument qpid.policy_type=self-destruct
my-topic/my-key' over 1.0 now, it will result in a subscription queue being created with a limit of 500 messages, that deletes itself (thus ending the subscription) if that limit is exceeded and is bound to 'amq.topic' with the key 'my-key'.
2.14. Domain
sasl_mechanisms, username, password.
qpid-config, for example:
qpid-config add domain my-domain --argument url=some.hostname.com:5672
qpid-config add incoming incoming-name --argument domain=my-domain --argument source=queue1 --argument target=queue2
queue1 in the process identified by my-domain and directed into queue2 on the qpidd instance against which the command is run.
2.15. Message Queue
--queue-purge-interval. While this is not a qpid-config option, it is worth understanding that message TTL can be configured, and when the purge attempt is successful the messages are subsequently removed.
2.17. Message Consumer (Receiver)
Chapter 3. Getting Started
3.1. Getting Started with Python
3.1.1. Python Messaging Development
python interpreter to execute the file.
3.1.2. Python Client Libraries
python-qpid- Apache Qpid Python client library.
python-qpid-qmf- Queue Management Framework (QMF) Python client library.
python-saslwrapper- Python bindings for the saslwrapper library.
3.1.3. Install Python Client Libraries (Red Hat Enterprise Linux 6)
yum command.
- Red Hat Enterprise Linux Server 6
- Red Hat Enterprise Linux Workstation 6
- Red Hat Enterprise Linux Client 6
yum install python-qpid python-qpid-qmf python-saslwrapper
3.2. Getting Started with .NET
3.2.1. .NET Messaging Development
3.2.2. Windows SDK
3.2.3. Windows SDK Contents
\bin- Compiled binary (.dll and .exe) files, and the associated debug program database (.pdb) files.
- Boost library files.
- Microsoft Visual Studio runtime library files.
\docs- Apache Qpid C++ API Reference
\dotnet_examples- A Visual Studio solution file and associated project files to demonstrate using the WinSDK in C#
\examples- A Visual Studio solution file and associated project files to demonstrate using the WinSDK in unmanaged C++
\include- A directory tree of .h files
\lib- The linker .lib files that correspond to files in /bin
3.2.4. How To Download and Install the Windows SDK
3.2.4.1. Obtain the Windows SDK
Procedure 3.1. How To Obtain the Windows SDK For Your Environment
- Log in to the Red Hat Customer Portal.
- Click the
A-Ztab to sort the product list alphabetically, and then selectRed Hat Enterprise MRG Messagingto display the downloads screen. - Select the desired product version from the menu.
- Select the desired architecture from the menu.
- Locate the correct Windows SDK binary for your environment, and then click to start the download.
3.2.4.2. Install the Windows SDK
Previous Step in How To Download and Install the Windows SDK
- Unzip the downloaded Windows SDK to your filesystem.
- Copy all
qpid*andboost*files from the/bin/Release/directory into your enviroment's/bin/Release/directory.
3.3. Getting Started with C++
3.3.1. C++ Messaging Development
3.3.2. C++ on Linux
3.3.2.1. C++ Client Libraries
qpid-cpp-client- Apache Qpid C++ client library.
qpid-cpp-client-ssl- SSL support for clients.
qpid-cpp-client-rdma- RDMA Protocol support (including Infiniband) for Qpid clients.
qpid-cpp-client-devel- Header files and tools for developing Qpid C++ clients.
qpidd-cpp-client-devel-docs- AMQP client development documentation.
3.3.2.2. Install C++ Client Libraries (Red Hat Enterprise Linux 6)
yum command.
Red Hat MRG Messaging v.2 (for RHEL-6 Server) channel.
yum install qpid-cpp-client qpid-cpp-client-rdma qpid-cpp-client-ssl qpid-cpp-client-devel
3.3.2.3. Install C++ Client Libraries for MRG 3
yum command.
Red Hat MRG Messaging v.3 (for RHEL-6 Server) channel.
yum install qpid-cpp-client qpid-cpp-client-rdma qpid-cpp-client-ssl qpid-cpp-client-devel
3.3.3. C++ on Windows
3.3.3.1. Windows SDK
3.3.3.2. Windows SDK Contents
\bin- Compiled binary (.dll and .exe) files, and the associated debug program database (.pdb) files.
- Boost library files.
- Microsoft Visual Studio runtime library files.
\docs- Apache Qpid C++ API Reference
\dotnet_examples- A Visual Studio solution file and associated project files to demonstrate using the WinSDK in C#
\examples- A Visual Studio solution file and associated project files to demonstrate using the WinSDK in unmanaged C++
\include- A directory tree of .h files
\lib- The linker .lib files that correspond to files in /bin
3.3.3.3. How To Download and Install the Windows SDK
3.3.3.3.1. Obtain the Windows SDK
Procedure 3.2. How To Obtain the Windows SDK For Your Environment
- Log in to the Red Hat Customer Portal.
- Click the
A-Ztab to sort the product list alphabetically, and then selectRed Hat Enterprise MRG Messagingto display the downloads screen. - Select the desired product version from the menu.
- Select the desired architecture from the menu.
- Locate the correct Windows SDK binary for your environment, and then click to start the download.
3.3.3.3.2. Install the Windows SDK
Previous Step in How To Download and Install the Windows SDK
- Unzip the downloaded Windows SDK to your filesystem.
- Copy all
qpid*andboost*files from the/bin/Release/directory into your enviroment's/bin/Release/directory.
3.4. Getting Started with Java
3.4.1. Java Client Libraries
qpid-java-client- The Java implementation of the Qpid client
qpid-java-common- Common files for the Qpid Java client
qpid-java-example- Programming examples
3.4.2. Install Java Client Libraries (Red Hat Enterprise Linux 6)
- Subscribe your system to the
Additional Services Channels for Red Hat Enterprise Linux 6 / MRG Messaging v.2 (for RHEL-6 Server)channel. - Run the following yum command with root privileges:
yum install qpid-java-client qpid-java-common qpid-java-example
3.5. Getting Started with Ruby
3.5.1. Ruby Messaging Development
3.5.2. Ruby Client Libraries
ruby-qpid-qmf- Ruby QMF bindings
ruby-saslwrapper- Ruby bindings for the saslwrapper library
3.5.3. Install Ruby Client Libraries (Red Hat Enterprise Linux 6)
ruby-qpid-qmf package is in the main channel; the ruby-saslwrapper package is in the Optional child channel.
- Subscribe your system to one of the following channels:
Red Hat Enterprise Linux Server 6Red Hat Enterprise Linux Client 6Red Hat Enterprise Linux Workstation 6
- With root privileges run the command:
yum install ruby-qpid-qmf
- Subscribe your system one of the following channels:
Red Hat Enterprise Linux Optional Server v 6Red Hat Enterprise Linux Optional Client 6Red Hat Enterprise Linux Optional Workstation 6
- With root privileges run the command:
yum install ruby-saslwrapper
3.6. Hello World
3.6.1. Red Hat Enterprise Messaging "Hello World"
- Python
import sys from qpid.messaging import * connection = Connection("localhost:5672") try: connection.open() session = connection.session() sender = session.sender("amq.topic") receiver = session.receiver("amq.topic") message = Message("Hello World!") sender.send(message) fetchedmessage = receiver.fetch(timeout=1) print fetchedmessage.content session.acknowledge() except MessagingError,m: print m connection.close()- C#/.NET
using System; using Org.Apache.Qpid.Messaging; namespace Org.Apache.Qpid.Messaging { class Program { static void Main(string[] args) { String broker = args.Length > 0 ? args[0] : "localhost:5672"; String address = args.Length > 1 ? args[1] : "amq.topic"; Connection connection = null; try { connection = new Connection(broker); connection.Open(); Session session = connection.CreateSession(); Receiver receiver = session.CreateReceiver(address); Sender sender = session.CreateSender(address); sender.Send(new Message("Hello world!")); Message message = new Message(); message = receiver.Fetch(DurationConstants.SECOND * 1); Console.WriteLine("{0}", message.GetContentObject()); session.Acknowledge(); connection.Close(); } catch (Exception e) { Console.WriteLine("Exception {0}.", e); if (connection != null) connection.Close(); } } } }- C++
#include <qpid/messaging/Connection.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> #include <iostream> using namespace qpid::messaging; int main(int argc, char** argv) { std::string broker = argc > 1 ? argv[1] : "localhost:5672"; std::string address = argc > 2 ? argv[2] : "amq.topic"; Connection connection(broker); try { connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver(address); Sender sender = session.createSender(address); sender.send(Message("Hello world!")); Message message = receiver.fetch(Duration::SECOND * 1); std::cout << message.getContentObject() << std::endl; session.acknowledge(); connection.close(); return 0; } catch(const std::exception& error) { std::cerr << error.what() << std::endl; connection.close(); return 1; } }
3.6.2. Java JMS "Hello World" Program Listing
qpid-java-examples package.
- Java
package org.apache.qpid.example.jmsexample.hello; import javax.jms.*; import javax.naming.Context; import javax.naming.InitialContext; import java.util.Properties; public class Hello { public Hello() { } public static void main(String[] args) { Hello producer = new Hello(); producer.runTest(); } private void runTest() { try { Properties properties = new Properties(); properties.load(this.getClass().getResourceAsStream("hello.properties")); Context context = new InitialContext(properties); ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("qpidConnectionfactory"); Connection connection = connectionFactory.createConnection(); connection.start(); Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE); Destination destination = (Destination) context.lookup("topicExchange"); MessageProducer messageProducer = session.createProducer(destination); MessageConsumer messageConsumer = session.createConsumer(destination); TextMessage message = session.createTextMessage("Hello world!"); messageProducer.send(message); message = (TextMessage)messageConsumer.receive(); System.out.println(message.getText()); connection.close(); context.close(); } catch (Exception exp) { exp.printStackTrace(); } } }
hello.properties:
java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory # connectionfactory.[jndiname] = [ConnectionURL] connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' # destination.[jndiname] = [address_string] destination.topicExchange = amq.topic
3.6.3. "Hello World" Walk-through
- Python
from qpid.messaging import *
- C++
#include <qpid/messaging/Connection.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> using namespace qpid::messaging;
- C#/.NET
using Org.Apache.Qpid.Messaging; namespace Org.Apache.Qpid.Messaging {
Connection object. The Connection object constructor takes the url of the broker as its parameter:
- Python
connection = Connection("localhost:5672")- C++
Connection connection(broker);
- C#/.NET
Connection connection = null; connection = new Connection(broker);
username/password@serverurl:port. If you try this with a remote server, remember to open the firewall on the message broker to allow incoming connections for the broker port.
- C++
Connection connection(broker, "{protocol:amqp1.0}");- C#/.NET
connection = new Connection(broker, "{protocol:amqp1.0}");
open method, which opens a configured connection.
- Python
try: connection.open()
- C++
try { connection.open();- C#/.NET
connection.Open();
Connection object has a createSession method (session in Python) that returns a Session object, so we get a session from the connection that we created previously:
- Python
session = connection.session()
- C++
Session session = connection.createSession();
- C#/.NET
Session session = connection.CreateSession();
Session object has sender and receiver methods, which take a target or source address as a parameter, and return a Sender and a Receiver object, respectively. These are the objects that we need to send and receive messages, so we will create them by calling the respective methods of our session. We will use the amq.topic exchange for this demo. This is a pre-configured exchange on the broker, so we don't need to create it, and we can rely on its presence:
- Python
sender = session.sender("amq.topic") receiver = session.receiver("amq.topic")- C++
Receiver receiver = session.createReceiver(address); Sender sender = session.createSender(address);
- C#/.NET
Receiver receiver = session.CreateReceiver(address); Sender sender = session.CreateSender(address);
amq.topic exchange on the broker. Because our routing target is an exchange, it will be routed further from there by the broker.
amq.topic exchange, and our receiver will receive it in a queue.
Message object takes as a parameter to its constructor a string that becomes the message.content:
- Python
message = Message("Hello World!")
Message object constructor sets the correct content-type when you set the message.content through the constructor. However, if you set it after creating the Message object by assigning a value to the message.content property, then you also need to set the message.content_type property appropriately.
send method of our sender to send the message to the broker:
- Python
sender.send(message)
- C++
sender.send(Message("Hello world!"));- C#/.NET
sender.Send(new Message("Hello world!"));
amq.topic exchange on the message broker.
amq.topic exchange for us. The message is now waiting in that queue.
fetch method of our receiver:
- Python
fetchedmessage = receiver.fetch(timeout=1)
- C++
Message message = receiver.fetch(Duration::SECOND * 1);
- C#/.NET
Message message = new Message(); message = receiver.Fetch(DurationConstants.SECOND * 1);
timeout parameter tells fetch how long to wait for a message. If we do not set a timeout the receiver will wait indefinitely for a message to appear on the queue. If we set the timeout to 0, the receiver will check the queue and return immediately if nothing is there. We set it to timeout in 1 second to ensure ample time for our message to be routed and appear in the queue.
Fetch returns a Message object, so we will print its content property:
- Python
print fetchedmessage.content
- C++
std::cout << message.getContent() << std::endl;
- C#/.NET
Console.WriteLine("{0}", message.GetContent());
- Python
session.acknowledge()
- C++
session.acknowledge();
- C#/.NET
session.Acknowledge();
- Python
except MessagingError,m: print m connection.close()- C++
} catch(const std::exception& error) { std::cerr << error.what() << std::endl; connection.close(); return 1; }- C#/.NET
} catch (Exception e) { Console.WriteLine("Exception {0}.", e); if (connection != null) connection.Close(); }
helloworld.py, and then run it using the command python helloworld.py. If the message broker is running on your local machine, you should see the words: "Hello World!" printed on your programlisting.
Chapter 4. Beyond "Hello World"
4.1. Subscriptions
amq.topic exchange. In the background this creates a queue and subscribes it to the amq.topic exchange. Our Hello World program sender publishes to the amq.topic exchange. The amq.topic exchange is a good one to use for the demo. A topic exchange allows queues to be subscribed (to bind to the exchange) with a binding key that acts as a filter on the subject of messages sent to the exchange. Since we bind to the exchange with no binding key, we signal that we're interested in all messages coming through the exchange.
amq.topic exchange, the message is delivered to the subscription queue for our receiver. Our receiver then calls fetch() to retrieve the message from its subscription queue.
amq.topic exchange and after we send the message, register our receiver with the exchange.
- Python
sender = session.sender("amq.topic") receiver = session.receiver("amq.topic") message = Message("Hello World!") sender.send(message)- C++
Session session = connection.createSession(); Receiver receiver = session.createReceiver(address); Sender sender = session.createSender(address); sender.send(Message("Hello world!"));- C#/.NET
Session session = connection.CreateSession(); Receiver receiver = session.CreateReceiver(address); Sender sender = session.CreateSender(address); sender.Send(new Message("Hello world!"));
- Python
sender = session.sender("amq.topic") message = Message("Hello World!") sender.send(message) receiver = session.receiver("amq.topic")- C++
Session session = connection.createSession(); Sender sender = session.createSender(address); sender.send(Message("Hello world!")); Receiver receiver = session.createReceiver(address);- C#/.NET
Session session = connection.CreateSession(); Sender sender = session.CreateSender(address); sender.Send(new Message("Hello world!")); Receiver receiver = session.CreateReceiver(address);
amq.topic exchange. The exchange then delivered the message to all the subscribed queues... which was none. When our receiver subscribes to the exchange it's too late to receive the message. In the original version of the program the receiver subscribes to the exchange before the message is sent, so it receives a copy of the message in its subscription queue.
qpid-config command. Restart the broker to clear all the queues (all non-durable queues are destroyed when the broker restarts). Then run the command:
qpid-config queues
raw_input method to grab some keyboard input.
- Python
sender = session.sender("amq.topic") receiver = session.receiver("amq.topic") print "Press Enter to continue" x= raw_input() message = Message("Hello World!") sender.send(message)
qpid-config queues to examine the queues on the broker.
qpid-config queues
amq.topic exchange for us,to allow our receiver to receive messages from the exchange. You'll also see a number of other queues with the same ID number at the end of them. These are the queues that the qpid-config utility uses to query the message broker and receive the queue list you run the command. If you run the command again, you'll see that our receiver queue remains the same, and the other queues have a new ID - each time you run a qpid-config command it creates it own queues to receive a response from the server. You won't be able to see that those queues aren't there when you're not running qpid-config, because you need to run qpid-config to see the queues, but you can take my word for it.
- Version 2.2 and below
- To see the queue-exchange bindings, run:
qpid-config queues -b
The-bswitch displays bindings. You'll see that the two dynamically created queues are bound to theamq.topicexchange. - Version 2.3 and above
- To see the queue-exchange bindings, run:
qpid-config queues -r
The-rswitch displays bindings. You'll see that the two dynamically created queues are bound to theamq.topicexchange.
connection.close() ends the session, and the two exclusive queues on the broker are deleted. You can run qpid-config queues again to verify that.
amq.topic exchange. This queue is private (randomly named and exclusive), and deleted when the consumer disconnects, so it is not suitable for publishing. In order to make messages available to consumers who may or may not be connected to the exchange when the message is sent, a message-producing application needs to create a publicly-accessible queue (publishing). Consuming applications can then subscribe to this published queue and receive messages in a decoupled fashion.
4.2. Publishing
4.3. AMQP Exchange Types
- Direct
- A Direct Exchange allows a consumer to bind a queue to it with a key. When a message is received by a direct-type exchange, the message is routed to any queues whose binding key matches the subject of the message. The Direct Exchange also supports exclusive bindings, which allow a queue to monopolize messages sent to an exchange, and implement a simple direct-to-queue model.
- Topic
- A Topic Exchange allows a consumer to bind a queue to it with a key that specifies wildcard matching. The wildcard is then matched against the subject of messages sent to the exchange. This allows you to implement message filtering patterns using a topic exchange and various queues with different binding keys.
4.4. Pre-configured Exchanges
- Default exchange
- A nameless direct exchange. All queues are bound to this exchange by default, allowing them to be accessed by queue name.
amq.direct- The pre-configured named direct exchange.
amq.fanout- The pre-configured fanout exchange.
amq.match- The pre-configured headers exchange.
amq.topic- The pre-configured topic exchange.
4.5. Exchange Subscription Patterns
- copy of messages
- move of messages
- exclusive binding
A copy of messages is where each consumer gets their own copy of every message.
Note
A move of messages is where multiple consumers connect to the same queue and take messages from the queue in a round-robin fashion.
Note
The third pattern, exclusive binding, is where a consumer mandates that only the consumer may have access to messages routed to an endpoint.
Note
4.6. The Default Exchange
4.6.1. Default Exchange
4.6.2. Publish to a Queue using the Default Exchange
qpid-config:
qpid-config add queue quick-publish
{create: always} then the queue will be created if it does not already exist. In addition to always, the create command can also take the arguments sender and receiver, to indicate that the queue should be created only when a sender connects to the address, or only when a receiver connects to the address.
- Python
sender = session.sender("quick-publish; {create: always}")- C++
Sender sender = session.createSender("quick-publish; {create: always}")
4.6.3. Subscribe to the Default Exchange
quick-publish", using the Python API:
- C++
Receiver receiver = session.createReceiver('quick-publish');- Python
receiver = session.receiver('quick-publish')
quick-publish queue.
- C++
Receiver receiver = session.createReceiver('quick-publish; {mode: browse}');- Python
receiver = session.receiver('quick-publish; {mode: browse}')
create parameter:
- C++
Receiver receiver = session.createReceiver("my-own-copies-please; {create: always, node: {type: 'queue'}}");- Python
receiver = session.receiver("my-own-copies-please; {create: always, node: {type: 'queue'}}")
my-own-copies-please" already exists, then your receiver will connect to that queue. If the queue does not exist, then it will be created (all of this assumes sufficient privileges, of course).
my-own-copies-please" exists, your receiver will silently connect to that in preference to creating a queue. This is not what you intended, and will have unpredictable results. To avoid this, you can use the assert parameter, like this:
- C++
try { Receiver receiver = session.createReceiver("my-own-copies-please; {create: always, assert: always, node: {type: 'queue'}}"); } catch(const std::exception& error) { std::cerr << error.what() << std::endl; }- Python
try: receiver = session.receiver("my-own-copies-please; {create: always, assert: always, node: {type: 'queue'}}") except MessagingError m: print m
my-own-copies-please" already exists and is an exchange, the receiver constructor will raise an exception: "expected queue, got topic".
4.7. Direct Exchange
4.7.1. Direct Exchange

Figure 4.1. Direct Exchange
4.7.2. Create a Direct Exchange using qpid-config
qpid-config add exchange direct exchange name creates a new direct exchange.
qpid-config command creates a new direct exchange called engineering:
qpid-config add exchange direct engineering
4.7.3. Create a Direct Exchange from an application
engineering:
- Python
sender = session.sender('engineering;{create: always, node:{type:topic, x-declare:{type:direct}}}')
engineering already exists, the sender will not try to create a new one, but will connect to the existing one. You need to be careful, however, because if a queue with the name engineering already exists, then your sender will silently connect to that queue.
engineering, you can use assert, as in this example:
- Python
try: sender = session.sender('engineering;{create: always, node:{type:topic, x-declare:{type:direct}}, assert: always}') except MessagingError, m: print m
assert: always, node: {type: topic}; if engineering exists and is a queue, rather than an exchange, the sender constructor will raise an exception: "expected topic, got queue".
assert to verify that it is an exchange and not a queue, you cannot verify what type of exchange it is.
4.7.4. Publish to a Direct Exchange
The first is to create a sender that routes messages directly to the endpoint that you wish to publish to. Remember that a Direct Exchange requires an exact match, so you are sending to a specific destination. At the same time, bear in mind that multiple queues can bind to the exchange to receive messages routed to the same destination. So it is a specific endpoint that may have multiple consumers.
qpid-config add exchange direct finance
- Python
sender = session.sender('finance;{create:always, node: {type: topic, x-declare: {type: direct}}}')
reports endpoint on the finance exchange.
- Python
sender = session.sender('finance/reports') sender.send('Message to all consumers bound to finance with key reports')
sender will go to queues that have bound to the finance direct exchange using the key reports; with one caveat.
The second option is to create a sender that routes messages to the exchange, and use the message subject to control the routing to the specific endpoint. This way you can dynamically decide where messages will go, for example based on the names of keys that are provided at run-time, perhaps in the body of other messages.
- Python
sender = session.sender('finance; {assert: always, node: {type: topic}}') msg = Message('Message to all consumers bound to finance with key reports') msg.subject = 'reports' sender.send(msg)
subject. You can target different endpoints on that exchange by changing the subject before sending the message. For example, to send copies of the same message to finance/reports and finance/records:
- Python
sender = session.sender('finance; {assert: always, node: {type: topic}}') msg = Message('Message for reports and records') msg.subject = 'reports' sender.send(msg) msg.subject = 'records' sender.send(msg)
{assert: always, node: {type: topic}} is used to ensure that we don't inadvertently connect to a queue with the name finance bound to the default exchange. Queues and exchanges have separate namespaces, but remember that the default exchange is nameless.
As you can observe in the second case, setting the subject influences where the message is routed. If you use the first method — the sender with the subject in its address — you must be careful not to set the message subject inadvertently. The sender will write the correct subject into the message when you send it if the message subject is blank, but it will not overwrite any message subject that you provide. The first method — the sender with a subject in its address — provides a "default destination" for all messages it sends that do not have a message subject set. You can target other endpoints on the exchange by explicitly setting a subject before sending the message - in which case they go to the exchange for further routing based on your custom subject. Just be aware that setting the message subject determines its routing.
4.7.5. Subscribe to a Direct Exchange
This is the most straight-forward method to implement. Create a receiver using an address comprised of the exchange name and the routing key. For example, create a receiver on direct exchange "finance" using the "reports" key of interest:
- C++
Receiver receiver = session.createReceiver("finance/reports")- Python
receiver = session.receiver('finance/reports')
Subscription using a shared queue may be created by naming the subscription queue and defining it non-exclusive. For example:
- C++
Receiver receiver = session.createReceiver("finance/quick-publish;{link:{name:my-subscription, x-declare:{exclusive:False}}}");- Python
receiver = session.receiver('finance/quick-publish;{link:{name:my-subscription, x-declare:{exclusive:False}}}')
x-bindings. For example:
- C++
Receiver receiver = session.createReceiver("my-subscription;{create: always, node:{x-bindings: [{exchange: 'finance', key: 'quick-publish'}]}}");- Python
receiver = session.receiver('my-subscription;{create: always, node:{x-bindings: [{exchange: 'finance', key: 'quick-publish'}]}}')
my-subscription" and bound it to the direct exchange "finance" with the key "quick-publish".
Both Link-scoped x-declare and Node-scoped x-bindings clauses are not supported in AMQP 1.0, hence we request the capability of a shared subscription:
- C++
Receiver receiver = session.createReceiver("finance/quick-publish;{node: {capabilities:[shared]}, link: {name: 'my-subscription'}}");
4.7.6. Exclusive Bindings for Direct Exchanges
qpid.exclusive-binding is used to declare an exclusive binding.
drain -f "amq.direct; {create:always, link: {name:one, x-bindings:[{key:unique, arguments: {qpid.exclusive-binding:True}}]}}"4.8. Fanout Exchange
4.8.1. The pre-configured Fanout Exchange
amq.fanout.
4.8.2. Fanout Exchange

Figure 4.2. Fanout Exchange
# as their binding key.
4.8.3. Create a Fanout Exchange using qpid-config
qpid-config:
qpid-config add exchange fanout my-fanout-exchange
durable (persistent between restarts of the broker), use the --durable option:
qpid-config add exchange fanout my-fanout-exchange --durable
qpid-config exchanges command lists the exchanges on the broker.
4.8.4. Create a Fanout Exchange from an application
create: alwaysnode: {type: topic, x-declare: {exchange: exchange-name, type: fanout}}
myfanout.
- Python
tx = ssn.sender("myfanout; {create: always, node: {type: topic, x-declare: {exchange: myfanout, type: fanout}}}")
4.8.5. Publish to Multiple Queues using the Fanout Exchange
- Python
import sys from qpid.messaging import * con = Connection("localhost:5672") con.open() try: ssn = con.session() tx = ssn.sender("amq.fanout") tx.send("Hello to all consumers bound to the amq.fanout exchange") finally: con.close()
4.8.6. Subscribe to a Fanout Exchange
- Subscribe to the exchange using an ephemeral subscription. This creates and binds a temporary private queue that is destroyed when your application disconnects. This approach makes sense when you do not need to share responsibility for the messages between multiple consumers, and you do not care about messages that are sent when your application is not running or is disconnected.
- Subscribe to a queue that is bound to the exchange. This allows messages to be buffered in the queue when your application is disconnected, and allows several consumers to share responsibility for the messages in the queue.
To implement the private, ephemeral subscription, create a receiver using the name of the fanout exchange as the receiver's address. For example:
- Python
rx = receiver("amq.fanout")
To implement a shareable subscription that persists across consumer application restarts, create a queue, and subscribe to that queue.
qpid-config:
qpid-config add queue shared-q qpid-config bind amq.fanout shared-q
--durable option.
qpid-config command to view the exchange bindings after issuing these commands. On MRG Messaging 2.2 and below use the command qpid-config exchanges -b. On MRG Messaging 2.3 and above use the command qpid-config exchanges -r.
- Python
rx = receiver("shared-q")
qpid-config:
- Python
rx = receiver("shared-q;{create: always, link: {x-bindings: [{exchange: 'amq.fanout', queue: 'shared-q'}]}}")
- C++
Receiver receiver = session.createReceiver("amq.fanout;{node: {capabilities:[shared]}, link: {name: 'shared-q'}}");
4.9. Topic Exchange
4.9.1. The pre-configured Topic Exchange
amq.topic.
4.9.2. Topic Exchange

Figure 4.3. Topic Exchange
In the binding key, # matches any number of period-separated terms, and * matches a single term.
#.news will match messages with subjects such as usa.news and germany.europe.news, while a binding key of *.news will match messages with the subject usa.news, but not germany.europe.news.
4.9.3. Create a Topic Exchange using qpid-config
qpid-config command creates a topic exchange called news:
qpid-config add exchange topic news
4.9.4. Create a Topic Exchange from an application
news:
- Python
txtopic = ssn.sender("news; {create: always, node: {type: topic}}")
4.9.5. Publish to a Topic Exchange
news topic exchange with routing keys that allow geography-based subscriptions by consumers:
- Python
import sys from qpid.messaging import * conn = Connection("localhost:5672") conn.open() try: ssn = conn.session() txnews = ssn.sender("news; {create: always, node: {type: topic}}") msg = Message("News about Europe") msg.subject = "europe.news" txnews.send(msg) msg = Message("News about the US") msg.subject = "usa.news" txnews.send(msg) finally: conn.close()
4.9.6. Subscribe to a Topic Exchange
qpid-config to create a queue named news and bind it to the amq.topic exchange with a wildcard that matches everything.news, where everything is any number of period-separated terms:
qpid-config add queue news qpid-config bind amq.topic news "#.news"
news queue for all messages whose routing key ends with .news:
- Python
rxnews = ssn.receiver("news")
- Python
rxnews = ssn.receiver("news;{create: always, node: {type:queue}, link:{x-bindings:[{exchange: 'amq.topic', queue: 'news', key: '#.news'}]}}")
- C++
Receiver rxnews = ssn.createReceiver("amq.topic/#.news;{node:{capabilities:[shared]}, link:{name: 'news'}}");
- Python
rxnews = ssn.receiver("amq.topic/#.news");
# symbol will match any number of period-separated terms. The # will match exactly one term.
4.10. Headers Exchange
4.10.1. The pre-configured Headers Exchange
amq.match.
4.10.2. Headers Exchange
4.10.3. Create a Headers Exchange using qpid-config
qpid-config command creates a headers exchange called property-match:
qpid-config add exchange headers property-match
4.10.4. Create a Headers Exchange from an application
headers-match:
- Python
txheaders = ssn.sender("headers-match;{create: always, node: {type: topic, x-declare: {exchange: headers-match, type: headers}}}")
4.10.5. Publish to a Headers Exchange
properties. For example:
- Python
import sys from qpid.messaging import * conn = Connection("localhost:5672") conn.open() try: ssn = conn.session() txheaders = ssn.sender("amq.match") msg = Message("Headers Exchange message") msg.properties['header1'] = 'value1' txheaders.send(msg) finally: ssn.close()
4.10.6. Subscribe to a Headers Exchange
- Changes
- Updated April 2013.
- Updated July 2013.
match-q, and subscribes it to the amq.match exchange using a binding key that matches messages that have a header key header1 with a value of value1:
- Python
rxheaders = ssn.receiver("match-q;{create: always, node: {type: queue}, link:{x-bindings:[{key: 'binding-name', exchange: 'amq.match', queue: 'match-q', arguments:{'x-match': 'any', 'header1': 'value1'}}]}}")
- C++
Receiver rxheaders = ssn.createReceiver("amq.match; {link: {name:match-q, filter:{value:{'x-match': 'any', 'header1': 'value1'}, name: headers, descriptor:'apache.org:legacy-amqp-headers-binding:map'}}}");
x-match argument can take the values any, which matches messages with any of the key value pairs in the binding, or all, which matches messages that have all the key value pairs from the binding key in their header.
x-binding, and so a filter is used.
x-binding. Note the x-bindings argument key. This argument creates a named handle for the binding, which is visible when running qpid-config exchanges -r. Without a handle, a binding cannot be deleted by name. A null key is valid, but in addition to not being able to be deleted by name, when a binding is created with a null handle, any further attempt to create a binding with a null handle on that exchange will be update the existing binding rather than create a new one.
4.11. XML Exchange
4.11.1. Custom Exchange Types
4.11.2. The pre-configured XML Exchange Type
4.11.3. Create an XML Exchange
qpid-config command creates an XML exchange called myxml:
qpid-config add exchange xml myxml
- Python
tx = ssn.sender("myxml; {create: always, node: {type: topic, x-declare: {exchange: myxml, type: xml}}}")
4.11.4. Subscribe to the XML Exchange
myxml by creating a queue xmlq and binding it to the exchange with an XQuery.
- Python
rxXML = ssn.receiver("myxmlq; {create:always, link: { x-bindings: [{exchange:myxml, key:weather, arguments:{xquery:'./weather'} }]}}")
- C++
Receiver rxXML = ssn.createReceiver("myxml/weather; {link: {name:myxmlq, filter:{name:myfilter, descriptor:'apache.org:query-filter:string', value:'./weather'}}}");
./weather will match any messages whose body content has the root XML element <weather>.
key argument for x-bindings. This ensures that the binding has a unique name, allowing it to be deleted and updated by name, and ensuring that it is not accidentally updated, as might be the case if it were anonymous in the namespace of the exchange.
- Python
#!/usr/bin/python import sys from qpid.messaging import * conn = Connection("localhost:5672") conn.open() try: ssn = conn.session() tx = ssn.sender("myxml/weather; {create: always, node: {type: topic, x-declare: {exchange: myxml, type: xml}}}") xquerystr = 'let $w := ./weather ' xquerystr += "return $w/station = 'Raleigh-Durham International Airport (KRDU)' " xquerystr += 'and $w/temperature_f > 50 ' xquerystr += 'and $w/temperature_f - $w/dewpoint > 5 ' xquerystr += 'and $w/wind_speed_mph > 7 ' xquerystr += 'and $w/wind_speed_mph < 20' rxaddr = 'myxmlq; {create: always, ' rxaddr += 'link: {x-bindings: [{exchange: myxml, ' rxaddr += 'key: weather, ' rxaddr += 'arguments: {xquery: "' + xquerystr + '"' rxaddr += '}}]}}' rx = ssn.receiver(rxaddr) msgstr = '<weather>' msgstr += '<station>Raleigh-Durham International Airport (KRDU)</station>' msgstr += '<wind_speed_mph>16</wind_speed_mph>' msgstr += '<temperature_f>70</temperature_f>' msgstr += '<dewpoint>35</dewpoint>' msgstr += '</weather>' msg = Message(msgstr) tx.send(msg) rxmsg = rx.fetch(timeout=1) print rxmsg ssn.acknowledge() finally: conn.close()
Chapter 5. Message Delivery and Acceptance
5.1. The Lifecycle of a Message
5.1.1. Message Delivery Overview

Figure 5.1. Fanout Exchange
message.subject, which acts as the routing key (2), and then send the message to the broker (3).
5.1.2. Message Generation
Message object is used to generate a message.
- Python
import sys from qpid.messaging import * ... msg = Message('This is the message content') msg.content = 'Message content can be assigned like this' msg.properties['header-key'] = 'value' tx = ssn.sender('amq.topic') # msg.subject set by sender for routing purposes tx.send(msg) msg.subject = 'Messaging Routing Key can also be manually set' # beware that this will interfere with sender-object-based routing
5.1.3. Message Send over Reliable Link
- The sender passes the message to the broker.
- The broker responds with an acknowledgement that it takes responsibility for delivery of the message.
- The sender deletes its local copy of the message.
5.1.4. Message Send over Unreliable Link
- The sender passes the message to the broker.
- The sender deletes the local copy of the message.
5.1.5. Message Distribution on the Broker
5.1.6. Message Receive over Reliable Link
- The broker passes the message to the receiver.
- The receiver acknowledges responsibility for the message. In this case the broker deletes the server-side copy of the message.
- The receiver rejects the message. In this case the broker routes the message to an
alternate-exchangeif one is defined for the queue, or else discards the message. - The receiver releases the message. In this case the broker returns the message to the queue with a message header
redelivered:true. - The receiver disconnects without acknowledging or rejecting the message. In this case the broker returns the message to the queue with a message header
redelivered:true.
5.1.7. Message Receive over Unreliable Link
- The broker passes the message to the receiver.
- The broker deletes the server-side copy of the message.
5.2. Browsing and Consuming Messages
5.2.1. Message Acquisition and Acceptance
The included drain program can be used in either browse or acquisition mode.
drain can be found in:
/usr/share/doc/python-qpid-0.14/examples/api/drain /usr/share/qpidc/examples/messaging/drain.cpp
qpid-config command:
qpid-config add queue browse-acquire-demo
browse-acquire-demo queue when you run qpid-config queues.
browse-acquire-demo using spout. Spout is included in the same packages as drain, and can be found in the same directories. Run spout to send a message to the queue:
./spout browse-acquire-demo "Hello World"
browse-acquire-demo queue. Let's use drain to browse it first of all:
./drain -c 0 "browse-acquire-demo; {mode:browse}"drain a second time, and you'll see the message again. Running the drain program twice simulates two different browsing consumers accessing the queue. The message is read and remains available for other consuming applications when it is browsed.
browse-acquire-demo queue using qpid-config:
qpid-config del queue browse-acquire-demo
qpid-config responds with an error because a message remains in the queue.
./drain -c 0 "browse-acquire-demo"
qpid-config:
qpid-config del queue browse-acquire-demo
drain demo is the fact that browsers see a message only once. Because each time drain is run it creates a different browser, it sees the message in the queue each time. The same browser, however, sees the message only once, no matter how many times it looks.
- Python
import sys from qpid.messaging import * def msgfetch(rx): try: msg = rx.fetch(timeout=1) except MessagingError, m: msg = m return msg connection = Connection("localhost:5672") connection.open() try: session = connection.session() tx = session.sender("browse-acquire-demo;{create:always}") rxbrowse1 = session.receiver("browse-acquire-demo;{mode:browse}") rxbrowse2 = session.receiver("browse-acquire-demo;{mode:browse}") rxbrowse3 = session.receiver("browse-acquire-demo;{mode:browse}") rxacquire = session.receiver("browse-acquire-demo") tx.send("Hello World") print "\nBrowser 1 saw message:" print msgfetch(rxbrowse1) print "Browser 1 then saw message:" print msgfetch(rxbrowse1) print "\nBrowser 2 saw message:" print msgfetch(rxbrowse2) print "Browser 2 then saw message:" print msgfetch(rxbrowse2) print "\nAcquired message:" print msgfetch(rxacquire) print "\nBrowser 3 saw message:" print msgfetch(rxbrowse3) except MessagingError, m: print m finally: connection.close()
drain to examine the queue:
./drain -c 0 browse-acquire-demo
When our receiver acquired the message from the queue, the broker set the message to acquired. When a message is acquired, the broker treats the message as if it has been delivered, but it does not delete it from the queue. One of a number of things happen from here: the consumer who acquired the message acknowledges the message, releases the message, or rejects the message, or the consumer might disconnect through a network failure.
acquired, and message consumers browsing or fetching from the queue will not see the message. When our application disconnects without acknowledging receipt, the broker switches the message out of acquired state and sets a header redelivered=True. The message is then made available to other consumers, such as the drain browser that we ran after our application closed.
redelivered=true'. This alerts the other nodes that this message may have already been acted on, and they can perform checks to see if that is so. This narrows the window for exceptions even further, when the applications are designed to take advantage of these features.
connection.close() line:
- Python
connection.open() try: session=connection.session() rxacquire2 = session.receiver("browse-acquire-demo") print "\nAcquirer 2 saw message:" print msgfetch(rxacquire2) except MessagingError, m: print m finally: session.acknowledge() connection.close()
redelivered to inform us that another consumer acquired this message previously. We have now acquired this message, and it will again disappear for other consumers browsing or fetching from this queue. This time, however, we call session.acknowledge() before closing the connection. This method acknowledges receipt of the message (it acknowledges all messages as-yet unacknowledged for the session). Since we have acknowledged receipt of the message, the message is acquired, and it is removed from the queue.
drain now, you will see that there are no messages in the queue.
A consumer can explicitly release a message. When this happens, the message is returned to the queue for redelivery. The effect is the same as if the consumer lost its connection to the broker.
acknowledge() method with the message and Disposition(RELEASED) as parameters:
session.acknowledge(msg, Disposition(RELEASED))
release() method.
Note that this two-phase acquisition and acceptance behavior is the behavior over a reliable link (technically an at-least-once link), which is the default link for receiver connections to the broker. If you explicitly connect your receiver to a queue using an unreliable link, or directly connect to an exchange, then received messages are immediately acquired with no need to acknowledge them.
To delete the queue we used for this demo, you can either restart the broker (all non-durable queues are deleted when the broker is restarted), or you can use qpid-config:
qpid-config del queue browse-acquire-demo
--force switch to override this check and delete a queue with messages in it, or you can use drain to empty the queue, and then reissue the command on the now-empty queue.
5.2.2. Message Acquisition and Acceptance on an Unreliable Link
link: {reliability: unreliable} in the address. For example, to create a receiver with an unreliable link to a queue named "browse-acquire-demo":
- Python
rxacquire = session.receiver("browse-acquire-demo; {link:{reliability: unreliable}")
- Python
import sys from qpid.messaging import * def msgfetch(rx): try: msg = rx.fetch(timeout=1) except MessagingError, m: msg = m return msg linktype="" while linktype != "R" and linktype !="U": response = raw_input("Use (R)eliable or (U)nreliable link [R/U]?") linktype = response.upper() connection = Connection("localhost:5672") connection.open() try: session = connection.session() tx = session.sender("browse-acquire-demo;{create: always}") rxbrowse1 = session.receiver("browse-acquire-demo;{mode:browse}") rxbrowse2 = session.receiver("browse-acquire-demo;{mode:browse}") rxbrowse3 = session.receiver("browse-acquire-demo;{mode:browse}") if linktype == "R": rxacquire = session.receiver("browse-acquire-demo") else: rxacquire = session.receiver("browse-acquire-demo; {link:{reliability:unreliable}}") tx.send("Hello World") print "\nBrowser 1 saw message:" print msgfetch(rxbrowse1) print "Browser 1 then saw message:" print msgfetch(rxbrowse1) print "\nBrowser 2 saw message:" print msgfetch(rxbrowse2) print "Browser 2 then saw message:" print msgfetch(rxbrowse2) print "\nAcquired message:" print msgfetch(rxacquire) rxacquire.close() print "\nBrowser 3 saw message:" print msgfetch(rxbrowse3) except MessagingError, m: print m finally: connection.close() connection.open() try: session=connection.session() rxacquire2 = session.receiver("browse-acquire-demo") print "\nAcquirer 2 saw message:" print msgfetch(rxacquire2) except MessagingError, m: print m finally: session.acknowledge() connection.close()
Acquirer 2 saw message:
Message(redelivered=True, properties={'x-amqp-0-10.routing-key': u'browse-acquire-demo'}, content='Hello World')Acquirer 2 saw message: None
unreliable.
It is not possible to release or reject messages acquired over an unreliable link. Over an unreliable link messages are implicitly acknowledged when they are fetched.
5.2.3. Message Rejection
alternate exchange, then the rejected message is routed there; otherwise it is discarded.
acknowledge() method of the session, passing in the message that you wish to reject, and specify REJECTED as the Disposition parameter:
- Python
msg = rx.fetch(timeout = 1) if msg.content == "something we don't like": ssn.acknowledge(msg, Disposition(REJECTED)) else: ssn.acknowledge(msg)
unreliable link, mesages are implicitly acknowledged when they are fetched.
5.2.4. Receiving Messages from Multiple Sources
Prerequisites:
Receiver object receives messages from a single subscription. An application can create many receivers, and may wish to deal with messages from these various receivers in the order that the messages are received. The session object provides a method nextReceiver that allows an application to read messages from multiple receivers in a federated order.
prefetch must be enabled for the receivers, and the receivers must be using the same session.
- Python
receiver1 = session.receiver(address1) receiver1.capacity = 10 receiver2 = session.receiver(address) receiver2.capacity = 10 message = session.next_receiver().fetch() print message.content session.acknowledge()
- C++
Receiver receiver1 = session.createReceiver(address1); receiver1.setCapacity(10); Receiver receiver2 = session.createReceiver(address2); receiver2.setCapacity(10); Message message = session.nextReceiver().fetch(); std::cout << message.getContent() << std::endl; session.acknowledge(); // acknowledge message receipt
- .NET/C#
Receiver receiver1 = session.CreateReceiver(address1); receiver1.SetCapacity(10); Receiver receiver2 = session.CreateReceiver(address2); receiver2.SetCapacity(10); Message message = new Message(); message = session.NextReceiver().Fetch(); Console.WriteLine("{0}", message.GetContent()); session.Acknowledge();
5.2.5. Rejected and Orphaned Messages
5.2.6. Alternate Exchange
- Messages that are acquired and then rejected by a message consumer (rejected messages).
- Unacknowledged messages in a queue that is deleted (orphaned messages).
- Messages sent to the exchange with a routing key for which there is no matching binding on the exchange.
Chapter 6. Advanced Queue Features
6.1. Browse-only Queues
spout and drain programs are part of the client libraries package and when installed can be found at:
/usr/share/doc/python-qpid-${version}/examples/api/./spout \
-c 10 \
--broker "localhost:${PORT}" \
'q; {create: always, node:{type:queue , x-declare:{arguments:{"qpid.browse-only":1}}}}' \
"All work and no play makes Mick a dull boy."
./drain --broker 'localhost:${PORT}' 'q'
6.2. Ignore Locally Published Messages
no-local key in the queue declaration as a key:value pair. The value of the key is ignored; the presence of the key is sufficient.
qpid-config:
qpid-config add queue noloopbackqueue1 --argument no-local=true
6.3. Exclusive Queues
6.4. Server-side Selectors
6.4.1. Select messages using a filter
selector in the link portion of the connection URL.
green, red, or blue as the value of the color property:
queue_name;{link:{selector:"color in ('green', 'red', 'blue')"}}queue_name;{link:{selector:"amqp.priority = 1"}}
queue_name;{link:{selector:"amqp.priority IS BETWEEN 3 AND 6"}}
queue_name;{link:{selector:"myflag AND amqp.redelivered"}}
queue_name;{link:{selector:"msg_title LIKE '%news%'"}With Python, selectors can be used by temporary syntax. For example, the C++ address with selector:
queue_name;{link:{selector:"myproperty = 1"}}queue_name;{link:{'x-subscribe': {'arguments': {'x-apache-selector': "myproperty = 1"}}}}The Qpid Java client does not currently support server-side selectors, only JMS selectors. JMS selectors function differently than server-side selectors. Consult the JMS specification for more detail on JMS slectors.
6.4.2. Server-side selector syntax
SelectExpression ::= OrExpression? // Note 0 // Lexical Elements Alpha ::= [a-zA-Z] Digit ::= [0-9] IdentifierInitial ::= Alpha | "_" | "$" IdentifierPart ::= IdentifierInitial | Digit | "." Identifier ::= IdentifierInitial IdentifierPart* Constraint : Identifier NOT IN ("NULL", "TRUE", "FALSE", "NOT", "AND", "OR", "BETWEEN", "LIKE", "IN", "IS", "ESCAPE") // Note 1 LiteralString ::= ("'" [^']* "'")+ // Note 2 LiteralExactNumeric ::= Digit+ Exponent ::= ("+"|"-")? LiteralExactNumeric LiteralApproxNumeric ::= Digit "." Digit* ( "E" Exponent )? | "." Digit+ ( "E" Exponent )? | Digit+ "E" Exponent // Note 1 LiteralBool ::= "TRUE" | "FALSE" // Note 1 Literal ::= LiteralBool | LiteralString | LiteralApproxNumeric | LiteralExactNumeric EqOps ::= "=" | "<>" ComparisonOps ::= EqOps | ">" | ">=" | "<" | "<=" AddOps ::= "+" | "-" MultiplyOps ::= "*" | "/" // Expression syntax OrExpression ::= AndExpression ( "OR" AndExpression )* AndExpression ::= ComparisonExpression ( "AND" ComparisonExpression )* ComparisonExpression ::= AddExpression "IS" "NOT"? "NULL" | AddExpression "NOT"? "LIKE" LiteralString ( "ESCAPE" LiteralString )? | AddExpression "NOT"? "BETWEEN" AddExpression "AND" AddExpression | AddExpression "NOT"? "IN" "(" PrimaryExpression ("," PrimaryExpression)* ")" | AddExpression ComparisonOps AddExpression | "NOT" ComparisonExpression | AddExpression // Note 3 AddExpression ::= MultiplyExpression ( AddOps MultiplyExpression )* MultiplyExpression ::= UnaryArithExpression ( MultiplyOps UnaryArithExpression )* UnaryArithExpression ::= AddOps AddExpression | "(" OrExpression ")" | PrimaryExpression PrimaryExpression ::= Identifier | Literal
E" for exponent and the boolean values true and false, are case-insensitive.
'' becomes '.
( "ESCAPE" LiteralString ) clause, LiteralString is limited to a one character string. The characters % and _ are not allowed.
6.5. Automatically Deleted Queues
6.5.1. Automatically Deleted Queues
qpid-config utility to receive information from the message broker are an example of this pattern.
auto-delete is deleted by the broker after the last consumer has released its subscription to the queue. After the auto-delete queue is created, it becomes eligible for deletion as soon as a consumer subscribes to the queue. When the number of consumers subscribed to the queue reaches zero, the queue is deleted.
- Python
responsequeue = session.receiver('my-response-queue; {create:always, node:{x-declare:{auto-delete:True}}}')
Note
default exchange: a pre-configured nameless direct exchange.
A custom timeout can be configured to provide a grace period before the deletion occurs.
Note
qpid.auto_delete_timeout:0 is specified, the parameter has no effect: setting the parameter to 0 turns off the delayed auto-delete function.
- Python
responsequeue = session.receiver("my-response-queue; {create:always, node:{x-declare:{auto-delete:True, arguments:{'qpid.auto_delete_timeout':120}}}}")
- Python
testqueue = session.sender("my-test-queue; {create:always, node:{x-declare:{auto-delete:True}}}") testqueuehandle = session.receiver("my-test-queue") ..... connection.close() # testqueuehandle is now released
exclusive and auto-delete; these queues are deleted by the broker when the session that declared the queue ends, since the session that declared the queue is only possible subscriber.
6.5.2. Automatically Deleted Queue Example
auto-delete-producer.py. It can be run using a Python interpreter.
- Python
import sys from qpid.messaging import * connection=Connection("localhost:5672") connection.open() try: session=connection.session() tx=session.sender("test-queue; {create:always, node:{x-declare:{auto-delete:True}}}") tx.send("test message!") x = raw_input("Press Enter to continue") tx.send("test message 2") except MessagingError, m: print m connection.close()
durable queues are deleted. This allows you to start this test with a clean slate.
qpid-config queues
exclusive and auto-del. This is the queue that qpid-config is using to retrieve the list of queues, and will change each time you run the command.
auto-delete-producer.py program using a Python interpreter:
python auto-delete-producer.py
qpid-config queues again to list the queues on the broker. This time you will see the test-queue that our program created. Our program has exited, but the queue has not been deleted because so far no-one has subscribed to it.
amqp1.0 behaviour. Using amqp0-10 the queue is deleted when not in use only if there have been consumers, using amqp1.0 the queue is deleted when not in use even if there have never been any consumers.
drain utility to examine the messages on the queue. The drain utility is part of the C++ and Python client library packages.
drain runs, it subscribes to the queue, retrieves messages, and then unsubscribes. Run:
drain -c 0 test-queue
qpid-config queues now, you will see that the test-queue has been deleted. A consumer subscribed to the queue, and then unsubscribed.
drain to browse the queue, rather than acquire the messages:
drain -c 0 "test-queue;{mode:browse}"auto-delete-subscribe.py:
- Python
import sys from qpid.messaging import * connection=Connection("localhost:5672") connection.open() try: session=connection.session() rx=session.receiver("test-queue") print rx.fetch(timeout = 1) session.acknowledge() except MessagingError,m: print m connection.close()
auto-delete-producer.py. When it pauses, run auto-delete-subscriber.py, then check qpid-config queues. You'll see that the queue has been deleted.
drain to browse the test-queue. It doesn't exist.
auto-delete-producer.py was deleted when our consumer program subscribed to the queue by creating and attaching a receiver, and then unsubscribed by closing the connection. The second message sent by our message producer was never delivered and no exception was raised.
- Python
import sys from qpid.messaging import * connection=Connection("localhost:5672") connection.open() try: session=connection.session() tx=session.sender("test-queue; {create:always, node:{x-declare:{auto-delete:True}}}") rx=session.receiver("test-queue") tx.send("test message!") x = raw_input("Press Enter to continue") tx.send("test message 2") x = raw_input("Press Enter to continue") except MessagingError, m: print m connection.close()
auto-delete-producer.py program. Run auto-delete-subscriber.py in the first pause. Previously, this would delete the queue, and the second message would go nowhere. This time our producer's own subscription is keeping the queue alive. Press Enter to have auto-delete-producer.py send the second message. Now check the queue using either drain or auto-delete-subscriber.py. This time you'll see that the queue exists and the message has been delivered as expected.
6.5.3. Queue Deletion Checks
- If ACL is enabled, the broker will check that the user who initiated the deletion has permission to do so.
- If the
ifEmptyflag is passed the broker will raise an exception if the queue is not empty - If the
ifUnusedflag is passed the broker will raise an exception if the queue has subscribers - If the queue is exclusive the broker will check that the user who initiated the deletion owns the queue
6.6. Last Value (LV) Queues
6.6.1. Last Value Queues
6.6.2. Declaring a Last Value Queue
qpid.last_value_queue_key when creating the queue.
stock-ticker that uses stock-symbol as the key, using qpid-config:
qpid-config add queue stock-ticker --argument qpid.last_value_queue_key=stock-symbol
- Python
myLastValueQueue = mySession.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
RHT", "JAVA", and other string values; and also 3, 15, and other integer values.
6.6.3. Last Value Queue Example
- Python
import sys from qpid.messaging import *
- Python
connection = Connection("localhost:5672") connection.open()
- Python
session = connection.session()
- Python
stockSender = session.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}")
qpid-config command line tool:
qpid-config add queue stock-ticker --argument qpid.last_value_queue_key=stock-symbol
- Python
msg1 = Message("10") msg1.properties = {'stock-symbol':'RHT'} msg2 = Message("10") msg2.properties = {'stock-symbol':'JAVA'} msg3 = Message("10") msg3.properties = {'stock-symbol':'MSFT'} msg4 = Message("12") msg4.properties = {'stock-symbol':'RHT'}
msg4 updating msg1. To contrast the behavior of the last value queue with a standard FIFO queue, we'll send our messages to a control queue, called control-queue at the same time:
- Python
controlSender = session.sender("control-queue;{create:always, node:{type:queue}}")
- Python
stockSender.send(msg1) controlSender.send(msg1) stockSender.send(msg2) controlSender.send(msg2) stockSender.send(msg3) controlSender.send(msg3) stockSender.send(msg4) controlSender.send(msg4)
- Python
stockBrowser = session.receiver("stock-ticker; {mode:browse}") controlBrowser = session.receiver("control-queue; {mode:browse}")
session.receiver("stock-ticker"), and run the demo again. With the receivers browsing, you will be able to see more distinctly the effect of a Last Value Queue over time by running the demo several times in succession without clearing the queues.
available() method. We do this by setting the receivers' prefetch capacity to a value higher than the default of 0:
- Python
stockBrowser.capacity = 20 controlBrowser.capacity = 20
- Python
sleep 10
sleep from the time library:
- Python
from time import sleep
available() property of the receiver with certainty that this represents the number of messages in the queue. When operating asynchronously available() reports the number of messages available locally. After a ten second delay, we can be reasonably certain that this is the total number of messages in the queue. In an actual asynchronous operation you would not want to block execution of your application. Instead you would use a pattern like this:
- Python
while True: try: msg = stockBrowser.fetch(timeout = 10) print msg.properties["stock-symbol"] + ":" + msg.content except Empty: break
- Python
print "Last Value Queue has " + str(stockBrowser.available()) + " messages" print "\nLast Value Queue messages:" for x in range(stockBrowser.available()): try: msg = stockBrowser.fetch(timeout = 1) print msg.properties["stock-symbol"] + ":" + msg.content except MessagingError, m: pass print "Control Queue has " + str(controlBrowser.available()) + " messages" print "\nControl Queue messages:" for x in range(controlBrowser.available()): try: msg = controlBrowser.fetch(timeout = 1) print msg.properties["stock-symbol"] + ":" + msg.content except MessagingError, m: pass
- Python
session.acknowledge() connection.close()
- Python
import sys from qpid.messaging import * from time import sleep connection = Connection("localhost:5672") try: connection.open() session = connection.session() stockSender = session.sender("stock-ticker;{create:always, node:{type:queue, x-declare:{arguments:{'qpid.last_value_queue_key': 'stock-symbol'}}}}") controlSender = session.sender("control-queue;{create:always, node:{type:queue}}") stockBrowser = session.receiver("stock-ticker;{mode:browse}") controlBrowser = session.receiver("control-queue;{mode:browse}") controlBrowser = session.receiver("control-queue") msg1 = Message("10") msg1.properties = {'stock-symbol':'RHT'} msg2 = Message("10") msg2.properties = {'stock-symbol':'JAVA'} msg3 = Message("10") msg3.properties = {'stock-symbol':'MSFT'} msg4 = Message("12") msg4.properties = {'stock-symbol':'RHT'} stockSender.send(msg1) controlSender.send(msg1) stockSender.send(msg2) controlSender.send(msg2) stockSender.send(msg3) controlSender.send(msg3) stockSender.send(msg4) controlSender.send(msg4) stockBrowser.capacity = 20 controlBrowser.capacity = 20 sleep(10) print "\nLast Value Queue has " + str(stockBrowser.available()) + " messages" print "Last Value Queue messages:" for x in range(stockBrowser.available()): try: msg = stockBrowser.fetch(timeout = 1) print msg.properties["stock-symbol"] + ":" + msg.content except MessagingError, m: pass print "\nControl Queue has " + str(controlBrowser.available()) + " messages" print "Control Queue messages:" for x in range(controlBrowser.available()): try: msg = controlBrowser.fetch(timeout = 1) print msg.properties["stock-symbol"] + ":" + msg.content except MessagingError, m: pass session.acknowledge() except MessagingError,m: print m finally: connection.close()
6.6.4. Last Value Queue Command-line Example
drain and spout can be used for sending and receiving messages for testing purposes. The source code for the two utilities is included in the Python and C++ client library packages. The Python version can be run uncompiled using a Python interpreter.
qpid-config command to create a Last Value Queue:
qpid-config add queue my-queue --argument qpid.last_value_queue_key=type
type' is used to match messages in the queue.
drain command:
./drain -f -c 0 'my-queue; {mode: browse}'spout to send messages to the queue, setting a header value for the key 'type':
./spout -P type=a my-queue a1 ./spout -P type=a my-queue a2 ./spout -P type=a my-queue a3 ./spout -P type=b my-queue b1 ./spout -P type=c my-queue c1 ./spout -P type=c my-queue c2 ./spout -P type=a my-queue a4
./drain -c 0 'my-queue; {mode: browse}'type' values.
6.7. Priority Queuing
6.7.1. Priority Queuing
qpid.priority attribute. This attribute is an integer value between 1 and 10, and defines the number of distinct priority levels for the queue.
qpid.priority attribute of a queue is set to 10, there are ten distinct priority levels for the queue. In this case a message with a priority level of 10 is delivered before a message with a priority of 9, which is delivered before a message with a priority level of 5, which is delivered before a message with a priority level of 1.
qpid.priority attribute of a queue is set to 2, there are two distinct priority levels for the queue. In this case message priorities 6-10 is the queue priority level 1, and message priorities 1-5 is the queue priority level 2. Messages in the same priority band are delivered based on their priority and the order in which they are received.
6.7.2. Declaring a Priority Queue
qpid.priorities in the x-declare arguments of the node declaration. For example:
- Python
sender = session.sender('my-queue; {create: always, node:{x-declare:{arguments:{qpid.priorities:10}}}}')
qpid-config:
qpid-config add queue 'my-queue; {create: always, node:{x-declare:{arguments:{qpid.priorities:10}}}}'6.7.3. Considerations when using Priority Queues
Priority Queues deliver messages to acquiring consumers in order of priority, rather than the usual First-In-First-Out (FIFO) order of a queue. The delivery order for browsing consumers is "undefined". At the time of writing, browsing consumers receive messages from a priority queue in FIFO order; however, you should not rely on this behavior in your applications, as it may change in the future.
If the message enqueue rate sufficient outpaces the dequeue rate in a priority queue, it is possible that lower priority messages may never be removed from the queue. To avoid this situation the Fairshare feature allows a consumer to take a specified block of message from each priority level in turn.
6.7.4. Priority Queue Demonstration
- Python
#!/usr/bin/python import sys from qpid.messaging import * connection = Connection("localhost:5672") connection.open() try: ssn = connection.session() x = 0 print "\n" while True: print "Create queue with 2 or 10 priority levels?" x = raw_input() if (x == "2") or (x == "10"): break tx = ssn.sender("nonpriority-demo-queue; {create: always, node: {type: 'queue'}}") print "Creating a priority queue with " + x + " priority levels:" address = "priority-demo-queue; {create: always, " address = address + "node:{x-declare: {auto-delete:True, " address = address + "arguments: {'qpid.priorities': " address = address + x + "}}}}" print address txpriority = ssn.sender(address) rx = ssn.receiver('nonpriority-demo-queue') rxpriority = ssn.receiver("priority-demo-queue") rxbrowse = ssn.receiver("priority-demo-queue; {mode: browse}") print "\nPress Enter to continue\n" x = raw_input() print "First message sent:" msg = Message("priority 1") msg.priority = 1 tx.send(msg) txpriority.send(msg) print msg print "Second message sent:" msg = Message('priority 4') msg.priority = 4 tx.send(msg) txpriority.send(msg) print msg print "\nPress Enter to continue\n" x = raw_input() print "BROWSE PRIORITY QUEUE" print "First browse in priority queue:" print rxbrowse.fetch() print "Second browse in priority queue:" print rxbrowse.fetch() print "\nPress Enter to continue\n" x = raw_input() print "ACQUIRE PRIORITY QUEUE" print "First message in priority queue:" print rxpriority.fetch() print "Second message in priority queue:" print rxpriority.fetch() print "\nPress Enter to continue\n" x = raw_input() print "ACQUIRE NON-PRIORITY QUEUE" print "First message in non-priority queue:" print rx.fetch() print "Second message in non-priority queue:" print rx.fetch() ssn.acknowledge() finally: connection.close()
Create queue with 2 or 10 priority levels?
10
Creating a priority queue with 10 priority levels:
priority-demo-queue; {create: always, node:{x-declare: {auto-delete:True, arguments: {'qpid.priorities': 10}}}}auto-delete: True to allow the program to be run multiple times with different values for qpid.priorities. If the queue already exists when the sender is created, the value given for qpid.priorities has no effect. This value only has an effect when the queue is created.
First message sent: Message(priority=1, content='priority 1') Second message sent: Message(priority=4, content='priority 4')
BROWSE PRIORITY QUEUE
First browse in priority queue:
Message(priority=1, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 1')
Second browse in priority queue:
Message(priority=4, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 4')ACQUIRE PRIORITY QUEUE
First message in priority queue:
Message(priority=4, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 4')
Second message in priority queue:
Message(priority=1, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 1')ACQUIRE NON-PRIORITY QUEUE
First message in non-priority queue:
Message(priority=1, properties={'x-amqp-0-10.routing-key': u'nonpriority-demo-queue'}, content='priority 1')
Second message in non-priority queue:
Message(priority=4, properties={'x-amqp-0-10.routing-key': u'nonpriority-demo-queue'}, content='priority 4')Create queue with 2 or 10 priority levels?
2
Creating a priority queue with 2 priority levels:
priority-demo-queue; {create: always, node:{x-declare: {auto-delete:True, arguments: {'qpid.priorities': 2}}}}
....
ACQUIRE PRIORITY QUEUE
First message in priority queue:
Message(priority=1, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 1')
Second message in priority queue:
Message(priority=4, properties={'x-amqp-0-10.routing-key': u'priority-demo-queue'}, content='priority 4')6.8. Message Groups
6.8.1. Message Groups
6.8.2. Create a Queue with Message Groups enabled
qpid.group_header_key and qpid.shared_msg_group in the queue creation arguments.
qpid.group_header_key is the header key that will be used to match messages on. Messages with the same value for this key in their header belong to the same group.
qpid.shared_msg_group should be set to 1.
- Python
groupedSender = session.sender("my-grouped-msg-queue; {create: always, node: {x-declare: {auto-delete: True, arguments: {'qpid.group_header_key': 'msgGroupID', 'qpid.shared_msg_group': 1}}}}")- C++
Sender groupedSender = session.createSender("my-grouped-msg-queue; {create:always, node: {x-declare: {auto-delete: True, arguments: {'qpid.group_header_key':'msgGroupID', 'qpid.shared_msg_group':1}}}}")
6.8.3. Message Group Consumer Requirements
redelivered=True, and the rest of the group is missing.
6.8.4. Configure a Queue for Message Groups using qpid-config
qpid-config command creates a queue called "MyMsgQueue", with message grouping enabled and using the header key "GROUP_KEY" to identify message groups.
qpid-config add queue MyMsgQueue --group-header="GROUP_KEY" --shared-groups
6.8.5. Default Group
qpid.no-group. If a message cannot be assigned to any other group, it is assigned to this group.
6.8.6. Override the Default Group Name
qpid.no-group. You can change this default group name by supplying a value for the default-message-group configuration parameter to the broker at start-up. For example, using the command line:
qpidd --default-message-group "EMPTY-GROUP"
6.8.7. Message Groups Demonstration
message-groups.py, then run it using Python on a machine with the messaging broker started.
- Python
import sys from qpid.messaging import * def sendmsg(group, num): # send the message to the broker and add it to our in-memory representation of the broker queue global memoryqueue global tx msg = Message(group + num) msg.properties = {'ourGroupID': group} tx.send(msg) memoryqueue.append(group + num) def pullmsg(consumer): # fetch a message from the broker and print it to the console global counter global memoryqueue msg = consumers[consumer - 1].fetch(timeout = 1) print "\nQueued message: " + memoryqueue[counter] print "Consumer " + str(consumer) + " got: " + msg.content counter +=1 return msg # Two connections are used to simulate two distinct consumers connection = Connection("localhost:5672") connection2 = Connection("localhost:5672") connection.open() connection2.open() try: session = connection.session() session2 = connection2.session() x = raw_input('Enable message grouping [Y/n]?') if x == 'N' or x == 'n': # Create the queue without message groups tx = session.sender("test-nogroup-queue; {create: always, node:{x-declare:{auto-delete:True}}}") rx1 = session.receiver("test-nogroup-queue") rx2 = session2.receiver("test-nogroup-queue") print "\nMessage grouping is disabled" msggroup = False else: # Create the queue with message groups enabled tx = session.sender("test-group-queue; {create: always, node:{x-declare:{auto-delete: True, arguments: {'qpid.group_header_key': 'ourGroupID', 'qpid.shared_msg_group' : 1}}}}") rx1 = session.receiver("test-group-queue") rx2 = session2.receiver("test-group-queue") print "\nMessage grouping is enabled" msggroup = True # Put the receivers in an array so we can use a function to fetch messages consumers = [] consumers.append(rx1) consumers.append(rx2) print "Sending interleaved messages from two different groups to the queue..." # We create an in-memory picture of the queue, to see what order the messages are on the broker memoryqueue = [] sendmsg('A', '1') sendmsg('B', '1') sendmsg('B', '2') sendmsg('A', '2') sendmsg('B', '3') sendmsg('A', '3') counter = 0 pullmsg(1) pullmsg(2) if msggroup: print "\nConsumer 1 now owns message group A. Consumer 2 now owns message group B." msgc1 = pullmsg(1) msgc2 = pullmsg(2) if msggroup: print "\nThe consumers will now acknowledge all the messages, or only the last one." resp = raw_input('Should they acknowlege all messages? [Y/n]') if resp == 'N' or resp == 'n': print "\nAcknowledging only part of the group. The consumers retain ownership of the group. This is an anti-pattern! See the source code comments for details." session.acknowledge(msgc1) session2.acknowledge(msgc2) antipattern = True # Acknowledging only part of a group is an anti-pattern. Messages are grouped to ensure that a single consumer can deal with the whole group. If this consumer now fails before completing the rest of the group, the unacknowledged messages in the group will be released and redelivered by the broker, but the acknowledged messages in the group are now missing in action! else: print "\nAcknowledging all fetched messages. The consumers will release ownership of the groups." session.acknowledge() session2.acknowledge() antipattern = False print "\nPulling more messages from the queue:" pullmsg(1) pullmsg(2) if msggroup: if antipattern == False: print "\nConsumer 1 now owns message group B. Consumer 2 now owns message group A." print "\nSending some more messages to the queue..." sendmsg('B', '4') sendmsg('B', '5') sendmsg('A', '4') sendmsg('A', '5') pullmsg(1) pullmsg(2) pullmsg(1) pullmsg(2) finally: connection.close() connection2.close()
The program sends messages from two different Groups - A and B - to a queue. Here is an example of the output when message groups are disabled:
$ python message-groups.py Enable message grouping [Y/n]?n Message grouping is disabled Sending interleaved messages from two different groups to the queue... Queued message: A1 Consumer 1 got: A1 Queued message: B1 Consumer 2 got: B1 Queued message: B2 Consumer 1 got: B2 Queued message: A2 Consumer 2 got: A2 Queued message: B3 Consumer 1 got: B3 Queued message: A3 Consumer 2 got: A3 Queued message: B4 Consumer 1 got: B4 Queued message: B5 Consumer 2 got: B5 Queued message: A4 Consumer 1 got: A4 Queued message: A5 Consumer 2 got: A5
$ python message-groups.py Enable message grouping [Y/n]?y Message grouping is enabled Sending interleaved messages from two different groups to the queue... Queued message: A1 Consumer 1 got: A1 Queued message: B1 Consumer 2 got: B1 Consumer 1 now owns message group A. Consumer 2 now owns message group B. Queued message: B2 Consumer 1 got: A2 Queued message: A2 Consumer 2 got: B2
The consumers will now acknowledge all the messages, or only the last one. Should they acknowlege all messages? [Y/n]y Acknowledging all fetched messages. The consumers will release ownership of the groups. Pulling more messages from the queue: Queued message: B3 Consumer 1 got: B3 Queued message: A3 Consumer 2 got: A3
Consumer 1 now owns message group B. Consumer 2 now owns message group A. Sending some more messages to the queue... Queued message: B4 Consumer 1 got: B4 Queued message: B5 Consumer 2 got: A4 Queued message: A4 Consumer 1 got: B5 Queued message: A5 Consumer 2 got: A5
The consumers will now acknowledge all the messages, or only the last one. Should they acknowlege all messages? [Y/n]n Acknowledging only part of the group. The consumers retain ownership of the group. This is an anti-pattern! See the source code comments for details. Pulling more messages from the queue: Queued message: B3 Consumer 1 got: A3 Queued message: A3 Consumer 2 got: B3 Sending some more messages to the queue... Queued message: B4 Consumer 1 got: A4 Queued message: B5 Consumer 2 got: B4 Queued message: A4 Consumer 1 got: A5 Queued message: A5 Consumer 2 got: B5
Chapter 7. Asynchronous Messaging
7.1. Asynchronous Operations
7.2. Asynchronous Sending
7.2.1. Synchronous and Asynchronous Send
send() method of a send object is asynchronous - it returns immediately, without waiting for a receipt from the broker:
- Python
sender.send(message, sync = False)
- C++
sender.send(message, false)
Note that this is the default behavior for the C++ API.
7.2.2. Sender Capacity
capacity is the property of a sender object that controls the number of asynchronous sends pending acknowledgement from the server that the sender will permit. These unacknowledged messages are buffered in memory for retransmission in the event of a link failure, so the sender capacity is also known as the sender replay buffer size.
UNLIMITED, meaning that the sender will allow an unlimited number of asynchronous calls to be made, and buffer a number of messages that is limited only by the memory limits of the system.
capacity is set to a number other than UNLIMITED, the sender will allow only that many asynchronous send operations to be outstanding at the same time.
capacity is set to 10, then a maximum of 10 asynchronous send operations can be awaiting acknowledgement at the same time for the sender. If 10 asynchronous send operations are invoked, and an 11th operation is attempted before any of those 10 are acknowledged by the broker, then the sender will block until one of the asynchronous send operations is acknowledged by the broker.
7.2.3. Set Sender Capacity
capacity property of a sender. In C++, the sender capacity is set using the setCapacity method.
- Python
sender.capacity = 20
- C++
sender.setCapacity(20)
7.2.4. Query Sender Capacity
- Sender Capacity
- The maximum number of asynchronously sent messages that can be pending acknowledgement at any given time. By default this is
UNLIMITED, but it can be changed to constrain the number of unsettled asynchronous calls. An attempt to make a further asynchronous call when the sender is at capacity will block until another sent message is acknowledged by the broker.- C++
sender.getCapacity()
- Python
sender.capacity
- Sender Unsettled
- The number of asynchronous sends pending acknowledgement from the broker.
- C++
sender.getUnsettled()
- Python
sender.unsettled()
- Sender Available
- The number of additional asynchronous calls that the sender can accept at the moment. This value is available as a property, but can also be computed from
sender.capacity-sender.unsettled.- C++
sender.getAvailable()
- Python
sender.available()
7.2.5. Avoiding a Blocked Asynchronous Send
- C++
if (sender.getAvailable() > 0) sender.send(message, false) // else drop the message- Python
if sender.available() > 0: sender.send(message, sync=False) else: # drop the message
- C++
sender.setCapacity(SOME_LARGE_NUMBER)
- Python
sender.capacity = SOME_LARGE_NUMBER
7.2.6. Asynchronous Message Sending Example
- C++
sender.setCapacity(MY_CAPACITY); // Later bool resend = true; while (resend) { if (sender.getAvailable()>0) { sender.send(message,false); resend = false; } else { // May wish to do nothing here // or send to log file std::cout << "Warning: Capacity \ full. Retry" << std::endl; } } // Later if (sender.getUnsettled()) { session.sync(); }- Python
snd.capacity = MY_CAPACITY # Later resend = True while (resend): if (snd.available()>0): snd.send(msg, sync = False) resend = False else: print "Warning: Capacity full" # Later if (snd.unsettled()): ssn.sync()
7.2.7. Asynchronous Send and Link Reliability
sender.capacity is the number of unacknowledged sends that a sender will allow when sending asynchronously. The two-phase send/acknowledge behavior is a characteristic of a reliable link (technically known as a link with at-least-once reliability). The sender sends a message, and buffers that message locally until the server responds to acknowledge receipt of the message. This buffering of unacknowledged sent messages enables the sender to resend messages (sender replay) if the link is dropped and then re-established. When a reliable link is dropped and then transparently re-established, messages that were sent asynchronously but not acknowledged by the server are resent from the sender replay buffer.
unreliable link when creating a sender. For example:
- Python
sender = session.sender("amq.topic;{link: {'reliability': 'unreliable'}}")
unreliable link, sender capacity has no meaning. On an unreliable link the server does not acknowledge receipt of messages. All messages are considered as good as acknowledged once they are sent. This is the meaning of unreliable for a sender. If the link is dropped there is no way for the sender to know which messages made it to the broker and which were lost. This also means that over an unreliable link asynchronous senders will not block, as their capacity is never utilized.
Sender.capacity is used to limit the exposure of an application to data loss, and the amount of memory that senders can consume with their replay buffer. It can also be used to throttle producers. You can use an unreliable link along with asynchronous send to maximize throughput without the implications of local memory required for the sender replay buffer, and no throttling of producers. However, you must be aware of the reduced reliability and employ this pattern in situations where the potential for data loss is not important.
- Python
import sys from qpid.messaging import * connection = Connection("localhost:5672") try: connection.open() session = connection.session() linktype="" while linktype != "R" and linktype !="U": response = raw_input("Use (R)eliable or (U)nreliable link [R/U]? ") linktype = response.upper() if linktype == "U": sender = session.sender("amq.topic;{link: {'reliability': 'unreliable'}}") else: sender = session.sender("amq.topic") message = Message("Hello World:") print sender.capacity sender.capacity = 5 for x in range (1000): if sender.available() == 0: print "Sender is blocking..." sender.send("Hello World: " + str(x), sync=False) print str(x) +" : " + str(sender.unsettled()) + " : " + str(sender.available()) except MessagingError,m: print m finally: connection.close()
message number : unacknowledged messages : further async send capacity
Use (R)eliable or (U)nreliable link [R/U]? R ... 918 : 1 : 4 919 : 2 : 3 920 : 3 : 2 921 : 4 : 1 922 : 5 : 0 Sender is blocking...
sender.capacity (set to 5 in the program code) to see the impact it has on sender blocking.
sender.capacity has no impact on the performance of the sender. Remember, however, that it is now unreliable:
Use (R)eliable or (U)nreliable link [R/U]? U ... 984 : 0 : 5 985 : 0 : 5 986 : 0 : 5 987 : 0 : 5 988 : 0 : 5 989 : 0 : 5
7.3. Asynchronous Receiving
7.3.1. Asynchronous Message Retrieval (Prefetch)
fetch() call. The receiver's capacity to prefetch messages is 0 by default.
capacity property of a receiver.
- Prefetched messages are available locally when requested by the application, without the overhead of a synchronous call to retrieve a message from the broker.
- A receiver with prefetching enabled has an
available()method that can be invoked to determine how many prefetched messages are available.
available() method:
available() as an absolute indicator of the state of the queue. For example, calling available() immediately after setting the capacity of a receiver to something other than 0 is likely to return a value of 0 messages available. This does not necessarily mean that the queue has no messages, but rather that no prefetched messages are locally available yet.
available method of a receiver with prefetching enabled will be the capacity of the receiver. The available() method reports the number of prefetched messages available, not the number of messages in the queue. If the number of available messages is less than the capacity of the receiver, however, you can infer that this is the number of messages in the queue, with the above caveat about the asynchronous nature of prefetching.
7.3.2. Enable Receiver Prefetch
- Python
import sys from qpid.messaging import * connection = Connection("localhost:5672") connection.open() ssn = connection.session() prefetchingReceiver = ssn.receiver("testqueue; {create:always}"); prefetchingReceiver.capacity = 100
7.3.3. Asynchronously Acknowledging Received Messages
acquired on the broker until they are acknowledged by the consumer. When a message is in acquired mode it is not visible in the queue. If the consumer disconnects without acknowledging receipt, the message will be moved out of acquired and again become available to consumers, with the header redelivered=true.
acknowledge() method of the session object:
- Python
session.acknowledge()
acknowledge() method with no arguments acknowledges receipt of all as-yet-unacknowledged messages fetched using that session. To acknowledge a specific message, pass the message as an argument. For example:
- Python
msg = rx.fetch(timeout = 1) session.acknowledge(msg)
sync = False parameter:
- Python
session.acknowledge(msg, sync = False)
When an unreliable link is requested for a receiver, acknowledgement is implicit when a message is fetched. This means that the broker marks the message as acquired as soon as the receiver fetches it. No acknowledgement is necessary, and no release or rejection of messages is possible.
7.3.4. Asynchronous Receive and Link Reliability
unreliable link is a potentially lossy situation. Over an unreliable link, when an application is consuming (as opposed to browsing the queue) the broker deletes the message from the queue as soon as it is prefetched. It does not wait for acknowledgement from the consumer. If the consumer fails before it dispatches prefetched messages, the broker will not redeliver them.
unreliable link - be aware of the implications.
Chapter 8. Reliability and Quality of Service
8.1. Link Reliability
8.1.1. Reliable Link
An acquiring message consumer (also known as a competing message consumer) is a message consumer who removes messages from a queue, and makes them unavailable to other consumers. When an acquiring message consumer fetches a message from the broker over a reliable link, the message is set to acquired. In the acquired state the message is not visible to other consumers. It is to all intents and purposes acquired by the consumer, but the broker maintains its copy in acquired state until the consumer acknowledges acquisition. At that point the broker considers the message reliably delivered, and will delete its copy.
redelivered: true.
alternate exchange, if one has been configured for this queue or exchange. If no alternate exchange is configured, the message will be discarded.
When a message is sent to the broker over a reliable link, the sender maintains its local copy until the broker acknowledges receipt. At that time the sender deletes the local copy. When sending synchronously this causes the application to block until this exchange has taken place. When sending asynchronously these unacknowledged sent messages are stored in the sender replay buffer.
All links to queues are reliable by default. It is not necessary to explicitly request a reliable link when connecting to a queue.
link: {'reliability': 'at-least-once'} in the address. For example:
sender = session.sender("amq.topic;{link: {'reliability': 'at-least-once'}}")8.1.2. Unreliable Link
unreliable link when establishing a connection to a queue.
unreliable link, the broker deletes it immediately, without waiting for the consumer to acknowledge that it received and successfully actioned a message.
unreliable link, although this is be no means certain. The most obvious use for an unreliable link is when a large volume of data is being transmitted at high speed and data loss is not an issue.
unreliable link
To request an unreliable link, specify link: {'reliability': 'unreliable'} in the address for the receiver or sender. For example:
- Python
sender = session.sender("amq.topic;{link: {'reliability': 'unreliable'}}")
8.2. Queue Sizing
8.2.1. Controlling Queue Size
qpid.max_size) and maximum message count (qpid.max_count) for the queue.
qpid.max_size is specified in bytes. qpid.max_count is specified as the number of messages.
qpid-config creates a queue with a maximum size in memory of 200MB, and a maximum number of 5000 messages:
qpid-config add queue my-queue --max-queue-size=204800000 --max-queue-count 5000
qpid.max_count and qpid.max_size directives go inside the arguments of the x-declare of the node. For example, the following address will create the queue as the qpid-config command above:
- Python
tx = ssn.sender("my-queue; {create: always, node: {x-declare: {'auto-delete': True, arguments:{'qpid.max_count': 5000, 'qpid.max_size': 204800000}}}}")
qpid.max_count attribute will only be applied if the queue does not exist when this code is executed.
qpid.policy_type
The behavior when a queue reaches these limits is configurable. By default, on non-durable queues the behavior is reject: further attempts to send to the queue result in a TargetCapacityExceeded exception being thrown at the sender.
qpid.policy_type option. The possible values are:
- reject
- Message publishers throw an exception
TargetCapacityExceeded. This is the default behavior for non-durablequeues. - ring
- The oldest messages are removed to make room for newer messages.
qpid-config command sets the limit policy to ring:
qpid-config add queue my-queue --max-queue-size=204800 --max-queue-count 5000 --limit-policy ring
- Python
tx = ssn.sender("my-queue; {create: always, node: {x-declare: {'auto-delete': True, arguments:{'qpid.max_count': 5000, 'qpid.max_size': 204800, 'qpid.policy_type': 'ring'}}}}")
See Also:
8.2.2. Queue Threshold Alerts
qpid.max_size or qpid.max_count) approaches 80% of its limit. The figure of 80% is configurable across the server using the broker option --default-event-threshold-ratio. If you set this to zero, alerts are disabled for all queues by default. Additionally, you can override the default alert threshold per-queue using qpid.alert_count and qpid.alert_size when creating the queue.
qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#. Alerts are sent as map messages.
- Python
conn = Connection.establish("localhost:5672") session = conn.session() rcv = session.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#") while True: event = rcv.fetch() print "Threshold exceeded on queue %s" % event.content[0]["_values"]["qName"] print " at a depth of %s messages, %s bytes" % (event.content[0]["_values"]["msgDepth"], event.content[0]["_values"]["byteDepth"]) session.acknowledge()
To avoid alert message flooding, there is a 60 second gap between alert messages. This can be overridden on a per-queue basis using the qpid.alert_repeat_gap to specify a different value in seconds.
The following aliases are maintained for compatibility with earlier clients:
x-qpid-maximum-message-countis equivalent toqpid.alert_countx-qpid-maximum-message-sizeis equivalent toqpid.alert_sizex-qpid-minimum-alert-repeat-gapis equivalent toqpid.alert_repeat_gap
8.3. Producer Flow Control
8.3.1. Flow Control
ring do not have queue flow thresholds enabled. These queues deal with reaching capacity through the ring mechanism. All other queues with limits have two threshold values that are set by the broker when the queue is created:
- flow_stop_threshold
- the queue resource utilization level that enables flow control when exceeded. Once crossed, the queue is considered in danger of overflow, and the broker will cease acknowledging sent messages to induce producer flow control. Note that either queue size or message count capacity utilization can trigger this.
- flow_resume_threshold
- the queue resource utilization level that disables flow control when dropped below. Once crossed, the queue is no longer considered in danger of overflow, and the broker again acknowledges sent messages. Note that once trigger by either, both queue size and message count must fall below this threshold before producer flow control is deactivated.
qpid.max_size of 204800 (200MB), and a flow_stop_threshold of 80, then the broker will initiate producer flow control if the queue reaches 80% of 204800, or 163840 bytes of enqueued messages.
flow_resume_threshold, producer flow control is stopped. Setting the flow_resume_threshold above the flow_stop_threshold has the obvious consequence of locking producer flow control on, so don't do it.
8.3.2. Queue Flow State
flowState boolean in the queue's QMF management object. When this is true flow control is active.
flowStoppedCount that increments each time flow control becomes active for the queue.
8.3.3. Broker Default Flow Thresholds
--default-flow-stop-threshold= flow control activated at this percentage of capacity (size or count)--default-flow-resume-threshold= flow control de-activated at this percentage of capacity (size or count)
qpidd --default-flow-stop-threshold=90 --default-flow-resume-threshold=75
8.3.4. Disable Broker-wide Default Flow Thresholds
qpidd --default-flow-stop-threshold=100 --default-flow-resume-threshold=100
8.3.5. Per-Queue Flow Thresholds
qpid.flow_stop_sizeintegerflow stop threshold value in bytes.qpid.flow_resume_sizeintegerflow resume threshold value in bytes.qpid.flow_stop_countintegerflow stop threshold value as a message count.qpid.flow_resume_countintegerflow resume threshold value as a message count.
8.4. Credit-based Flow Control
8.4.1. Flow Control Using Credit
8.4.2. Credit Allocation Modes
- In credit mode, credit must be explicitly re-issued by the subscriber before the broker can recommence sending messages
- In window mode, the credit is automatically reissued for received messages. In this mode, the client indicates that a message has been received by informing the broker of the completion of the transfer. Though completion is essentially a form of acknowledgment, it should not be confused with acceptance which is an confirmation of ownership transfer.
8.5. Durable Queues
8.5.1. Durable Queues
8.5.2. Persistent Messages
Message.setDurable(true) to mark a message as persistent.
8.5.3. Create a durable queue in an application
- C++
Sender sender = session.createSender("important-messages; {create:always, node:{durable: True}")- Python
newqueue = session.sender("important-messages; {create:always, node:{durable: True}")
durable and auto-delete, it is only durable until it gets auto-deleted! Carefully consider if this is the behavior that you want.
8.5.4. Mark a message as persistent
PERSISTENT. For instance, in C++, the following code makes a message persistent:
message.getDeliveryProperties().setDeliveryMode(PERSISTENT);
Table 8.1. Persistent Message and Durable Queue Disk States
| A persistent message AND durable queue | Written to disk |
| A persistent message AND non-durable queue | Not written to disk |
| A non-persistent message AND non-durable queue | Not written to disk |
| A non-persistent message AND durable queue | Not written to disk |
8.5.5. Durable Message State After Restart
redelivered flag on all recovered persistent messages.
redelivered flag as a suggestion.
8.5.6. Journal Description
8.5.7. Configure the Message Journal in an application
qpid.file_size and qpid.file_count in the x-declare arguments of the address used to create a queue:
- Python
tx = ssn.sender("my-queue;{create: always, node: {durable: True, x-declare: {arguments: {'qpid.file_size': 20, 'qpid.file_count': 12}}}}")
8.6. Transactions
8.6.1. Transactions
8.6.2. Transactions Example
- .NET/C#
Connection connection = new Connection(broker); Session session = connection.createTransactionalSession(); ... if (smellsOk()) session.Commit(); else session.Rollback();
- C++
Connection connection(broker); Session session = connection.createTransactionalSession(); ... if (smellsOk()) session.commit(); else session.rollback();
Chapter 9. Qpid Management Framework (QMF)
9.1. QMF - Qpid Management Framework
qmf.default.direct/broker where qmf.default.direct is the exchange, with a routing key or subject of broker. The message should contain a reply-to address from which the sender can receive responses.
9.2. QMF Versions
9.3. Creating Exchanges from an Application
test-fanout
Message(subject='broker', reply_to='qmf.default.topic/direct.6da5bfc3-44fb-4441-b834-6c5897b9606a;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_method_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_object_id': {'_object_name': 'org.apache.qpid.broker:broker:amqp-broker'}, '_method_name': 'create', '_arguments': {'strict': True, 'type': 'exchange', 'name': u'test-fanout', 'properties': {'exchange-type': u'fanout'}}})9.4. Broker Exchange and Queue Configuration via QMF
qpid-config command-line utility uses QMF messages to perform many of its administration tasks.
9.5. Command Messages
qmf.default.direct/broker.
See Also:
9.6. QMF Command Message Structure
QMF Command Messages are map messages. A QMF command message contains the keys _object_id, _method_name and _arguments.
_object_id is mandatory. Its value is a nested map identifying the target of the command. For QMF commands that administer the broker and its resources, the _object_id map contains a single value with the key _object_name containing the value org.apache.qpid.broker:broker:amqp-broker. The _object_name value has the following syntax 'package:class:id'. The desired value may be obtained from the schema, using qpid-tool.
_method_name has the name of the command as its value and the key _arguments contains a nested map of command arguments.
Two message properties, x-amqp-0-10.app-id and qmf.opcode must be set. The property x-amqp-0-10.app-id should always have the value qmf2 and qmf.opcode contains the value _method_request.
To receive a response from the server, set the reply-to address of the QMF command message to an address where you can receive messages. After the command message is sent to the broker's QMF address, the response arrives from the reply-to address specified. The response message has the x-amqp-0-10.app-id property set to qmf2 when using amqp0-10.
qmf.opcode property is set to _method_response. If an error was encountered, qmf.opcode property will contain the value _exception.
_arguments. In the case of an exception, details of the exception are within a nested map against the key _values.
9.7. Create Command
create command takes five arguments:
- type
- The type of object to be created, this can be a queue, exchange or binding.
- name
- The name of the object to be created. The
nameargument of a queue or exchange is a single value, for example a queue namedmy-queuesets the name argument to a string of that value. The name of a binding uses the pattern exchange/queue/key, for example:amq.topic/my-queue/my-keyidentifies a binding betweenmy-queueand the exchangeamq.topicwith the binding keymy-key. - properties
- The specific properties for the object to be created, value is a nested map.
- strict
- The strict argument takes a boolean value that is presently ignored. This value is intended to indicate whether the command will fail if any unrecognized properties have been specified.
- auto_delete_timeout
- Optional. If specified upon first declaring an auto-delete queue, specifies a delay, in seconds, after which the deletion will take place. Note: If the queue is re-declared after becoming eligible for deletion, but before the delay expires, then the queue will be not be deleted.
my-queue. In this example my-queue is configured to be auto-deleted after 10 seconds.
- Python
conn = Connection(opts.broker) try: conn.open() ssn = conn.session() snd = ssn.sender("qmf.default.direct/broker") reply_to = "reply-queue; {create:always, node:{x-declare:{auto-delete:true}}}" rcv = ssn.receiver(reply_to) content = { "_object_id": {"_object_name": "org.apache.qpid.broker:broker:amqp-broker"}, "_method_name": "create", "_arguments": {"type":"queue", "name":"my-queue", "properties":{"auto-delete":True, "qpid.auto_delete_timeout":10}} } request = Message(reply_to=reply_to, content=content) request.properties["x-amqp-0-10.app-id"] = "qmf2" request.properties["qmf.opcode"] = "_method_request" snd.send(request) try: response = rcv.fetch(timeout=opts.timeout) if response.properties['x-amqp-0-10.app-id'] == 'qmf2': if response.properties['qmf.opcode'] == '_method_response': return response.content['_arguments'] elif response.properties['qmf.opcode'] == '_exception': raise Exception("Error: %s" % response.content['_values']) else: raise Exception("Invalid response received, unexpected opcode: %s" % m) else: raise Exception("Invalid response received, not a qmfv2 method: %s" % m) except Empty: print "No response received!" except Exception, e: print e except ReceiverError, e: print e except KeyboardInterrupt: pass conn.close()
9.8. Delete Command
delete command takes three arguments:
- type
- The type of object to be deleted, this can be a queue, exchange or binding.
- name
- The name of the object to be deleted. The
nameargument of a queue or exchange is a single value, for examplemy-queue. The name of a binding uses the pattern exchange/queue/key, for example:amq.topic/my-queue/my-keyidentifies a binding betweenmy-queueand the exchangeamq.topicwith the binding keymy-key. - options
- A nested map with the key
options. This is presently unused.
9.9. List Command
- Python
Message(subject='broker', reply_to='qmf.default.topic/direct.8b59a7ae-93f1-4450-9e43-1b0665bf622b;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_query_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_what': 'OBJECT', '_schema_id': {'_class_name': 'exchange'}})
- Python
Message(subject='broker', reply_to='qmf.default.topic/direct.7f703720-c815-4c79-986c-354b3963bc76;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_query_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_what': 'OBJECT', '_schema_id': {'_class_name': 'queue'}})
9.10. Queue and Exchange Creation using QMF
test:
- Python
Message(subject='broker', reply_to='qmf.default.topic/direct.8702f596-b112-427d-b93e-7e0ae28f2ae8;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_method_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_object_id': {'_object_name': 'org.apache.qpid.broker:broker:amqp-broker'}, '_method_name': 'create', '_arguments': {'strict': True, 'type': 'queue', 'name': u'test', 'properties': {}}})
test-fanout:
- Python
Message(subject='broker', reply_to='qmf.default.topic/direct.81915d0a-d2e1-4cf9-9369-921bac725aab;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}', correlation_id='1', properties={'qmf.opcode': '_method_request', 'x-amqp-0-10.app-id': 'qmf2', 'method': 'request'}, content={'_object_id': {'_object_name': 'org.apache.qpid.broker:broker:amqp-broker'}, '_method_name': 'create', '_arguments': {'strict': True, 'type': 'exchange', 'name': u'test-fanout', 'properties': {'exchange-type': u'fanout'}}})
9.11. QMF Events
qmf.default.topic/agent.ind.event.org_apache_qpid_broker.$QMF_Event.#, where $QMF_Event is one of the provided QMF Events from the following table:
Table 9.1. QMF Events
| QMF Event | Severity | Arguments |
|---|---|---|
|
clientConnect
|
inform
|
rhost, user, properties
|
|
clientConnectFail
|
warn
|
rhost, user, reason, properties
|
|
clientDisconnect
|
inform
|
rhost, user, properties
|
|
brokerLinkUp
|
inform
|
rhost
|
|
brokerLinkDown
|
warn
|
rhost
|
|
queueDeclare
|
inform
|
rhost, user, qName, durable, excl, autoDel, altEx, args, disp
|
|
queueDelete
|
inform
|
rhost, user, qName
|
|
exchangeDeclare
|
inform
|
rhost, user, exName, exType, altEx, durable, autoDel, args, disp
|
|
exchangeDelete
|
inform
|
rhost, user, exName
|
|
bind
|
inform
|
rhost, user, exName, qName, key, args
|
|
unbind
|
inform
|
rhost, user, exName, qName, key
|
|
subscribe
|
inform
|
rhost, user, qName, dest, excl, args
|
|
unsubscribe
|
inform
|
rhost, user, dest
|
|
queueThresholdExceeded
|
warn
|
qName, msgDepth, byteDepth
|
See Also:
9.12. QMF Client Connection Events
Table 9.2. QMF Client Connection Event Topics
| QMF queue | Purpose |
|---|---|
|
qmf.default.topic/agent.ind.event.org_apache_qpid_broker.clientConnect.#
|
Client connections
|
|
qmf.default.topic/agent.ind.event.org_apache_qpid_broker.clientConnectFail.#
|
Failed connection attempts
|
|
qmf.default.topic/agent.ind.event.org_apache_qpid_broker.clientDisconnect.#
|
Client disconnections
|
client_ppid[1]client_pidclient_process
Fetched Message(
properties={
u'qmf.agent': u'apache.org:qpidd:a2ff61bc-19b2-4078-8a7e-9c007151c79c',
'x-amqp-0-10.routing-key': u'agent.ind.event.org_apache_qpid_broker.clientConnect.info.apache_org.qpidd.a2ff61bc-19b2-4078-8a7e-9c007151c79c',
'x-amqp-0-10.app-id': 'qmf2',
u'qmf.content': u'_event',
u'qmf.opcode': u'_data_indication',
u'method': u'indication'},
content=[{
u'_schema_id': {
u'_package_name': 'org.apache.qpid.broker',
u'_class_name': 'clientConnect',
u'_type': '_event',
u'_hash': UUID('476930ed-01dd-9629-7f84-f42b4b0bc410')},
u'_timestamp': 1347032560197086881,
u'_values': {
u'user': 'anonymous',
u'properties': {
u'qpid.session_flow': 1,
u'qpid.client_ppid': 26139,
u'qpid.client_pid': 26876,
u'qpid.client_process': u'spout'},
u'rhost': '127.0.0.1:5672-127.0.0.1:43276'},
u'_severity': 6}])
Fri Sep 7 15:42:40 2012 org.apache.qpid.broker:clientConnect user=anonymous properties={
u'qpid.session_flow': 1,
u'qpid.client_ppid': 26139,
u'qpid.client_pid': 26876,
u'qpid.client_process': u'spout'}
rhost=127.0.0.1:5672-127.0.0.1:432769.13. ACL Lookup Query Methods
# Catch 22: allow anonymous to access the lookup debug functions acl allow-log anonymous create queue acl allow-log anonymous all exchange name=qmf.* acl allow-log anonymous all exchange name=amq.direct acl allow-log anonymous all exchange name=qpid.management acl allow-log anonymous access method name=Lookup*
Lookup and LookupPublish.
Lookup method is a general query for any action, object, and set of properties. The LookupPublish method is the optimized, per-message fastpath query.
allow, deny, allow-log, or deny-log.
Method: Lookup
Table 9.3. Method: Lookup
| Argument | Type | Direction |
|---|---|---|
|
userId
|
long-string
|
I
|
|
action
|
long-string
|
I
|
|
object
|
long-string
|
I
|
|
objectName
|
long-string
|
I
|
|
propertyMap
|
field-table
|
I
|
|
result
|
long-string
|
O
|
Method: LookupPublish
Table 9.4. Method: LookupPublish
| Argument | Type | Direction |
|---|---|---|
|
userId
|
long-string
|
I
|
|
exchangeName
|
long-string
|
I
|
|
routingKey
|
long-string
|
I
|
|
result
|
long-string
|
O
|
Management Properties and Statistics
Table 9.5. Broker Management Quota Property
| Element | Type | Access | Description |
|---|---|---|---|
|
maxConnections
|
uint16
|
ReadOnly
|
Maximum allowed connections
|
Table 9.6. ACL Management Properties
| Element | Type | Access | Description |
|---|---|---|---|
|
maxConnectionsPerIp
|
uint16
|
ReadOnly
|
Maximum allowed connections
|
|
maxConnectionsPerUser
|
uint16
|
ReadOnly
|
Maximum allowed connections
|
|
maxQueuesPerUser
|
uint16
|
ReadOnly
|
Maximum allowed queues
|
|
connectionDenyCount
|
uint64
| |
Number of connections denied
|
|
queueQuotaDenyCount
|
uint64
| |
Number of queue creations denied
|
Example
Procedure 9.1. ACL Lookup Example
- Start the broker using the example ACL file
acl-test-01-rules.aclreproduced below, and withQPID_LOG_ENABLE=debug+:acl. - Run the Python script
acl-test-01.py. - Examine the Python program output and the broker log.
ACL File acl-test-01-rules.acl
# acl-test-rules-00.acl
# 27-march-2012
group admins moe@COMPANY.COM \
larry@COMPANY.COM \
curly@COMPANY.COM \
shemp@COMPANY.COM
group auditors aaudit@COMPANY.COM baudit@COMPANY.COM caudit@COMPANY.COM \
daudit@COMPANY.COM eaduit@COMPANY.COM eaudit@COMPANY.COM
group tatunghosts tatung01@COMPANY.COM \
tatung02/x86.build.company.com@COMPANY.COM \
tatung03/x86.build.company.com@COMPANY.COM \
tatung04/x86.build.company.com@COMPANY.COM \
HTTP/tatung-test1.eng.company.com@COMPANY.COM
group publishusers publish@COMPANY.COM x-pubs@COMPANY.COM
# Admins: This should be the *only* group which ever gets "all" access
# to anything. Everything/everyone else must not be as permissive
acl allow-log admins all all
# Catch 22: allow anonymous to access the lookup debug functions
acl allow-log anonymous create queue
acl allow-log anonymous all exchange name=qmf.*
acl allow-log anonymous all exchange name=amq.direct
acl allow-log anonymous all exchange name=qpid.management
acl allow-log anonymous access method name=Lookup*
acl allow all publish exchange name=''
# Auditors
acl allow-log auditors all exchange name=company.topic routingkey=private.audit.*
# Tatung
acl allow-log tatunghosts publish exchange name=company.topic routingkey=tatung.*
acl allow-log tatunghosts publish exchange name=company.direct routingkey=tatung-service-queue
# Publish
acl allow-log publishusers create queue
acl allow-log publishusers publish exchange name=qpid.management routingkey=broker
acl allow-log publishusers publish exchange name=qmf.default.topic routingkey=*
acl allow-log publishusers publish exchange name=qmf.default.direct routingkey=*
# Consumers - everyone
acl allow-log all bind exchange name=company.topic routingkey=tatung.*
acl allow-log all bind exchange name=company.direct routingkey=tatung-service-queue
acl allow-log all consume queue
acl allow-log all access exchange
acl allow-log all access queue
acl allow-log all create queue name=tmp.* durable=false autodelete=true exclusive=true policytype=ring
# All else is denied
acl deny-log all all
Python Script acl-test-01.py
# acl-test-00.py
# test driver for QPID-3918 lookup hooks.
#
# The broker is to use acl-test-00-rules.acl.
#
import sys
import qpid
import qmf
totalLookups = 0
failLookups = 0
exitOnError = True
#
# Run a type 1 lookup
# This is the general lookup
#
def Lookup(acl, userName, action, aclObj, aclObjName, propMap, expectedResult = ''):
global totalLookups
global failLookups
totalLookups += 1
result = acl.Lookup(userName, action, aclObj, aclObjName, propMap)
suffix = ''
if (expectedResult != ''):
if (result.result != expectedResult):
failLookups += 1
suffix = ', [ERROR: Expected ' + expectedResult + "]"
if (result.result is None):
suffix = suffix + ', [' + result.text + ']'
print 'Lookup : [name:', userName, ", action: ", action, ", object: ", aclObj, \
", objName: '", aclObjName, "', properties: ", propMap, \
"], [Result: ", result.result, "]", suffix
if (exitOnError and failLookups > 0):
sys.exit()
#
# Run a type 2 lookup
# This is a specific PUBLISH EXCHANGE ['user', 'exchangeName', 'routingKey'] lookup
#
def LookupPublish(acl, userName, exchName, keyName, expectedResult = ''):
global totalLookups
global failLookups
totalLookups += 1
result = acl.LookupPublish(userName, exchName, keyName)
suffix = ''
if (expectedResult != ''):
if (result.result != expectedResult):
failLookups += 1
suffix = ', [ERROR: Expected ' + expectedResult + "]"
if (result.result is None):
suffix = suffix + ', [' + result.text + ']'
print 'LookupPublish : [name:', userName, \
", exchName: '", exchName, "', key: ", keyName, \
"], [Result: ", result.result, "]", suffix
if (exitOnError and failLookups > 0):
sys.exit()
#
# AllBut
#
# Given All names and some names we don't want,
# return the All list with the targets removed
#
def AllBut(allList, removeList):
tmpList = allList[:]
for item in removeList:
try:
tmpList.remove(item)
except Exception, e:
print "ERROR in AllBut() \nallList = %s \nremoveList = %s \nerror = %s " \
% (allList, removeList, e)
return tmpList
#
# Main
#
# Fire up a session and get the acl methods
#
from qmf.console import Session
sess = Session()
broker = sess.addBroker()
acls = sess.getObjects(_class="acl", _package="org.apache.qpid.acl")
acl = acls[0]
# print acl.getMethods() # just to see the method names available
#
# define some group lists
#
g_admins = ['moe@COMPANY.COM', \
'larry@COMPANY.COM', \
'curly@COMPANY.COM', \
'shemp@COMPANY.COM']
g_auditors = [ 'aaudit@COMPANY.COM','baudit@COMPANY.COM','caudit@COMPANY.COM', \
'daudit@COMPANY.COM','eaduit@COMPANY.COM','eaudit@COMPANY.COM']
g_tatunghosts = ['tatung01@COMPANY.COM', \
'tatung02/x86.build.company.com@COMPANY.COM', \
'tatung03/x86.build.company.com@COMPANY.COM', \
'tatung04/x86.build.company.com@COMPANY.COM', \
'HTTP/tatung-test1.eng.company.com@COMPANY.COM']
g_publishusers = ['publish@COMPANY.COM', 'x-pubs@COMPANY.COM']
g_public = ['jpublic@COMPANY.COM', 'me@yahoo.com']
g_all = g_admins + g_auditors + g_tatunghosts + g_publishusers + g_public
action_all = ['consume','publish','create','access','bind','unbind','delete','purge','update']
#
# Run some tests
#
print '#'
print '# admin'
print '#'
for u in g_admins:
Lookup(acl, u, "create", "queue", "anything", {"durable":"true"}, "allow-log")
print '#'
print '# auditors'
print '#'
uInTest = g_auditors + g_admins
uOutTest = AllBut(g_all, uInTest)
for u in uInTest:
LookupPublish(acl, u, "company.topic", "private.audit.This", "allow-log")
for u in uInTest:
for a in action_all:
Lookup(acl, u, a, "exchange", "company.topic", {"routingkey":"private.audit.This"}, "allow-log")
for u in uOutTest:
LookupPublish(acl, u, "company.topic", "private.audit.This", "deny-log")
Lookup(acl, u, "bind", "exchange", "company.topic", {"routingkey":"private.audit.This"}, "deny-log")
print '#'
print '# tatungs'
print '#'
uInTest = g_admins + g_tatunghosts
uOutTest = AllBut(g_all, uInTest)
for u in uInTest:
LookupPublish(acl, u, "company.topic", "tatung.this2", "allow-log")
LookupPublish(acl, u, "company.direct", "tatung-service-queue", "allow-log")
for u in uOutTest:
LookupPublish(acl, u, "company.topic", "tatung.this2", "deny-log")
LookupPublish(acl, u, "company.direct", "tatung-service-queue", "deny-log")
for u in uOutTest:
for a in ["bind", "access"]:
Lookup(acl, u, a, "exchange", "company.topic", {"routingkey":"tatung.this2"}, "allow-log")
Lookup(acl, u, a, "exchange", "company.direct", {"routingkey":"tatung-service-queue"}, "allow-log")
print '#'
print '# publishusers'
print '#'
uInTest = g_admins + g_publishusers
uOutTest = AllBut(g_all, uInTest)
for u in uInTest:
LookupPublish(acl, u, "qpid.management", "broker", "allow-log")
LookupPublish(acl, u, "qmf.default.topic", "this3", "allow-log")
LookupPublish(acl, u, "qmf.default.direct", "this4", "allow-log")
for u in uOutTest:
LookupPublish(acl, u, "qpid.management", "broker", "deny-log")
LookupPublish(acl, u, "qmf.default.topic", "this3", "deny-log")
LookupPublish(acl, u, "qmf.default.direct", "this4", "deny-log")
for u in uOutTest:
for a in ["bind"]:
Lookup(acl, u, a, "exchange", "qpid.management", {"routingkey":"broker"}, "deny-log")
Lookup(acl, u, a, "exchange", "qmf.default.topic", {"routingkey":"this3"}, "deny-log")
Lookup(acl, u, a, "exchange", "qmf.default.direct", {"routingkey":"this4"}, "deny-log")
for a in ["access"]:
Lookup(acl, u, a, "exchange", "qpid.management", {"routingkey":"broker"}, "allow-log")
Lookup(acl, u, a, "exchange", "qmf.default.topic", {"routingkey":"this3"}, "allow-log")
Lookup(acl, u, a, "exchange", "qmf.default.direct", {"routingkey":"this4"}, "allow-log")
#
# Report statistics
#
print 'Total Lookups: ', totalLookups
print 'Failed Lookups: ', failLookups
#
# Close the session
#
sess.close()9.14. Using QMF in a Cluster
Chapter 10. The Qpid Messaging API
10.1. Handling Exceptions
10.1.1. Messaging Exceptions Reference
10.1.2. C++ Messaging Exceptions Class Hierarchy
- MessagingException
- The base class for Messaging exceptions.
- InvalidOptionString : public MessagingException
- Thrown when the syntax of the option string used to configure a connection is not valid.
- KeyError : public MessagingException
- Thrown to indicate a failed lookup of some local object. For example when attempting to retrieve a session, sender or receiver by name.
- LinkError : public MessagingException
- Base class for exceptions thrown to indicate a failed lookup of some local object.
- AddressError : public LinkError
- Thrown to indicate a failed lookup of some local object. For example when attempting to retrieve a session, sender or receiver by name.
- ResolutionError : public AddressError
- Thrown when a syntactically correct address cannot be resolved or used.
- AssertionFailed : public ResolutionError
- Thrown when creating a sender or receiver for an address for which some asserted property of the node is not matched.
- NotFound : public ResolutionError
- Thrown on attempts to create a sender or receiver to a non-existent node.
- MalformedAddress : public AddressError
- Thrown when an address string with invalid syntax is used.
- ReceiverError : public LinkError
- FetchError : public ReceiverError
- NoMessageAvailable : public FetchError
- Thrown by Receiver::fetch(), Receiver::get() and Session::nextReceiver() to indicate that there no message was available before the timeout specified.
- SenderError : public LinkError
- SendError : public SenderError
- TargetCapacityExceeded : public SendError
- Thrown to indicate that the sender attempted to send a message that would result in the target node on the peer exceeding a preconfigured capacity.
- SessionError : public MessagingException
- TransactionError : public SessionError
- TransactionAborted : public TransactionError
- Thrown on Session::commit() if reconnection results in the transaction being automatically aborted.
- TransactionUnknown : public TransactionError
- The outcome of the transaction on the broker (commit or roll-back) is not known. This occurs when the connection fails after the commit was sent, but before a response is received.
- UnauthorizedAccess : public SessionError
- Thrown to indicate that the application attempted to do something for which it was not authorized by its peer.
- UnauthorizedAccess : public SessionError
- ConnectionError : public MessagingException
- TransportFailure : public MessagingException
- Thrown to indicate loss of underlying connection. When auto-reconnect is used this will be caught by the library and used to trigger reconnection attempts. If reconnection fails (according to whatever settings have been configured), then an instance of this class will be thrown to signal that.
10.1.3. Connection Exceptions
qpid::messaging namespace.
- Connection::Connection(const std::string&, const qpid::types::Variant::Map&)
MessagingExceptionif any of the options in the supplied map are not recognised.qpid::types::InvalidConversionif any of the option values are of the wrong type.- Connection::Connection(const std::string& url, const std::string& options)
MessagingExceptionif any of the options in the supplied map are not recognised.qpid::types::InvalidConversionif any of the option values are of the wrong type.InvalidOptionStringif the format of the option string is invalid.- Connection::setOption(const std::string& name, const qpid::types::Variant& value)
MessagingExceptionif the named option is not recognised.qpid::types::InvalidConversionif the option value is of the wrong type.- Connection::open()
qpid::Url::Invalidif the url is not valid (this may be the url supplied on construction or any of the reconnect_urls supplied via options).TransportFailureif a connection could not be established.ConnectionErrorfor any other failure, including where the broker sends a connection.close control before the AMQP 0-10 defined connection handshake completes.qpid::types::InvalidConversionif the broker sends an improperly encoded value for the 'known-host' field of theconnection.open-ok controlas defined by AMQP 0-10 specification.- Connection::isOpen()
- Does not throw exceptions.
- Connection::close()
TargetCapacityExceededif any of the sessions established for the connection have attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif any of the sessions established for the connection have attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends a connection.close control (i.e. if broker initiates closing of an active connection just before the client does).MessagingExceptionif the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session while the close is in progress).TransportFailureif a connection was lost while trying to perform the close 'handshake' with the broker.- Connection::createTransactionalSession(const std::string& name)
SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected which could happen on enabling transactions for the session (e.g. if the broker in question did not support transactions).ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of the session before it becomes active).TransportFailureif the connection was lost (and if automatic reconnect is enabled could not be re-established).qpid::Url::Invalidif reconnect is enabled and a url in thereconnect_urlsoption list is invalid.qpid::types::InvalidConversionif the broker were to send an improperly encoded value for the 'known-host' field of theconnection.open-okcontrol as defined by AMQP 0-10 specification.- Connection::createSession(const std::string&)
ConnectionErrorif the broker to which the client is connected sends a connection.close control (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of the session before it becomes active).TransportFailureif the connection was lost (and if automatic reconnect is enabled could not be re-established).qpid::Url::Invalidif reconnect is enabled and a url in thereconnect_urlsoption list is invalid.qpid::types::InvalidConversionif the broker were to send an improperly encoded value for the 'known-host' field of the connection.open-ok control as defined by AMQP 0-10 specification.- Connection::getSession(const std::string&)
KeyErrorif no session for the specified name exists.- Connection::getAuthenticatedUsername()
- Does not throw any exception.
10.1.4. Session Exceptions
qpid::messaging namespace.
- Session::close()
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::commit()
TransactionAbortedif the original AMQP 0-10 session is lost, e.g. due to failover, forcing an automatic rollback.TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::rollback()
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::acknowledge(bool)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::acknowledge(Message&, bool)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::acknowledgeUpTo(Message&, bool)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::reject(Message&)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.ThrowsSessionErrorif an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::release(Message&)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailure if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::sync(bool)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::getReceivable()
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::getUnsettledAcks()
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailure if a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::nextReceiver(Receiver&, Duration)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::nextReceiver(Duration)
Receiver::NoMessageAvailableif no message became available in time.TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.ThrowsSessionErrorif an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::createSender(const Address&)
ResolutionErrorif there is an error in resolving the address.TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::createSender(const std::string&)
ResolutionErrorif there is an error in resolving the address.MalformedAddressif the syntax of the address string is not valid.TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::createReceiver(const Address&)
ResolutionErrorif there is an error in resolving the address.TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends a connection.close control (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::createReceiver(const std::string&)
ResolutionErrorif there is an error in resolving the address.MalformedAddressif the syntax of the address string is not valid.TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Session::getSender(const std::string&)
KeyErrorif there is no sender for the specified name.- Session::getReceiver(const std::string&)
- KeyError if there is no receiver for the specified name.
- Session::checkError()
qpid::messaging::SessionErrorif an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.qpid::messaging::ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).qpid::messaging::MessagingExceptionif the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).- Session::getConnection()
- Does not throw exceptions.
- Session::hasError()
- Does not throw exceptions.
10.1.5. Sender Exceptions
qpid::messaging namespace.
- Sender::send(const Message& message, bool)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends a session.detached control (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Sender::close()
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Sender::setCapacity(uint32_t)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Sender::getUnsettled()
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Sender::getAvailable()
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Sender::getCapacity()
- Does not throw exceptions.
- Sender::getName()
- Does not throw exceptions.
- Sender::getSession()
- Does not throw exceptions.
10.1.6. Receiver Exceptions
qpid::messaging namespace.
- Receiver::get(Message& message, Duration timeout=Duration::FOREVER)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::Message get(Duration timeout=Duration::FOREVER)
NoMessageAvailableif there is no message to give after waiting for the specified timeout, or if the Receiver is closed, in which caseisClose()will be true.TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::fetch(Message& message, Duration timeout=Duration::FOREVER)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif an execution.exception command, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::fetch(Duration timeout=Duration::FOREVER)
NoMessageAvailableif there is no message to give after waiting for the specified timeout, or if the Receiver is closed, in which caseisClose()will be true.TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::setCapacity(uint32_t)
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::getAvailable()
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::getUnsettled()
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::close()
TargetCapacityExceededif the session has attempted to send a message that would result in a queue exceeding configured limits.UnauthorizedAccessif the session has attempted to perform an operation for which it has not been granted permission.SessionErrorif anexecution.exceptioncommand, as defined in AMQP 0-10, is received from the broker to which the client is connected.ConnectionErrorif the broker to which the client is connected sends aconnection.closecontrol (i.e. if broker initiates closing of an active connection).MessagingExceptionif the broker to which the client is connected sends asession.detachedcontrol (i.e. if broker initiates closing of an active session).TransportFailureif a connection was lost (and if automatic reconnect is enabled could not be re-established).- Receiver::isClosed()
- Does not throw exceptions.
- Receiver::getCapacity()
- Does not throw exceptions.
- Receiver::getName()
- Does not throw exceptions.
- Receiver::getSession()
- Does not throw exceptions.
Chapter 11. Addresses
11.1. x-declare Parameters
x-declare part of an address string:
Table 11.1.
| Parameter | Usage |
|---|---|
auto-delete
| boolean specifying if the queue/exchange should be auto-deleted
|
exclusive
| boolean specifying exclusiveness of the queue/exchange
|
alternate-exchange
|
alternate exchange where messages shall be routed to when this queue is deleted / the exchange fails to find a matching bind for a message
|
arguments
|
a nested map with arguments available specifically for the queue / exchange. Refer to https://cwiki.apache.org/confluence/display/qpid/Qpid+extensions+to+AMQP for further details.
|
11.2. Address String Options Reference
Table 11.2.
| Option | Value | Semantics |
|---|---|---|
assert
|
one of:
always, never, sender or receiver
|
Asserts that the properties specified in the node option match whatever the address resolves to. If they do not, resolution fails and an exception is raised.
|
create
|
one of:
always, never, sender or receiver
|
Creates the node to which an address refers if it does not exist. No error is raised if the node does exist. The details of the node may be specified in the node option.
|
delete
|
one of:
always, never, sender or receiver
|
Delete the node when the sender or receiver is closed.
|
node
|
A nested map containing
node properties.
|
Specifies properties of the node to which the address refers. These are used in conjunction with the
assert or create options.
|
link
|
A nested map containing
link properties.
|
Used to control the establishment of a conceptual link from the client application to or from the target/source address.
|
mode
|
one of:
browse, consume
|
This option is only of relevance for source addresses that resolve to a queue. If browse is specified the messages delivered to the receiver are left on the queue rather than being removed. If consume is specified the normal behavior applies; messages are removed from the queue once the client acknowledges their receipt.
|
11.3. Node Properties
Table 11.3.
| Property | Value | Semantics |
|---|---|---|
type
|
one of:
topic, queue
|
Indicates the type of the node.
|
durable
|
one of:
True, False
|
Indicates whether the node survives a loss of volatile storage e.g. if the broker is restarted.
|
x-declare
|
A nested map whose values correspond to the valid fields on an AMQP 0-10
queue-declare or exchange-declare command.
|
These values are used to fine tune the creation or assertion process. Note however that they are protocol specific.
|
x-bindings
|
A nested list in which each binding is represented by a map. The entries of the map for a binding contain the fields that describe an AMQP 0-10 binding. Here is the format for x-bindings:
[
{
exchange: <exchange>,
queue: <queue>,
key: <key>,
arguments: {
<key_1>: <value_1>,
...,
<key_n>: <value_n> }
},
...
]
|
In conjunction with the create option, each of these bindings is established as the address is resolved. In conjunction with the assert option, the existence of each of these bindings is verified during resolution. Again, these are protocol specific.
|
properties |
A nested map of AMQP 1.0 properties.
| A nested map of properties specified through properties is recommended over use of x-declare, which generates the nested map of properties when it is used. |
capabilities | A single string or list of strings representing AMQP 1.0 capabilities. | A list containing the AMQP 1.0 capabilities requested from the source or target. |
11.4. Link Properties
Table 11.4.
| Option | Value | Semantics |
|---|---|---|
reliability
|
Currently only
unreliable and at-least-once are supported. See the footnotes for further details.
Reliability indicates the level of link reliability requested by the sender or receiver.
unreliable and at-most-once are currently treated as synonyms, and allow messages to be lost if a broker crashes or the connection to a broker is lost. at-least-once guarantees that a message is not lost, but duplicates may be received. exactly-once guarantees that a message is not lost, and is delivered precisely once.
| |
durable
|
One of:
True, False.
|
Indicates whether the link survives a loss of volatile storage e.g. if the broker is restarted.
|
x-declare
|
A nested map whose values correspond to the valid fields of an AMQP 0-10
queue-declare command.
|
These values can be used to customize the subscription queue in the case of receiving from an exchange. Note however that they are protocol specific.
|
x-subscribe
|
A nested map whose values correspond to the valid fields of an AMQP 0-10
message-subscribe command.
|
These values can be used to customize the subscription.
|
x-bindings
|
A nested list each of whose entries is a map that may contain fields (
queue, exchange, key and arguments) describing an AMQP 0-10 binding.
|
These bindings are established during resolution independent of the create option. They are considered logically part of the linking process rather than of node creation.
|
filter
|
A map containing
name, descriptor, and value, describing an AMQP 1.0 filter.
| name
descriptor is a string descriptor identifying the filter type; value is value for the filter, whose type is dictated by the type of filter (for example: string for legacy-amqp-direct-binding, and map for legacy-amqp-headers-binding). |
11.5. Address String Grammar
The following regular expressions define the tokens used to parse address strings:
LBRACE: \\{
RBRACE: \\}
LBRACK: \\[
RBRACK: \\]
COLON: :
SEMI: ;
SLASH: /
COMMA: ,
NUMBER: [+-]?[0-9]*\\.?[0-9]+
ID: [a-zA-Z_](?:[a-zA-Z0-9_-]*[a-zA-Z0-9_])?
STRING: "(?:[^\\\\"]|\\\\.)*"|\'(?:[^\\\\\']|\\\\.)*\'
ESC: \\\\[^ux]|\\\\x[0-9a-fA-F][0-9a-fA-F]|\\\\u[0-9a-fA-F][0-9a-fA-F][0-9a-fA-F][0-9a-fA-F]
SYM: [.#*%@$^!+-]
WSPACE: [ \\n\\r\\t]+The formal grammar for addresses is given below:
address := name [ SLASH subject ] [ ";" options ]
name := ( part | quoted )+
subject := ( part | quoted | SLASH )*
quoted := STRING / ESC
part := LBRACE / RBRACE / COLON / COMMA / NUMBER / ID / SYM
options := map
map := "{" ( keyval ( "," keyval )* )? "}"
keyval "= ID ":" value
value := NUMBER / STRING / ID / map / list
list := "[" ( value ( "," value )* )? "]"The address string options map supports the following parameters:
<name> [ / <subject> ] ; {
create: always | sender | receiver | never,
delete: always | sender | receiver | never,
assert: always | sender | receiver | never,
mode: browse | consume,
node: {
type: queue | topic,
durable: True | False,
x-declare: { ... <declare-overrides> ... },
x-bindings: [<binding_1>, ... <binding_n>]
},
link: {
name: <link-name>,
durable: True | False,
reliability: unreliable | at-most-once | at-least-once | exactly-once,
x-declare: { ... <declare-overrides> ... },
x-bindings: [<binding_1>, ... <binding_n>],
x-subscribe: { ... <subscribe-overrides> ... }
}
}
<name> [ / <subject> ] ; {
create: always | sender | receiver | never,
assert: always | sender | receiver | never,
mode: browse | consume,
node: {
type: queue | topic,
durable: True | False,
properties: { ... <nested-map> ... }[2],
capabilities: [<capability_1>, ... <capability_n>]
},
link: {
name: <link-name>,
durable: True | False,
reliability: unreliable | at-most-once | at-least-once | exactly-once,
filter: { name: <name>, descriptor: <filter-descriptor>, value: <filter-value> }
}
}
The create, delete (AMQP 0-10 only), and assert policies specify who should perform the associated action:
- always
- the action is performed by any messaging client
- sender
- the action is only performed by a sender
- receiver
- the action is only performed by a receiver
- never
- the action is never performed (this is the default)
The node-type is one of:
- topic
- in the AMQP 0-10 mapping, a topic node defaults to the topic exchange, x-declare may be used to specify other exchange types
- queue
- this is the default
node-type
The following AMQP 1.0 filters are implemented in MRG 3:
legacy-amqp-direct-bindinglegacy-amqp-topic-bindinglegacy-amqp-headers-bindingselector-filterxquery-filter
11.6. Connection Options
11.7. Setting Connection Options
- Python
connection = Connection("localhost:5672", reconnect = True, reconnect_urls = "amqp:tcp:127.0.0.1:5674", heartbeat = 1) try: connection.open()- C++
Connection connection("localhost:5672", "{reconnect: true, reconnect_urls:'amqp:tcp:127.0.0.1:5674', reconnect:true, heartbeat: 1}"); try { connection.open();- .NET/C#
Connection connection= new Connection("localhost:5672", "{reconnect: true, reconnect_urls:'amqp:tcp:127.0.0.1:5674', reconnect:true, heartbeat: 1}"); try { connection.Open();
- Python
connection = Connection("localhost:5672") connection.reconnect = True try: connection.Open()- C++
Connection connection("localhost:5672"); connection.setOption("reconnect", true); try { connection.open();- .NET/C#
Connection connection = new Connection("localhost:5672"); connection.SetOption("reconnect", true); try { connection.Open();
11.8. Connection Options Reference
Table 11.5. Connection Options (General)
| Option name | Value type | Semantics |
|---|---|---|
username | string | The username to use when authenticating to the broker. |
password | string | The password to use when authenticating to the broker. |
heartbeat | integer | Requests that heartbeats be sent every N seconds. If two successive heartbeats are missed, the connection is considered lost and will fail or start the reconnect process if configured to do so. |
max-channels | integer | Restricts the maximum number of supported channels, to assist with tuning the Messaging API. Not supported in AMPQ 1.0. |
max-frame-size | integer |
Restricts the maximum frame size, to assist with tuning the Messaging API. Not supported in AMPQ 1.0.
The minimum value should be at least 4096B; anything lower will cause authentication failures. The product does not enforce this restriction.
|
protocol | string | The AMQP protocol to use. The recognized values are 'amqp1.0' and 'amqp0-10'. AMQP 0-10 is the default. Note: Not supported in Python client. |
reconnect | boolean | Transparently reconnect if the connection is lost. |
reconnect_urls | Broker address list | A list of one or more brokers to attempt communication with when a connection fails. |
reconnect_urls_replace | boolean |
Controls how setting the reconnect_urls option is treated. If true, setting reconnect_urls causes the old list to be replaced with the new one. If false, the new list is appended to the old list. The default value is false. |
reconnect_timeout | float | Total number of seconds to continue reconnection attempts before giving up and raising an exception. |
reconnect_limit | integer | Maximum number of reconnection attempts before giving up and raising an exception. |
reconnect_interval_min | float | Minimum number of seconds between reconnection attempts. The first reconnection attempt is made immediately; if that fails, the first reconnection delay is set to the value of reconnect_interval_min; if that attempt fails, the reconnect interval increases exponentially until a reconnection attempt succeeds or reconnect_interval_max is reached. This value can be fractional. For example, 0.001 sets the maximum reconnect interval to one millisecond. |
reconnect_interval_max | float | Maximum reconnect interval in seconds. This value can be fractional. For example, 0.001 sets the maximum reconnect interval to one millisecond. |
reconnect_interval | float | Sets both reconnection_interval_min and reconnection_interval_max to the same number of seconds. |
sasl_mechanisms | string | The specific SASL mechanisms to use when authenticating to the broker as a space separated list. |
sasl_service | string | The service name if needed by the SASL mechanism in use. |
sasl_min_ssf | integer | The minimum acceptable security strength factor. |
sasl_max_ssf | integer | The maximum acceptable security strength factor. |
ssl_cert_name | string | Name of the certificate to use for a given client. |
ssl_ignore_hostname_verification_failure | boolean | Disables authentication of the server to the client (and should be used only as a last resort). If set to true, the client can connect to the server even if the hostname used (or IP address) does not match what is in the servers certificate. |
tcp_nodelay | boolean | Set tcp_no_delay, i.e. disable Nagle algorithm. Note: Not Supported in Python client. |
transport | string | Sets the underlying transport protocol used. The default option is tcp. To enable ssl, set to ssl. The C++ client additionally supports rdma. |
Table 11.6. Connection Options (Python Client Only)
| Option name | Value type | Semantics |
|---|---|---|
address_ttl | float | Time until cached address resolution expires. |
host | string | The name or ip address of the remote host (overridden by url). |
port | integer | The port number of the remote host (overridden by url). |
ssl_certfile | string | File with client's public key (PEM format). |
ssl_keyfile | string | File with client's private key (PEM format). |
ssl_trustfile | string | File with trusted certificates to validate the server. |
url | string | [ <username> [ / <password> ] @ ] <host> [ : <port> ]. |
Table 11.7. Connection Options (AMPQ 1.0 Only)
| Option name | Value type | Semantics |
|---|---|---|
container_id | string | The container ID to use for the connection. |
nest_annotations | boolean | If true, annotations in received messages are presented as properties with keys x-amqp-delivery-annotations or x-amqp-delivery-annotations. The values consist of nested maps containing the annotations. If false, the annotations are merged in with the properties. |
set_to_on_send | boolean | If true, all sent messages will have the to field set to the node name of the sender. |
properties or client_properties | integer | The properties to include in the open frame sent. |
properties nested map is recommended. The x-declare map is supported as a convenience and is automatically converted to a properties map before sending to the broker.
Chapter 12. Message Timestamping
12.1. Message Timestamping
12.2. Enable Message Timestamping at Broker Start-up
--enable-timestamp yes argument:
./qpidd --enable-timestamp yes
12.3. Enable Message Timestamping from an Application
getTimestampConfig and setTimestampConfig get and set the timestamping configuration.
- getTimestampConfig
- Returns
Trueif received messages are timestamped. - setTimestampConfig
- Set
Trueto enable timestamping received messages,Falseto disable timestamping.
12.4. Access a Message Timestamp in Python
try:
msg = receiver.fetch(timeout=1)
if "x-amqp-0-10.timestamp" in msg.properties:
print("Timestamp=%s" % str(msg.properties["x-amqp-0-10.timestamp"]))
except Empty:
pass
12.5. Access a Message Timestamp in C++
messaging::Message msg;
if (receiver.fetch(msg, messaging::Duration::SECOND*1)) {
if (msg.getProperties().find("x-amqp-0-10.timestamp") !=
msg.getProperties().end()) {
std::cout << "Timestamp=" <<
msg.getProperties()["x-amqp-0-10.timestamp"].asString() << std::endl;
}
}
12.6. Using AMQ 0-10 Message Property Keys for Timestamping
delivery-properties.timestamp), the timestamp value can be accessed using the x-amqp-0-10.timestamp message property.
See Also:
Chapter 13. Maps and Lists
13.1. Maps and Lists in Message Content
13.2. Map and List Representation in Native Data Types
Table 13.1. Map and List Representation in Supported Languages
| Language | map | list |
|---|---|---|
| Python | dict | list |
| C++ | Variant::Map | Variant::List |
| Java | MapMessage | ListMessage |
| .NET | Dictionary<string, object> | Collection<object> |
13.3. Qpid Maps and Lists in Python
- Python
from qpid.messaging import * # !!! SNIP !!! content = {'Id' : 987654321, 'name' : 'Widget', 'percent' : 0.99} content['colours'] = ['red', 'green', 'white'] content['dimensions'] = {'length' : 10.2, 'width' : 5.1,'depth' : 2.0}; content['parts'] = [ [1,2,5], [8,2,5] ] content['specs'] = {'colors' : content['colours'], 'dimensions' : content['dimensions'], 'parts' : content['parts'] } message = Message(content=content) sender.send(message)
13.4. Python Data Types in Maps
Table 13.2. Python Data Types in Maps
| Python Data Type | → C++ | → Java |
|---|---|---|
bool | bool | boolean |
int | int64 | long |
long | int64 | long |
float | double | double |
unicode | string | java.lang.String |
uuid | qpid::types::Uuid | java.util.UUID |
dict | Variant::Map | java.util.Map |
list | Variant::List | java.util.List |
13.5. Qpid Maps and Lists in C++
using namespace qpid::types;
// !!! SNIP !!!
Message message;
Variant::Map content;
content["id"] = 987654321;
content["name"] = "Widget";
content["percent"] = 0.99;
Variant::List colours;
colours.push_back(Variant("red"));
colours.push_back(Variant("green"));
colours.push_back(Variant("white"));
content["colours"] = colours;
Variant::Map dimensions;
dimensions["length"] = 10.2;
dimensions["width"] = 5.1;
dimensions["depth"] = 2.0;
content["dimensions"]= dimensions;
Variant::List part1;
part1.push_back(Variant(1));
part1.push_back(Variant(2));
part1.push_back(Variant(5));
Variant::List part2;
part2.push_back(Variant(8));
part2.push_back(Variant(2));
part2.push_back(Variant(5));
Variant::List parts;
parts.push_back(part1);
parts.push_back(part2);
content["parts"]= parts;
Variant::Map specs;
specs["colours"] = colours;
specs["dimensions"] = dimensions;
specs["parts"] = parts;
content["specs"] = specs;
message.setContentObject(content);
sender.send(message, true);13.6. C++ Data Types in Maps
Table 13.3. C++ Data Types in Maps
C++ Data Type | → Python | → Java |
|---|---|---|
bool | bool | boolean |
uint16 | int | long | short |
uint32 | int | long | int |
uint64 | int | long | long |
int16 | int | long | short |
int32 | int | long | int |
int64 | int | long | long |
float | float | float |
double | float | double |
string | unicode | java.lang.String |
qpid::types::Uuid | uuid | java.util.UUID |
Variant::Map | dict | java.util.Map |
Variant::List | list | java.util.List |
13.7. Qpid Maps and Lists in .NET C#
- .NET/C#
using System; using System.Collections.Generic; using System.Collections.ObjectModel; using Org.Apache.Qpid.Messaging; namespace Org.Apache.Qpid.Messaging.examples { class MapSender { // csharp.map.sender example // // Send an amqp/map message // The map message contains simple types, a nested amqp/map, // an ampq/list, and specific instances of each supported type. // static int Main(string[] args) { string url = "amqp:tcp:localhost:5672"; string address = "message_queue; {create: always}"; string connectionOptions = ""; if (args.Length > 0) url = args[0]; if (args.Length > 1) address = args[1]; if (args.Length > 2) connectionOptions = args[2]; // // Create and open an AMQP connection to the broker URL // Connection connection = new Connection(url, connectionOptions); connection.Open(); // // Create a session and a sender // Session session = connection.CreateSession(); Sender sender = session.CreateSender(address); // // Create structured content for the message. This example builds a // map of items including a nested map and a list of values. // Dictionary<string, object> content = new Dictionary<string, object>(); Dictionary<string, object> subMap = new Dictionary<string, object>(); Collection<object> colors = new Collection<object>(); // add simple types content["id"] = 987654321; content["name"] = "Widget"; content["percent"] = 0.99; // add nested amqp/map subMap["name"] = "Smith"; subMap["number"] = 354; content["nestedMap"] = subMap; // add an amqp/list colors.Add("red"); colors.Add("green"); colors.Add("white"); // list contains null value colors.Add(null); content["colorsList"] = colors; // add one of each supported amqp data type bool mybool = true; content["mybool"] = mybool; byte mybyte = 4; content["mybyte"] = mybyte; UInt16 myUInt16 = 5 ; content["myUInt16"] = myUInt16; UInt32 myUInt32 = 6; content["myUInt32"] = myUInt32; UInt64 myUInt64 = 7; content["myUInt64"] = myUInt64; char mychar = 'h'; content["mychar"] = mychar; Int16 myInt16 = 9; content["myInt16"] = myInt16; Int32 myInt32 = 10; content["myInt32"] = myInt32; Int64 myInt64 = 11; content["myInt64"] = myInt64; Single mySingle = (Single)12.12; content["mySingle"] = mySingle; Double myDouble = 13.13; content["myDouble"] = myDouble; Guid myGuid = new Guid("000102030405060708090a0b0c0d0e0f"); content["myGuid"] = myGuid; content["myNull"] = null; // // Construct a message with the map content and send it synchronously // via the sender. // Message message = new Message(content); sender.Send(message, true); // // Wait until broker receives all messages. // session.Sync(); // // Close the connection. // connection.Close(); return 0; } } }
13.8. C# Data Types and .NET bindings
Table 13.4. Data Type Mapping between C++ and .NET binding
| C++ Data Type | .NET binding |
|---|---|
void | nullptr |
bool | bool |
uint8 | byte |
uint16 | UInt16 |
uint32 | UInt32 |
uint64 | UInt64 |
int16 | char |
int16 | Int16 |
int32 | Int32 |
int64 | Int64 |
float | Single |
double | Double |
string | string |
qpid::types::Uuid | Guid |
Variant::Map | Dictionary< string, object > |
Variant::List | Collection< object > |
Note
string objects are translated to and from C++ strings using UTF-8 encoding only.
Chapter 14. The Request/Response Pattern
14.1. The Request/Response Pattern
reply-to message property to allow a server to respond to the client that sent a message. A server sets up a service queue, with a name known to clients. A client creates a private queue for the server's response, creates a message for a request, sets the request's reply-to property to the address of the client's response queue, and sends the request to the service queue. The server sends the response to the address specified in the request's reply-to property.
14.2. Request/Response C++ Example
Receiver receiver = session.createReceiver("service_queue; {create: always}");
Message request = receiver.fetch();
const Address& address = request.getReplyTo(); // Get "reply-to" from request ...
if (address) {
Sender sender = session.createSender(address); // ... send response to "reply-to"
Message response("pong!");
sender.send(response);
session.acknowledge();
}
#, it is given a unique name.
Sender sender = session.createSender("service_queue");
Receiver receiver = session.createReceiver("#response-queue; {create:always}");
Address responseQueue = receiver.getAddress();
Message request;
request.setReplyTo(responseQueue);
request.setContent("ping");
sender.send(request);
Message response = receiver.fetch();
std::cout << request.getContent() << " -> " << response.getContent() << std::endl;
Chapter 15. Performance Tips
15.1. Apache Qpid Programming for Performance
- Consider prefetching messages for receivers. This helps eliminate roundtrips and increases throughput. Prefetch is disabled by default, and enabling it is the most effective means of improving throughput of received messages.
- Send messages asynchronously. Again, this helps eliminate roundtrips and increases throughput. The C++ and .NET clients send asynchronously by default, however the python client defaults to synchronous sends.
- Acknowledge messages in batches. Rather than acknowledging each message individually, consider issuing acknowledgments after n messages and/or after a particular duration has elapsed.
- Tune the sender capacity. If the capacity is too low the sender may block waiting for the broker to confirm receipt of messages, before it can free up more capacity.
- If you are setting a reply-to address on messages being sent by the c++ client, make sure the address type is set to either queue or topic as appropriate. This avoids the client having to determine which type of node is being referred to, which is required when handling reply-to in AMQP 0-10.
- For latency-sensitive applications, setting
tcp-nodelayonqpiddand on client connections can help reduce the latency.
Chapter 16. Cluster Failover
16.1. Changes to Clustering in MRG 3
cluster module with the new ha module. This module provides active-passive clustering functionality for high availability.
cluster module in MRG 2 was active-active: clients could connect to any broker in the cluster. The new ha module is active-passive. Exactly one broker acts as primary the other brokers act as backup. Only the primary accepts client connections. If a client attempts to connect to a backup broker, the connection is aborted and the client fails-over until it connects to the primary.
ha module also supports a virtual IP address. Clients can be configured with a single IP address that is automatically routed to the primary broker. This is the recommended configuration.
In MRG 2, a clustered broker would only utilize a single CPU thread. Some users worked around this by running multiple clustered brokers on a single machine, to utilize the multiple cores.
16.2. Active-Passive Messaging Clusters
rgmanager, to detect failures, choose the new primary and handle network partitions.
16.3. Cluster Failover in C++
reconnect to be true. For example:
qpid::messaging::Connection c("node1,node2,node3","{reconnect:true}");heartbeat option. For example:
qpid::messaging::Connection c("node1,node2,node3","{reconnect:true,heartbeat:10}");16.4. Cluster Failover in Python
reconnect=True and a list of host:port addresses as reconnect_urls when calling Connection.establish or Connection.open:
connection = qpid.messaging.Connection.establish("node1", reconnect=True, reconnect_urls=["node1", "node2", "node3"])heartbeat option. For example:
connection = qpid.messaging.Connection.establish("node1", reconnect=True, reconnect_urls=["node1", "node2", "node3"], heartbeat=10)16.5. Failover Behavior in Java JMS Clients
failover property:
connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'&failover='failover_exchange'
Fail-over Modes
failover_exchange- If the connection fails, fail over to any other broker in the cluster. This is provided for backward compatibility. Use of a Virtual IP (and transparent server-side failover) is recommended.
roundrobin- If the connection fails, fail over to one of the brokers specified in the brokerlist.
singlebroker- Fail-over is not supported; the connection is to a single broker only.
nofailover- Disables all retry and failover logic.
<class>- Any other value is interpreted as a classname which must implement the
org.apache.qpid.jms.failover.FailoverMethodinterface.
idle_timeout property, which is an integer corresponding to the heartbeat period in seconds. For instance, the following line from a JNDI properties file sets the heartbeat time out to 3 seconds:
connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'&idle_timeout=3
Chapter 17. Logging
17.1. Logging in C++
- Use
QPID_LOG_ENABLEto set the level of logging you are interested in (trace,debug,info,notice,warning,error, orcritical):export QPID_LOG_ENABLE="warning+"
- The Qpidd broker and C++ clients use
QPID_LOG_OUTPUTto determine where logging output should be sent. This is either a file name or the special valuesstderr,stdout, orsyslog:export QPID_LOG_TO_FILE="/tmp/myclient.out"
- From a Windows command prompt, use the following command format to set the environment variables:
set QPID_LOG_ENABLE=warning+ set QPID_LOG_TO_FILE=D:\tmp\myclient.out
17.2. Logging in Python
basicConfig() logging method reports all warnings and errors:
from logging import basicConfig basicConfig()
qpidd daemon allows you to specify the level of logging desired. For instance, the following code enables logging at the DEBUG level:
from qpid.log import enable, DEBUG
enable("qpid.messaging.io", DEBUG)
$ pydoc qpid.log.
17.3. Change the logging level at runtime
setLogLevel method to control the logging level. The following C++ code demonstrates calling this method to set the logging level.
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Address.h>
#include <iostream>
using namespace std;
using namespace qpid::messaging;
using namespace qpid::types;
int main(int argc, char** argv) {
if (argc < 2) {
cerr << "Invalid number of parameters, expecting log level (info, trace, warning or so)" << endl;
return 1;
}
string log_level = argv[1];
Connection connection(argc>2?argv[2]:"localhost:5672");
connection.open();
Session session = connection.createSession();
Sender sender = session.createSender("qmf.default.direct/broker");
Receiver receiver = session.createReceiver("#reply-queue; {create:always, node:{x-declare:{auto-delete:true}}}");
Address responseQueue = receiver.getAddress();
Message message;
Variant::Map content;
Variant::Map OID;
Variant::Map arguments;
OID["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
arguments["level"] = log_level;
content["_object_id"] = OID;
content["_method_name"] = "setLogLevel";
content["_arguments"] = arguments;
message.setContentObject(content);
message.setReplyTo(responseQueue);
message.setProperty("x-amqp-0-10.app-id", "qmf2");
message.setProperty("qmf.opcode", "_method_request");
message.setContentType("amqp/map");
sender.send(message, true);
/* receive a response from the broker & check our request was successfully processed */
Message response;
if (receiver.fetch(response,qpid::messaging::Duration(30000)) == true) {
qpid::types::Variant::Map recv_props = response.getProperties();
if (recv_props["qmf.opcode"] == "_method_response")
std::cout << "Response: OK" << std::endl;
else if (recv_props["qmf.opcode"] == "_exception")
std::cerr << "Error: " << response.getContent() << std::endl;
else
std::cerr << "Invalid response received!" << std::endl;
}
else
std::cout << "Timeout: No response received within 30 seconds!" << std::endl;
receiver.close();
sender.close();
session.close();
connection.close();
return 0;
}- Save the example code to a file
set_log_level.cpp. - Modify the Connection URL in the code to resolve to your broker. At the moment it is set to connect to a broker running on port 5672 on the local machine.
- Compile the example code:
g++ -Wall -lqpidclient -lqpidcommon -lqpidmessaging -lqpidtypes -o set_log_level set_log_level.cpp
- Use the complied program to change the log level of the broker:
./set_log_level "trace+"
- To observe the change in the logging level, tail the server log as you run the program.
Chapter 18. Security
18.1. Security features provided by Qpid
18.2. Authentication
18.3. SASL Support in Windows Clients
ANONYMOUS and PLAIN and EXTERNAL authentication mechanisms.
18.4. Enable Kerberos authentication
kinit, there is no need to supply a user name or password. If you are using another form of authentication, or are not already authenticated with Kerberos, you can supply these as connection options:
connection.setOption("username", "mick");
connection.setOption("password", "pa$$word");
18.5. Enable SSL
transport connection option to ssl:
connection.setOption("transport", "ssl");
18.6. SSL Client Environment Variables for C++ Clients
Table 18.1. SSL Client Environment Variables for C++ clients
| SSL Client Options for C++ clients | |
|---|---|
QPID_SSL_USE_EXPORT_POLICY | Use NSS export policy |
QPID_SSL_CERT_PASSWORD_FILE PATH | File containing password to use for accessing certificate database |
QPID_SSL_CERT_DB PATH | Path to directory containing certificate database |
QPID_SSL_CERT_NAME NAME | Name of the certificate to use. When SSL client authentication is enabled, a certificate name should normally be provided. |
Chapter 19. The AMQP 0-10 mapping
19.1. The AMQP 0-10 mapping
qpid.subject entry in the application-headers field of the message-properties.
message-subscribe request for the queue in question. The accept-mode is determined by the reliability option in the link properties; for unreliable links the accept-mode is none, for reliable links it is explicit. The default for a queue is reliable. The acquire-mode is determined by the value of the mode option. If the mode is set to browse the acquire mode is not-acquired, otherwise it is set to pre-acquired. The exclusive and arguments fields in the message-subscribe command can be controlled using the x-subscribe map.
x-declare map within the link properties. The reliability option determines most of the other parameters. If the reliability is set to unreliable then an auto-deleted, exclusive queue is used meaning that if the client or connection fails messages may be lost. For exactly-once the queue is not set to be auto-deleted. The durability of the subscription queue is determined by the durable option in the link properties. The binding process depends on the type of the exchange the source address resolves to.
- For a topic exchange, if no subject is specified and no
x-bindingsare defined for the link, the subscription queue is bound using a wildcard matching any routing key (thus satisfying the expectation that any message sent to that address will be received from it). If a subject is specified in the source address however, it is used for the binding key (this means that the subject in the source address may be a binding pattern including wildcards). - For a fanout exchange the binding key is irrelevant to matching. A receiver created from a source address that resolves to a fanout exchange receives all messages sent to that exchange regardless of any subject the source address may contain. An
x-bindingselement in the link properties should be used if there is any need to set the arguments to the bind. - For a direct exchange, the subject is used as the binding key. If no subject is specified an empty string is used as the binding key.
- For a headers exchange, if no subject is specified the binding arguments simply contain an
x-matchentry and no other entries, causing all messages to match. If a subject is specified then the binding arguments contain anx-matchentry set to all and an entry forqpid.subjectwhose value is the subject in the source address (this means the subject in the source address must match the message subject exactly). For more control thex-bindingselement in the link properties must be used. - For the XML exchange, if a subject is specified it is used as the binding key and an XQuery is defined that matches any message with that value for
qpid.subject. Again this means that only messages whose subject exactly match that specified in the source address are received. If no subject is specified then the empty string is used as the binding key with an xquery that will match any message (this means that only messages with an empty string as the routing key will be received). For more control the x-bindings element in the link properties must be used. A source address that resolves to the XML exchange must contain either a subject or an x-bindings element in the link properties as there is no way at present to receive any message regardless of routing key.
queue, exchange, key, or arguments. If the queue value is absent the queue name the address resolves to is implied. If the exchange value is absent the exchange name the address resolves to is implied.
msg refers to the Message class defined in the Qpid Messaging API, mp refers to an AMQP 0-10 message-properties struct, and dp refers to an AMQP 0-10 delivery-properties struct.
Table 19.1. Mapping to AMQP 0-10 Message Properties
| Python API | C++ API [a] | AMQP 0-10 Property [b] |
|---|---|---|
msg.id | msg.{get,set}MessageId() | mp.message_id |
msg.subject | msg.{get,set}Subject() | mp.application_headers ["qpid.subject"] |
msg.user_id | msg.{get,set}UserId() | mp.user_id |
msg.reply_to | msg.{get,set}ReplyTo() | mp.reply_to [c] |
msg.correlation_id | msg.{get,set}CorrelationId() | mp.correlation_id |
msg.durable | msg.{get,set}Durable() | dp.delivery_mode == delivery_mode.persistent [d] |
msg.priority | msg.{get,set}Priority() | dp.priority |
msg.ttl | msg.{get,set}Ttl() | dp.ttl |
msg.redelivered | msg.{get,set}Redelivered() | dp.redelivered |
msg.properties | msg.{get,set}Properties() | mp.application_headers |
msg.content_type | msg.{get,set}ContentType() | mp.content_type |
[a]
The .NET Binding for C++ Messaging provides all the message and delivery properties described in the C++ API.
[b]
In these entries, mp refers to an AMQP message property, and dp refers to an AMQP delivery property.
[c]
The reply_to is converted from the protocol representation into an address.
[d]
Note that msg.durable is a boolean, not an enum.
| ||
19.2. AMQ 0-10 Message Property Keys
x-amqp-0-10.app-id, its value will be used to set the message-properties.app-id property in the outgoing message. Likewise, if an incoming message has message-properties.app-id set, its value can be accessed via the x-amqp-0-10.app-id message property key.
x-amqp-0-10.content-encoding, its value will be used to set the message-properties.content-encoding property in the outgoing message. Likewise, if an incoming message has message-properties.content-encoding set, its value can be accessed via the x-amqp-0-10.content-encoding message property key.
delivery-properties.routing-key) in an incoming messages can be accessed via the x-amqp-0-10.routing-key message property.
19.3. AMQP Routing Key and Message Subject
x-amqp-0-10.routing-key property is set to the value of the message subject, with one exception.
sender = session.sender('amq.topic/SubjectX')msg1 = Message('A message with no subject')
msg2 = Message('A message with a subject')
msg2.subject = 'SubjectY'msg1 has its subject and AMQP routing key set to 'SubjectX'. msg2 retains its subject 'SubjectY', and has its AMQP routing key set to 'SubjectY'.
sender = session('amq.topic')
msg = Message('No subject, and none assigned by the sender')
sender.send(msg)sender = session('amq.topic')
msg = Message('No subject, but a manually assigned AMQP routing key')
msg.properties['x-amqp-0-10.routing-key'] = 'amqp-SubjectX'
sender.send(msg)amqp-0-10.routing-key may be useful in an interoperability scenario, but in Red Hat Enterprise Messaging the message subject is used for routing.
import sys
from qpid.messaging import *
# This program demonstrates that the x-amqp-0-10.routing-key
# (1) is (re)set to the message subject when the message has a subject or
# is sent via a sender that has a subject
# (2) is not a valid basis for routing in a topic exchange
# - the topic exchange will not route a message to a queue
def sendmsg(msg, note = ''):
global rxplain, rxsubject, txplain, txsubject, ssn, testcount
msg.properties['sender'] = 'Plain Sender'
txplain.send(msg)
msg.properties['sender'] = 'SubjectX Sender'
txsubject.send(msg)
if testcount > 0:
x = raw_input('\nPress Enter for the next test message')
print '\n================================================\n'
testcount = testcount + 1
print '\nScenario ' + str(testcount)
print '\nSent message:\n'
subject = 'Blank'
if msg.subject:
subject = msg.subject
print 'Subject:\t' + subject
routekey = 'Blank'
if 'x-amqp-0-10.routing-key' in msg.properties:
routekey = msg.properties['x-amqp-0-10.routing-key']
print 'Routing Key:\t' + routekey
msgcount = 0
print '\nThe queue listening for all messages received:'
try:
while True:
rxmsg = rxplain.fetch(timeout = 1)
subject ='Blank'
if rxmsg.subject:
subject = rxmsg.subject
routekey = 'Blank'
if 'x-amqp-0-10.routing-key' in rxmsg.properties:
routekey = rxmsg.properties['x-amqp-0-10.routing-key']
print '\nSubject:\t' + subject
print 'Routing Key:\t' + routekey
print 'Sent via:\t' + rxmsg.properties['sender']
msgcount = 1
ssn.acknowledge(rxmsg)
except:
pass
if msgcount == 0:
print 'Nothing\n'
else:
msgcount = 0
print '\nThe queue listening for SubjectX messages received:'
try:
while True:
rxmsg = rxsubject.fetch(timeout = 1)
subject ='Blank'
if rxmsg.subject:
subject = rxmsg.subject
routekey = 'Blank'
if 'x-amqp-0-10.routing-key' in rxmsg.properties:
routekey = rxmsg.properties['x-amqp-0-10.routing-key']
print '\nSubject:\t' + subject
print 'Routing Key:\t' + routekey
print 'Sent via:\t' + rxmsg.properties['sender']
msgcount = 1
ssn.acknowledge(rxmsg)
except:
pass
if msgcount == 0:
print 'Nothing\n'
if note != '':
print '\nNote: ' + note + "\n"
connection = Connection("localhost:5672")
connection.open()
try:
ssn = connection.session()
# we create our receivers here so that queues are created to hold the messages sent
rxplain = ssn.receiver("amq.topic")
rxsubject = ssn.receiver("amq.topic/SubjectX")
txplain = ssn.sender("amq.topic")
txsubject = ssn.sender("amq.topic/SubjectX")
testcount = 0
msg = Message("Plain message, no subject")
sendmsg(msg, "a subject sender writes the subject and routing key when a message has no subject, a plain sender does not")
msg = Message("Message with subject")
msg.subject = "SubjectX"
sendmsg(msg, "a plain sender writes the routing key if the message has a subject")
msg = Message("Message with a different subject")
msg.subject = "SubjectY"
sendmsg(msg, "a subject sender does not rewrite a subject, both senders use the message subject to write routing key")
msg = Message("Message with routing key")
msg.properties["x-amqp-0-10.routing-key"] = "SubjectX"
sendmsg(msg, "a routing key is not sufficient to route to a queue - the match is on subject")
msg = Message("Message with different routing key")
msg.properties["x-amqp-0-10.routing-key"] = "SubjectY"
sendmsg(msg, "the only case where you can manually set a non-blank routing key is a message with a blank subject, sent via a plain sender")
msg = Message("Message with different routing key and subject")
msg.properties["x-amqp-0-10.routing-key"] = "SubjectY"
msg.subject = "SubjectZ"
sendmsg(msg, "all messages with subjects and all messages sent via a subject sender have their routing key rewritten")
finally:
connection.close()19.4. Using AMQ 0-10 Message Property Keys for Timestamping
delivery-properties.timestamp), the timestamp value can be accessed using the x-amqp-0-10.timestamp message property.
See Also:
Chapter 20. Using the qpid-java AMQP 0-10 client
20.1. A Simple Messaging Program in Java JMS
package org.apache.qpid.example.jmsexample.hello;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.util.Properties;
public class Hello {
public Hello() {
}
public static void main(String[] args) {
Hello producer = new Hello();
producer.runTest();
}
private void runTest() {
try {
Properties properties = new Properties();
properties.load(this.getClass().getResourceAsStream("hello.properties"));
Context context = new InitialContext(properties);
ConnectionFactory connectionFactory
= (ConnectionFactory) context.lookup("qpidConnectionfactory");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination destination = (Destination) context.lookup("topicExchange");
MessageProducer messageProducer = session.createProducer(destination);
MessageConsumer messageConsumer = session.createConsumer(destination);
TextMessage message = session.createTextMessage("Hello world!");
messageProducer.send(message);
message = (TextMessage)messageConsumer.receive();
System.out.println(message.getText());
connection.close();
context.close();
}
catch (Exception exp) {
exp.printStackTrace();
}
}
}Here is an explanation of the program code:
properties.load(this.getClass().getResourceAsStream("hello.properties"));Context context = new InitialContext(properties);
ConnectionFactory connectionFactory
= (ConnectionFactory) context.lookup("qpidConnectionfactory");Connection connection = connectionFactory.createConnection();
connection.start();
Session session=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Destination destination = (Destination) context.lookup("topicExchange");MessageProducer messageProducer = session.createProducer(destination);
MessageConsumer messageConsumer = session.createConsumer(destination);
message = (TextMessage)messageConsumer.receive();
connection.close();
context.close();
The contents of the hello.properties file are shown below.
java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory # connectionfactory.[jndiname] = [ConnectionURL] connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' # destination.[jndiname] = [address_string] destination.topicExchange = amq.topic
20.2. Apache Qpid JNDI Properties for AMQP Messaging
- connectionfactory.<jndiname>
- The Connection URL that the connection factory uses to perform connections.
- queue.<jndiname>
- A JMS queue. Implemented as an
amq.directexchange in Apache Qpid. - topic.<jndiname>
- A JMS topic. Implemented as an
amq.topicexchange in Apache Qpid. - destination.<jndiname>
- Can be used for defining all amq destinations, queues, topics and header matching, using an address string (or a binding URL, for backward-compatibility with earlier implementations).
20.3. JNDI Properties for Apache Qpid
java.naming.factory.initial = org.apache.qpid.jndi.PropertiesFileInitialContextFactory # connectionfactory.[jndiname] = [ConnectionURL] connectionfactory.qpidConnectionfactory = amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672' # destination.[jndiname] = [address_string] destination.topicExchange = amq.topic
20.4. Durable Subscription Queues in MRG 3
# java -cp ${CLASSPATH} org.apache.qpid.example.Drain "amq.topic/some_subject;{ link: { durable: true } }"
javax.jms.JMSException: Error registering consumer: org.apache.qpid.AMQException: You cannot mark a subscription queue as durable without providing a name for the link.
# java -cp ${CLASSPATH} org.apache.qpid.example.Drain "amq.topic/some_subject;{ link: { name: some_name, durable: true } }"20.5. Connection URLs
amqp://[<user>:<pass>@][<clientid>]<virtualhost>[?<option>='<value>'[&<option>='<value>']]
amqp://username:password@clientid/test?brokerlist='tcp://localhost:5672'
Table 20.1. Connection URL Properties
| Option | Type | Description |
|---|---|---|
brokerlist
|
The broker to use for this connection. In the current release, precisely one broker must be specified.
| |
max_prefetch
|
Integer
|
The maximum number of pre-fetched messages per destination.
|
sync_publish
|
{'persistent' | 'transient' | 'all' | ''}
|
A sync command is sent after every persistent or transient message to guarantee that it has been received.
persistent sets this behavior for persistent messages.
transient sets this behavior for transient messages only.
all syncs both type of messages, however the default behavior '' also has the same effect.
|
sync_ack
|
Boolean
|
A sync command is sent after every acknowledgment to guarantee that it has been received.
|
use_legacy_map_msg_format
|
Boolean
|
If you are using JMS Map messages and deploying a new client with any JMS client older than 0.7 release, you must set this to
true to ensure the older clients can understand the map message encoding.
|
failover
| {'roundrobin' | 'failover_exchange' | 'singlebroker' | 'nofailover' | '<class>'}
|
|
ssl |
Boolean
|
If
ssl='true', use SSL for all broker connections. Overrides any per-broker settings in the brokerlist entries. If not specified, the brokerlist entry for each given broker is used to determine whether SSL is used.
|
Broker list URL
brokerlist=<transport>://<host>[:<port>](?<param>=<value>)?(&<param>=<value>)*
brokerlist='tcp://localhost:5672'
Example 20.1. Broker Lists
amqp://guest:guest@test/test?sync_ack='true' &brokerlist='tcp://ip1:5672?sasl_mechs='GSSAPI''
amqp://guest:guest@test/test?sync_ack='true' &brokerlist='tcp://ip1:5672?ssl='true'&ssl_cert_alias='cert1''
amqp://guest:guest@/test?failover='roundrobin?cyclecount='2'' &brokerlist='tcp://ip1:5672?retries='5'&connectdelay='2000';tcp://ip2:5672?retries='5'&connectdelay='2000''
Table 20.2. Broker List URL Options
| Option | Type | Description |
|---|---|---|
idle_timeout
|
Integer
|
Frequency of idle_timeout messages (in seconds)
|
sasl_mechs
|
--
|
For secure applications, we suggest
CRAM-MD5, DIGEST-MD5, or GSSAPI. The ANONYMOUS method is not secure. The PLAIN method is secure only when used together with SSL. For Kerberos, sasl_mechs must be set to GSSAPI, sasl_protocol must be set to the principal for the qpidd broker, e.g. qpidd/, and sasl_server must be set to the host for the SASL server, e.g. sasl.com. SASL External is supported using SSL certification, e.g. ssl='true'&sasl_mechs='EXTERNAL'
|
sasl_encryption
|
Boolean
|
If
sasl_encryption='true', the JMS client attempts to negotiate a security layer with the broker using GSSAPI to encrypt the connection. Note that for this to happen, GSSAPI must be selected as the sasl_mech.
|
ssl
|
Boolean
|
If
ssl='true', the JMS client will encrypt the connection using SSL.
|
tcp_nodelay
|
Boolean
|
If
tcp_nodelay='true', TCP packet batching is disabled.
|
sasl_protocol
|
--
|
Used only for Kerberos.
sasl_protocol must be set to the principal for the qpidd broker, e.g. qpidd/
|
sasl_server
|
--
|
For Kerberos,
sasl_mechs must be set to GSSAPI, sasl_server must be set to the host for the SASL server, e.g. sasl.com.
|
trust_store
|
String
|
Path to Kerberos trust store
|
trust_store_password
|
String
|
Kerberos trust store password
|
key_store
|
String
|
Path to Kerberos key store
|
key_store_password
|
String
|
Kerberos key store password
|
ssl_verify_hostname
|
Boolean
|
When using SSL you can enable hostname verification by using "
ssl_verify_hostname=true" in the broker URL.
|
ssl_cert_alias
|
String
|
If multiple certificates are present in the keystore, the alias will be used to extract the correct certificate.
|
retries | integer |
The number of times to retry connection to each broker in the broker list. Defaults to 1.
|
connectdelay | integer |
Length of time (in milliseconds) to wait before attempting to reconnect. Defaults to 0.
|
connecttimeout | integer |
Length of time (in milliseconds) to wait for the socket connection to succeed. A value of 0 represents an infinite timeout, i.e. the connection attempt will block until established or an error occurs. Defaults to 30000.
|
tcp_nodelay | Boolean | If tcp_nodelay='true', TCP packet batching is disabled. Defaults to true since Qpid 0.14. |
20.6. Java JMS Message Properties
Table 20.3. Mapping JMS Headers to AMQP fields
| JMS Header Name | AMQP Identifier | AMQP Field | AMQP Section | Notes |
|---|---|---|---|---|
JMSCorrelationID | correlation_id
| correlation-id
| properties | |
JMSDeliveryMode | delivery_mode
| durable
| header | Computed value: [durable ? 'PERSISTENT' : 'NON_PERSISTENT'] |
JMSDestination | to
| to
| properties | |
JMSExpiration | absolute_expiry_time
| absolute-expiry-time
| properties | |
JMSMessageID | message_id
| message-id
| properties | |
JMSPriority | priority
| priority
| header | |
JMSRedelivered | redelivered
| delivery-count
| header | computed value: delivery-count > 0 |
JMSReplyTo | reply_to
| reply-to
| properties | |
JMSTimestamp | creation_time
| creation-time
| properties | |
JMSType | subject
| subject
| properties |
Note
JMSDeliveryModeJMSPriorityJMSMessageIDJMSTimestampJMSCorrelationIDJMSType
20.7. JMS MapMessage Types
MapMessage interface, which provides support for maps in messages. The following code shows how to send a MapMessage in Java JMS.
Example 20.2. Sending a Java JMS MapMessage
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import edu.emory.mathcs.backport.java.util.Arrays;
// !!! SNIP !!!
MessageProducer producer = session.createProducer(queue);
MapMessage m = session.createMapMessage();
m.setIntProperty("Id", 987654321);
m.setStringProperty("name", "Widget");
m.setDoubleProperty("price", 0.99);
List<String> colors = new ArrayList<String>();
colors.add("red");
colors.add("green");
colors.add("white");
m.setObject("colours", colors);
Map<String,Double> dimensions = new HashMap<String,Double>();
dimensions.put("length",10.2);
dimensions.put("width",5.1);
dimensions.put("depth",2.0);
m.setObject("dimensions",dimensions);
List<List<Integer>> parts = new ArrayList<List<Integer>>();
parts.add(Arrays.asList(new Integer[] {1,2,5}));
parts.add(Arrays.asList(new Integer[] {8,2,5}));
m.setObject("parts", parts);
Map<String,Object> specs = new HashMap<String,Object>();
specs.put("colours", colors);
specs.put("dimensions", dimensions);
specs.put("parts", parts);
m.setObject("specs",specs);
producer.send(m);
MapMessage, and the corresponding data types that will be received by clients in Python or C++.
Table 20.4. Java Data Types in Maps
Java Data Type | ? Python | ? C++ |
|---|---|---|
boolean | bool | bool |
short | int | long | int16 |
int | int | long | int32 |
long | int | long | int64 |
float | float | float |
double | float | double |
java.lang.String | unicode | std::string |
java.util.UUID | uuid | qpid::types::Uuid |
java.util.Map [a] | dict | Variant::Map |
java.util.List | list | Variant::List |
[a]
In Qpid, maps can nest. This goes beyond the functionality required by the JMS specification.
| ||
20.8. JMS ListMessage
ListMessage type is available for sending lists.
javax.jms.StreamMessagejavax.jms.MapMessageorg.apache.qpid.jms.ListMessage
org.apache.qpid.jms.ListMessage- by creating it viacreateListMessage()inorg.apache.qpid.jms.Session.Example:ListMessage msg = ((org.apache.qpid.jms.Session)ssn).createListMessage();
- If you set
-Dqpid.use_legacy_stream_message=falseany stream message you create will be encoded as a list message.Example:StreamMessage msg = jmsSession.createStreamMessage();
20.9. JMS Client Logging
org.apache.qpid. Otherwise log4j will default to DEBUG which will degrade performance considerably due to excessive logging. The recommended logging level for production is WARN.
log4j.properties file and placed in the CLASSPATH, or they can be set explicitly using the -Dlog4j.configuration property.
Example 20.3. log4j Logging Properties
log4j.logger.org.apache.qpid=WARN, console
log4j.additivity.org.apache.qpid=false
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=all
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n20.10. AMQP 0-10 JMS Client Configuration
20.10.1. Configuration Methods and Granularity
- JVM level using JVM arguments - Affects all connections, sessions, consumers and producers created within the JVM.Example: The
-dmax_prefetch=1000property specifies the message credits to use. - Connection level using connection or broker properties - Affects the respective connection and sessions, consumers and producers created by that connection.Example: The
amqp://guest:guest@test/test?max_prefetch='1000' &brokerlist='tcp://localhost:5672'property specifies the message credits to use. This overrides any value specified via the JVM argumentmax_prefetch. - Destination level using addressing options - Affects the producer(s) and consumer(s) created using the respective destination.Example:
my-queue; {create: always, link:{capacity: 10}}where capacity option specifies the message credits to use. This overrides any connection level configuration.
20.10.2. qpid-java JVM Arguments
Table 20.5. Configuration Options For Connection Behavior
| Property Name | Type | Default Value | Description |
|---|---|---|---|
qpid.amqp.version | string | 0-10 | Sets the AMQP version to be used - currently supports 0-8, 0-9, 0-91, and 0-10. The client will begin negotiation at the specified version and only negotiate downwards if the broker does not support the specified version. |
qpid.heartbeat | int | 120 (seconds) | The heartbeat interval in seconds. Two consecutive missed heartbeats result in the connection timing out. This can also be set per connection. |
ignore_setclientID | boolean | false | If a client ID is specified in the connection URL then it is used, otherwise an ID is generated. If an ID is specified after it has been generated Qpid will throw an exception. Setting this property to 'true' disables that check and allows you to set a client ID at any time. |
Table 20.6. Configuration Options For Session Behavior
| Property Name | Type | Default Value | Description |
|---|---|---|---|
qpid.session.command_limit | int | 65536 | Limits the number of unacknowledged commands. |
qpid.session.byte_limit | int | 1048576 | Limits the number of unacknowledged commands in bytes. |
qpid.use_legacy_map_message | boolean | false | Uses the old map message encoding. By default the map messages are encoded using the 0-10 map encoding. This can also be set per connection as well. |
qpid.jms.daemon.dispatcher | boolean | false | Controls whether the Session dispatcher thread is a daemon thread or not. If this system property is set to true then the Session dispatcher threads will be created as daemon threads. This setting is introduced in version 0.16. |
Table 20.7. Configuration Options For Consumer Behavior
| Property Name | Type | Default Value | Description |
|---|---|---|---|
max_prefetch | int | 500 | Maximum number of messages to credits. Can also be set per connection or per destination. |
qpid.session.max_ack_delay | long | 1000 (ms) | Timer interval to flush message acks in buffer when using AUTO_ACK and DUPS_OK. |
sync_ack | boolean | false | If set, each message will be acknowledged synchronously. When using AUTO_ACK mode, set this to "true". Can also be set per connection. |
Table 20.8. Configuration Options For Producer Behavior
| Property Name | Type | Default Value | Description |
|---|---|---|---|
sync_publish | string | - | Sends messages synchronously. Valid values are persistent, transient, all. Can also be set per connection. |
Table 20.9. Configuration Options For Threading
| Property Name | Type | Default Value | Description |
|---|---|---|---|
qpid.thread_factory | string | org.apache.qpid.thread.DefaultThreadFactory | Specifies the thread factory to use. If using a real time JVM, set to org.apache.qpid.thread.RealtimeThreadFactory. |
qpid.rt_thread_priority | int | 20 | Specifies the priority (1-99) for realtime threads created by the realtime thread factory. |
Table 20.10. Configuration Options For I/O
| Property Name | Type | Default Value | Description |
|---|---|---|---|
qpid.transport | string | org.apache.qpid.transport.network.io.IoNetworkTransport | The transport implementation to be used. You can also specify the org.apache.qpid.transport.network.NetworkTransport transport mechanism. |
qpid.sync_op_timeout | long | 60000 (milliseconds) | The length of time to wait for a synchronous operation to complete. For compatibility with older clients, use amqj.default_syncwrite_timeout. |
qpid.tcp_nodelay | boolean | true |
Sets the TCP_NODELAY property of the underlying socket.
This can also be set per connection using the Connection URL options.
For compatibility with older clients, the synonym
amqj.tcp_nodelay is supported.
|
qpid.send_buffer_size | integer | 65535 |
Sets the SO_SNDBUF property of the underlying socket.
For compatibility with older clients, the synonym
amqj.sendBufferSize is supported.
|
qpid.receive_buffer_size | integer | 65535 |
Sets the SO_RCVBUF property of the underlying socket.
For compatibility with older clients, the synonym
amqj.receiveBufferSize is supported.
|
qpid.failover_method_timeout | long | 60000 |
During failover, this is the timeout for each attempt to try to re-establish the connection. If a reconnection attempt exceeds the timeout, the entire failover process is aborted.
It is only applicable for AMQP 0-8/0-9/0-9-1 clients.
|
Table 20.11. Configuration Options For Security
| Property Name | Type | Default Value | Description |
|---|---|---|---|
qpid.sasl_mechs | string | PLAIN | The SASL mechanism used. More than one can be specified using a comma separated list. Supported values are PLAIN, GSSAPI, and EXTERNAL. |
qpid.sasl_protocol | string | AMQP | When using GSSAPI as the SASL mechanism, sasl_protocol must be set to the principal for the qpidd broker. |
qpid.sasl_server_name | string | localhost | When using GSSAPI as the SASL mechanism, sasl_server must be set to the host for the SASL server. |
Table 20.12. JVM properties for GSSAPI as the SASL mechanism
| Property Name | Type | Default Value | Description |
|---|---|---|---|
javax.security.auth.useSubjectCredsOnly | boolean | true | If set to 'false', forces the SASL GASSPI client to obtain kerberos credentials explicitly. |
java.security.auth.login.config | string | - | Specifies the JASS configuration file. |
Table 20.13. Configuration options for SSL connections
| Property Name | Type | Default Value | Description |
|---|---|---|---|
qpid.ssl_timeout | long | 60000 | Timeout value used by the Java SSL engine when waiting on operations. |
qpid.ssl.KeyManagerFactory.algorithm | string | - |
The key manager factory algorithm name. If not set, defaults to the value returned from the Java runtime call KeyManagerFactory.getDefaultAlgorithm().
For compatibility with older clients, the synonym qpid.ssl.keyStoreCertType is supported.
|
qpid.ssl.TrustManagerFactory.algorithm | string | - |
The trust manager factory algorithm name. If not set, defaults to the value returned from the Java runtime call TrustManagerFactory.getDefaultAlgorithm()
For compatibility with older clients, the synonym qpid.ssl.trustStoreCertType is supported.
|
Table 20.14. JVM Properties for SSL connections
| Property Name | Type | Default Value | Description |
|---|---|---|---|
javax.net.ssl.keyStore | string | jvm default | Specifies the key store path. |
javax.net.ssl.keyStorePassword | string | jvm default | Specifies the key store password. |
javax.net.ssl.trustStore | string | jvm default | Specifies the trust store path. |
javax.net.ssl.trustStorePassword | string | jvm default | Specifies the trust store password. |
20.11. Java Message Service with Filters
20.11.1. No Local filter
<type name="no-local-filter" class="composite" source="list" provides="filter">
<descriptor name="apache.org:no-local-filter:list" code="0x0000468C:0x00000003"/>
</type>20.11.2. Selector filter
<type name="selector-filter" class="restricted" source="string" provides="filter">
<descriptor name="apache.org:selector-filter:string" code="0x0000468C:0x00000004"/>
</type>amqp.field_name where field_name is the appropriate AMQP 1.0 field named in the table above, with the hyphen replaced by an underscore. For example, the selector: JMSCorrelationID = 'abc' AND color = 'blue' AND weight > 2500 would be transferred over the wire as: amqp.correlation_id = 'abc' AND color = 'blue' AND weight > 2500
Table 20.15. Mapping AMQP types to JMS types
| AMQP Type | JMS Selector Type |
|---|---|
|
null
|
null
|
|
boolean
|
boolean
|
|
ubyte
|
short
|
|
ushort
|
int
|
|
uint
|
long
|
|
ulong
|
long
|
|
byte
|
byte
|
|
short
|
short
|
|
int
|
int
|
|
long
|
long
|
|
float
|
float
|
|
double
|
double
|
|
decimal32
|
double
|
|
decimal64
|
double
|
|
decimal128
|
double
|
|
char
|
char
|
|
timestamp
|
long
|
|
uuid
|
byte[16]
|
|
binary
|
byte[]
|
|
string
|
String
|
|
symbol
|
String
|
Chapter 21. Using the qpid-jms AMQP 1.0 client
21.1. QPID AMQP 1.0 JMS Client Configuration
InitialContext, the syntax for its related configuration, and various URI options that can be set when defining a ConnectionFactory.
InitialContext, itself obtained from an InitialContextFactory, to look up JMS objects such as ConnectionFactory. The Qpid JMS client provides an implementation of the InitialContextFactory in class org.apache.qpid.jms.jndi.JmsInitialContextFactory. This may be configured and used in three main ways:
- Via
jndi.propertiesfile on the Java Classpath. - By including a file named
jndi.propertieson the Classpath and setting thejava.naming.factory.initialproperty to valueorg.apache.qpid.jms.jndi.JmsInitialContextFactory, the QpidInitialContextFactoryimplementation will be discovered when instantiating InitialContext object.javax.naming.Context ctx = new javax.naming.InitialContext();
The particularConnectionFactory, Queue and Topic objects you wish the context to contain are configured using properties (the syntax for which is detailed below) either directly within thejndi.propertiesfile, or in a separate file which is referenced injndi.propertiesusing thejava.naming.provider.urlproperty. - Via system properties.
- By setting the
java.naming.factory.initialsystem property to valueorg.apache.qpid.jms.jndi.JmsInitialContextFactory, the QpidInitialContextFactoryimplementation will be discovered when instantiating InitialContext object.javax.naming.Context ctx = new javax.naming.InitialContext();
The particularConnectionFactory, Queue and Topic objects you wish the context to contain are configured as properties in a file, which is passed using thejava.naming.provider.urlsystem property. The syntax for these properties is detailed below. - Programmatically using an environment Hashtable.
- The InitialContext may also be configured directly by passing an environment during creation:
Hashtable<Object, Object> env = new Hashtable<Object, Object>(); env.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); javax.naming.Context context = new javax.naming.InitialContext(env);
The particularConnectionFactory, Queue and Topic objects you wish the context to contain are configured as properties (the syntax for which is detailed below), either directly within the environment Hashtable, or in a separate file which is referenced using thejava.naming.provider.urlproperty within the environment Hashtable.
Table 21.1. Property syntax
| Property | Syntax |
|---|---|
| ConnectionFactory | connectionfactory.lookupName = URI |
| Queue | queue.lookupName = queueName |
| Topic | topic.lookupName = topicName |
ConnectionFactory, Queue, and Topic:
connectionfactory.myFactoryLookup = amqp://localhost:5672 queue.myQueueLookup = queueA topic.myTopicLookup = topicA
ConnectionFactory factory = (ConnectionFactory) context.lookup("myFactoryLookup");
Queue queue = (Queue) context.lookup("myQueueLookup");
Topic topic = (Topic) context.lookup("myTopicLookup");
21.2. QPID AMQP 1.0 JMS Client Connection URLs
amqp://hostname:port[?option=value[&option2=value...]]
ConnectionFactory, these are detailed in the following tables.
Connection, Session, MessageConsumer and MessageProducer.
Table 21.2. JMS Configuration options
| Option | Description |
|---|---|
jms.username
|
User name value used to authenticate the connection
|
jms.password
|
The password value used to authenticate the connection
|
jms.clientID
|
The ClientID value that is applied to the connection.
|
jms.forceAsyncSend
|
Configures whether all Messages sent from a
MessageProducer are sent asynchronously or only those Message that qualify such as Messages inside a transaction or non-persistent messages.
|
jms.alwaysSyncSend
|
Override all asynchronous send conditions and always sends every Message from a
MessageProducer synchronously.
|
jms.sendAcksAsync
|
Causes all Message acknowledgments to be sent asynchronously.
|
jms.localMessageExpiry
|
Controls whether
MessageConsumer instances will locally filter expired Messages or deliver them. By default this value is set to true and expired messages will be filtered.
|
jms.localMessagePriority
|
If enabled pre-fetched messages are reordered locally based on their given Message priority value. Default is false.
|
jms.validatePropertyNames
|
If message property names should be validated as valid Java identifiers. Default is
true.
|
jms.queuePrefix
|
Optional prefix value added to the name of any Queue created from a JMS Session.
|
jms.topicPrefix
|
Optional prefix value added to the name of any Topic created from a JMS Session.
|
jms.closeTimeout
|
Timeout value that controls how long the client waits on Connection close before returning. (By default the client waits 15 seconds for a normal close completion event).
|
jms.connectTimeout
|
Timeout value that controls how long the client waits on Connection establishment before returning with an error. (By default the client waits 15 seconds for a connection to be established before failing).
|
jms.clientIDPrefix
|
Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS
ConnectionFactory. The default prefix is ID:.
|
jms.connectionIDPrefix
|
Optional prefix value that is used for generated Connection ID values when a new Connection is created for the JMS
ConnectionFactory. This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing the logs easier. The default prefix is ID:.
|
Table 21.3. Prefetch Options
| Option | Description |
|---|---|
jms.prefetchPolicy.queuePrefetch
|
defaults to
1000
|
jms.prefetchPolicy.topicPrefetch
|
defaults to
1000
|
jms.prefetchPolicy.queueBrowserPrefetch
|
defaults to
1000
|
jms.prefetchPolicy.durableTopicPrefetch
|
defaults to
1000
|
jms.prefetchPolicy.all
|
used to set all prefetch values at once.
|
RedeliveryPolicy parameter controls how redelivered messages are handled on the client.
Table 21.4. Redelivery Options
| Option | Description |
|---|---|
jms.redeliveryPolicy.maxRedeliveries | Controls when an incoming message is rejected based on the number of times it has been redelivered, the default value is disabled (-1). A value of zero (0) would indicate no message redeliveries are accepted, a value of five (5) would allow a message to be redelivered five times. |
amqp://localhost:5672?jms.clientID=foo&transport.connectTimeout=30000
Table 21.5. TCP Transport Options
| Option | Description |
|---|---|
transport.sendBufferSize
|
default is
64k
|
transport.receiveBufferSize
|
default is
64k
|
transport.trafficClass
|
default is
0
|
transport.connectTimeout
|
default is
60 seconds
|
transport.soTimeout
|
default is
-1
|
transport.soLinger
|
default is
-1
|
transport.tcpKeepAlive
|
default is
false
|
transport.tcpNoDelay
|
default is
true
|
amqps://localhost:5673
Table 21.6. SSL Transport Options
| Option | Description |
|---|---|
transport.keyStoreLocation
|
The default is to read from the system property
javax.net.ssl.keyStore
|
transport.keyStorePassword
|
The default is to read from the system property
javax.net.ssl.keyStorePassword
|
transport.trustStoreLocation
|
The default is to read from the system property
javax.net.ssl.trustStore
|
transport.trustStorePassword
|
The default is to read from the system property
javax.net.ssl.keyStorePassword
|
transport.storeType
|
The type of trust store being used. Default is
JKS.
|
transport.contextProtocol
|
The protocol argument used when getting an SSLContext. Default is
TLS.
|
transport.enabledCipherSuites
|
The cipher suites to enable, comma separated. No default, meaning the context default ciphers are used. Any disabled ciphers are removed from this.
|
transport.disabledCipherSuites
|
The cipher suites to disable, comma separated. Ciphers listed here are removed from the enabled ciphers. No default.
|
transport.enabledProtocols
|
The protocols to enable, comma separated. No default, meaning the context default protocols are used. Any disabled protocols are removed from this.
|
transport.disabledProtocols
|
The protocols to disable, comma separated. Protocols listed here are removed from the enabled protocols. Default is
SSLv2Hello,SSLv3.
|
transport.trustAll
|
Whether to trust the provided server certificate implicitly, regardless of any configured trust store. Defaults to
false.
|
transport.verifyHost
|
Whether to verify that the hostname being connected to matches with the provided server certificate. Defaults to
true.
|
transport.keyAlias
|
The alias to use when selecting a keypair from the keystore if required to send a client certificate to the server. No default.
|
Table 21.7. AMQP Options
| Option | Description |
|---|---|
amqp.idleTimeout
|
The idle timeout in milliseconds after which the connection will be failed if the peer sends no AMQP frames. Default is
60000.
|
amqp.vhost
|
The
vhost to connect to. Used to populate the SASL and Open hostname fields. Default is the main hostname from the Connection URI.
|
amqp.saslLayer
|
Controls whether connections use a SASL layer or not. Default is
true.
|
amqp.saslMechanisms
|
Which SASL mechanism(s) the client allows selection of, if offered by the server and usable with the configured credentials. Comma separated if specifying more than 1 mechanism. Default is to allow selection from all the clients supported mechanisms, which are currently
EXTERNAL, CRAM-MD5, PLAIN, and ANONYMOUS.
|
amqp.maxFrameSize
|
The max-frame-size value in bytes that is advertised to the peer. Default is
1048576.
|
failover prefix and a list of URIs for the brokers is contained inside a set of parentheses.
jms. options are applied to the overall failover URI, outside the parentheses, and affect the JMS Connection object for its lifetime.
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.maxReconnectAttempts=20
transport. or amqp. options defined earlier, with these being applied as each host is connected to:
failover:(amqp://host1:5672?amqp.option=value,amqp://host2:5672?transport.option=value)?jms.clientID=foo
Table 21.8. Failover Options
| Option | Description |
|---|---|
failover.initialReconnectDelay
|
The amount of time the client will wait before the first attempt to reconnect to a remote peer. The default value is zero (
0), meaning the first attempt happens immediately.
|
failover.reconnectDelay
|
Controls the delay between successive reconnection attempts, defaults to
10 milliseconds. If the backoff option is not enabled this value remains constant.
|
failover.maxReconnectDelay
|
The maximum time that the client will wait before attempting a reconnect. This value is only used when the backoff feature is enabled to ensure that the delay does not grow too large. Defaults to
30 seconds as the max time between connect attempts.
|
failover.useReconnectBackOff
|
Controls whether the time between reconnection attempts grows based on a configured multiplier. This option defaults to
true.
|
failover.reconnectBackOffMultiplier
|
The multiplier used to grow the reconnection delay value, defaults to
2.0d.
|
failover.maxReconnectAttempts
|
The number of reconnection attempts allowed before reporting the connection as failed to the client. The default is no limit or (
-1).
|
failover.startupMaxReconnectAttempts
|
For a client that has never connected to a remote peer before this option control how many attempts are made to connect before reporting the connection as failed. The default is to use the value of
maxReconnectAttempts.
|
failover.warnAfterReconnectAttempts
|
Controls how often the client will log a message indicating that failover reconnection is being attempted. The default is to log every
10 connection attempts.
|
transport. and amqp. URI options outlined earlier for a non-failover broker URI but prefixed with failover.nested.. For example, to apply the same value for the amqp.vhost option to every broker connected to you might have a URI like:
failover:(amqp://host1:5672,amqp://host2:5672)?jms.clientID=foo&failover.nested.amqp.vhost=myhost
failover.nested. to discovery.discovered. For example, without the agent URI details, a general discovery URI might look like:
discovery:(<agent-uri>)?discovery.maxReconnectAttempts=20&discovery.discovered.jms.clientID=foo
discovery:(file:///path/to/monitored-file?updateInterval=60000)
updateInterval. It controls the frequency in milliseconds which the file is inspected for change. The default value is 30000.
discovery:(multicast://default?group=default)
239.255.2.3:6155). You may change this to specify the actual IP and port in use with your multicast configuration.
group. It controls which multicast group messages are listened for on. The default value is default.
21.3. QPID AMQP 1.0 JMS Client Logging
org.apache.qpid.jms hierarchy, which you can use to configure a logging implementation based on your needs.
- Set the environment variable (not Java system property)
PN_TRACE_FRMtotrue, which will cause Proton to emit frame logging to stdout. - Add the option
amqp.traceFrames=trueto your connection URI to have the client add a protocol tracer to Proton, and configure theorg.apache.qpid.jms.provider.amqp.FRAMESLogger toTRACElevel to include the output in your logs.
Chapter 22. .NET Binding for Qpid C++ Messaging
22.1. .NET Binding for the C++ Messaging Client Examples
Table 22.1. Client and Server Examples
| Example Name | Example Description |
|---|---|
csharp.example.server | Creates a receiver and listens for messages. Upon receipt, the content of the message is converted to upper case and forwarded to the received message's ReplyTo address. |
csharp.example.client | Sends a series of messages to the server and prints the original message content and the received message content. |
See Also:
22.2. .NET Binding Class Mapping to Underlying C++ Messaging API
Table 22.2. Map Sender and Receiver Examples
| Example Name | Example Description |
|---|---|
csharp.map.receiver | Creates a receiver and listens for a map message. Upon receipt, the message is decoded and displayed on the console. |
csharp.map.sender | Creates a map message and sends it to map.receiver. The map message contains values for every supported .NET messaging binding data type. |
See Also:
22.3. .NET Binding for the C++ Messaging API Class: Address
Table 22.3. .NET Binding for the C++ Messaging API Class: Address
| .NET Binding Class: Address | |
|---|---|
| Language | Syntax |
| C++ | class Address |
| .NET | public ref class Address |
| Constructor | |
| C++ | Address(); |
| .NET | public Address(); |
| Constructor | |
| C++ | Address(const std::string& address); |
| .NET | public Address(string address); |
| Constructor | |
| C++ | Address(const std::string& name, const std::string& subject, const qpid::types::Variant::Map& options, const std::string& type = ""); |
| .NET | public Address(string name, string subject, Dictionary<string, object> options); |
| .NET | public Address(string name, string subject, Dictionary<string, object> options, string type); |
| Copy constructor | |
| C++ | Address(const Address& address); |
| .NET | public Address(Address address); |
| Destructor | |
| C++ | ~Address(); |
| .NET | ~Address(); |
| Finalizer | |
| C++ | not applicable |
| .NET | !Address(); |
| Copy assignment operator | |
| C++ | Address& operator=(const Address&); |
| .NET | public Address op_Assign(Address rhs); |
| Property: Name | |
| C++ | const std::string& getName() const; |
| C++ | void setName(const std::string&); |
| .NET | public string Name { get; set; } |
| Property: Subject | |
| C++ | const std::string& getSubject() const; |
| C++ | void setSubject(const std::string&); |
| .NET | public string Subject { get; set; } |
| Property: Options | |
| C++ | const qpid::types::Variant::Map& getOptions() const; |
| C++ | qpid::types::Variant::Map& getOptions(); |
| C++ | void setOptions(const qpid::types::Variant::Map&); |
| .NET | public Dictionary<string, object> Options { get; set; } |
| Property: Type | |
| C++ | std::string getType() const; |
| C++ | void setType(const std::string&); |
| .NET | public string Type { get; set; } |
| Miscellaneous | |
| C++ | std::string str() const; |
| .NET | public string ToStr(); |
| Miscellaneous | |
| C++ | operator bool() const; |
| .NET | not applicable |
| Miscellaneous | |
| C++ | bool operator !() const; |
| .NET | not applicable |
See Also:
22.4. .NET Binding for the C++ Messaging API Class: Connection
Table 22.4. .NET Binding for the C++ Messaging API Class: Connection
| .NET Binding Class: Connection | |
|---|---|
| Language | Syntax |
| C++ | class Connection : public qpid::messaging::Handle<ConnectionImpl> |
| .NET | public ref class Connection |
| Constructor | |
| C++ | Connection(ConnectionImpl* impl); |
| .NET | not applicable |
| Constructor | |
| C++ | Connection(); |
| .NET | not applicable |
| Constructor | |
| C++ | Connection(const std::string& url, const qpid::types::Variant::Map& options = qpid::types::Variant::Map()); |
| .NET | public Connection(string url); |
| .NET | public Connection(string url, Dictionary<string, object> options); |
| Constructor | |
| C++ | Connection(const std::string& url, const std::string& options); |
| .NET | public Connection(string url, string options); |
| Copy Constructor | |
| C++ | Connection(const Connection&); |
| .NET | public Connection(Connection connection); |
| Destructor | |
| C++ | ~Connection(); |
| .NET | ~Connection(); |
| Finalizer | |
| C++ | not applicable |
| .NET | !Connection(); |
| Copy assignment operator | |
| C++ | Connection& operator=(const Connection&); |
| .NET | public Connection op_Assign(Connection rhs); |
| Method: SetOption | |
| C++ | void setOption(const std::string& name, const qpid::types::Variant& value); |
| .NET | public void SetOption(string name, object value); |
| Method: open | |
| C++ | void open(); |
| .NET | public void Open(); |
| Property: isOpen | |
| C++ | bool isOpen(); |
| .NET | public bool IsOpen { get; } |
| Method: close | |
| C++ | void close(); |
| .NET | public void Close(); |
| Method: createTransactionalSession | |
| C++ | Session createTransactionalSession(const std::string& name = std::string()); |
| .NET | public Session CreateTransactionalSession(); |
| .NET | public Session CreateTransactionalSession(string name); |
| Method: createSession | |
| C++ | Session createSession(const std::string& name = std::string()); |
| .NET | public Session CreateSession(); |
| .NET | public Session CreateSession(string name); |
| Method: getSession | |
| C++ | Session getSession(const std::string& name) const; |
| .NET | public Session GetSession(string name); |
Property: AuthenticatedUsername | |
| C++ | std::string getAuthenticatedUsername(); |
| .NET | public string GetAuthenticatedUsername(); |
See Also:
22.5. .NET Binding for the C++ Messaging API Class: Duration
Table 22.5. .NET Binding for the C++ Messaging API Class: Duration
| .NET Binding Class: Duration | |
|---|---|
| Language | Syntax |
| C++ | class Duration |
| .NET | public ref class Duration |
| Constructor | |
| C++ | explicit Duration(uint64_t milliseconds); |
| .NET | public Duration(ulong mS); |
| Copy constructor | |
| C++ | not applicable |
| .NET | public Duration(Duration rhs); |
| Destructor | |
| C++ | default |
| .NET | default |
| Finalizer | |
| C++ | not applicable |
| .NET | default |
| Property: Milliseconds | |
| C++ | uint64_t getMilliseconds() const; |
| .NET | public ulong Milliseconds { get; } |
| Operator: * | |
| C++ | Duration operator*(const Duration& duration, uint64_t multiplier); |
| .NET | public static Duration operator *(Duration dur, ulong multiplier); |
| .NET | public static Duration Multiply(Duration dur, ulong multiplier); |
| C++ | Duration operator*(uint64_t multiplier, const Duration& duration); |
| .NET | public static Duration operator *(ulong multiplier, Duration dur); |
| .NET | public static Duration Multiply(ulong multiplier, Duration dur); |
| Constants | |
| C++ | static const Duration FOREVER; |
| C++ | static const Duration IMMEDIATE; |
| C++ | static const Duration SECOND; |
| C++ | static const Duration MINUTE; |
| .NET | public sealed class DurationConstants |
| .NET | public static Duration FORVER; |
| .NET | public static Duration IMMEDIATE; |
| .NET | public static Duration MINUTE; |
| .NET | public static Duration SECOND; |
See Also:
22.6. .NET Binding for the C++ Messaging API Class: FailoverUpdates
Table 22.6. .NET Binding for the C++ Messaging API Class: FailoverUpdates
| .NET Binding Class: FailoverUpdates | |
|---|---|
| Language | Syntax |
| C++ | class FailoverUpdates |
| .NET | public ref class FailoverUpdates |
| Constructor | |
| C++ | FailoverUpdates(Connection& connection); |
| .NET | public FailoverUpdates(Connection connection); |
| Destructor | |
| C++ | ~FailoverUpdates(); |
| .NET | ~FailoverUpdates(); |
| Finalizer | |
| C++ | not applicable |
| .NET | !FailoverUpdates(); |
See Also:
22.7. .NET Binding for the C++ Messaging API Class: Message
Table 22.7. .NET Binding for the C++ Messaging API Class: Message
| .NET Binding Class: Message | |
|---|---|
| Language | Syntax |
| C++ | class Message |
| .NET | public ref class Message |
| Constructor | |
| C++ | Message(const std::string& bytes = std::string()); |
| .NET | Message(); |
| .NET | Message(System::String ^ theStr); |
| .NET | Message(System::Object ^ theValue); |
| .NET | Message(array<System::Byte> ^ bytes); |
| Constructor | |
| C++ | Message(const char*, size_t); |
| .NET | public Message(byte[] bytes, int offset, int size); |
| Copy Constructor | |
| C++ | Message(const Message&); |
| .NET | public Message(Message message); |
| Copy assignment operator | |
| C++ | Message& operator=(const Message&); |
| .NET | public Message op_Assign(Message rhs); |
| Destructor | |
| C++ | ~Message(); |
| .NET | ~Message(); |
| Finalizer | |
| C++ | not applicable |
| .NET | !Message() |
| Property: ReplyTo | |
| C++ | void setReplyTo(const Address&); |
| C++ | const Address& getReplyTo() const; |
| .NET | public Address ReplyTo { get; set; } |
| Property: Subject | |
| C++ | void setSubject(const std::string&); |
| C++ | const std::string& getSubject() const; |
| .NET | public string Subject { get; set; } |
| Property: ContentType | |
| C++ | void setContentType(const std::string&); |
| C++ | const std::string& getContentType() const; |
| .NET | public string ContentType { get; set; } |
| Property: MessageId | |
| C++ | void setMessageId(const std::string&); |
| C++ | const std::string& getMessageId() const; |
| .NET | public string MessageId { get; set; } |
| Property: UserId | |
| C++ | void setUserId(const std::string&); |
| C++ | const std::string& getUserId() const; |
| .NET | public string UserId { get; set; } |
| Property: CorrelationId | |
| C++ | void setCorrelationId(const std::string&); |
| C++ | const std::string& getCorrelationId() const; |
| .NET | public string CorrelationId { get; set; } |
| Property: Priority | |
| C++ | void setPriority(uint8_t); |
| C++ | uint8_t getPriority() const; |
| .NET | public byte Priority { get; set; } |
| Property: Ttl | |
| C++ | void setTtl(Duration ttl); |
| C++ | Duration getTtl() const; |
| .NET | public Duration Ttl { get; set; } |
| Property: Durable | |
| C++ | void setDurable(bool durable); |
| C++ | bool getDurable() const; |
| .NET | public bool Durable { get; set; } |
| Property: Redelivered | |
| C++ | bool getRedelivered() const; |
| C++ | void setRedelivered(bool); |
| .NET | public bool Redelivered { get; set; } |
| Method: SetProperty | |
| C++ | void setProperty(const std::string&, const qpid::types::Variant&); |
| .NET | public void SetProperty(string name, object value); |
| Property: Properties | |
| C++ | const qpid::types::Variant::Map& getProperties() const; |
| C++ | qpid::types::Variant::Map& getProperties(); |
| .NET | public Dictionary<string, object> Properties { get; set; } |
| Method: SetContent | |
| C++ | void setContent(const std::string&); |
| C++ | void setContent(const char* chars, size_t count); |
| .NET | public void SetContent(byte[] bytes); |
| .NET | public void SetContent(string content); |
| .NET | public void SetContent(byte[] bytes, int offset, int size); |
| Method: GetContent | |
| C++ | std::string getContent() const; |
| .NET | public string GetContent(); |
| .NET | public void GetContent(byte[] arr); |
| .NET | public void GetContent(Collection<object> __p1); |
| .NET | public void GetContent(Dictionary<string, object> dict); |
| Method: GetContentPtr | |
| C++ | const char* getContentPtr() const; |
| .NET | not applicable |
| Property: ContentSize | |
| C++ | size_t getContentSize() const; |
| .NET | public ulong ContentSize { get; } |
| Struct: EncodingException | |
| C++ | struct EncodingException : qpid::types::Exception |
| .NET | not applicable |
| Method: decode | |
| C++ | void decode(const Message& message, qpid::types::Variant::Map& map, const std::string& encoding = std::string()); |
| C++ | void decode(const Message& message, qpid::types::Variant::List& list, const std::string& encoding = std::string()); |
| .NET | not applicable |
| Method: encode | |
| C++ | void encode(const qpid::types::Variant::Map& map, Message& message, const std::string& encoding = std::string()); |
| C++ | void encode(const qpid::types::Variant::List& list, Message& message, const std::string& encoding = std::string()); |
| .NET | not applicable |
| Method: AsString | |
| C++ | not applicable |
| .NET | public string AsString(object obj); |
| .NET | public string ListAsString(Collection<object> list); |
| .NET | public string MapAsString(Dictionary<string, object> dict); |
See Also:
22.8. .NET Binding for the C++ Messaging API Class: Receiver
Table 22.8. .NET Binding for the C++ Messaging API Class: Receiver
| .NET Binding Class: Receiver | |
|---|---|
| Language | Syntax |
| C++ | class Receiver |
| .NET | public ref class Receiver |
| Constructor | |
| .NET | Constructed object is returned by Session.CreateReceiver |
| Copy constructor | |
| C++ | Receiver(const Receiver&); |
| .NET | public Receiver(Receiver receiver); |
| Destructor | |
| C++ | ~Receiver(); |
| .NET | ~Receiver(); |
| Finalizer | |
| C++ | not applicable |
| .NET | !Receiver() |
| Copy assignment operator | |
| C++ | Receiver& operator=(const Receiver&); |
| .NET | public Receiver op_Assign(Receiver rhs); |
| Method: Get | |
| C++ | bool get(Message& message, Duration timeout=Duration::FOREVER); |
| .NET | public bool Get(Message mmsgp); |
| .NET | public bool Get(Message mmsgp, Duration durationp); |
| Method: Get | |
| C++ | Message get(Duration timeout=Duration::FOREVER); |
| .NET | public Message Get(); |
| .NET | public Message Get(Duration durationp); |
| Method: Fetch | |
| C++ | bool fetch(Message& message, Duration timeout=Duration::FOREVER); |
| .NET | public bool Fetch(Message mmsgp); |
| .NET | public bool Fetch(Message mmsgp, Duration duration); |
| Method: Fetch | |
| C++ | Message fetch(Duration timeout=Duration::FOREVER); |
| .NET | public Message Fetch(); |
| .NET | public Message Fetch(Duration durationp); |
| Property: Capacity | |
| C++ | void setCapacity(uint32_t); |
| C++ | uint32_t getCapacity(); |
| .NET | public uint Capacity { get; set; } |
| Property: Available | |
| C++ | uint32_t getAvailable(); |
| .NET | public uint Available { get; } |
| Property: Unsettled | |
| C++ | uint32_t getUnsettled(); |
| .NET | public uint Unsettled { get; } |
| Method: Close | |
| C++ | void close(); |
| .NET | public void Close(); |
| Property: IsClosed | |
| C++ | bool isClosed() const; |
| .NET | public bool IsClosed { get; } |
| Property: Name | |
| C++ | const std::string& getName() const; |
| .NET | public string Name { get; } |
| Property: Session | |
| C++ | Session getSession() const; |
| .NET | public Session Session { get; } |
See Also:
22.9. .NET Binding for the C++ Messaging API Class: Sender
Table 22.9. .NET Binding for the C++ Messaging API Class: Sender
| .NET Binding Class: Sender | |
|---|---|
| Language | Syntax |
| C++ | class Sender |
| .NET | public ref class Sender |
| Constructor | |
| .NET | Constructed object is returned by session.createSender |
| Copy constructor | |
| C++ | Sender(const Sender&); |
| .NET | public Sender(Sender sender); |
| Destructor | |
| C++ | ~Sender(); |
| .NET | ~Sender(); |
| Finalizer | |
| C++ | not applicable |
| .NET | !Sender() |
| Copy assignment operator | |
| C++ | Sender& operator=(const Sender&); |
| .NET | public Sender op_Assign(Sender rhs); |
| Method: Send | |
| C++ | void send(const Message& message, bool sync=false); |
| .NET | public void Send(Message mmsgp); |
| .NET | public void Send(Message mmsgp, bool sync); |
| Method: Close | |
| C++ | void close(); |
| .NET | public void Close(); |
| Property: Capacity | |
| C++ | void setCapacity(uint32_t); |
| C++ | uint32_t getCapacity(); |
| .NET | public uint Capacity { get; set; } |
| Property: Available | |
| C++ | uint32_t getAvailable(); |
| .NET | public uint Available { get; } |
| Property: Unsettled | |
| C++ | uint32_t getUnsettled(); |
| .NET | public uint Unsettled { get; } |
| Property: Name | |
| C++ | const std::string& getName() const; |
| .NET | public string Name { get; } |
| Property: Session | |
| C++ | Session getSession() const; |
| .NET | public Session Session { get; } |
See Also:
22.10. .NET Binding for the C++ Messaging API Class: Session
Table 22.10. .NET Binding for the C++ Messaging API Class: Session
| Language | Syntax |
|---|---|
| C++ | class Session |
| .NET | public ref class Session |
| Constructor | |
| .NET | Constructed object is returned by Connection.CreateSession |
| Copy constructor | |
| C++ | Session(const Session&); |
| .NET | public Session(Session session); |
| Destructor | |
| C++ | ~Session(); |
| .NET | ~Session(); |
| Finalizer | |
| C++ | not applicable |
| .NET | !Session() |
| Copy assignment operator | |
| C++ | Session& operator=(const Session&); |
| .NET | public Session op_Assign(Session rhs); |
| Method: Close | |
| C++ | void close(); |
| .NET | public void Close(); |
| Method: Commit | |
| C++ | void commit(); |
| .NET | public void Commit(); |
| Method: Rollback | |
| C++ | void rollback(); |
| .NET | public void Rollback(); |
| Method: Acknowledge | |
| C++ | void acknowledge(bool sync=false); |
| C++ | void acknowledge(Message&, bool sync=false); |
| .NET | public void Acknowledge(); |
| .NET | public void Acknowledge(bool sync); |
| .NET | public void Acknowledge(Message __p1); |
| .NET | public void Acknowledge(Message __p1, bool __p2); |
| Method: Reject | |
| C++ | void reject(Message&); |
| .NET | public void Reject(Message __p1); |
| Method: Release | |
| C++ | void release(Message&); |
| .NET | public void Release(Message __p1); |
| Method: Sync | |
| C++ | void sync(bool block=true); |
| .NET | public void Sync(); |
| .NET | public void Sync(bool block); |
| Property: Receivable | |
| C++ | uint32_t getReceivable(); |
| .NET | public uint Receivable { get; } |
| Property: UnsettledAcks | |
| C++ | uint32_t getUnsettledAcks(); |
| .NET | public uint UnsettledAcks { get; } |
| Method: NextReceiver | |
| C++ | bool nextReceiver(Receiver&, Duration timeout=Duration::FOREVER); |
| .NET | public bool NextReceiver(Receiver rcvr); |
| .NET | public bool NextReceiver(Receiver rcvr, Duration timeout); |
| Method: NextReceiver | |
| C++ | Receiver nextReceiver(Duration timeout=Duration::FOREVER); |
| .NET | public Receiver NextReceiver(); |
| .NET | public Receiver NextReceiver(Duration timeout); |
| Method: CreateSender | |
| C++ | Sender createSender(const Address& address); |
| .NET | public Sender CreateSender(Address address); |
| Method: CreateSender | |
| C++ | Sender createSender(const std::string& address); |
| .NET | public Sender CreateSender(string address); |
| Method: CreateReceiver | |
| C++ | Receiver createReceiver(const Address& address); |
| .NET | public Receiver CreateReceiver(Address address); |
| Method: CreateReceiver | |
| C++ | Receiver createReceiver(const std::string& address); |
| .NET | public Receiver CreateReceiver(string address); |
| Method: GetSender | |
| C++ | Sender getSender(const std::string& name) const; |
| .NET | public Sender GetSender(string name); |
| Method: GetReceiver | |
| C++ | Receiver getReceiver(const std::string& name) const; |
| .NET | public Receiver GetReceiver(string name); |
| Property: Connection | |
| C++ | Connection getConnection() const; |
| .NET | public Connection Connection { get; } |
| Property: HasError | |
| C++ | bool hasError(); |
| .NET | public bool HasError { get; } |
| Method: CheckError | |
| C++ | void checkError(); |
| .NET | public void CheckError(); |
See Also:
22.11. .NET Class: SessionReceiver
SessionReceiver class provides a convenient callback mechanism for messages received by all receivers on a given session.
using Org.Apache.Qpid.Messaging;
using System;
namespace Org.Apache.Qpid.Messaging.SessionReceiver
{
public interface ISessionReceiver
{
void SessionReceiver(Receiver receiver, Message message);
}
public class CallbackServer
{
public CallbackServer(Session session, ISessionReceiver callback);
public void Close();
}
}
Org.Apache.Qpid.Messaging and Org.Apache.Qpid.Messaging.SessionReceiver. The calling program creates a function that implements the ISessionReceiver interface. This function will be called whenever a message is received by the session. The callback process is started by creating a CallbackServer and will continue to run until the client program calls the CallbackServer.Close function.
SessionReceiver callback is contained in cpp/bindings/qpid/dotnet/examples/csharp.map.callback.receiver.
See Also:
Appendix A. Exchange and Queue Declaration Arguments
A.1. Exchange and Queue Argument Reference
- Changes
qpid.last_value_queueandqpid.last_value_queue_no_browsedeprecated and removed.qpid.msg_sequencequeue argument replaced byqpid.queue_msg_sequence.ring_strictandflow_to_diskare no longer validqpid.policy_typevalues.qpid.persist_last_nodedeprecated and removed.
Exchange options
qpid.exclusive-binding(bool)- Ensures that a given binding key is associated with only one queue.
qpid.ive(bool)- If set to “true”, the exchange is an initial value exchange, which differs from other exchanges in only one way: the last message sent to the exchange is cached, and if a new queue is bound to the exchange, it attempts to route this message to the queue, if the message matches the binding criteria. This allows a new queue to use the last received message as an initial value.
qpid.msg_sequence(bool)- If set to “true”, the exchange inserts a sequence number named “qpid.msg_sequence” into the message headers of each message. The type of this sequence number is int64. The sequence number for the first message routed from the exchange is 1, it is incremented sequentially for each subsequent message. The sequence number is reset to 1 when the qpid broker is restarted.
qpid.sequence_counter(int64)- Start
qpid.msg_sequencecounting at the given number.
Queue options
no-local(bool)- Specifies that the queue should discard any messages enqueued by sessions on the same connection as that which declares the queue.
qpid.alert_count(uint32_t)- If the queue message count goes above this size an alert should be sent.
qpid.alert_repeat_gap(int64_t)- Controls the minimum interval between events in seconds. The default value is 60 seconds.
qpid.alert_size(int64_t)- If the queue size in bytes goes above this size an alert should be sent.
qpid.auto_delete_timeout(bool)- If a queue is configured to be automatically deleted, it will be deleted after the amount of seconds specified here.
qpid.browse-only(bool)- All users of queue are forced to browse. Limit queue size with ring, LVQ, or TTL. Note that this argument name uses a hyphen rather than an underscore.
qpid.file_count(int)- Set the number of files in the persistence journal for the queue. Default value is 8.
qpid.file_size(int64)- Set the number of pages in the file (each page is 64KB). Default value is 24.
qpid.flow_resume_count(uint32_t)- Flow resume threshold value as a message count.
qpid.flow_resume_size(uint64_t)- Flow resume threshold value in bytes.
qpid.flow_stop_count(uint32_t)- Flow stop threshold value as a message count.
qpid.flow_stop_size(uint64_t)- Flow stop threshold value in bytes.
qpid.last_value_queue_key(string)- Defines the key to use for a last value queue.
qpid.max_count(uint32_t)- The maximum byte size of message data that a queue can contain before the action dictated by the
policy_typeis taken. qpid.max_size(uint64_t)- The maximum number of messages that a queue can contain before the action dictated by the
policy_typeis taken. qpid.policy_type(string)- Sets default behavior for controlling queue size. Valid values are
rejectandring. qpid.priorities(size_t)- The number of distinct priority levels recognized by the queue (up to a maximum of 10). The default value is 1 level.
qpid.queue_msg_sequence(string)- Causes a custom header with the specified name to be added to enqueued messages. This header is automatically populated with a sequence number.
qpid.trace.exclude(string)- Does not send on messages which include one of the given (comma separated) trace ids.
qpid.trace.id(string)- Adds the given trace id as to the application header "
x-qpid.trace" in messages sent from the queue. x-qpid-maximum-message-count- This is an alias for
qpid.alert_count. x-qpid-maximum-message-size- This is an alias for
qpid.alert_size. x-qpid-minimum-alert-repeat-gap- This is an alias for
qpid.alert_repeat_gap. x-qpid-priorities- This is an alias for
qpid.priorities.
Appendix B. Revision History
| Revision History | |||
|---|---|---|---|
| Revision 3.2.0-6 | Fri Oct 16 2015 | ||
| |||
| Revision 3.2.0-5 | Thu Oct 8 2015 | ||
| |||
| Revision 3.2.0-3 | Tue Sep 29 2015 | ||
| |||
| Revision 3.2.0-1 | Tue Jul 14 2015 | ||
| |||
| Revision 3.1.0-5 | Wed Apr 01 2015 | ||
| |||
| Revision 3.0.0-4 | Tue Sep 23 2014 | ||
| |||
