第10章 分散トレーシング

分散トレーシングを使用すると、分散システムのアプリケーション間で実行されるトランザクションの進捗を追跡できます。マイクロサービスのアーキテクチャーでは、トレーシングはサービス間のトランザクションの進捗を追跡します。トレースデータは、アプリケーションのパフォーマンスを監視し、ターゲットシステムおよびエンドユーザーアプリケーションの問題を調べるのに役立ちます。

AMQ Streams では、トレーシングによってメッセージのエンドツーエンドの追跡が容易になります。これは、ソースシステムから Kafka、さらに Kafka からターゲットシステムおよびアプリケーションへのメッセージの追跡です。これは、Grafana ダッシュボード で表示できるメトリクスやコンポーネントロガーを補います。

AMQ Streams によるトレーシングのサポート方法

トレーシングのサポートは、以下のコンポーネントに組み込まれています。

  • Kafka Connect (Source2Image がサポートされる Kafka Connect を含む)
  • MirrorMaker
  • MirrorMaker 2.0
  • AMQ Streams Kafka Bridge

カスタムリソースのテンプレート設定プロパティーを使用して、これらのコンポーネントのトレーシングを有効化および設定します。

Kafka プロデューサー、コンシューマー、および Kafka Streams API アプリケーションでトレーシングを有効にするには、AMQ Streams に含まれる OpenTracing Apache Kafka Client Instrumentation ライブラリーを使用してアプリケーションコードを インストルメント化 します。インストルメント化されると、クライアントはメッセージのトレースデータを生成します (メッセージの作成時やログへのオフセットの書き込み時など)。

トレースは、サンプリングストラテジーに従いサンプル化され、Jaeger ユーザーインターフェースで可視化されます。

注記

トレーシングは Kafka ブローカーではサポートされません。

AMQ Streams 以外のアプリケーションおよびシステムにトレーシングを設定する方法については、本章の対象外となります。この件についての詳細は、OpenTracing ドキュメント を参照し、「inject and extrac」を検索してください。

手順の概要

AMQ Streams のトレーシングを設定するには、以下の手順を順番に行います。

前提条件

  • Jaeger バックエンドコンポーネントが OpenShift クラスターにデプロイされている必要があります。デプロイメント手順の詳細は、Jaeger デプロイメントのドキュメントを参照してください。

10.1. OpenTracing および Jaeger の概要

AMQ Streams では OpenTracing および Jaeger プロジェクトが使用されます。

OpenTracing は、トレーシングまたは監視システムに依存しない API 仕様です。

  • OpenTracing API は、アプリケーションコードを インストルメント化 するために使用されます。
  • インストルメント化されたアプリケーションは、分散システム全体で個別のトランザクションの トレース を生成します。
  • トレースは、特定の作業単位を定義する スパン で構成されます。

Jaeger はマイクロサービスベースの分散システムのトレーシングシステムです。

  • Jaeger は OpenTracing API を実装し、インストルメント化のクライアントライブラリーを提供します。
  • Jaeger ユーザーインターフェースを使用すると、トレースデータをクエリー、フィルター、および分析できます。

The Jaeger user interface showing a simple query

その他のリソース

10.2. Kafka クライアントのトレーシング設定

Jaeger トレーサーを初期化し、分散トレーシング用にクライアントアプリケーションをインストルメント化します。

10.2.1. Kafka クライアント用の Jaeger トレーサーの初期化

一連のトレーシング環境変数を使用して、Jaeger トレーサーを設定および初期化します。

手順

各クライアントアプリケーションで以下を行います。

  1. Jaeger の Maven 依存関係をクライアントアプリケーションの pom.xml ファイルに追加します。

    <dependency>
        <groupId>io.jaegertracing</groupId>
        <artifactId>jaeger-client</artifactId>
        <version>1.1.0.redhat-00002</version>
    </dependency>
  2. トレーシング環境変数を使用して Jaeger トレーサーの設定を定義します。
  3. 2. で定義した環境変数から、Jaeger トレーサーを作成します。

    Tracer tracer = Configuration.fromEnv().getTracer();
    注記

    別の Jaeger トレーサーの初期化方法については、Java OpenTracing ライブラリーのドキュメントを参照してください。

  4. Jaeger トレーサーをグローバルトレーサーとして登録します。

    GlobalTracer.register(tracer);

これで、Jaeger トレーサーはクライアントアプリケーションが使用できるように初期化されました。

10.2.2. トレーシングの環境変数

ここに示す環境変数は、Kafka クライアントに Jaeger トレーサーを設定するときに使用します。

注記

トレーシング環境変数は Jaeger プロジェクトの一部で、変更される場合があります。最新の環境変数については、Jaeger ドキュメントを参照してください。

プロパティー必要性説明

JAEGER_SERVICE_NAME

必要

Jaeger トレーサーサービスの名前。

JAEGER_AGENT_HOST

不要

UDP (User Datagram Protocol) を介した jaeger-agent との通信のためのホスト名。

JAEGER_AGENT_PORT

不要

UDP を介した jaeger-agent との通信に使用されるポート。

JAEGER_ENDPOINT

不要

traces エンドポイント。クライアントアプリケーションが jaeger-agent を迂回し、jaeger-collector に直接接続する場合にのみ、この変数を定義します。

JAEGER_AUTH_TOKEN

不要

エンドポイントに bearer トークンとして送信する認証トークン。

JAEGER_USER

不要

Basic 認証を使用する場合にエンドポイントに送信するユーザー名。

JAEGER_PASSWORD

不要

Basic 認証を使用する場合にエンドポイントに送信するパスワード。

JAEGER_PROPAGATION

不要

トレースコンテキストの伝播に使用するカンマ区切りの形式リスト。デフォルトは標準の Jaeger 形式です。有効な値は jaegerb3、および w3c です。

JAEGER_REPORTER_LOG_SPANS

不要

レポーターがスパンも記録する必要があるかどうかを示します。

JAEGER_REPORTER_MAX_QUEUE_SIZE

不要

レポーターの最大キューサイズ。

JAEGER_REPORTER_FLUSH_INTERVAL

不要

レポーターのフラッシュ間隔 (ミリ秒単位)。Jaeger レポーターがスパンバッチをフラッシュする頻度を定義します。

JAEGER_SAMPLER_TYPE

不要

クライアントトレースに使用するサンプリングストラテジー。

  • Constant
  • Probabilistic
  • Rate Limiting
  • Remote (デフォルト)

すべてのトレースをサンプリングするには、Constant サンプリングストラテジーを使用し、パラメーターを 1 にします。

詳細は、Jaeger ドキュメントを参照してください。

JAEGER_SAMPLER_PARAM

不要

サンプラーのパラメーター (数値)。

JAEGER_SAMPLER_MANAGER_HOST_PORT

不要

リモートサンプリングストラテジーを選択する場合に使用するホスト名およびポート。

JAEGER_TAGS

不要

報告されたすべてのスパンに追加されるトレーサーレベルのタグのカンマ区切りリスト。

この値に ${envVarName:default} 形式を使用して環境変数を参照することもできます。:default は任意の設定で、環境変数が見つからない場合に使用する値を特定します。

10.3. トレーサーでの Kafka クライアントのインストルメント化

Kafka プロデューサーとコンシューマークライアント、および Kafka Streams API アプリケーションを分散トレーシング用にインストルメント化します。

10.3.1. トレーシングのための Kafka プロデューサーおよびコンシューマーのインストルメント化

Decorator パターンまたは Interceptor を使用して、Java プロデューサーおよびコンシューマーアプリケーションコードをトレーシング用にインストルメント化します。

手順

各プロデューサーおよびコンシューマーアプリケーションのアプリケーションコードで以下を行います。

  1. OpenTracing の Maven 依存関係を、プロデューサーまたはコンシューマーの pom.xml ファイルに追加します。

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-client</artifactId>
        <version>0.1.15.redhat-00001</version>
    </dependency>
  2. Decorator パターンまたは Interceptor のいずれかを使用して、クライアントアプリケーションコードをインストルメント化します。

    • Decorator パターンを使用する場合は以下を行います。

      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Create an instance of the TracingKafkaProducer:
      TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
              tracer);
      
      // Send:
      tracingProducer.send(...);
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Create an instance of the TracingKafkaConsumer:
      TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
              tracer);
      
      // Subscribe:
      tracingConsumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);
      
      // Retrieve SpanContext from polled record (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    • インターセプターを使用する場合は以下を使用します。

      // Register the tracer with GlobalTracer:
      GlobalTracer.register(tracer);
      
      // Add the TracingProducerInterceptor to the sender properties:
      senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingProducerInterceptor.class.getName());
      
      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Send:
      producer.send(...);
      
      // Add the TracingConsumerInterceptor to the consumer properties:
      consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingConsumerInterceptor.class.getName());
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Subscribe:
      consumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = consumer.poll(1000);
      
      // Retrieve the SpanContext from a polled message (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);

10.3.1.1. Decorator パターンのカスタムスパン名

スパン は Jaeger の論理作業単位で、操作名、開始時間、および期間が含まれます。

Decorator パターンを使用してプロデューサーおよびコンシューマーの各アプリケーションをインストルメント化する場合、TracingKafkaProducer および TracingKafkaConsumer オブジェクトの作成時に BiFunction オブジェクトを追加の引数として渡すと、カスタムスパン名を定義できます。OpenTracing の Apache Kafka Client Instrumentation ライブラリーには、複数の組み込みスパン名が含まれています。

例: カスタムスパン名を使用した Decorator パターンでのクライアントアプリケーションコードのインストルメント化

// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
    (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";

// Create an instance of the KafkaProducer:
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

// Create an instance of the TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
        tracer,
        producerSpanNameProvider);

// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.

// Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
    (operationName, consumerRecord) -> operationName.toUpperCase();

// Create an instance of the KafkaConsumer:
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

// Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction:

TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
        tracer,
        consumerSpanNameProvider);

// Spans created by the tracingConsumer will have the operation name as the span name, in upper-case.
// "receive" -> "RECEIVE"

10.3.1.2. ビルトインスパン名

カスタムスパン名を定義するとき、ClientSpanNameProvider クラスで以下の BiFunctions を使用できます。spanNameProvider の指定がない場合は、CONSUMER_OPERATION_NAME および PRODUCER_OPERATION_NAME が使用されます。

BiFunction説明

CONSUMER_OPERATION_NAME, PRODUCER_OPERATION_NAME

operationName をスパン名として返します。コンシューマーには「receive」、プロデューサーには「send」を返します。

CONSUMER_PREFIXED_OPERATION_NAME(String prefix), PRODUCER_PREFIXED_OPERATION_NAME(String prefix)

prefix および operationName の文字列連結を返します。

CONSUMER_TOPIC, PRODUCER_TOPIC

メッセージの送信先または送信元となったトピックの名前を(record.topic()) 形式で返します。

PREFIXED_CONSUMER_TOPIC(String prefix), PREFIXED_PRODUCER_TOPIC(String prefix)

prefix およびトピック名の文字列連結を (record.topic()) 形式で返します。

CONSUMER_OPERATION_NAME_TOPIC, PRODUCER_OPERATION_NAME_TOPIC

操作名およびトピック名を "operationName - record.topic()" 形式で返します。

CONSUMER_PREFIXED_OPERATION_NAME_TOPIC(String prefix), PRODUCER_PREFIXED_OPERATION_NAME_TOPIC(String prefix)

prefix および "operationName - record.topic()" の文字列連結を返します。

10.3.2. Kafka Streams アプリケーションのトレーシングのインストルメント化

本セクションでは、分散トレーシングのために Kafka Streams API アプリケーションをインストルメント化する方法を説明します。

手順

各 Kafka Streams API アプリケーションで以下を行います。

  1. opentracing-kafka-streams 依存関係を、Kafka Streams API アプリケーションの pom.xml ファイルに追加します。

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-streams</artifactId>
        <version>0.1.15.redhat-00001</version>
    </dependency>
  2. TracingKafkaClientSupplier サプライヤーインターフェースのインスタンスを作成します。

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
  3. サプライヤーインターフェースを KafkaStreams に提供します。

    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();

10.4. MirrorMaker、Kafka Connect、および Kafka Bridge のトレーシング設定

分散トレーシングは、MirrorMaker、MirrorMaker 2.0、Kafka Connect (Source2Image がサポートされる Kafka Connect を含む)、および AMQ Streams Kafka Bridge でサポートされます。

MirrorMaker および MirrorMaker 2.0 でのトレーシング

MirrorMaker および MirrorMaker 2.0 では、メッセージはソースクラスターからターゲットクラスターにトレーシングされます。トレースデータは、MirrorMaker または MirrorMaker 2.0 コンポーネントを出入りするメッセージを記録します。

Kafka Connect でのトレーシング

Kafka Connect により生成および消費されるメッセージのみがトレーシングされます。Kafka Connect と外部システム間で送信されるメッセージをトレーシングするには、これらのシステムのコネクターでトレーシングを設定する必要があります。詳細は、「Kafka Connect の設定」 を参照してください。

Kafka Bridge でのトレーシング

Kafka Bridge によって生成および消費されるメッセージがトレーシングされます。Kafka Bridge を介してメッセージを送受信するクライアントアプリケーションから受信する HTTP リクエストもトレーシングされます。エンドツーエンドのトレーシングを設定するために、HTTP クライアントでトレーシングを設定する必要があります。

10.4.1. MirrorMaker、Kafka Connect、および Kafka Bridge リソースでのトレーシングの有効化

KafkaMirrorMakerKafkaMirrorMaker2KafkaConnectKafkaConnectS2I、および KafkaBridge カスタムリソースの設定を更新して、リソースごとに Jaeger トレーサーサービスを指定および設定します。OpenShift クラスターでトレーシングが有効になっているリソースを更新すると、2 つのイベントがトリガーされます。

  • インターセプタークラスは、MirrorMaker、MirrorMaker 2.0、Kafka Connect、または AMQ Streams Kafka Bridge の統合されたコンシューマーおよびプロデューサーで更新されます。
  • MirrorMaker、MirrorMaker 2.0 および Kafka Connect では、リソースに定義されたトレーシング設定に基づいて、Jaeger トレーサーがトレーシングエージェントによって初期化されます。
  • Kafka Bridge では、リソースに定義されたトレーシング設定に基づいて、Jaeger トレーサーが Kafka Bridge によって初期化されます。

手順

KafkaMirrorMakerKafkaMirrorMaker2KafkaConnectKafkaConnectS2I、および KafkaBridge リソースにこれらのステップを実行します。

  1. spec.template プロパティーで、Jaeger トレーサーサービスを設定します。以下に例を示します。

    Kafka Connect の Jaeger トレーサー設定

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
    spec:
      #...
      template:
        connectContainer: 1
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing: 2
        type: jaeger
      #...

    MirrorMaker の Jaeger トレーサー設定

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker
    metadata:
      name: my-mirror-maker
    spec:
      #...
      template:
        mirrorMakerContainer:
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger
    #...

    MirrorMaker 2.0 の Jaeger トレーサー設定

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaMirrorMaker2
    metadata:
      name: my-mm2-cluster
    spec:
      #...
      template:
        connectContainer:
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger
    #...

    Kafka Bridge の Jaeger トレーサー設定

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaBridge
    metadata:
      name: my-bridge
    spec:
      #...
      template:
        bridgeContainer:
          env:
            - name: JAEGER_SERVICE_NAME
              value: my-jaeger-service
            - name: JAEGER_AGENT_HOST
              value: jaeger-agent-name
            - name: JAEGER_AGENT_PORT
              value: "6831"
      tracing:
        type: jaeger
    #...

    1
    トレーシング環境変数をテンプレートの設定プロパティーとして使用します。
    2
    spec.tracing.type プロパティーを jaeger に設定します。
  2. リソースを作成または更新します。

    oc apply -f your-file