Chapter 19. Kafka messages in a business process

Red Hat AMQ Streams, based on Apache Kafka, is a streaming platform. It acts as a message broker, passing messages, which are sorted into topics, between applications in a software environment.

Using Red Hat Process Automation Manager, you can create business processes that send and receive Kafka messages in the following ways:

  • Create a start event, intermediate catch event, or boundary event (attached to a human task) of the type message. The KIE Server automatically subscribes to the Kafka topic that is defined in the message. A message triggers the event. The event node acts as the consumer of the message and can pass the content of the message to the subsequent node in the process.
  • Create an end event or intermediate throw event of the type message. When the process triggers the event, the KIE Server sends a Kafka message in the topic that is defined in the message. The message contains the data that is configured in the event. The event node acts as the producer of the message.
  • Add the KafkaPublishMessages custom task to the process. This task does not require the KIE Server Kafka capability but can be more complicated to configure than message events.
  • Configure your service and the KIE Server to emit Kafka messages about every completed process, case, and task when transactions are committed.

19.1. Creating an event that receives Kafka messages

When designing your business process in Business Central, you can create an event that receives Kafka messages.

This event is triggered each time a message arrives in the configured topic. The message is expected to contain data that matches a predefined data object. The process engine parses the message and provides it as an output of the event.

Procedure

  1. Open the project that contains your business process in Business Central.
  2. Create a data object defining the data that the message will contain. For instructions about creating data objects, see Designing business processes using BPMN models.
  3. Select the business process and open the business process designer.
  4. Add a start event, an intermediate catch event, or a boundary event (attached to a human task) of the type message.
  5. Open the properties of the event.
  6. In the Message field, select New and then enter the name of the message. This name must be the same as the name of the topic from which the event is to receive Kafka messages, or else must be defined in an org.kie.server.jbpm-kafka.ext.topics.broker-topic-name system property of the KIE Server.

    For instructions about using org.kie.server.jbpm-kafka.ext.topics.* system properties to define topic names, see Chapter 20, Configuring a KIE Server to send and receive Kafka messages from the process.

  7. Add an output data item. Select the data object that you created as its type.
  8. Save the business process.
Note

If a Kafka message event starts a new process instance, the initiator field of the instance is set to unknown by default, because the Kafka consumer extension does not receive the identity of the user associated with the Kafka message.

Next steps

To enable Red Hat AMQ Streams integration when running the process, you must configure the KIE Server according to instructions in Chapter 20, Configuring a KIE Server to send and receive Kafka messages from the process.

19.2. Creating an event that sends Kafka messages

When designing your business process in Business Central, you can create an event that sends Kafka messages.

The event can have a data object as an input data item. The process engine sends the content of a data object as a message in the configured topic.

Procedure

  1. Open the project that contains your business process in Business Central.
  2. Add the work item handler that is required for message-sending events:

    1. Select the SettingsDeploymentsWork Item Handlers tab.
    2. Click Add Work Item Handler.
    3. Enter the following values in the new empty line:

      • Name: Send Task
      • Value : new org.jbpm.bpmn2.handler.SendTaskHandler()
      • Resolver type: MVEL
  3. Create a data object defining the data that the message must contain. For instructions about creating data objects, see Designing business processes using BPMN models.
  4. Select the business process and open the business process designer.
  5. Add an intermediate throw event or an end event of the type message.
  6. Open the properties of the event.
  7. In the Message field, select New and then enter the name of the message. This name must be the same as the name of the topic to which the event is to send Kafka messages, or else must be defined in an org.kie.server.jbpm-kafka.ext.topics.broker-topic-name system property of the KIE Server.

    For instructions about using org.kie.server.jbpm-kafka.ext.topics.* system properties to define topic names, see Chapter 20, Configuring a KIE Server to send and receive Kafka messages from the process.

  8. Add an input data item. Select the data object that you created as its type.
  9. Save the business process.

Next steps

To enable Red Hat AMQ Streams integration when running the process, you must configure the KIE Server according to instructions in Chapter 20, Configuring a KIE Server to send and receive Kafka messages from the process.

19.3. Adding a custom task that sends Kafka messages

You can add a KafkaPublishMessages custom task to your process. This task sends Kafka messages. It does not use the KIE Server Kafka capability, so you can use this task in processes that do not run on a KIE Server. However, this task can be more complicated to configure than other Red Hat AMQ Streams integration options.

Procedure

  1. In the Business Central administrative settings menu, as the administrative user, select Custom Tasks Administration.
  2. Ensure that KafkaPublishMessages is set to On.
  3. In Business Central, select MenuDesignProjects and then click the space name and the project name.
  4. Select the SettingsCustom Tasks tab.
  5. In the KafkaPublishMessages line, click Install.
  6. Enter the following information:

    • Bootstrap Servers: The host and port of the Kafka broker, for example, localhost:9092. You can use a comma-separated list of multiple host:port pairs.
    • Client ID: An identifier string to pass to the broker when making requests. Red Hat AMQ Streams uses this string for logging.
    • Key Serializer class: The class that provides the key serializer. You can use the standard serializer class name: org.apache.kafka.common.serialization.StringSerializer. Alternatively you can use your own custom serializer class.
    • Value Serializer class: The class that provides the value serializer. You can use the standard serializer class name: org.apache.kafka.common.serialization.StringSerializer. Alternatively you can use your own custom serializer class.

      In any of these fields, you can enter an env[property] value. In this case, the process engine reads the setting from a system property at runtime. For example, you can set Client ID to env[application.client.id] and then, before running the process service, set the client ID value in the application.client.id system property.

  7. Select the Assets tab.
  8. Select the business process and open the business process designer.
  9. Add the KafkaPublishMessages custom task, available under Custom Tasks in the BPMN modeler palette.
  10. In the properties of the custom task, open the data assignments.
  11. Assign the Key, Topic, and Value inputs to define the message.

Next steps

If you entered a custom serializer class, you must upload the source code and configure the class, as described in Section 19.3.1, “Adding a custom serializer class for the KafkaPublishMessages custom task”.

19.3.1. Adding a custom serializer class for the KafkaPublishMessages custom task

If you want to use a custom serializer class for the KafkaPublishMessages custom task, you must upload the source code and configure the class.

Procedure

  1. Prepare a Java source file with the custom serializer class, for example, MyCustomSerializer. Use the package name for your space and project, for example, com.myspace.test. Also prepare the source files for any other required custom classes.
  2. In Business Central, enter your project and select the SettingsDependencies tab.
  3. Add any dependencies that your custom classes require, for example, org.apache.kafka.kafka-clients.
  4. Select the Assets tab.
  5. For each of the class source files, complete the following steps:

    1. Click Import Asset.
    2. In the Please select a file to upload field, select the location of the Java source file for the custom serializer class.
    3. Click Ok to upload the file.
  6. Select the SettingsDeploymentsWork Item Handlers tab.
  7. In the KafkaPublishMessages line, modify the Value field to add the classLoader parameter. For example, the initial value of this field can be the following string:

    new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer")

    In this example, change the value to the following string:

    new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer", classLoader)