第11章 分散トレーシング

本章では、Jaeger を使用した AMQ Streams での分散トレーシングのサポートについて説明します。

分散トレーシングの設定方法は、AMQ Streams クライアントとコンポーネントによって異なります。

  • OpenTracing クライアントライブラリーを使用して、Kafka Producer、Consumer、および Streams API の各アプリケーションを分散トレーシング向けに インストルメント化 します。これには、インストルメント化コードをこれらのクライアントに追加することが含まれ、レースデータを生成するために個々のトランザクションの実行が監視されます。
  • 分散トレーシングのサポートは、AMQ Streams の Kafka Connect、MirrorMaker、および Kafka Bridge コンポーネントに組み込まれています。これらのコンポーネントを分散トレーシング向けに設定するには、関連するカスタムリソースを設定および更新します。

AMQ Streams クライアントおよびコンポーネントで分散トレーシングを設定する前に、「Kafka クライアント用の Jaeger トレーサーの初期化」の手順に従い、最初に Kafka クラスターで Jaeger トレーサーを初期化して設定する必要があります。

注記

分散トレーシングは Kafka ブローカーではサポートされません。

11.1. AMQ Streams での分散トレーシングの概要

分散トレーシングを使用すると、開発者およびシステム管理者は、分散システム内のアプリケーション (およびマイクロサービスアーキテクチャー内のサービス) 間のトランザクションの進捗を追跡できます。この情報は、アプリケーションのパフォーマンスを監視し、ターゲットシステムおよびエンドユーザーアプリケーションの問題を調べるのに役立ちます。

AMQ Streams およびデータストリーミングのプラットフォームでは、通常、分散トレーシングによって、メッセージのエンドツーエンドでの追跡 (ソースシステムから Kafka クラスターへ、さらにターゲットシステムおよびアプリケーションへ) が容易になります。

分散トレーシングは、システムの可観測性の要素として、Grafana ダッシュボードで表示可能なメトリクスと各コンポーネントで利用可能なロガーを補完します。

OpenTracing の概要

AMQ Streams の分散トレーシングは、オープンソースの OpenTracing および Jaeger プロジェクトを使用して実装されます。

OpenTracing 仕様では、分散トレーシングのアプリケーションをインストルメント化するために開発者が使用する API が定義されます。これは、トレーシングシステムに依存しません。

アプリケーションがインストルメント化されると、個々のトランザクションの トレース が生成されます。トレースは、特定の作業単位を定義する スパン で構成されます。

Kafka Bridge と、Kafka Producer、Consumer、および Streams API アプリケーションのインストルメンテーションを簡素化するために、AMQ Streams には OpenTracing Apache Kafka Client Instrumentation ライブラリーが含まれています。

注記

OpenTracing プロジェクトは OpenCensus プロジェクトと統合されました。新たに統合されたプロジェクトの名前は OpenTelemetry です。OpenTelemetry は、OpenTracing API を使用してインストルメント化されたアプリケーションの互換性を維持します。

Jaeger の概要

Jaeger はトレーシングシステムで、マイクロサービスベースの分散システムの監視およびトラブルシューティングに使用される OpenTracing API の実装です。4 つの主要コンポーネントで構成され、アプリケーションのインストルメント化するためのクライアントライブラリーが提供されます。Jaeger ユーザーインターフェースを使用すると、トレースデータを視覚化、クエリー、フィルタリング、および分析できます。

Jaeger ユーザーインターフェースのクエリー例

Simple Jaeger query

11.1.1. AMQ Streams での分散トレーシングのサポート

AMQ Streams では、分散トレーシングは以下でサポートされます。

  • Kafka Connect (Source2Image がサポートされる Kafka Connect を含む)
  • MirrorMaker
  • AMQ Streams Kafka Bridge

これらのコンポーネントの分散トレーシングを有効化および設定するには、関連するカスタムリソース (例: KafkaConnectKafkaBridge) でテンプレート設定プロパティーを設定します。

Kafka Producer、Consumer、および Streams API の各アプリケーションで分散トレーシングを有効にするには、OpenTracing Apache Kafka Client Instrumentation ライブラリーを使用してアプリケーションコードをインストルメント化します。インストルメント化されると、クライアントはメッセージのトレースを生成します (メッセージの作成時やログへのオフセットの書き込み時など)。

トレースは、サンプリングストラテジーに従いサンプル化され、Jaeger ユーザーインターフェースで可視化されます。このトレースデータは、Kafka クラスターのパフォーマンスの監視や、ターゲットシステムおよびアプリケーションの問題のデバッグに便利です。

手順の概要

AMQ Streams の分散トレーシングを設定するには、以下の手順に従います。

本章では、AMQ Streams クライアントおよびコンポーネントの分散トレーシングの設定についてのみ説明します。AMQ Streams 以外のアプリケーションおよびシステムに分散トレーシングを設定する方法については、本章の対象外となります。この件についての詳細は、OpenTracing ドキュメントを参照し、「inject and extrac」を検索してください。

作業を開始する前の注意事項

AMQ Streams の分散トレーシングを設定する前に、以下を理解しておくと便利です。

前提条件

  • Jaeger バックエンドコンポーネントが OpenShift クラスターにデプロイされている必要があります。デプロイメント手順の詳細は、Jaeger デプロイメントのドキュメントを参照してください。

11.2. Kafka クライアントのトレース設定

本セクションでは、分散トレーシング用にクライアントアプリケーションをインストルメント化できるように、Jaeger トレーサーを初期化する方法について説明します。

11.2.1. Kafka クライアント用の Jaeger トレーサーの初期化

一連のトレーシング環境変数を使用して、Jaeger トレーサーを設定および初期化します。

手順

クライアントアプリケーションごとに以下の手順を実行します。

  1. Jaeger の Maven 依存関係をクライアントアプリケーションの pom.xml ファイルに追加します。

    <dependency>
        <groupId>io.jaegertracing</groupId>
        <artifactId>jaeger-client</artifactId>
        <version>1.1.0.redhat-00002</version>
    </dependency>
  2. トレーシング環境変数を使用して Jaeger トレーサーの設定を定義します。
  3. 2. で定義した環境変数から、Jaeger トレーサーを作成します。

    Tracer tracer = Configuration.fromEnv().getTracer();
    注記

    別の Jaeger トレーサーの初期化方法については、Java OpenTracing ライブラリーのドキュメントを参照してください。

  4. Jaeger トレーサーをグローバルトレーサーとして登録します。

    GlobalTracer.register(tracer);

これで、Jaeger トレーサーはクライアントアプリケーションが使用できるように初期化されました。

11.2.2. トレーシング環境変数

ここに示す環境変数は、Kafka クライアントに Jaeger トレーサーを設定するときに使用します。

注記

トレーシング環境変数は Jaeger プロジェクトの一部で、変更される場合があります。最新の環境変数については、Jaeger ドキュメントを参照してください。

プロパティー必要性説明

JAEGER_SERVICE_NAME

必要

Jaeger トレーサーサービスの名前。

JAEGER_AGENT_HOST

不要

UDP (User Datagram Protocol) を介した jaeger-agent との通信のためのホスト名。

JAEGER_AGENT_PORT

不要

UDP を介した jaeger-agent との通信に使用されるポート。

JAEGER_ENDPOINT

不要

traces エンドポイント。クライアントアプリケーションが jaeger-agent を迂回し、jaeger-collector に直接接続する場合にのみ、この変数を定義します。

JAEGER_AUTH_TOKEN

不要

エンドポイントに bearer トークンとして送信する認証トークン。

JAEGER_USER

不要

Basic 認証を使用する場合にエンドポイントに送信するユーザー名。

JAEGER_PASSWORD

不要

Basic 認証を使用する場合にエンドポイントに送信するパスワード。

JAEGER_PROPAGATION

不要

トレースコンテキストの伝播に使用するカンマ区切りの形式リスト。デフォルトは標準の Jaeger 形式です。有効な値は jaeger および b3 です。

JAEGER_REPORTER_LOG_SPANS

不要

レポーターがスパンも記録する必要があるかどうかを示します。

JAEGER_REPORTER_MAX_QUEUE_SIZE

不要

レポーターの最大キューサイズ。

JAEGER_REPORTER_FLUSH_INTERVAL

不要

レポーターのフラッシュ間隔 (ミリ秒単位)。Jaeger レポーターがスパンバッチをフラッシュする頻度を定義します。

JAEGER_SAMPLER_TYPE

不要

クライアントトレースに使用するサンプリングストラテジー: Constant、Probabilistic、Rate Limiting、または Remote (デフォルトタイプ)

全トレースをサンプリングするには、パラメーターに 1 を指定して Constant サンプリングストラテジーを使用します。

詳細は、Jaeger ドキュメントを参照してください。

JAEGER_SAMPLER_PARAM

不要

サンプラーのパラメーター (数値)。

JAEGER_SAMPLER_MANAGER_HOST_PORT

不要

リモートサンプリングストラテジーを選択する場合に使用するホスト名およびポート。

JAEGER_TAGS

不要

報告されたすべてのスパンに追加されるトレーサーレベルのタグのカンマ区切りリスト。

この値に ${envVarName:default} 形式を使用して環境変数を参照することもできます。:default は任意の設定で、環境変数が見つからない場合に使用する値を特定します。

11.3. トレーサーでの Kafka クライアントのインストルメント化

本セクションでは、分散トレーシングのために Kafka Producer、Consumer、および Streams API アプリケーションをインストルメント化する方法を説明します。

11.3.1. トレーシングのための Kafka Producer および Consumer のインストルメント化

Decorator パターンまたは Interceptor を使用して、Java Producer および Consumer アプリケーションコードを分散トレーシング用にインストルメント化します。

手順

Kafka Producer および Consumer アプリケーションのアプリケーションコードで以下の手順を実行します。

  1. OpenTracing の Maven 依存関係を、Producer または Consumer の pom.xml ファイルに追加します。

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-client</artifactId>
        <version>0.1.11.redhat-00001</version>
    </dependency>
  2. Decorator パターンまたは Interceptor のいずれかを使用して、クライアントアプリケーションコードをインストルメント化します。

    • デコレーターパターンを使用する場合は、以下の例を使用します。

      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Create an instance of the TracingKafkaProducer:
      TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
              tracer);
      
      // Send:
      tracingProducer.send(...);
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Create an instance of the TracingKafkaConsumer:
      TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
              tracer);
      
      // Subscribe:
      tracingConsumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);
      
      // Retrieve SpanContext from polled record (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    • インターセプターを使用する場合は、以下の例を使用します。

      // Register the tracer with GlobalTracer:
      GlobalTracer.register(tracer);
      
      // Add the TracingProducerInterceptor to the sender properties:
      senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingProducerInterceptor.class.getName());
      
      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Send:
      producer.send(...);
      
      // Add the TracingConsumerInterceptor to the consumer properties:
      consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingConsumerInterceptor.class.getName());
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Subscribe:
      consumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = consumer.poll(1000);
      
      // Retrieve the SpanContext from a polled message (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);

11.3.1.1. Decorator パターンのカスタムスパン名

スパン は Jaeger の論理作業単位で、操作名、開始時間、および期間が含まれます。

Decorator パターンを使用して Kafka Producer および Consumer の各アプリケーションをインストルメント化する場合、TracingKafkaProducer および TracingKafkaConsumer オブジェクトの作成時に BiFunction オブジェクトを追加の引数として渡すと、カスタムスパン名を定義できます。OpenTracing の Apache Kafka Client Instrumentation ライブラリーには、以下のようなビルトインスパン名がいくつか含まれています。

例: カスタムスパン名を使用した Decorator パターンでのクライアントアプリケーションコードのインストルメント化

// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
    (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";

// Create an instance of the KafkaProducer:
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

// Create an instance of the TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
        tracer,
        producerSpanNameProvider);

// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.

// Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
    (operationName, consumerRecord) -> operationName.toUpperCase();

// Create an instance of the KafkaConsumer:
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

// Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction:

TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
        tracer,
        consumerSpanNameProvider);

// Spans created by the tracingConsumer will have the operation name as the span name, in upper-case.
// "receive" -> "RECEIVE"

11.3.1.2. ビルトインスパン名

カスタムスパン名を定義するとき、ClientSpanNameProvider クラスで以下の BiFunctions を使用できます。spanNameProvider の指定がない場合は、CONSUMER_OPERATION_NAME および PRODUCER_OPERATION_NAME が使用されます。

BiFunction説明

CONSUMER_OPERATION_NAME, PRODUCER_OPERATION_NAME

operationName をスパン名として返します。Consumer には「receive」、Producer には「send」を返します。

CONSUMER_PREFIXED_OPERATION_NAME(String prefix), PRODUCER_PREFIXED_OPERATION_NAME(String prefix)

prefix および operationName の文字列連結を返します。

CONSUMER_TOPIC, PRODUCER_TOPIC

メッセージの送信先または送信元となったトピックの名前を(record.topic()) 形式で返します。

PREFIXED_CONSUMER_TOPIC(String prefix), PREFIXED_PRODUCER_TOPIC(String prefix)

prefix およびトピック名の文字列連結を (record.topic()) 形式で返します。

CONSUMER_OPERATION_NAME_TOPIC, PRODUCER_OPERATION_NAME_TOPIC

操作名およびトピック名を "operationName - record.topic()" 形式で返します。

CONSUMER_PREFIXED_OPERATION_NAME_TOPIC(String prefix), PRODUCER_PREFIXED_OPERATION_NAME_TOPIC(String prefix)

prefix および "operationName - record.topic()" の文字列連結を返します。

11.3.2. Kafka Streams アプリケーションのトレーシングのインストルメント化

本セクションでは、分散トレーシングのために Kafka Streams API アプリケーションをインストルメント化する方法を説明します。

手順

Kafka Streams API アプリケーションごとに以下の手順を実行します。

  1. opentracing-kafka-streams 依存関係を、Kafka Streams API アプリケーションの pom.xml ファイルに追加します。

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-streams</artifactId>
        <version>0.1.11.redhat-00001</version>
    </dependency>
  2. TracingKafkaClientSupplier サプライヤーインターフェースのインスタンスを作成します。

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
  3. サプライヤーインターフェースを KafkaStreams に提供します。

    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();

11.4. MirrorMaker、Kafka Connect、および Kafka Bridge のトレース設定

分散トレーシングは、MirrorMaker、Kafka Connect (Source2Image がサポートされる Kafka Connect を含む)、および AMQ Streams Kafka Bridge でサポートされます。

MirrorMaker でのトレース

MirrorMaker では、メッセージはソースクラスターからターゲットクラスターにトレースされます。トレースデータは、MirrorMaker コンポーネントに出入りするメッセージを記録します。

Kafka Connect でのトレース

Kafka Connect により生成および消費されるメッセージのみがトレーシングされます。Kafka Connect と外部システム間で送信されるメッセージをトレーシングするには、これらのシステムのコネクターでトレーシングを設定する必要があります。詳細は「Kafka Connect クラスターの設定」を参照してください。

Kafka Bridge でのトレーシング

Kafka Bridge によって生成および消費されるメッセージがトレーシングされます。Kafka Bridge を介してメッセージを送受信するクライアントアプリケーションから受信する HTTP リクエストもトレースされます。エンドツーエンドのトレースを設定するために、HTTP クライアントでトレースを設定する必要があります。

11.4.1. MirrorMaker、Kafka Connect、および Kafka Bridge リソースでのトレースの有効化

KafkaMirrorMakerKafkaConnectKafkaConnectS2I、およびKafkaBridge カスタムリソースの設定を更新して、リソースごとに Jaeger トレーサーサービスを指定および設定します。OpenShift クラスターでトレースが有効になっているリソースを更新すると、2 つのイベントがトリガーされます。

  • インターセプタークラスは、MirrorMaker、Kafka Connect、または AMQ Streams Kafka Bridge の統合されたコンシューマーおよびプロデューサーで更新されます。
  • MirrorMaker および Kafka Connect では、リソースに定義されたトレース設定に基づいて、Jaeger トレーサーがトレーシングエージェントによって初期化されます。
  • Kafka Bridge では、リソースに定義されたトレース設定に基づいて、Jaeger トレーサーが Kafka Bridge によって初期化されます。

手順

KafkaMirrorMakerKafkaConnectKafkaConnectS2I、および KafkaBridge リソースにこれらのステップを実行します。

  1. spec.template プロパティーで、Jaeger トレーサーサービスを設定します。以下に例を示します。

    Kafka Connect の Jaeger トレーサー設定

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
    spec:
      #...
      template:
        connectContainer: 1
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing: 2
        type: jaeger
      #...

    MirrorMaker の Jaeger トレーサー設定

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaMirrorMaker
    metadata:
      name: my-mirror-maker
    spec:
      #...
      template:
        mirrorMakerContainer:
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger
    #...

    Kafka Bridge の Jaeger トレーサー設定

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaBridge
    metadata:
      name: my-bridge
    spec:
      #...
      template:
        bridgeContainer:
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger
    #...

    1
    トレーシング環境変数をテンプレートの設定プロパティーとして使用します。
    2
    spec.tracing.type プロパティーを jaeger に設定します。
  2. リソースを作成または更新します。

    oc apply -f your-file