Chapter 15. Distributed tracing

Distributed tracing allows you to track the progress of transactions between applications in a distributed system. In a microservices architecture, tracing tracks the progress of transactions between services. Trace data is useful for monitoring application performance and investigating issues with target systems and end-user applications.

In AMQ Streams on Red Hat Enterprise Linux, tracing facilitates the end-to-end tracking of messages: from source systems to Kafka, and then from Kafka to target systems and applications. Tracing complements the available JMX metrics.

How AMQ Streams supports tracing

Support for tracing is provided for the following clients and components.

Kafka clients:

  • Kafka producers and consumers
  • Kafka Streams API applications

Kafka components:

  • Kafka Connect
  • Kafka Bridge
  • MirrorMaker
  • MirrorMaker 2.0

To enable tracing, you perform four high-level tasks:

  1. Enable a Jaeger tracer.
  2. Enable the Interceptors:

  3. Set tracing environment variables.
  4. Deploy the client or component.

When instrumented, clients generate trace data. For example, when producing messages or writing offsets to the log.

Traces are sampled according to a sampling strategy and then visualized in the Jaeger user interface.

Note

Tracing is not supported for Kafka brokers.

Setting up tracing for applications and systems beyond AMQ Streams is outside the scope of this chapter. To learn more about this subject, search for "inject and extract" in the OpenTracing documentation.

Outline of procedures

To set up tracing for AMQ Streams, follow these procedures in order:

Prerequisites

15.1. Overview of OpenTracing and Jaeger

AMQ Streams uses the OpenTracing and Jaeger projects.

OpenTracing is an API specification that is independent from the tracing or monitoring system.

  • The OpenTracing APIs are used to instrument application code
  • Instrumented applications generate traces for individual transactions across the distributed system
  • Traces are composed of spans that define specific units of work over time

Jaeger is a tracing system for microservices-based distributed systems.

  • Jaeger implements the OpenTracing APIs and provides client libraries for instrumentation
  • The Jaeger user interface allows you to query, filter, and analyze trace data

A simple query in the Jaeger user interface

Additional resources

15.2. Setting up tracing for Kafka clients

Initialize a Jaeger tracer to instrument your client applications for distributed tracing.

15.2.1. Initializing a Jaeger tracer for Kafka clients

Configure and initialize a Jaeger tracer using a set of tracing environment variables.

Procedure

In each client application:

  1. Add Maven dependencies for Jaeger to the pom.xml file for the client application:

    <dependency>
        <groupId>io.jaegertracing</groupId>
        <artifactId>jaeger-client</artifactId>
        <version>1.1.0.redhat-00002</version>
    </dependency>
  2. Define the configuration of the Jaeger tracer using the tracing environment variables.
  3. Create the Jaeger tracer from the environment variables that you defined in step two:

    Tracer tracer = Configuration.fromEnv().getTracer();
    Note

    For alternative ways to initialize a Jaeger tracer, see the Java OpenTracing library documentation.

  4. Register the Jaeger tracer as a global tracer:

    GlobalTracer.register(tracer);

A Jaeger tracer is now initialized for the client application to use.

15.2.2. Instrumenting producers and consumers for tracing

Use a Decorator pattern or Interceptors to instrument your Java producer and consumer application code for tracing.

Procedure

In the application code of each producer and consumer application:

  1. Add a Maven dependency for OpenTracing to the producer or consumer’s pom.xml file.

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-client</artifactId>
        <version>0.1.15.redhat-00001</version>
    </dependency>
  2. Instrument your client application code using either a Decorator pattern or Interceptors.

    • To use a Decorator pattern:

      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Create an instance of the TracingKafkaProducer:
      TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
              tracer);
      
      // Send:
      tracingProducer.send(...);
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Create an instance of the TracingKafkaConsumer:
      TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
              tracer);
      
      // Subscribe:
      tracingConsumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);
      
      // Retrieve SpanContext from polled record (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
    • To use Interceptors:

      // Register the tracer with GlobalTracer:
      GlobalTracer.register(tracer);
      
      // Add the TracingProducerInterceptor to the sender properties:
      senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingProducerInterceptor.class.getName());
      
      // Create an instance of the KafkaProducer:
      KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
      
      // Send:
      producer.send(...);
      
      // Add the TracingConsumerInterceptor to the consumer properties:
      consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                TracingConsumerInterceptor.class.getName());
      
      // Create an instance of the KafkaConsumer:
      KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
      
      // Subscribe:
      consumer.subscribe(Collections.singletonList("messages"));
      
      // Get messages:
      ConsumerRecords<Integer, String> records = consumer.poll(1000);
      
      // Retrieve the SpanContext from a polled message (consumer side):
      ConsumerRecord<Integer, String> record = ...
      SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
Custom span names in a Decorator pattern

A span is a logical unit of work in Jaeger, with an operation name, start time, and duration.

To use a Decorator pattern to instrument your producer and consumer applications, define custom span names by passing a BiFunction object as an additional argument when creating the TracingKafkaProducer and TracingKafkaConsumer objects. The OpenTracing Apache Kafka Client Instrumentation library includes several built-in span names.

Example: Using custom span names to instrument client application code in a Decorator pattern

// Create a BiFunction for the KafkaProducer that operates on (String operationName, ProducerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
    (operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";

// Create an instance of the KafkaProducer:
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);

// Create an instance of the TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
        tracer,
        producerSpanNameProvider);

// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.

// Create a BiFunction for the KafkaConsumer that operates on (String operationName, ConsumerRecord consumerRecord) and returns a String to be used as the name:

BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
    (operationName, consumerRecord) -> operationName.toUpperCase();

// Create an instance of the KafkaConsumer:
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);

// Create an instance of the TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction:

TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
        tracer,
        consumerSpanNameProvider);

// Spans created by the tracingConsumer will have the operation name as the span name, in upper-case.
// "receive" -> "RECEIVE"

Built-in span names

When defining custom span names, you can use the following BiFunctions in the ClientSpanNameProvider class. If no spanNameProvider is specified, CONSUMER_OPERATION_NAME and PRODUCER_OPERATION_NAME are used.

Table 15.1. BiFunctions to define custom span names

BiFunctionDescription

CONSUMER_OPERATION_NAME, PRODUCER_OPERATION_NAME

Returns the operationName as the span name: "receive" for consumers and "send" for producers.

CONSUMER_PREFIXED_OPERATION_NAME(String prefix), PRODUCER_PREFIXED_OPERATION_NAME(String prefix)

Returns a String concatenation of prefix and operationName.

CONSUMER_TOPIC, PRODUCER_TOPIC

Returns the name of the topic that the message was sent to or retrieved from in the format (record.topic()).

PREFIXED_CONSUMER_TOPIC(String prefix), PREFIXED_PRODUCER_TOPIC(String prefix)

Returns a String concatenation of prefix and the topic name in the format (record.topic()).

CONSUMER_OPERATION_NAME_TOPIC, PRODUCER_OPERATION_NAME_TOPIC

Returns the operation name and the topic name: "operationName - record.topic()".

CONSUMER_PREFIXED_OPERATION_NAME_TOPIC(String prefix), PRODUCER_PREFIXED_OPERATION_NAME_TOPIC(String prefix)

Returns a String concatenation of prefix and "operationName - record.topic()".

15.2.3. Instrumenting Kafka Streams applications for tracing

Instrument Kafka Streams applications for distributed tracing using a supplier interface. This enables the Interceptors in the application.

Procedure

In each Kafka Streams application:

  1. Add the opentracing-kafka-streams dependency to the Kafka Streams application’s pom.xml file.

    <dependency>
        <groupId>io.opentracing.contrib</groupId>
        <artifactId>opentracing-kafka-streams</artifactId>
        <version>0.1.15.redhat-00001</version>
    </dependency>
  2. Create an instance of the TracingKafkaClientSupplier supplier interface:

    KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
  3. Provide the supplier interface to KafkaStreams:

    KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
    streams.start();

15.3. Setting up tracing for MirrorMaker and Kafka Connect

This section describes how to configure MirrorMaker, MirrorMaker 2.0, and Kafka Connect for distributed tracing.

You must enable a Jaeger tracer for each component.

15.3.1. Enabling tracing for MirrorMaker

Enable distributed tracing for MirrorMaker by passing the Interceptor properties as consumer and producer configuration parameters.

Messages are traced from the source cluster to the target cluster. The trace data records messages entering and leaving the MirrorMaker component.

Procedure

  1. Configure and enable a Jaeger tracer.
  2. Edit the /opt/kafka/config/consumer.properties file.

    Add the following Interceptor property:

    consumer.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
  3. Edit the /opt/kafka/config/producer.properties file.

    Add the following Interceptor property:

    producer.interceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor
  4. Start MirrorMaker with the consumer and producer configuration files as parameters:

    su - kafka
    /opt/kafka/bin/kafka-mirror-maker.sh --consumer.config /opt/kafka/config/consumer.properties --producer.config /opt/kafka/config/producer.properties --num.streams=2

15.3.2. Enabling tracing for MirrorMaker 2.0

Enable distributed tracing for MirrorMaker 2.0 by defining the Interceptor properties in the MirrorMaker 2.0 properties file.

Messages are traced between Kafka clusters. The trace data records messages entering and leaving the MirrorMaker 2.0 component.

Procedure

  1. Configure and enable a Jaeger tracer.
  2. Edit the MirrorMaker 2.0 configuration properties file, ./config/connect-mirror-maker.properties, and add the following properties:

    header.converter=org.apache.kafka.connect.converters.ByteArrayConverter 1
    consumer.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor 2
    producer.interceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor
    1
    Prevents Kafka Connect from converting message headers (containing trace IDs) to base64 encoding. This ensures that messages are the same in both the source and the target clusters.
    2
    Enables the Interceptors for MirrorMaker 2.0.
  3. Start MirrorMaker 2.0 using the instructions in Section 9.4, “Synchronizing data between Kafka clusters using MirrorMaker 2.0”.

15.3.3. Enabling tracing for Kafka Connect

Enable distributed tracing for Kafka Connect using configuration properties.

Only messages produced and consumed by Kafka Connect itself are traced. To trace messages sent between Kafka Connect and external systems, you must configure tracing in the connectors for those systems.

Procedure

  1. Configure and enable a Jaeger tracer.
  2. Edit the relevant Kafka Connect configuration file.

    • If you are running Kafka Connect in standalone mode, edit the /opt/kafka/config/connect-standalone.properties file.
    • If you are running Kafka Connect in distributed mode, edit the /opt/kafka/config/connect-distributed.properties file.
  3. Add the following properties to the configuration file:

    producer.interceptor.classes=io.opentracing.contrib.kafka.TracingProducerInterceptor
    consumer.interceptor.classes=io.opentracing.contrib.kafka.TracingConsumerInterceptor
  4. Save the configuration file.
  5. Set tracing environment variables and then run Kafka Connect in standalone or distributed mode.

The Interceptors in Kafka Connect’s internal consumers and producers are now enabled.

15.4. Enabling tracing for the Kafka Bridge

Enable distributed tracing for the Kafka Bridge by editing the Kafka Bridge configuration file. You can then deploy a Kafka Bridge instance that is configured for distributed tracing to the host operating system.

Traces are generated when:

  • The Kafka Bridge sends messages to HTTP clients and consumes messages from HTTP clients
  • HTTP clients send HTTP requests to send and receive messages through the Kafka Bridge

To have end-to-end tracing, you must configure tracing in your HTTP clients.

Procedure

  1. Edit the config/application.properties file in the Kafka Bridge installation directory.

    Remove the code comments from the following line:

    bridge.tracing=jaeger
  2. Save the configuration file.
  3. Run the bin/kafka_bridge_run.sh script using the configuration properties as a parameter:

    cd kafka-bridge-0.xy.x.redhat-0000x
    ./bin/kafka_bridge_run.sh --config-file=config/application.properties

    The Interceptors in the Kafka Bridge’s internal consumers and producers are now enabled.

15.5. Environment variables for tracing

Use these environment variables when configuring a Jaeger tracer for Kafka clients and components.

Note

The tracing environment variables are part of the Jaeger project and are subject to change. For the latest environment variables, see the Jaeger documentation.

Table 15.2. Jaeger tracer environment variables

PropertyRequiredDescription

JAEGER_SERVICE_NAME

Yes

The name of the Jaeger tracer service.

JAEGER_AGENT_HOST

No

The hostname for communicating with the jaeger-agent through the User Datagram Protocol (UDP).

JAEGER_AGENT_PORT

No

The port used for communicating with the jaeger-agent through UDP.

JAEGER_ENDPOINT

No

The traces endpoint. Only define this variable if the client application will bypass the jaeger-agent and connect directly to the jaeger-collector.

JAEGER_AUTH_TOKEN

No

The authentication token to send to the endpoint as a bearer token.

JAEGER_USER

No

The username to send to the endpoint if using basic authentication.

JAEGER_PASSWORD

No

The password to send to the endpoint if using basic authentication.

JAEGER_PROPAGATION

No

A comma-separated list of formats to use for propagating the trace context. Defaults to the standard Jaeger format. Valid values are jaeger and b3.

JAEGER_REPORTER_LOG_SPANS

No

Indicates whether the reporter should also log the spans.

JAEGER_REPORTER_MAX_QUEUE_SIZE

No

The reporter’s maximum queue size.

JAEGER_REPORTER_FLUSH_INTERVAL

No

The reporter’s flush interval, in ms. Defines how frequently the Jaeger reporter flushes span batches.

JAEGER_SAMPLER_TYPE

No

The sampling strategy to use for client traces:

  • Constant
  • Probabilistic
  • Rate Limiting
  • Remote (the default)

To sample all traces, use the Constant sampling strategy with a parameter of 1.

For more information, see the Jaeger documentation.

JAEGER_SAMPLER_PARAM

No

The sampler parameter (number).

JAEGER_SAMPLER_MANAGER_HOST_PORT

No

The hostname and port to use if a Remote sampling strategy is selected.

JAEGER_TAGS

No

A comma-separated list of tracer-level tags that are added to all reported spans.

The value can also refer to an environment variable using the format ${envVarName:default}. :default is optional and identifies a value to use if the environment variable cannot be found.