Chapter 15. Connecting to Kafka

Apache Kafka is a distributed streaming platform that you can use to obtain and publish data. In an integration, you can subscribe for data from a Kafka topic that you specify or publish data to a Kafka topic that you specify. To do this, create a connection to Kafka and add that connection to an integration flow. Details are in the following topics:

15.1. Creating a connection to a Kafka broker

In an integration, to subscribe for data from a Kafka topic or publish data to a Kafka topic, create a connection to Kafka and then add that connection to an integration.

Prerequisite

  • If you want to use Transport Layer Security (TLS) to encrypt your data, you have the Kafka broker’s PEM certificate text. Typically, you obtain the broker certificate text from your Kafka server administrator.

Procedure

  1. In Fuse Online, in the left panel, click Connections to display any available connections.
  2. Click Create Connection to display connectors.
  3. Click the Kafka Message Broker connector.
  4. In the Kafka broker URIs field, select the broker that you want this connection to access, or enter a comma separated list of Kafka broker URIs. Each URI should be in the form host:port.
  5. For the Transport Protocol field, select one of the following options:

    • If you want to encrypt your data to protect it in transit, select TLS (Transport Layer Security).
    • If you do not want to encrypt your data, select Plain and then skip to Step 7.
  6. If you selected TLS in Step 5, then in the Broker certificate field, paste the Kafka broker’s PEM certificate text.
  7. Optional. Click Add to specify key:value pairs to configure Kafka producer and consumer options.

    For example, if you want a new integration to be able to consume old messages from a topic, change the auto.offset.reset value from the default (latest) to earliest by typing auto.offset.reset for the Key field and earliest for the Value field.

    For details about Kafka producer configuration options, go to https://kafka.apache.org/documentation/#producerconfigs

    For details about Kafka consumer configuration options, go to https://kafka.apache.org/documentation/#consumerconfigs

    Note: If you add configuration attributes, Fuse Online does not include them as part of its validation process in the next step.

  8. Click Validate. Fuse Online immediately tries to validate the connection and displays a message that indicates whether validation is successful. If validation fails, revise the input parameter and try again.
  9. If validation is successful, click Next.
  10. In the Name field, enter your choice of a name that helps you distinguish this connection from any other connections. For example, you might type Kafka West.
  11. In the Description field, optionally enter any information that is helpful to know about this connection.
  12. Click Save to see that the connection you created is now available. If you entered the example name, you would see that Kafka West appears as a connection that you can choose to add to an integration.

15.2. Obtaining data from a Kafka broker to trigger integration execution

To trigger execution of an integration upon receiving data from a Kafka broker, add a Kafka connection as the start connection. When the integration is running, the Kafka connection continuously watches for data in the Kafka topic that you specify. When the connection finds new data, it passes that data to the next step in the integration.

Prerequisite

You created a connection to a Kafka broker.

Procedure

  1. In the Fuse Online panel on the left, click Integrations.
  2. Click Create Integration.
  3. On the Choose a connection page, click the Kafka connection that you want to use to start the integration.
  4. On the Choose an action page, select the Subscribe action to receive data from the topic that you specify.
  5. In the Topic Name field, click the down carat to display a list of topics and click the topic that you want to subscribe to. Or, type a topic name to create that topic.
  6. Click Next to specify the action’s output type.
  7. In the Select Type field, if the data type does not need to be known, accept Type specification not required and click Next. You do not need to follow the rest of these instructions.

    However, a structured data type is recommended. For example, if you want to map the Kafka connection output to a subsequent step then you must specify the data type. The data mapper does not recognize unstructured data.

    To specify the data type, click in the Select Type field and select one of the following as the schema type:

    • JSON schema is a document that describes the structure of JSON data. The document’s media type is application/schema+json.
    • JSON instance is a document that contains JSON data. The document’s media type is application/json.
    • XML schema is a document that describes the structure of XML data. The document’s file extension is .xsd.
    • XML instance is a document that contains XML data. The document’s file extension is .xml.
  8. In the Definition input box, paste a definition that conforms to the schema type you selected. For example, if you select JSON schema then you would paste the content of a JSON schema file, which has a media type of application/schema+json.
  9. In the Data Type Name field, enter a name that you choose for the data type. For example, if you are specifying a JSON schema for vendors then you might specify Vendor as the data type name.

    You will see this data type name when you are creating or editing an integration that uses the connection for which you are specifying this type. Fuse Online displays the type name in the integration visualization and in the data mapper.

  10. In the Data Type Description field, provide information that helps you distinguish this type. This description appears in the data mapper when you hover over the step that processes this type.
  11. Click Next.

Result

The integration now has a start connection and Fuse Online prompts you to choose the finish connection.

15.3. Publishing data to a Kafka broker

In an integration, you can publish data to a Kafka broker in the middle of a flow or to finish a simple integration. To do this, add a Kafka connection to the middle of a flow or as a simple integration’s finish connection.

Prerequisites

  • You created a connection to a Kafka broker.
  • You are creating or editing a flow and Fuse Online is prompting you to add to the integration. Or, Fuse Online is prompting you to choose a finish connection.

Procedure

  1. On the Add to Integration page, click the plus sign where you want to add a Kafka connection. Skip this step if you are adding a simple integration’s finish connection.
  2. Click the Kafka connection that you want to use to publish messages.
  3. On the Choose an action page, select Publish.
  4. In the Topic Name field, click the down carat to display a list of topics and click the topic that you want to publish to.
  5. Click Next to specify the action’s input type.
  6. In the Select Type field, if the data type does not need to be known, accept Type specification not required and click Next. You do not need to follow the rest of these instructions.

    However, a structured data type is recommended. For example, if you want to map the connection input in a data mapper step then you must specify the data type. The data mapper cannot display fields for unstructured data.

    To specify the data type, click in the Select Type field and select one of the following as the schema type:

    • JSON schema is a document that describes the structure of JSON data. The document’s media type is application/schema+json.
    • JSON instance is a document that contains JSON data. The document’s media type is application/json.
    • XML schema is a document that describes the structure of XML data. The document’s file extension is .xsd.
    • XML instance is a document that contains XML data. The document’s file extension is .xml.
  7. In the Definition input box, paste a definition that conforms to the schema type you selected. For example, if you select JSON schema then you would paste the content of a JSON schema file, which has a media type of application/schema+json.
  8. In the Data Type Name field, enter a name that you choose for the data type. For example, if you are specifying a JSON schema for vendors then you might specify Vendor as the data type name.

    You will see this data type name when you are creating or editing an integration that uses the connection for which you are specifying this type. Fuse Online displays the type name in the integration visualization and in the data mapper.

  9. In the Data Type Description field, provide information that helps you distinguish this type. This description appears in the data mapper when you hover over the step that processes this type.
  10. Click Next.

Result

The connection appears in the integration visualization where you added it.