第12章 Apache Kafka で交換されたメッセージを修正するためのトランスフォームの適用

Debezium には、変更イベントレコードを修正するために使用できるいくつかのシングルメッセージ変換 (SMT) があります。Apache Kafka にレコードを送信する前に、レコードを修正する変換を適用するようにコネクターを設定することができます。また、Debezium SMT をシンクコネクターに適用して、コネクターが Kafka トピックから読み込む前にレコードを修正することもできます。

特定のメッセージのみに選択的に変換を適用したい場合は、SMT を適用する条件を定義する Kafka Connect プレディケートを設定することができます。

Debezium は以下の SMT を提供しています。

トピックルーター SMT
元のトピック名に適用される正規表現に基づいて、変更イベントレコードを特定のトピックに再ルーティングします。
コンテンツベースルーター SMT
イベントの内容に基づいて、指定された変更イベントのレコードを再送します。
メッセージフィルターリング SMT
イベントレコードのサブセットを宛先の Kafka トピックに伝搬することができます。この変換では、イベントレコードの内容に基づいて、コネクターが発信する変更イベントレコードに正規表現を適用します。式にマッチしたレコードのみが対象のトピックに書き込まれます。その他の記録は無視されます。
新記録の状態抽出 SMT
Debezium の変更イベントレコードの複雑な構造をシンプルなフォーマットにフラット化します。構造を簡略化することで、元の構造を消費できないシンクコネクターでの処理が可能になります。
送信トレイ (Outbox) イベントルーター SMT
複数のサービス間での安全で信頼性の高いデータ交換を可能にするアウトボックスパターンのサポートを提供します。
MongoDB outbox event router SMT
複数のサービス間で安全で信頼性の高いデータ交換を可能にするために、送信トレイパターンを使用するためのサポートを提供します。

12.1. SMT 述語を使用した変換の選択的適用

コネクターに単一メッセージ変換 (SMT) を設定する場合、変換の述語を定義できます。述語は、コネクターが処理するメッセージのサブセットに変換を条件的に適用する方法を指定します。Debezium などのソースコネクターまたはシンクコネクターに対して設定する変換に、述語を割り当てることができます。

12.1.1. SMT 述語について

Debezium は、Kafka Connect がレコードを Kafka トピックに保存する前に、イベントレコードを変更するために使用できるさまざまな単一メッセージ変換 (SMT) を提供します。デフォルトでは、Debezium コネクターにこれらの SMT のいずれかを設定する場合、Kafka Connect はコネクターが出力するすべてのレコードに変換を適用します。ただし、共通の特徴を共有する変更イベントメッセージのサブセットのみが変更されるように、一部の変換を適用する場合があります。

たとえば、Debezium コネクターでは、特定のテーブルからのイベントメッセージまたは特定のヘッダーキーが含まれるイベントメッセージでのみ変換を実行する必要がある場合があります。Apache Kafka 2.6 以降を実行する環境では、変換に述語ステートメントを追加して、特定のレコードだけに SMT を適用するように Kafka Connect に指示できます。述語では、Kafka Connect が処理する各メッセージを評価するために使用する条件を指定します。Debezium コネクターが変更イベントメッセージを出力すると、Kafka Connect はメッセージを設定済みの述語条件に対して確認します。イベントメッセージで条件が満たされる場合、Kafka Connect は変換を適用し、メッセージを Kafka トピックに書き込みます。条件に一致しないメッセージは、そのまま Kafka に送信されます。

この状況は、シンクコネクター SMT に定義する述語に類似しています。コネクターは Kafka トピックからメッセージを読み取り、Kafka Connect はメッセージを述語条件に対して評価します。メッセージが条件と一致する場合、Kafka Connect は変換を適用し、メッセージをシンクコネクターに渡します。

述語を定義したら、それを再利用し、複数の変換に適用できます。述語には negate オプションがあり、これを使うと述語の条件を反転させて、述語文で定義された条件に一致しないレコードにのみ適用することができます。negate オプションを使うと、条件を否定することを前提とした他の変換と述語をペアにすることができます。

述語要素

述語には、以下の要素が含まれます。

  • predicates 接頭辞
  • エイリアス (例:isOutbox Table)
  • タイプ (例えば、org.apache.kafka.connect.transforms.predicates.Topic Name Matches)Kafka Connect は、デフォルトの述語型のセットを提供します。これは、独自のカスタム述語を定義することで補足できます。
  • 条件ステートメントと追加の設定プロパティー (述語の型 (正規表現の命名パターンなど) による)

デフォルトの述語型

デフォルトでは、以下の述語型を利用できます。

HasHeaderKey
Kafka Connect が評価するイベントメッセージのヘッダーのキー名を指定します。述語は、指定された名前を持つヘッダーキーが含まれるレコードを true と評価します。
RecordIsTombstone

Kafka 廃棄 レコードとマッチします。述語は、null 値を持つすべてのレコードに対してtrue と評価されます。この述語をフィルター SMT と組み合わせて使用して廃棄レコードを削除します。この述語には設定パラメーターはありません。

Kafka の tombstone は、0 バイトの null ペイロードを持つキーを持つレコードです。Debezium コネクターがソースデータベースで削除操作を処理すると、コネクターは削除操作に対して 2 つの変更イベントを出力します。

  • データベースレコードの以前の値を提供する削除操作 ("op" : "d") イベント。
  • キーは同じだが、値が null の墓石イベント。

    tombstone は、行の削除マーカーを表します。ログコンパクション が Kafka に対して有効になっている場合、コンパクト時に Kafka は tombstone と同じキーを共有するすべてのイベントを削除します。ログコンパクションは、トピックの delete.retention.ms 設定で制御されるコンパクト化の間隔で定期的に行われます。

    廃棄 (tombstone) イベントを出力しないように Debezium を設定する ことは可能ですが、ログコンパクション中に想定される動作を維持するために Debezium が tombstone を出力するのを許可することを推奨します。tombstone を抑制することにより、ログコンパクション中に削除されたキーのレコードを Kafka が削除しなくなります。環境に tombstone を処理できないシンクコネクターが含まれる場合は、RecordIsTombstone 述語で SMT を使用して廃棄レコードをフィルターリングするようにシンクコネクターを設定できます。

TopicNameMatches
Kafka Connect が照合するトピックの名前を指定する正規表現。トピック名が指定の正規表現と一致するコネクターレコードの場合、述語は true になります。この述語を使用して、ソーステーブルの名前に基づいてレコードに SMT を適用します。

12.1.2. SMT 述語の定義

デフォルトでは、Kafka Connect は Debezium コネクター設定の各単一メッセージ変換を、Debezium から受け取るすべての変更イベントレコードに適用します。Apache Kafka 2.6 以降では、Kafka Connect による変換の適用方法を制御するコネクター設定で変換に SMT 述語を定義できます。述語ステートメントは、Kafka Connect が Debezium によって出力されるイベントレコードに変換を適用する条件を定義します。Kafka Connect は述語ステートメントを評価し、SMT を選択的に、述語で定義される条件に一致するレコードのサブセットに適用します。Kafka Connect 述語の設定は、変換の設定と似ています。述語エイリアスを指定し、エイリアスを変換に関連付け、述語の型および設定を定義します。

前提条件

  • Debezium 環境が Apache Kafka 2.6 以降を実行している。
  • SMT が Debezium コネクター用に設定されている。

手順

  1. Debezium コネクターの設定で、predicates パラメーターに、IsOutbox Table などの predicate エイリアスを指定します。
  2. コネクター設定の変換エイリアスに述語エイリアスを追加して、条件付きに適用する変換に述語エイリアスを関連付けます。

    transforms.<TRANSFORM_ALIAS>.predicate=<PREDICATE_ALIAS>

    以下に例を示します。

    transforms.outbox.predicate=IsOutboxTable
  3. 型を指定し、設定パラメーターの値を指定して述語を設定します。

    1. 型には、Kafka Connect で使用できる以下のデフォルト型のいずれかを指定します。

      • HasHeaderKey
      • RecordIsTombstone
      • TopicNameMatches

        以下に例を示します。

        predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
    2. TopicNameMatch または HasHeaderKey 述語に対して、照合するトピックまたはヘッダー名の正規表現を指定します。

      以下に例を示します。

      predicates.IsOutboxTable.pattern=outbox.event.*
  4. 条件を反転する場合は、negate キーワードを変換エイリアスに追加し、true に設定します。

    以下に例を示します。

    transforms.outbox.negate=true

    前述のプロパティーは、述語がマッチするレコードセットを反転し、Kafka Connect は述語で指定された条件に一致しないレコードに変換を適用するようにします。

例: 送信トレイイベントルーターの変換用の TopicNameMatch 述語

以下の例に示す Debezium コネクター設定は、送信トレイイベントルーター変換を Debezium が Kafka outbox.event.order トピックに出力するメッセージにだけ適用します。

TopicNameMatch 述語は送信トレイテーブルからのメッセージだけを true と評価するため (outbox.event.*)、データベースの他のテーブルから送信されるメッセージに変換は適用されません。

transforms=outbox
transforms.outbox.predicate=IsOutboxTable
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
predicates=IsOutboxTable
predicates.IsOutboxTable.type=org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.IsOutboxTable.pattern=outbox.event.*

12.1.3. 廃棄 (tombstone) イベントの無視

Debezium が廃棄 (tombstone) イベントを生成するかどうかや、Kafka がそれらを保持する期間を制御できます。データパイプラインによっては、Debezium が廃棄 (tombstone) イベントを出力しないように、コネクターの tombstones.on.delete プロパティーを設定する必要がある場合があります。

Debezium が tombstone を出力できるようにするかどうかは、トピックがどのように環境で使用されるかと、シンクコンシューマーの特性によって異なります。一部のシンクコネクターは、廃棄 (tombstone) イベントに依存してダウンストリームデータストアからレコードを削除します。シンクコネクターが廃棄 (tombstone) レコードに依存してダウンストリームデータストアのレコードを削除するタイミングを示す場合は、Debezium がそれらを出力するように設定します。

tombstone を生成するように Debezium を設定する場合、シンクコネクターが廃棄 (tombstone) イベントを受信するように追加の設定が必要になります。ログコンパクション中に Kafka がイベントメッセージを削除する前に、コネクターがイベントメッセージを読み取るために、トピックの保持ポリシーを設定する必要があります。コンパクション前にトピックが tombstone を保持する時間の長さは、トピックの delete.retention.ms プロパティーによって制御されます。

デフォルトでは、コネクターの tombstones.on.delete プロパティーが true に設定されているため、削除イベントが発生するたびに、コネクターは墓石を生成します。このプロパティーを false に設定して、Debezium が Kafka トピックに墓石の記録を保存しないようにすると、墓石の記録がないために意図しない結果になる可能性があります。Kafka はログコンパクション時に tombstone に依存して、削除されたキーに関連するレコードを削除します。

null 値のレコードを処理できないシンクコネクターやダウンストリームの Kafka コンシューマーをサポートする必要がある場合、Debezium が廃棄を出力するのを防止するのではなく、コンシューマーが読み取る前に RecordIsTombstone 述語型を使用して廃棄メッセージを削除する述語を使用するコネクターの SMT を設定することを検討してください。

手順

  • Debezium が削除されたデータベースレコードの墓石イベントを発行しないようにするには、コネクターオプション tombstones.on.deletefalse に設定します。

    以下に例を示します。

    “tombstones.on.delete”: “false”