Red Hat Training

A Red Hat training course is available for Red Hat Fuse

Chapter 11. Connecting to Kafka

Apache Kafka is a distributed streaming platform that you can use to obtain and publish data. In an integration, you can subscribe for data from a Kafka topic that you specify or publish data to a Kafka topic that you specify. To do this, create a connection to Kafka and then add that connection to an integration flow. Details are in the following topics:

11.1. Creating a connection to a Kafka broker

In an integration, to subscribe for data from a Kafka topic or publish data to a Kafka topic, create a connection to Kafka and then add that connection to an integration.

Prerequisite

You must know the URI for the Kafka broker that you want to connect to.

Procedure

  1. In Fuse Online, in the left panel, click Connections to display any available connections.
  2. In the upper right, click Create Connection to display connectors.
  3. Click the Kafka connector.
  4. In the Kafka bootstraps URI field, enter a comma separated list of Kafka broker URIs. Each URI should be in the form host:port.
  5. Click Validate. Fuse Online immediately tries to validate the connection and displays a message that indicates whether validation is successful. If validation fails, revise the input parameter and try again.
  6. If validation is successful, click Next.
  7. In the Connection Name field, enter your choice of a name that helps you distinguish this connection from any other connections. For example, you might enter Kafka West.
  8. In the Description field, optionally enter any information that is helpful to know about this connection.
  9. In the upper right, click Create to see that the connection you created is now available. If you entered the example name, you would see that Kafka West appears as a connection that you can choose to add to an integration.

11.2. Obtaining data from a Kafka broker to trigger integration execution

To trigger execution of an integration upon receiving data from a Kafka broker, add a Kafka connection as the start connection. When the integration is running, the Kafka connection continuously watches for data in the Kafka topic that you specify. When the connection finds new data, it passes that data to the next step in the integration.

Prerequisite

You created a connection to a Kafka broker.

Procedure

  1. In the Fuse Online panel on the left, click Integrations.
  2. Click Create Integration.
  3. On the Choose a Start Connection page, click the Kafka connection that you want to use to start the integration.
  4. On the Choose an Action page, click the Subscribe action to receive data from the topic that you specify.
  5. In the Topic Name field, click the down carat to display a list of topics and click the topic that you want to subscribe to.
  6. Click Next to specify the action’s output type.
  7. In the Select Type field, if the data type does not need to be known, accept Type specification not required and then, at the bottom, click Done. You do not need to follow the rest of these instructions.

    Otherwise, select one of the following as the schema type:

    • JSON schema is a document that describes the structure of JSON data. The document’s media type is application/schema+json.
    • JSON instance is a document that contains JSON data. The document’s media type is application/json.
    • XML schema is a document that describes the structure of XML data. The document’s file extension is .xsd.
    • XML instance is a document that contains XML data. The document’s file extension is .xml.
  8. In the Definition input box, paste a definition that conforms to the schema type you selected. For example, if you select JSON schema then you would paste the content of a JSON schema file, which has a media type of application/schema+json.
  9. In the Data Type Name field, enter a name that you choose for the data type. For example, suppose you are specifying a JSON schema for vendors. You can specify Vendor as the data type name.

    You will see this data type name when you are creating or editing an integration that uses the connection for which you are specifying this type. Fuse Online displays the type name in the integration visualization panel and in the data mapper.

  10. In the Data Type Description field, provide information that helps you distinguish this type. This description appears in the data mapper when you hover over the step that processes this type.
  11. Click Done.

Result

The connection appears at the beginning of the integration flow.

11.3. Publishing data to a Kafka broker

In an integration, you can publish data to a Kafka broker in the middle of a flow or to finish a simple integration. To do this, add a Kafka connection to the middle of a flow or as a simple integration’s finish connection.

Prerequisites

  • You created a connection to a Kafka broker.
  • You are creating or editing a flow and Fuse Online is prompting you to choose a step. Or, Fuse Online is prompting you to choose a finish connection.

Procedure

  1. Click the Kafka connection that you want to use to publish messages.
  2. On the Choose an Action page, click Publish.
  3. In the Topic Name field, click the down carat to display a list of topics and click the topic that you want to publish to.
  4. Click Next to specify the action’s input type.
  5. In the Select Type field, if the data type does not need to be known, accept Type specification not required and then, at the bottom, click Done. You do not need to follow the rest of these instructions.

    Otherwise, select one of the following as the schema type:

    • JSON schema is a document that describes the structure of JSON data. The document’s media type is application/schema+json.
    • JSON instance is a document that contains JSON data. The document’s media type is application/json.
    • XML schema is a document that describes the structure of XML data. The document’s file extension is .xsd.
    • XML instance is a document that contains XML data. The document’s file extension is .xml.
  6. In the Definition input box, paste a definition that conforms to the schema type you selected. For example, if you select JSON schema then you would paste the content of a JSON schema file, which has a media type of application/schema+json.
  7. In the Data Type Name field, enter a name that you choose for the data type. For example, suppose you are specifying a JSON schema for vendors. You can specify Vendor as the data type name.

    You will see this data type name when you are creating or editing an integration that uses the connection for which you are specifying this type. Fuse Online displays the type name in the integration visualization panel and in the data mapper.

  8. In the Data Type Description field, provide information that helps you distinguish this type. This description appears in the data mapper when you hover over the step that processes this type.
  9. Click Done.

Result

The connection appears in the integration flow in the location where you added it.