19.3. Kafka メッセージを送信するカスタムタスクの追加
KafkaPublishMessages カスタムタスクをプロセスに追加できます。このタスクは Kafka メッセージを送信します。KIE Server の Kafka 機能を使用しないため、KIE Server で実行されないプロセスでこのタスクを使用できます。ただし、このタスクは、他の Red Hat AMQ Streams 統合オプションよりも設定が複雑になる可能性があります。
手順
- Business Central 管理設定メニューで管理ユーザーとして Custom Tasks Administration を選択します。
- KafkaPublishMessages が On に設定されていることを確認します。
- Business Central で Menu → Design → Projects の順に選択し、スペース名とプロジェクト名をクリックします。
- Settings → Custom Tasks タブを選択します。
- KafkaPublishMessages 行で Install をクリックします。
以下の情報を入力します。
-
Bootstrap Servers: Kafka ブローカーのホストおよびポート (例:
localhost:9092)。複数の host:port ペアのコンマ区切りリストを使用できます。 - Client ID: 要求の実行時にブローカーに渡す識別子文字列。Red Hat AMQ Streams では、この文字列を使用してロギングを行います。
-
Key Serializer クラス: キーシリアライザーを提供するクラス。標準のシリアライザークラス名
org.apache.kafka.common.serialization.StringSerializerを使用できます。または、専用のカスタムシリアライザークラスを使用できます。 Value Serializer クラス: 値シリアライザーを提供するクラス。標準のシリアライザークラス名
org.apache.kafka.common.serialization.StringSerializerを使用できます。または、専用のカスタムシリアライザークラスを使用できます。これらのフィールドのいずれかで、
env[property]値を入力します。この場合、プロセスエンジンはランタイム時にシステムプロパティーから設定を読み取ります。たとえば、Client ID をenv[application.client.id]に設定し、プロセスサービスを実行する前に、application.client.idシステムプロパティーにクライアント ID の値を設定します。
-
Bootstrap Servers: Kafka ブローカーのホストおよびポート (例:
- Assets タブを選択します。
- ビジネスプロセスを選択し、ビジネスプロセスデザイナーを開きます。
-
BPMN モデラーパレットの Custom Tasks で利用可能な
KafkaPublishMessagesカスタムタスクを追加します。 - カスタムタスクのプロパティーで、データの割り当てを開きます。
- Key、Topic、および Value 入力を割り当て、メッセージを定義します。
次のステップ
カスタムシリアライザークラスを設定している場合は、「KafkaPublishMessages カスタムタスクのカスタムシリアライザークラスの追加」 で説明されているようにソースコードをアップロードし、クラスを設定する必要があります。
19.3.1. KafkaPublishMessages カスタムタスクのカスタムシリアライザークラスの追加
KafkaPublishMessages カスタムタスクにカスタムシリアライザークラスを使用する場合は、ソースコードをアップロードし、クラスを設定する必要があります。
手順
-
カスタムシリアライザークラス (例:
MyCustomSerializer) で Java ソースファイルを準備します。スペースおよびプロジェクトのパッケージ名 (例:com.myspace.test) を使用します。その他の必要なカスタムクラスのソースファイルも準備します。 - Business Central でプロジェクトを入力し、Settings → Dependencies タブを選択します。
-
カスタムクラスに必要な依存関係 (例:
org.apache.kafka.kafka-clients) を追加します。 - Assets タブを選択します。
各クラスソースファイルについて、以下の手順を実行します。
- Import Asset をクリックします。
- Please select a file to upload フィールドで、カスタムシリアライザークラスの Java ソースファイルの場所を選択します。
- OK をクリックしてファイルをアップロードします。
- Settings → Deployments → Work Item Handlers タブを選択します。
KafkaPublishMessages行で、Valueフィールドを変更してclassLoaderパラメーターを追加します。たとえば、このフィールドの最初の値は以下の文字列になります。new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer")この例では、値を以下の文字列に変更します。
new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer", classLoader)