19.3. Kafka メッセージを送信するカスタムタスクの追加

KafkaPublishMessages カスタムタスクをプロセスに追加できます。このタスクは Kafka メッセージを送信します。KIE Server の Kafka 機能を使用しないため、KIE Server で実行されないプロセスでこのタスクを使用できます。ただし、このタスクは、他の Red Hat AMQ Streams 統合オプションよりも設定が複雑になる可能性があります。

手順

  1. Business Central 管理設定メニューで管理ユーザーとして Custom Tasks Administration を選択します。
  2. KafkaPublishMessagesOn に設定されていることを確認します。
  3. Business Central で MenuDesignProjects の順に選択し、スペース名とプロジェクト名をクリックします。
  4. SettingsCustom Tasks タブを選択します。
  5. KafkaPublishMessages 行で Install をクリックします。
  6. 以下の情報を入力します。

    • Bootstrap Servers: Kafka ブローカーのホストおよびポート (例: localhost:9092)。複数の host:port ペアのコンマ区切りリストを使用できます。
    • Client ID: 要求の実行時にブローカーに渡す識別子文字列。Red Hat AMQ Streams では、この文字列を使用してロギングを行います。
    • Key Serializer クラス: キーシリアライザーを提供するクラス。標準のシリアライザークラス名 org.apache.kafka.common.serialization.StringSerializer を使用できます。または、専用のカスタムシリアライザークラスを使用できます。
    • Value Serializer クラス: 値シリアライザーを提供するクラス。標準のシリアライザークラス名 org.apache.kafka.common.serialization.StringSerializer を使用できます。または、専用のカスタムシリアライザークラスを使用できます。

      これらのフィールドのいずれかで、env[property] 値を入力します。この場合、プロセスエンジンはランタイム時にシステムプロパティーから設定を読み取ります。たとえば、Client IDenv[application.client.id] に設定し、プロセスサービスを実行する前に、application.client.id システムプロパティーにクライアント ID の値を設定します。

  7. Assets タブを選択します。
  8. ビジネスプロセスを選択し、ビジネスプロセスデザイナーを開きます。
  9. BPMN モデラーパレットの Custom Tasks で利用可能な KafkaPublishMessages カスタムタスクを追加します。
  10. カスタムタスクのプロパティーで、データの割り当てを開きます。
  11. KeyTopic、および Value 入力を割り当て、メッセージを定義します。

次のステップ

カスタムシリアライザークラスを設定している場合は、KafkaPublishMessages カスタムタスクのカスタムシリアライザークラスの追加」 で説明されているようにソースコードをアップロードし、クラスを設定する必要があります。

19.3.1. KafkaPublishMessages カスタムタスクのカスタムシリアライザークラスの追加

KafkaPublishMessages カスタムタスクにカスタムシリアライザークラスを使用する場合は、ソースコードをアップロードし、クラスを設定する必要があります。

手順

  1. カスタムシリアライザークラス (例: MyCustomSerializer) で Java ソースファイルを準備します。スペースおよびプロジェクトのパッケージ名 (例: com.myspace.test) を使用します。その他の必要なカスタムクラスのソースファイルも準備します。
  2. Business Central でプロジェクトを入力し、SettingsDependencies タブを選択します。
  3. カスタムクラスに必要な依存関係 (例: org.apache.kafka.kafka-clients) を追加します。
  4. Assets タブを選択します。
  5. 各クラスソースファイルについて、以下の手順を実行します。

    1. Import Asset をクリックします。
    2. Please select a file to upload フィールドで、カスタムシリアライザークラスの Java ソースファイルの場所を選択します。
    3. OK をクリックしてファイルをアップロードします。
  6. SettingsDeploymentsWork Item Handlers タブを選択します。
  7. KafkaPublishMessages 行で、Value フィールドを変更して classLoader パラメーターを追加します。たとえば、このフィールドの最初の値は以下の文字列になります。

    new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer")

    この例では、値を以下の文字列に変更します。

    new org.jbpm.process.workitem.kafka.KafkaWorkItemHandler("127.0.0.1:9092", "jbpm", "com.myspace.test.MyCustomSerializer", "com.myspace.test.MyCustomSerializer", classLoader)