Chapter 4. Connecting to AMQP

In an integration, you can obtain messages from or publish messages to an Advanced Message Queue Protocol (AMQP) broker. AMQP defines communication between clients and message brokers. To communicate with the following broker types, use the AMQP connector to create a connection to the broker of interest:

  • Apache ActiveMQ broker that supports AMQP
  • Apache ActiveMQ Artemis
  • AMQ 7 broker
  • EnMasse, which is an open source messaging platform

To communicate with one of the following broker types, use the Red Hat AMQ connector to create a connection to the broker of interest:

  • Apache ActiveMQ broker that does not support AMQP
  • AMQ 6 broker
Note

It is possible to use the AMQP connector to create a connection to an Apache ActiveMQ broker that does not support AMQP or to an AMQ 6 broker. Doing this requires transport configuration in the broker. For information about configuring the broker, see Red Hat JBoss A-MQ Managing and Monitoring Brokers, Adding Client Connection Points. For information about the configuration values to specify, see Red Hat JBoss A-MQ Connection Reference, Advanced Message Queuing Protocol (AMQP).

To use the AMQP connector, see:

4.1. Creating an AMQP connection

In an integration, to obtain messages from or publish messages to an AMQP broker, create an AMQP connection, which you can add to an integration.

Prerequisites

For the AMQP broker that you want to connect to, you have the following:

  • Its URI
  • User account credentials
  • Its PEM certificate text

Procedure

  1. In Fuse Online, in the left panel, click Connections to display any available connections.
  2. In the upper right, click Create Connection to display connectors.
  3. Click the AMQP Message Broker connector.
  4. Configure the connection:

    1. In the Connection URI field, enter the location you want to send data to or obtain data from.
    2. In the User name field, enter the user name for the account that you want to use to access this broker.
    3. In the Password field, enter the password for the account that you want to use to access this broker.
    4. In the Client ID field, enter the ID that allows connections to close and reopen without missing messages. The destination type must be a topic.
    5. If this connection will be used in a development environment, you can save some time by disabling Check certificates. Disabling the checking of certificates is a convenience for development environments. For secure production environments, always enable Check certificates.
    6. In the Broker certificate field, paste the broker’s PEM certificate text. This is required except when you disable the checking of certificates.
    7. In the Client certificate field, paste the client’s PEM certificate text. Content in this field is always optional.
  5. Click Validate. Fuse Online immediately tries to validate the connection and displays a message that indicates whether validation is successful. If validation fails, revise the configuration details as needed and try again.
  6. If validation is successful, click Next.
  7. In the Name field, enter your choice of a name that helps you distinguish this connection from any other connections. For example, you might enter AMQP 1.
  8. In the Description field, optionally enter any information that is helpful to know about this connection. For example, enter Sample AMQP connection
  9. Click Save to see that the connection you created is now available. If you entered the example name, you would see that AMQP 1 appears as a connection that you can choose to add to an integration.

4.2. Adding an AMQP connection to trigger integration execution upon receiving messages

To trigger execution of an integration upon receiving messages from an AMQP broker, add an AMQP connection as the integration’s start connection.

Prerequisites

  • You created a connection to the AMQP broker that you want to receive messages from.
  • You are creating an integration and Fuse Online is prompting you to choose the start connection.

Procedure

  1. On the Choose a connection page, click the AMQP connection that you want to use to start the integration.
  2. On the Choose an action page, select the Subscribe for messages action to receive messages from the queue or topic that you specify.
  3. Configure the action:

    1. In the Destination name field, enter the name of the queue or topic to receive data from.
    2. For the Destination type, accept Queue or select Topic.
    3. In the Durable subscription ID field, to allow connections to close and reopen without missing messages, enter the durable subscription ID. The destination type must be a topic.
    4. In the Message selector field, if you want to receive only data that satisfies a particular condition, enter a filter expression.

      A message selector is a string that contains an expression. The syntax of the expression is based on a subset of the SQL92 conditional expression syntax. The message selector in the following example selects any message that has a NewsType property whose value is set to Sports or Opinion:

      NewsType = ’Sports’ OR NewsType = ’Opinion’

      The message consumer receives only those messages whose headers and properties match the message selector expression. A message selector cannot select messages on the basis of the content of the message body.

  4. Click Next to specify the action’s output type.
  5. In the Select Type field, if the data type does not need to be known, accept Type specification not required and click Next. You do not need to follow the rest of these instructions.

    Otherwise, click in the Select Type field and select one of the following as the schema type:

    • JSON schema is a document that describes the structure of JSON data. The document’s media type is application/schema+json.
    • JSON instance is a document that contains JSON data. The document’s media type is application/json.
    • XML schema is a document that describes the structure of XML data. The document’s file extension is .xsd.
    • XML instance is a document that contains XML data. The document’s file extension is .xml.
  6. In the Definition input box, paste a definition that conforms to the schema type you selected. For example, if you select JSON schema then you would paste the content of a JSON schema file, which has a media type of application/schema+json.
  7. In the Data Type Name field, enter a name that you choose for the data type. For example, if you are specifying a JSON schema for vendors then you might specify Vendor as the data type name.

    You will see this data type name when you are creating or editing an integration that uses the connection for which you are specifying this type. Fuse Online displays the type name in the integration visualization and in the data mapper.

  8. In the Data Type Description field, provide information that helps you distinguish this type. This description appears in the data mapper when you hover over the step that processes this type.
  9. Click Next.

Result

The connection appears at the beginning of the integration flow.

4.3. Publishing messages to AMQP in the middle of a flow or to finish an integration

You can publish messages to an AMQP broker in the middle of a flow or to finish a simple integration. To do this, add an AMQP connection to the middle of a flow or as a simple integration’s finish connection.

Prerequisites

  • You created a connection to the AMQP broker that you want to publish messages to.
  • You are creating or editing a flow and Fuse Online is prompting you to add to the integration. Or, Fuse Online is prompting you to choose a simple integration’s finish connection.

Procedure

  1. On the Add to Integration page, click the plus sign where you want to add the connection. Skip this step if Fuse Online is prompting you to choose the finish connection.
  2. Click the AMQP connection that you want to use to publish messages.
  3. On the Choose an action page, select one of the following actions:

    • Publish messages to publish messages to the queue or topic that you specify without receiving a response. To configure this action:

      1. In the Destination name field, enter the name of the queue or topic to send messages to.
      2. For the Destination type, accept Queue or select Topic.
      3. Select Persistent to guarantee message delivery even if a connection fails.
    • Request response using messages to publish messages to the queue or topic that you specify and receive a response.

      Note

      When the finish connection in a simple integration is an AMQP connection that performs the Request response using messages action, the connection publishes the messages but the response is discarded. To avoid losing the response, add an AMQP connection as a middle connection that performs the Request response using messages action and finish the simple integration with a log step.

      To configure this action:

      1. In the Destination name field, enter the name of the queue or topic to send messages to.
      2. For the Destination type, accept Queue or select Topic.
      3. In the Durable subscription ID field, to allow connections to close and reopen without missing messages, enter the durable subscription ID. The destination type must be a topic.
      4. In the Message selector field, if you want to receive only responses that satisfy a particular condition, enter a filter expression.

        A message selector is a string that contains an expression. The syntax of the expression is based on a subset of the SQL92 conditional expression syntax. The message selector in the following example selects any message that has a NewsType property whose value is set to Sports or Opinion:

        NewsType = ’Sports’ OR NewsType = ’Opinion’

        The message consumer receives only those messages whose headers and properties match the message selector expression. A message selector cannot select messages on the basis of the content of the message body.

  4. Click Next to specify the action’s input and output type.
  5. In the Select Type field, if the data type does not need to be known, accept Type specification not required and click Next. You do not need to follow the rest of these instructions.

    Otherwise, click in the Select Type field and select one of the following as the schema type:

    • JSON schema is a document that describes the structure of JSON data. The document’s media type is application/schema+json.
    • JSON instance is a document that contains JSON data. The document’s media type is application/json.
    • XML schema is a document that describes the structure of XML data. The document’s file extension is .xsd.
    • XML instance is a document that contains XML data. The document’s file extension is .xml.
  6. In the Definition input box, paste a definition that conforms to the schema type you selected. For example, if you select JSON schema then you would paste the content of a JSON schema file, which has a media type of application/schema+json.
  7. In the Data Type Name field, enter a name that you choose for the data type. For example, if you are specifying a JSON schema for vendors then you might specify Vendor as the data type name.

    You will see this data type name when you are creating or editing an integration that uses the connection for which you are specifying this type. Fuse Online displays the type name in the integration visualization and in the data mapper.

  8. In the Data Type Description field, provide information that helps you distinguish this type. This description appears in the data mapper when you hover over the step that processes this type.
  9. Click Next.

Result

The connection appears in the integration visualization where you added it.