Chapter 22. Configuring a KIE Server to send and receive Kafka messages from the process

To run a process that sends or receives Kafka messages using events, you must use a KIE Server. You must configure this KIE Server to integrate with Red Hat AMQ Streams.

Procedure

  1. To enable integration with Red Hat AMQ Streams, set the following system properties according to your environment:

    • If you are using KIE Server on Red Hat JBoss EAP, set the org.kie.kafka.server.ext.disabled KIE Server system property of to false.
    • If you are using Spring Boot, set the kieserver.kafka.enabled system property to true.
  2. To configure the connection to the Kafka broker, set the org.kie.server.jbpm-kafka.ext.bootstrap.servers system property to the host and port of the broker. The default value is localhost:9092. You can use a comma-separated list of multiple host:port pairs.
  3. Optional: Set any of the following system properties to configure sending and receiving Kafka messages:

    • org.kie.server.jbpm-kafka.ext.client.id: An identifier string to pass to the broker when making requests. Red Hat AMQ Streams uses this string for logging.
    • org.kie.server.jbpm-kafka.ext.topics.*: Mapping of message names to topic names. For example, if you want to send or receive a message in the ExampleTopic topic when ExampleName is the name of the message, set the org.kie.server.jbpm-kafka.ext.topics.ExampleName system property to ExampleTopic. You can set any number of such system properties. If a message name is not mapped using a system property, the process engine uses this name as the topic name.
    • org.kie.server.jbpm-kafka.ext.property_name: You can set any Red Hat AMQ Streams consumer or producer property by using the org.kie.server.jbpm-kafka.ext prefix. For example, to set a value for the buffer.memory producer property, set the org.kie.server.jbpm-kafka.ext.buffer.memory KIE Server system property.

      This setting applies to all processes that send or receive Kafka messages using events on this KIE Server.

      For a list of Red Hat AMQ Streams consumer and producer properties, see the Consumer configuration parameters and Producer configuration parameters appendixes in Using AMQ Streams on RHEL.

  4. Optional: Set any of the following system properties to configure receiving Kafka messages:

    • org.kie.server.jbpm-kafka.ext.allow.auto.create.topics: Allow automatic topic creation. Enabled by default.
    • org.kie.server.jbpm-kafka.ext.group.id: A unique string that identifies the group to which this Kafka message consumer belongs. The default value is jbpm-consumer.
  5. Optional: Set any of the following system properties to configure sending Kafka messages:

    • org.kie.server.jbpm-kafka.ext.acks: The number of acknowledgements that the Kafka leader must receive before marking the request as complete. The default value is 1, which means the leader writes the record to its local log and then responds to the process engine, without waiting for full acknowledgement from all followers.
    • org.kie.server.jbpm-kafka.ext.max.block.ms: The number of milliseconds for which the publish method blocks. After this time, the process engine can resume execution of the business process. The default value is 2000 (2 seconds).

22.1. Configuring the use a custom message format

By default, when using message events, the process engine sends and receives messages in a format compliant with the CloudEvents specification version 1.0.

Optionally, you can configure the use of a raw JSON data format or a custom format for the messages. If you want to use a custom format, you need to implement and provide classes.

Prerequisites

  • Your project uses message events to send or receive messages.

Procedure

  1. If you want to use a custom format for sending or receiving messages, implement and provide custom classes:

    1. Develop the source code for the classes:

      • To send messages, develop a class that implements the KafkaEventWriter interface
      • To receive messages, develop a class that implements the KafkaEventReader interface

        You can download the interface definitons from the GitHub repository.

    2. Provide the classes to your business application. For instructions, see Chapter 24, Providing a custom class to your business application in Business Central.
  2. Set the following KIE Server system properties to set the custom writer or reader:

    • org.kie.server.jbpm-kafka.ext.eventWriterClass: the custom event writer class. Set this property to use a different format to send messages. If you want to use a custom format, set the property to the fully qualified name of your custom event writer class. If you want to use a raw JSON data format, set the property to org.kie.server.services.jbpm.kafka.RawJsonEventWriter.
    • org.kie.server.jbpm-kafka.ext.eventReaderClass: the custom event reader class. Set this property to use a different format to receive messages. If you want to use a custom format, set the property to the fully qualified name of your custom event reader class. If you want to use a raw JSON data format, set the property to org.kie.server.services.jbpm.kafka.RawJsonEventReader.