Chapter 15. Connecting to Kafka

Apache Kafka is a distributed streaming platform that you can use to obtain and publish data. In an integration, you can subscribe for data from a Kafka topic that you specify or publish data to a Kafka topic that you specify. To do this, enable Fuse Online to discover the Kafka brokers in the cluster. Then create a connection to Kafka and add that connection to an integration flow. Details are in the following topics:

15.1. Enabling auto-discovery of Kafka brokers/AMQ Streams

In an integration, if you plan to connect to a Kafka broker that is an AMQ Streams instance, enable auto-discovery of such brokers before you create the Kafka connection. This makes creating the Kafka connection easier because Fuse Online can provide Kafka broker details that are required to configure the connection.

To enable auto-discovery, you give an OpenShift user account privileges that allow that account to see resources on the cluster. The particular privileges that you assign let the account discover different subsets of brokers.

Note

For OpenShift 4.5, enabling auto-discovery of Kafka brokers is not supported. To connect to a Kafka broker, you must know the URI (in host:port format) for the Kafka broker.

Prerequisites

  • You are running Fuse Online on OCP on-site.
  • You installed the oc client tool and it is connected to the cluster in which you want to enable auto-discovery of Kafka brokers.
  • You have cluster administration permissions.
  • Kafka broker(s) (AMQ Streams instances) that you want to connect to in an integration are installed in the same cluster as Fuse Online.

Procedure

  1. Log in to OpenShift with an account that has cluster administration permissions. For example:

    $ oc login -u admin -p admin
  2. Ensure that the current project is the project in which Fuse Online is running. To view the current project, invoke the following command:

    $ oc project
  3. Give privileges that enable access to Kafka brokers in the cluster to an oc user account. How your cluster is configured determines the privileges that you need to grant. For example, for a Strimzi cluster on minishift, you can do the following:

    1. Create a cluster role that can access Kafka/Strimzi resources in the cluster. In the following example, kafkas.kafka.strimzi.io-view is the name of the new role:

      oc create clusterrole kafkas.kafka.strimzi.io-view --verb=get,list --resource=kafkas --resource=crd
    2. Add this cluster role to an oc user. In the following example, syndesis-server is the name of the oc user account:

      oc adm policy add-cluster-role-to-user kafkas.kafka.strimzi.io-view -z syndesis-server

Result

In Fuse Online, when you create a Kafka connection, the connection configuration page displays the Kafka brokers that are available. You select the broker that you want the connection to access.

15.2. Creating a connection to a Kafka broker

In an integration, to subscribe for data from a Kafka topic or publish data to a Kafka topic, create a connection to Kafka and then add that connection to an integration.

Prerequisites

  • You enabled auto-discovery of Kafka brokers in the cluster as described in Enabling auto-discovery of Kafka brokers/AMQ Streams. Or, you know the URI for the Kafka broker that you want to connect to.
  • If you want to use Transport Layer Security (TLS) to encrypt your data, you have the Kafka broker’s PEM certificate text. Typically, you obtain the broker certificate text from your Kafka server administrator.

Procedure

  1. In Fuse Online, in the left panel, click Connections to display any available connections.
  2. Click Create Connection to display connectors.
  3. Click the Kafka Message Broker connector.
  4. In the Kafka broker URIs field, select the broker that you want this connection to access, or enter a comma separated list of Kafka broker URIs. Each URI should be in the form host:port.
  5. For the Transport Protocol field, select one of the following options:

    • If you want to encrypt your data to protect it in transit, select TLS (Transport Layer Security).
    • If you do not want to encrypt your data, select Plain and then skip to Step 7.
  6. If you selected TLS in Step 5, then in the Broker certificate field, paste the Kafka broker’s PEM certificate text.
  7. Click Validate. Fuse Online immediately tries to validate the connection and displays a message that indicates whether validation is successful. If validation fails, revise the input parameter and try again.
  8. If validation is successful, click Next.
  9. In the Name field, enter your choice of a name that helps you distinguish this connection from any other connections. For example, you might enter Kafka West.
  10. In the Description field, optionally enter any information that is helpful to know about this connection.
  11. Click Save to see that the connection you created is now available. If you entered the example name, you would see that Kafka West appears as a connection that you can choose to add to an integration.

15.3. Obtaining data from a Kafka broker to trigger integration execution

To trigger execution of an integration upon receiving data from a Kafka broker, add a Kafka connection as the start connection. When the integration is running, the Kafka connection continuously watches for data in the Kafka topic that you specify. When the connection finds new data, it passes that data to the next step in the integration.

Prerequisite

You created a connection to a Kafka broker.

Procedure

  1. In the Fuse Online panel on the left, click Integrations.
  2. Click Create Integration.
  3. On the Choose a connection page, click the Kafka connection that you want to use to start the integration.
  4. On the Choose an action page, select the Subscribe action to receive data from the topic that you specify.
  5. In the Topic Name field, click the down carat to display a list of topics and click the topic that you want to subscribe to. Or, type a topic name to create that topic.
  6. Click Next to specify the action’s output type.
  7. In the Select Type field, if the data type does not need to be known, accept Type specification not required and click Next. You do not need to follow the rest of these instructions.

    However, a structured data type is recommended. For example, if you want to map the Kafka connection output to a subsequent step then you must specify the data type. The data mapper does not recognize unstructured data.

    To specify the data type, click in the Select Type field and select one of the following as the schema type:

    • JSON schema is a document that describes the structure of JSON data. The document’s media type is application/schema+json.
    • JSON instance is a document that contains JSON data. The document’s media type is application/json.
    • XML schema is a document that describes the structure of XML data. The document’s file extension is .xsd.
    • XML instance is a document that contains XML data. The document’s file extension is .xml.
  8. In the Definition input box, paste a definition that conforms to the schema type you selected. For example, if you select JSON schema then you would paste the content of a JSON schema file, which has a media type of application/schema+json.
  9. In the Data Type Name field, enter a name that you choose for the data type. For example, if you are specifying a JSON schema for vendors then you might specify Vendor as the data type name.

    You will see this data type name when you are creating or editing an integration that uses the connection for which you are specifying this type. Fuse Online displays the type name in the integration visualization and in the data mapper.

  10. In the Data Type Description field, provide information that helps you distinguish this type. This description appears in the data mapper when you hover over the step that processes this type.
  11. Click Next.

Result

The integration now has a start connection and Fuse Online prompts you to choose the finish connection.

15.4. Publishing data to a Kafka broker

In an integration, you can publish data to a Kafka broker in the middle of a flow or to finish a simple integration. To do this, add a Kafka connection to the middle of a flow or as a simple integration’s finish connection.

Prerequisites

  • You created a connection to a Kafka broker.
  • You are creating or editing a flow and Fuse Online is prompting you to add to the integration. Or, Fuse Online is prompting you to choose a finish connection.

Procedure

  1. On the Add to Integration page, click the plus sign where you want to add a Kafka connection. Skip this step if you are adding a simple integration’s finish connection.
  2. Click the Kafka connection that you want to use to publish messages.
  3. On the Choose an action page, select Publish.
  4. In the Topic Name field, click the down carat to display a list of topics and click the topic that you want to publish to.
  5. Click Next to specify the action’s input type.
  6. In the Select Type field, if the data type does not need to be known, accept Type specification not required and click Next. You do not need to follow the rest of these instructions.

    However, a structured data type is recommended. For example, if you want to map the connection input in a data mapper step then you must specify the data type. The data mapper cannot display fields for unstructured data.

    To specify the data type, click in the Select Type field and select one of the following as the schema type:

    • JSON schema is a document that describes the structure of JSON data. The document’s media type is application/schema+json.
    • JSON instance is a document that contains JSON data. The document’s media type is application/json.
    • XML schema is a document that describes the structure of XML data. The document’s file extension is .xsd.
    • XML instance is a document that contains XML data. The document’s file extension is .xml.
  7. In the Definition input box, paste a definition that conforms to the schema type you selected. For example, if you select JSON schema then you would paste the content of a JSON schema file, which has a media type of application/schema+json.
  8. In the Data Type Name field, enter a name that you choose for the data type. For example, if you are specifying a JSON schema for vendors then you might specify Vendor as the data type name.

    You will see this data type name when you are creating or editing an integration that uses the connection for which you are specifying this type. Fuse Online displays the type name in the integration visualization and in the data mapper.

  9. In the Data Type Description field, provide information that helps you distinguish this type. This description appears in the data mapper when you hover over the step that processes this type.
  10. Click Next.

Result

The connection appears in the integration visualization where you added it.