12.4. Debezium 変更イベントレコードの絞り込み

デフォルトでは、Debezium は受信するすべてのデータ変更イベントを Kafka ブローカーに配信します。ただし、プロデューサーから出力されるイベントのサブセットだけが必要となるケースがほとんどです。該当するレコードだけを処理できるように、Debezium では フィルター 単一メッセージ変換 (SMT) を利用することができます。

重要

Debezium フィルター SMT はテクノロジープレビュー機能です。テクノロジープレビュー機能は、Red Hat の実稼働環境のサービスレベルアグリーメント (SLA) ではサポートされません。また、機能的に完全ではない可能性があるため、Red Hat はテクノロジープレビュー機能を実稼働環境に実装することは推奨しません。テクノロジープレビューの機能は、最新の技術をいち早く提供して、開発段階で機能のテストやフィードバックの収集を可能にするために提供されます。サポート範囲の詳細は、テクノロジープレビュー機能のサポート範囲 を参照してください。

カスタム SMT を作成してフィルターロジックをエンコードするのに Java を使用することは可能ですが、カスタムコーディングされた SMT の使用にはデメリットがあります。以下に例を示します。

  • 変換を事前にコンパイルし、それを Kafka Connect にデプロイする必要がある。
  • 変更が生じるたびにコードの再コンパイルおよび再デプロイが必要になり、運用の柔軟性が失われる。

フィルター SMT は、JSR 223 (Scripting for the Java™ Platform) と統合するスクリプト言語をサポートしています。

Debezium には、JSR 223 API の実装は同梱されていません。Debezium で式言語を使用するには、その言語の JSR 223 スクリプトエンジン実装をダウンロードする必要があります。Debezium をデプロイする方法によって、必要な成果物を Maven Central から自動的にダウンロードするか、または成果物を手動でダウンロードし、言語実装で使用する他の JAR ファイルと共に Debezium コネクターのプラグインディレクトリーに追加することが可能です。

12.4.1. Debezium フィルター SMT の設定

セキュリティー上の理由から、フィルター SMT は Debezium コネクターアーカイブには含まれていません。代わりに、別のアーティファクト debezium-scripting-2.1.4.Final.tar.gz で提供されます。

Dockerfile からカスタム Kafka Connect コンテナーイメージを構築して Debezium コネクターをデプロイする場合、フィルター SMT を使用するには、明示的に SMT アーカイブをダウンロードし、コネクタープラグインと一緒にファイルをデプロイする必要があります。AMQ Streams を使用してコネクターをデプロイすると、Kafka Connect カスタムリソースで指定した設定パラメーターに基づいて、必要なアーティファクトを自動的にダウンロードすることができます。重要: フィルター SMT が Kafka Connect インスタンスに追加されると、インスタンスにコネクターを追加できる任意のユーザーはスクリプト式を実行することができます。許可されたユーザーだけがスクリプト式を実行できるようにするには、フィルター SMT を追加する前に、Kafka Connect インスタンスおよびその設定インターフェイスをセキュアにする必要があります。

以下の手順は、Dockerfile から Kafka Connect コンテナーイメージを構築する場合に適用されます。AMQ Streams を使用して Kafka Connect イメージを作成する場合は、お使いのコネクターのデプロイメントトピックに記載されている説明に従ってください。

手順

  1. ブラウザーから Red Hat ビルドの Debezium ダウンロードサイト を開き、Debezium スクリプト SMT アーカイブ(debezium-scripting-2.1.4.Final.tar.gz)をダウンロードします。
  2. アーカイブのコンテンツを Kafka Connect 環境の Debezium プラグインのディレクトリーに展開します。
  3. JSR-223 スクリプトエンジンの実装を取得し、そのコンテンツを Kafka Connect 環境の Debezium プラグインのディレクトリーに追加します。
  4. Kafka Connect プロセスを再起動し、新しい JAR ファイルを取得します。

Groovy 言語には、クラスパスで以下のライブラリーが必要です。

  • groovy
  • groovy-json (任意)
  • groovy-jsr223

JavaScript 言語には、クラスパスで以下のライブラリーが必要です。

  • graalvm.js
  • graalvm.js.scriptengine

12.4.2. 例: Debezium フィルター SMT の基本設定

Debezium コネクターの Kafka Connect 設定でフィルター変換を設定します。設定で、ビジネスルールに基づくフィルター条件を定義して、対象のイベントを指定します。フィルター SMT がイベントストリームを処理すると、設定されたフィルター条件に対して各イベントを評価します。フィルター条件の基準を満たすイベントのみがブローカーに渡されます。

変更イベントレコードを絞り込むように Debezium コネクターを設定するには、Debezium コネクターの Kafka Connect 設定で Filter SMT を設定します。フィルター SMT の設定には、フィルター条件を定義する正規表現を指定する必要があります。

たとえば、コネクター設定に以下の設定を追加します。

...
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.op == 'u' && value.before.id == 2
...

上記の例では、Groovy 式言語の使用を指定しています。正規表現 value.op == 'u' && value.before.id == 2 は、更新 (u) レコードで id 値が 2 のメッセージを除き、すべてのメッセージを削除します。

設定のカスタマイズ

前の例は、op フィールドを含む DML イベントのみを処理するように設計された単純な SMT 設定を示しています。コネクターが発行する可能性のある他の種類のメッセージ (ハートビートメッセージ、廃棄メッセージ、またはスキーマの変更とトランザクションに関するメタデータメッセージ) には、このフィールドは含まれません。処理の失敗を回避するために、特定 のイベントにのみ変換を選択的に適用する SMT 述語ステートメント を定義できます。

12.4.3. フィルターの式で使用される変数

Debezium は、特定の変数をフィルター SMT の評価コンテキストにバインドします。フィルター条件を指定する式を作成する場合、Debezium が評価コンテキストにバインドする変数を使用することができます。変数をバインドすることで、Debezium は SMT が式の条件を評価する際に変数の値を検索して解釈できるようにします。

以下の表に、Debezium がフィルター SMT の評価コンテキストにバインドする変数のリストを示します。

表12.3 フィルターの式で使用される変数

名前説明タイプ

鍵 (key)

メッセージのキー。

org.apache.kafka.connect​.data​.Struct

value

メッセージの値。

org.apache.kafka.connect.data​.Struct

keySchema

Schema of the message key.

org.apache.kafka.connect​.data​.Schema

valueSchema

メッセージの値のスキーマ。

org.apache.kafka.connect​.data​.Schema

topic

ルーティング先トピックの名前。

文字列

ヘッダー

メッセージヘッダーの Java マッピング。キーフィールドはヘッダー名です。headers 変数は、以下のプロパティーを公開します。

  • value (タイプ: Object)
  • schema (タイプ: org.apache.kafka​.connect​.data​.Schema)

java.util.Map​<String, ​io.debezium.transforms​.scripting​.RecordHeader>

式は、その変数に対して任意のメソッドを呼び出すことができます。式は、SMT がメッセージをどのように処理するかを定義するブール値に解決する必要があります。式のフィルター条件が true と評価されると、メッセージは維持されます。フィルター条件が false と評価されると、メッセージは削除されます。

式がそれ以外の効果を及ぼすことは許されません。つまり、式が渡す変数を変更することは許されません。

12.4.4. フィルター変換を一部適用するオプション

データベースの変更が発生したときに Debezium コネクターが出力する変更イベントメッセージの他に、コネクターはハートビートメッセージなど、他のタイプのメッセージとスキーマ変更およびトランザクションに関するメタデータメッセージも出力します。これらの他のメッセージの構造は、SMT が処理するように設計された変更イベントメッセージの構造とは異なるため、目的のデータ変更メッセージのみを処理するようにコネクターを SMT を選んで適用することが推奨されます。以下の方法のいずれかを使用して、SMT を選んで適用するようにコネクターを設定できます。

12.4.5. 他のスクリプト言語によるフィルター条件の設定

フィルター条件を記述する方法は、使用するスクリプト言語によって異なります。

たとえば、基本設定の例 に示すように、式言語として Groovy を使用する場合、以下の式は id 値が 2 に設定された更新レコードを除くすべてのメッセージを削除します。

value.op == 'u' && value.before.id == 2

他の言語では、同じ条件を表すのに異なる方法が使用されます。

ヒント

Debezium MongoDB コネクターは、after および patch フィールドを構造体ではなくシリアライズされた JSON ドキュメントとして出力します。
MongoDB コネクターでフィルター SMT を使うには、まず JSON の配列フィールドを個別のドキュメントに展開する必要があります。
式の中で JSON パーサーを使用すると、配列の各項目について個別の出力文書を生成することができます。例えば、表現言語として Groovy を使用している場合、groovy-json アーティファクトをクラスパスに追加し、(new groovy.json.JsonSlurper()).parseText(value.after).last_name == 'Kretchmar' のような表現を追加しています。

JavaScript

式言語に JavaScript を使用する場合、以下の例に示すように、Struct#get() メソッドを呼び出してフィルター条件を指定することができます。

value.get('op') == 'u' && value.get('before').get('id') == 2

JavaScript with Graal.js

JavaScript with Graal.js を使用してフィルター条件を定義する場合、Groovy で使用する方法と類似の方法を使用します。以下はその例です。

value.op == 'u' && value.before.id == 2

12.4.6. フィルター変換設定用のオプション

以下の表に、フィルター SMT で使用することができる設定オプションのリストを示します。

表12.4 フィルター SMT の設定オプション

プロパティー

デフォルト

説明

topic.regex

 

イベントのルーティング先トピックの名前を評価するオプションの正規表現で、フィルターロジックを適用するかどうかを決定します。ルーティング先トピックの名前が topic.regex の値とマッチする場合、変換はイベントをトピックに渡す前にフィルターロジックを適用します。トピックの名前が topic.regex の値とマッチしない場合は、SMT は変更せずにイベントをトピックに渡します。

language

 

式を記述する言語。jsr223. で始まる必要があります。例えば、jsr223.groovyjsr223.graal.js。Debezium では、JSR 223 API (Scripting for the Java ™ Platform) によるブートストラップだけがサポートされます。

condition

 

すべてのメッセージに対して評価される式。Boolean 値に評価されなければならず、結果が true の場合はメッセージを保持し、false の場合はメッセージを削除します。

null.handling.mode

保持

トランスフォーメーションが null (tombstone) メッセージをどのように扱うかを指定します。以下のオプションのいずれかを指定することができます。

保持
(デフォルト) メッセージを通過させます。
drop
メッセージを完全に削除します。
evaluate
メッセージにフィルター条件を適用します。