第11章 アプリケーション用 Debezium コネクターの設定

Debezium コネクターのデフォルトの動作がアプリケーションに適していない場合、以下の Debezium 機能を使用して必要な動作を設定することができます。

Kafka Connect 自動トピック作成
Connect が実行時にトピックを作成し、トピックの名前に基づいて設定設定をトピックに適用するのを許可します。
Avro シリアライゼーション
Debezium PostgreSQL、MongoDB、または SQL Server コネクターが Avro を使用してメッセージのキーと値をシリアライズする設定をサポートします。これにより、変更イベントレコードのコンシューマーがレコードスキーマの変更に容易に適応できるようにします。
CloudEvents コンバーター
Debezium コネクターが CloudEvents 仕様に準拠する変更イベントレコードを出力できるようにします。
Debezium コネクターへのシグナル送信
コネクターの動作を変更したり、アドホックスナップショットの開始などのアクションをトリガーしたりする方法を提供します。

11.1. Kafka Connect 自動トピック作成のカスタマイズ

Kafka には、トピックを自動的に作成するメカニズムが 2 つ用意されています。Kafka ブローカーの自動トピック作成を有効にすることができます。また、Kafka 2.6.0 以降では、Kafka Connect のトピック作成を有効にすることもできます。Kafka ブローカーは、auto.create.topics.enable プロパティーを使用してトピックの自動作成を制御します。Kafka Connect では、topic.creation.enable プロパティーで、Kafka Connect がトピックを作成することを許可するかどうかを指定します。いずれの場合も、プロパティーのデフォルト設定ではトピックの自動作成が有効です。

トピックの自動作成が有効な場合、Debezium ソースコネクターがまだルーティング先トピックが存在しないテーブルの変更イベントレコードを出力すると、イベントレコードが Kafka に取り込まれる際にトピックが実行時に作成されます。

ブローカーと Kafka Connect での自動トピック作成の違い

ブローカーが作成するトピックは、1 つのデフォルト設定の共有に制限されます。ブローカーは、異なるトピックまたはトピックのセットに一意の設定を適用することはできません。対照的に、Kafka Connect では、トピックの作成時に任意のさまざまな設定を適用し、Debezium コネクター設定で指定したレプリケーション係数、パーティション数、およびその他のトピック固有の設定を定義することができます。コネクター設定はトピック作成グループのセットを定義し、トピック設定のプロパティーセットを各グループに関連付けます。

ブローカー設定と Kafka Connect 設定は、互いに独立しています。ブローカーでトピック作成を無効にしたかどうかに関係なく、Kafka Connect はトピックを作成することができます。ブローカーと Kafka Connect の両方でトピックの自動作成を有効にすると Connect の設定が優先され、Kafka Connect のいずれの設定も適用されない場合に限り、ブローカーはトピックを作成します。

詳細については、以下のトピックを参照してください。

11.1.1. Kafka ブローカーの自動トピック作成の無効化

デフォルトでは、トピックがまだ存在しない場合、Kafka ブローカー設定によりブローカーは実行時にトピックを作成することができます。ブローカーによって作成されたトピックにカスタムプロパティーを設定することはできません。2.6.0 より前のバージョンの Kafka を使用し、特定の設定でトピックを作成する場合は、ブローカーの自動トピック作成を無効にし、手動またはカスタムデプロイプロセスのいずれにより明示的にトピックを作成する必要があります。

手順

  • ブローカーの設定で、auto.create.topics.enable の値を false にします。

11.1.2. Kafka Connect の自動トピック作成の設定

Kafka Connect でのトピックの自動作成は、topic.creation.enable プロパティーによって制御されます。次の例に示すように、プロパティーのデフォルト値は true であり、トピックの自動作成を有効にします。

topic.creation.enable = true

topic.creation.enable プロパティーの設定は、Connect クラスター内のすべてのワーカーに適用されます。

Kafka Connect の自動トピック作成では、トピックの作成時に Kafka Connect が適用する設定プロパティーを定義する必要があります。トピックグループを定義して Debezium コネクター設定でトピックの設定プロパティーを指定し、続いてそれぞれのグループに適用するプロパティーを指定します。コネクター設定では、デフォルトのトピック作成グループ、およびオプションで 1 つまたは複数のカスタムトピック作成グループを定義します。カスタムトピック作成グループは、トピック名パターンのリストを使用してグループの設定が適用されるトピックを指定します。

Kafka Connect がどのようにトピックをトピック作成グループと照合するかについての詳細は、トピック作成グループ を参照してください。設定プロパティーがどのようにグループに割り当てられるかについての詳細は、トピック作成グループの設定プロパティー を参照してください。

デフォルトでは、Kafka Connect が作成するトピックは、パターンserver.schema.table に基づいて名前が付けられます (例: dbserver.myschema.inventory)。

手順

  • Kafka Connect がトピックを自動的に作成しないようにするには、次の例のように、Kafka Connect カスタムリソースで topic.creation.enable の値を false に設定します。
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  name: my-connect-cluster

...

spec:
  config:
    topic.creation.enable: "false"
注記

Kafka Connect の自動トピック作成では、 replication.factor プロパティーと partitions プロパティーを少なくとも default のトピック作成グループに設定する必要があります。グループは、Kafka ブローカーのデフォルト値から必要なプロパティーの値を取得することができます。

11.1.3. 自動的に作成されたトピックの設定

Kafka Connect でトピックを自動的に作成するには、トピックの作成時に適用する設定プロパティーに関するソースコネクターからの情報が必要です。それぞれの Debezium コネクターの設定で、トピックの作成を制御するプロパティーを定義します。Kafka Connect がコネクターから出力されるイベントレコード用のトピックを作成すると、作成されるトピックは該当するグループから設定を取得します。この設定は、そのコネクターによって出力されたイベントレコードにのみ適用されます。

11.1.3.1. トピック作成グループ

トピックプロパティーのセットが、トピック作成グループに関連付けられます。少なくとも default トピック作成グループを定義し、その設定プロパティーを指定する必要があります。それ以外に、オプションとして 1 つまたは複数のカスタムトピック作成グループを定義し、それぞれに一意のプロパティーを指定することができます。

カスタムトピック作成グループを作成する場合、トピック名パターンに基づいて各グループにメンバートピックを定義します。各グループに含めるトピックまたはグループから除外するトピックを記述する命名パターンを指定することができます。include および exclude プロパティーには、トピック名パターンを定義する正規表現のコンマ区切りリストを指定します。たとえば、文字列 dbserver1.inventory で始まるすべてのトピックをグループに含める場合は、その topic.creation.inventory.include プロパティーの値をdbserver1\\.inventory\\.* に設定します。

注記

カスタムトピックグループに include および exclude プロパティーの両方を指定すると、除外ルールが優先され包含ルールがオーバーライドされます。

11.1.3.2. トピック作成グループの設定プロパティー

default トピック作成グループおよびそれぞれのカスタムグループは、一意の設定プロパティーのセットに関連付けられます。任意の Kafka トピックレベルの設定プロパティー をグループの設定に含めることができます。たとえば、トピックグループに 古いトピックセグメントのクリーンアップポリシー保持時間、または トピックの圧縮タイプ を指定することができます。少なくとも、作成するトピックの設定を記述するプロパティーの最小セットを定義する必要があります。

カスタムグループが登録されていない場合、または登録されているグループの include パターンが作成するトピックの名前とマッチしない場合、Kafka Connect は default グループの設定を使用してトピックを作成します。

トピック設定についての概要は、Installing Debezium on OpenShift の Kafka topic creation recommendations を参照してください。

11.1.3.3. Debezium デフォルトトピック作成グループ設定の指定

Kafka Connect の自動トピック作成を使用するためには、デフォルトのトピック作成グループを作成し、その設定を定義する必要があります。デフォルトのトピック作成グループの設定は、カスタムトピック作成グループの include リストのパターンにマッチしない名前のすべてのトピックに適用されます。

前提条件

  • Kafka Connect のカスタムリソースで、metadata.annotationsuse-connector-resources の値により、クラスターの Operator が KafkaConnector カスタムリソースを使用してクラスター内のコネクターを設定するように指定されている。以下に例を示します。

     ...
        metadata:
          name: my-connect-cluster
          annotations: strimzi.io/use-connector-resources: "true"
     ...

手順

  • topic.creation.default グループのプロパティーを定義するには、以下の例に示すように、コネクターのカスタムリソースの spec.config にプロパティーを追加します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
      labels:
        strimzi.io/cluster: my-connect-cluster
    spec:
    ...
    
       config:
    ...
         topic.creation.default.replication.factor: 3  1
         topic.creation.default.partitions: 10  2
         topic.creation.default.cleanup.policy: compact  3
         topic.creation.default.compression.type: lz4  4
    ...

    任意の Kafka トピックレベルの設定プロパティーdefault グループの設定に含めることができます。

表11.1 default のトピック作成グループのコネクター設定

項目説明

1

topic.creation.default.replication.factor は、デフォルトグループによって作成されるトピックのレプリケーション係数を定義します。
replication.factor グループの場合 default の設定は必須ですが、カスタムグループの場合は任意です。カスタムグループは、設定されていない場合、default グループの値にフォールバックします。Kafka ブローカーのデフォルト値を使用する場合は -1 を使用します。

2

topic.creation.default.partitions は、デフォルトグループによって作成されるトピックのパーティション数を定義します。
default グループの場合 partitions の設定は必須ですが、カスタムグループの場合は任意です。カスタムグループは、設定されていない場合、default グループの値にフォールバックします。Kafka ブローカーのデフォルト値を使用する場合は -1 を使用します。

3

topic.creation.default.cleanup.policyトピックレベルの設定パラメーターcleanup.policy プロパティーにマッピングされ、ログの保存ポリシーを定義します。

4

topic.creation.default.compression.type は、トピックレベルの設定パラメーターcompression.type プロパティーにマッピングされており、メッセージをハードディスク上でどのように圧縮するかを定義します。

注記

カスタムグループは、必要なreplication.factor および partitions プロパティーのみに対して、default グループの設定が戻ります。カスタムトピックグループ設定の他のプロパティーが定義されていない場合、default グループで指定された値は適用されません。

11.1.3.4. Debezium カスタムトピック作成グループ設定の指定

複数のカスタムトピックグループを、それぞれ個別の設定で定義することができます。

手順

  • カスタムトピックグループを定義するには、コネクターのカスタムリソースの spec.configtopic.creation.<group_name>.include プロパティーを追加し、続いてカスタムグループのトピックに適用する設定プロパティーを定義します。

    次の例では、カスタムトピック作成グループ inventoryapplicationlogs を定義するカスタムリソースの抜粋を示しています。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
    ...
    spec:
    ...
    
       config:
    ... 1
        topic.creation.inventory.include: dbserver1\\.inventory\\.*  2
        topic.creation.inventory.partitions: 20
        topic.creation.inventory.cleanup.policy: compact
        topic.creation.inventory.delete.retention.ms: 7776000000
    
        3
        topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.* 4
        topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.*  5
        topic.creation.applicationlogs.replication.factor: 1
        topic.creation.applicationlogs.partitions: 20
        topic.creation.applicationlogs.cleanup.policy: delete
        topic.creation.applicationlogs.retention.ms: 7776000000
        topic.creation.applicationlogs.compression.type: lz4
    ...
    ...

表11.2 inventory および applicationlogs カスタムトピック作成グループのコネクター設定

項目説明

1

inventory グループの設定を定義します。
カスタムグループでは、replication.factor および partitions プロパティーはオプションです。値が設定されていない場合、カスタムグループは、default グループに設定されている値にフォールバックします。Kafka ブローカーに設定されている値を使用する場合は、-1 に設定します。

2

topic.creation.inventory.include は、dbserver1.inventory. で始まるすべてのトピックにマッチする正規表現を定義します。inventory グループに定義された設定は、指定した正規表現にマッチする名前のトピックにのみ適用されます。

3

applicationlogs グループの設定を定義します。
カスタムグループでは、replication.factor および partitions プロパティーはオプションです。値が設定されていない場合、カスタムグループは、default グループに設定されている値にフォールバックします。Kafka ブローカーに設定されている値を使用する場合は、-1 に設定します。

4

topic.creation.applicationlogs.include では、dbserver1.logs.applog- で始まるすべてのトピックにマッチする正規表現を定義します。applicationlogs グループに定義された設定は、指定された正規表現にマッチする名前のトピックにのみ適用されます。このグループには、exclude プロパティーも定義されているため、include 正規表現に一致するトピックは、exclude プロパティーによってさらに制限される可能性があります。

5

topic.creation.applicationlogs.exclude では、dbserver1.logs.applog-old- で始まるすべてのトピックに一致する正規表現を定義します。applicationlogs グループに定義された設定は、指定された正規表現にマッチしない名前のトピックにのみ適用されます。このグループには include プロパティーも定義されているため、applicationlogs グループの設定は、指定されたinclude 正規表現にマッチし指定された exclude 正規表現にマッチしない名前のトピックにのみ適用されます。

11.1.3.5. Debezium カスタムトピック作成グループの登録

カスタムトピック作成グループの設定を指定したら、グループを登録します。

手順

  • カスタムグループを登録するには、コネクターのカスタムリソースに topic.creation.groups プロパティーを追加し、カスタムトピック作成グループをコンマで区切って指定します。

    カスタムトピック作成グループ inventoryapplicationlogs を登録するコネクターカスタムリソースの抜粋を以下に示します。

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnector
    metadata:
      name: inventory-connector
    ...
    spec:
    ...
    
       config:
         topic.creation.groups: inventory,applicationlogs
    
    ...

設定の完了

default トピックグループの設定に加えて inventory および applicationlogs カスタムトピック作成グループの設定が含まれる完了した設定の例を以下に示します。

以下に例を示します。デフォルトのトピック作成グループと 2 つのカスタムグループの設定

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnector
metadata:
  name: inventory-connector
...
spec:
...

   config:
...
    topic.creation.default.replication.factor: 3,
    topic.creation.default.partitions: 10,
    topic.creation.default.cleanup.policy: compact
    topic.creation.default.compression.type: lz4
    topic.creation.groups: inventory,applicationlogs
    topic.creation.inventory.include: dbserver1\\.inventory\\.*
    topic.creation.inventory.partitions: 20
    topic.creation.inventory.cleanup.policy: compact
    topic.creation.inventory.delete.retention.ms: 7776000000
    topic.creation.applicationlogs.include: dbserver1\\.logs\\.applog-.*
    topic.creation.applicationlogs.exclude": dbserver1\\.logs\\.applog-old-.*
    topic.creation.applicationlogs.replication.factor: 1
    topic.creation.applicationlogs.partitions: 20
    topic.creation.applicationlogs.cleanup.policy: delete
    topic.creation.applicationlogs.retention.ms: 7776000000
    topic.creation.applicationlogs.compression.type: lz4
...