Chapter 13. Validating schemas using Kafka client serializers/deserializers

Service Registry provides Kafka client serializers/deserializers for producer and consumer applications. Kafka producer applications use serializers to encode messages that conform to a specific event schema. Kafka consumer applications use deserializers to validate that the messages have been serialized using the correct schema, based on a specific schema ID. This ensures consistent schema use and helps to prevent data errors at runtime.

This chapter provides instructions on how to use the Kafka client serializers and deserializers for Apache Avro, JSON Schema, and Google Protobuf in your Kafka producer and consumer client applications:

Prerequisites

13.1. Kafka client applications and Service Registry

Using Service Registry decouples schema management from client application configuration. You can enable an application to use a schema from the registry by specifying its URL in the client code.

For example, you can store the schemas to serialize and deserialize messages in the registry, which are then referenced from the applications that use them to ensure that the messages that they send and receive are compatible with those schemas. Kafka client applications can push or pull their schemas from Service Registry at runtime.

Schemas can evolve, so you can define rules in Service Registry, for example, to ensure that changes to a schema are valid and do not break previous versions used by applications. Service Registry checks for compatibility by comparing a modified schema with previous schema versions.

Service Registry provides schema registry support for a number of schema technologies such as:

  • Avro
  • Protobuf
  • JSON Schema

These schema technologies can be used by client applications through Kafka client serializer/deserializer (SerDe) services provided by Service Registry. The maturity and usage of the SerDe classes provided by Service Registry may vary. See the type-specific sections below for more details about each.

Producer schema configuration

A producer client application uses a serializer to put the messages that it sends to a specific broker topic into the correct data format.

To enable a producer to use Service Registry for serialization:

After registering your schema, when you start Kafka and Service Registry, you can access the schema to format messages sent to the Kafka broker topic by the producer. Alternatively (depending on configuration), the producer can automatically register the schema on first use.

If a schema already exists, you can create a new version using the REST API based on compatibility rules defined in Service Registry. Versions are used for compatibility checking as a schema evolves. An artifact ID and schema version represents a unique tuple that identifies a schema.

Consumer schema configuration

A consumer client application uses a deserializer to get the messages that it consumes from a specific broker topic into the correct data format.

To enable a consumer to use Service Registry for deserialization:

The schema is then retrieved by the deserializer using a global ID written into the message being consumed. The schema global ID can be located in the message headers or in the message payload itself, depending on the configuration of the producer application.

When locating the global ID in the message payload, the format of the data begins with a magic byte (as a signal to consumers) followed by the global ID and then the message data as normal.

For example:

# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]

Now, when you start Kafka and Service Registry, you can access the schema to format messages received from the Kafka broker topic.

13.2. Strategies to look up a schema

The Kafka client serializer uses lookup strategies to determine the artifact ID and the global ID under which the message schema is registered in Service Registry.

For a given topic and message, you can use implementations of the following Java interfaces:

  • ArtifactIdStrategy to return an artifact ID
  • GlobalIdStrategy to return a global ID

The classes for each strategy are organized in the io.apicurio.registry.utils.serde.strategy package. The default strategy is the artifact ID TopicIdStrategy, which looks for Service Registry artifacts with the same name as the Kafka topic receiving messages.

Example

public String artifactId(String topic, boolean isKey, T schema) {
    return String.format("%s-%s", topic, isKey ? "key" : "value");
}

  • The topic parameter is the name of the Kafka topic receiving the message.
  • The isKey parameter is true when the message key is being serialized, and false when the message value is being serialized.
  • The schema parameter is the schema of the message being serialized or deserialized.
  • The artifactID returned is the artifact ID under which the schema is registered in Service Registry.

Which lookup strategy you use depends on how and where you store your schema. For example, you might use a strategy that uses a record ID if you have different Kafka topics with the same Avro message type.

Artifact ID strategy

The artifact ID strategy provides a way to map the Kafka topic and message information to an artifact ID in Service Registry. The common convention for the mapping is to combine the Kafka topic name with the key or value, depending on whether the serializer is used for the Kafka message key or value.

However, you can use alternative conventions for the mapping by using a strategy provided by Service Registry, or by creating a custom Java class that implements io.apicurio.registry.utils.serde.strategy.ArtifactIdStrategy.

Strategies to return an artifact ID

Service Registry provides the following strategies to return an artifact ID based on an implementation of ArtifactIdStrategy:

RecordIdStrategy
Avro-specific strategy that uses the full name of the schema.
TopicRecordIdStrategy
Avro-specific strategy that uses the topic name and the full name of the schema.
TopicIdStrategy
Default strategy that uses the topic name and key or value suffix.
SimpleTopicIdStrategy
Simple strategy that only uses the topic name.

Global ID strategy

The global ID strategy locates and identifies the specific version of the schema registered under the artifact ID provided by the artifact ID strategy. Every version of every artifact has a single globally unique identifier that can be used to retrieve the content of that artifact. This global ID is included in every Kafka message so that a deserializer can properly fetch the schema from Service Registry.

The global ID strategy can look up an existing artifact version, or it can register one if not found, depending on which strategy is used. You can also provide your own strategy by creating a custom Java class that implements io.apicurio.registry.utils.serde.strategy.GlobalIdStrategy.

Strategies to return a global ID

Service Registry provides the following strategies to return a global ID based on an implementation of GlobalIdStrategy:

FindLatestIdStrategy
Strategy that returns the global ID of the latest schema version, based on an artifact ID.
FindBySchemaIdStrategy
Strategy that matches schema content, based on an artifact ID, to return a global ID.
CachedSchemaIdStrategy
Strategy that caches the schema, and uses the global ID of the cached schema.
GetOrCreateIdStrategy
Strategy that tries to get the latest schema, based on an artifact ID, and if it does not exist, creates a new schema.
AutoRegisterIdStrategy
Strategy that updates the schema, and uses the global ID of the updated schema.

Global ID strategy configuration

You can configure the following application property for the global ID strategy:

  • apicurio.registry.check-period-ms: Configures the remote schema lookup period in milliseconds

You can configure application properties as Java system properties or include them in the Quarkus application.properties file. For more details, see the Quarkus documentation.

13.3. Service Registry serializer/deserializer constants

You can configure specific client serializer/deserializer (SerDe) services and schema lookup strategies directly into a client using the constants outlined in this section.

Alternatively, you can specify the constants in a properties file, or a properties instance.

Constants for serializer/deserializer services

public abstract class AbstractKafkaSerDe<T extends AbstractKafkaSerDe<T>> implements AutoCloseable {
   protected final Logger log = LoggerFactory.getLogger(getClass());

   public static final String REGISTRY_URL_CONFIG_PARAM = "apicurio.registry.url"; 1
   public static final String REGISTRY_CACHED_CONFIG_PARAM = "apicurio.registry.cached"; 2
   public static final String REGISTRY_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.id-handler"; 3
   public static final String REGISTRY_CONFLUENT_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.as-confluent"; 4
1
(Required) The URL of Service Registry.
2
Allows the client to make the request and look up the information from a cache of previous results, to improve processing time. If the cache is empty, the lookup is performed from Service Registry.
3
Extends ID handling to support other ID formats and make them compatible with Service Registry SerDe services. For example, changing the ID format from Long to Integer supports the Confluent ID format.
4
A flag to simplify the handling of Confluent IDs. If set to true, an Integer is used for the global ID lookup.

Constants for lookup strategies

public abstract class AbstractKafkaStrategyAwareSerDe<T, S extends AbstractKafkaStrategyAwareSerDe<T, S>> extends AbstractKafkaSerDe<S> {
   public static final String REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.artifact-id"; 1
   public static final String REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.global-id"; 2

Constants for converters

public class SchemalessConverter<T> extends AbstractKafkaSerDe<SchemalessConverter<T>> implements Converter {
   public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; 1
   public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; 2
1
(Required) Serializer to use with the converter.
2
(Required) Deserializer to use with the converter.

Constants for Avro data providers

public interface AvroDatumProvider<T> {
   String REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM = "apicurio.registry.avro-datum-provider"; 1
   String REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM = "apicurio.registry.use-specific-avro-reader"; 2
1
Avro Datum provider to write data to a schema, with or without reflection.
2
Flag to set to use an Avro-specific datum reader.
DefaultAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 1
ReflectAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 2
1
Default datum reader.
2
Datum reader using reflection.

13.4. Using different client serializer/deserializer types

When using a schema technology in your Kafka applications, you must choose which specific schema type to use. Common options include:

  • Apache Avro
  • JSON Schema
  • Google Protobuf

Which schema technology you choose is dependent on use case and preference. Of course you can use Kafka to implement custom serializer and deserializer classes, so you are always free to write your own classes, including leveraging Service Registry functionality using the Service Registry REST Java client.

For your convenience, Service Registry provides out-of-the box SerDe classes for Avro, JSON Schema, and Protobuf schema technologies. The following sections explains how to configure Kafka applications to use each type.

Kafka application configuration for serializers/deserializers

Using one of the serializer or deserializer classes provided by Service Registry in your Kafka application involves setting the correct configuration properties. The following simple examples show how to configure a serializer in a Kafka producer application and how to configure a deserializer in a Kafka consumer application.

Example serializer configuration in a Kafka producer

public Producer<Object,Object> createKafkaProducer(String kafkaBootstrapServers, String topicName) {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + topicName);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

    // Use a Service Registry-provided Kafka serializer
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        io.apicurio.registry.utils.serde.AvroKafkaSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        io.apicurio.registry.utils.serde.AvroKafkaSerializer.class.getName());

    // Configure Service Registry location
    props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, REGISTRY_URL);

    // Map the topic name (plus -key/value) to the artifactId in the registry
    props.putIfAbsent(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM,
        io.apicurio.registry.utils.serde.strategy.TopicIdStrategy.class.getName());

    // Get an existing schema or auto-register if not found
    props.putIfAbsent(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM,
        io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy.class.getName());

    // Create the Kafka producer
    Producer<Object, Object> producer = new KafkaProducer<>(props);
    return producer;
}

Example deserializer configuration in a Kafka consumer

public Consumer<Object,Object> createKafkaConsumer(String kafkaBootstrapServers, String topicName) {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + topicName);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Use a Service Registry-provided Kafka deserializer
    props.putIfAbsent(ProducerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        io.apicurio.registry.utils.serde.AvroKafkaDeserializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        io.apicurio.registry.utils.serde.AvroKafkaDeserializer.class.getName());

    // Configure Service Registry location
    props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, REGISTRY_URL);

    // No other configuration needed for deserializer because  globalId of the schema
    // the deserializer uses is sent as part of the message. The deserializer simply
    // extracts that globalId and uses it to look up the schema from the registry.

    // Create the Kafka consumer
    KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
    return consumer;
}

13.4.1. Configure Avro SerDe with Service Registry

Service Registry provides Kafka client serializer and deserializer classes for Apache Avro to make using Avro as easy as possible:

  • io.apicurio.registry.utils.serde.AvroKafkaSerializer
  • io.apicurio.registry.utils.serde.AvroKafkaDeserializer

Configure the Avro serializer

You can configure the Avro serializer class in the following ways:

  • Service Registry location as a URL
  • Artifact ID strategy
  • Global ID strategy
  • Global ID location
  • Global ID handler
  • Avro datum provider
  • Avro encoding

Global ID location

The serializer passes the unique global ID of the schema as part of the Kafka message so that consumers can use the right schema for deserialization. The location of that global ID can be in the payload of the message or in the message headers. The default approach is to pass the global ID in the message payload. If you want the ID sent in the message headers instead, you can set the following configuration property:

props.putIfAbsent(AbstractKafkaSerDe.USE_HEADERS, "true")

The property name is apicurio.registry.use.headers.

Global ID handler

You can customize precisely how the global ID is encoded when passing it in the Kafka message body. Set the configuration property apicurio.registry.id-handler to be a class that implements the io.apicurio.registry.utils.serde.strategy.IdHandler interface. Service Registry provides two implementations of that interface:

  • io.apicurio.registry.utils.serde.strategy.DefaultIdHandler - stores the ID as an 8 byte long
  • io.apicurio.registry.utils.serde.strategy.Legacy4ByteIdHandler - stores the ID as an 4 byte int

Service Registry represents the global ID of an artifact as a long, but for legacy reasons (or for compatibility with other registries or serde classes) you may want to use 4 bytes when sending the ID.

Avro datum provider

Avro provides different datum writers and readers to write and read data. Service Registry supports three different types:

  • Generic
  • Specific
  • Reflect

The Service Registry AvroDatumProvider is the abstraction on which type is then actually used, where DefaultAvroDatumProvider is used by default.

There are two configuration options you can set:

  • apicurio.registry.avro-datum-provider - provide a fully qualified Java class name of the AvroDatumProvider implementation, for example io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
  • apicurio.registry.use-specific-avro-reader - true or false, to use specific type when using DefaultAvroDatumProvider

Avro encoding

When using Apache Avro to serializer data, it is common to use the Avro binary encoding format. This is so that the data is encoded in as efficient a format as possible. However, Avro also supports encoding the data as JSON. Encoding as JSON is useful because it is much easier to inspect the payload of each message, often for logging, debugging, or other similar use cases. The Service Registry Avro serializer can be configured to change the encoding to JSON from the default (binary).

Set the Avro encoding to use by configuring the apicurio.avro.encoding property. The value must be either JSON or BINARY.

Configure the Avro deserializer

You must configure the Avro deserializer class to match the configuration settings of the serializer. As a result, you can configure the Avro deserializer class in the following ways:

  • Service Registry location as a URL
  • Global ID handler
  • Avro datum provider
  • Avro encoding

See the serializer section for these configuration options - the property names and values are the same.

Note

The following options are not needed when configuring the deserializer:

  • Artifact ID strategy
  • Global ID strategy
  • Global ID location

The reason these options are not necessary is that the deserializer class can figure this information out from the message itself. In the case of the two strategies, they are not needed because the serializer is responsible for sending the global ID of the schema as part of the message.

The location of that global ID is determined by the deserializer by simply checking for the magic byte at the start of the message payload. If that byte is found, the global ID is read from the message payload using the configured handler. If the magic byte is not found, the global ID is read from the message headers.

13.4.2. Configure JSON Schema SerDe with Service Registry

Service Registry provides Kafka client serializer and deserializer classes for JSON Schema to make using JSON Schema as easy as possible:

  • io.apicurio.registry.utils.serde.JsonSchemaKafkaSerializer
  • io.apicurio.registry.utils.serde.JsonSchemaKafkaDeserializer

Unlike Apache Avro, JSON Schema is not actually a serialization technology - it is instead a validation technology. As a result, configuration options for JSON Schema are quite different. For example, there is no encoding option, because data is always encoded as JSON.

Configure the JSON Schema serializer

You can configure the JSON Schema serializer class in the following ways:

  • Service Registry location as a URL
  • Artifact ID strategy
  • Global ID strategy
  • Validation enabled/disabled

The only non-standard configuration property is whether JSON Schema validation is enabled or disabled. The validation feature is disabled by default but can be enabled by setting apicurio.registry.serdes.json-schema.validation-enabled to "true". For example:

props.putIfAbsent(JsonSchemaSerDeConstants.REGISTRY_JSON_SCHEMA_VALIDATION_ENABLED, "true")`

Configure the JSON Schema deserializer

You can configure the JSON Schema deserializer class in the following ways:

  • Service Registry location as a URL
  • Validation enabled/disabled

The deserializer is simple to configure. You must provide the location of Service Registry so that the schema can be loaded. The only other configuration is whether or not to perform validation. These configuration properties are the same as for the serializer.

Note

Deserializer validation only works if the serializer passes the global ID in the Kafka message, which will only happen when validation is enabled in the serializer.

13.4.3. Configure Protobuf SerDe with Service Registry

Service Registry provides Kafka client serializer and deserializer classes for Google Protobuf to make using Protobuf as easy as possible:

  • io.apicurio.registry.utils.serde.ProtobufKafkaSerializer
  • io.apicurio.registry.utils.serde.ProtobufKafkaDeserializer

Configure the Protobuf serializer

You can configure the Protobuf serializer class in the following ways:

  • Service Registry location as a URL
  • Artifact ID strategy
  • Global ID strategy
  • Global ID location
  • Global ID handler

Configure the Protobuf deserializer

You must configure the Protobuf deserializer class to match the configuration settings of the serializer. As a result, you can configure the Protobuf deserializer class in the following ways:

  • Service Registry location as a URL
  • Global ID handler

See the serializer section for these configuration options - the property names and values are the same.

Note

The following options are not needed when configuring the deserializer:

  • Artifact ID strategy
  • Global ID strategy
  • Global ID location

The reason these options are not necessary is that the deserializer class can figure this information out from the message itself. In the case of the two strategies, they are not needed because the serializer is responsible for sending the global ID of the schema as part of the message.

The location of that global ID is determined (by the deserializer) by simply checking for the magic byte at the start of the message payload. If that byte is found, the global ID is read from the message payload (using the configured handler). If the magic byte is not found, the global ID is read from the message headers.

Note

The Protobuf deserializer does not deserialize to your exact Protobuf Message implementation, but rather to a DynamicMessage instance (because there is no appropriate API to do otherwise).

13.5. Registering a schema in Service Registry

After you have defined a schema in the appropriate format, such as Apache Avro, you can add the schema to Service Registry.

You can add the schema using:

  • The Service Registry web console
  • A curl command using the Service Registry API
  • A Maven plugin supplied with Service Registry
  • Schema configuration added to your client code

Client applications cannot use Service Registry until you have registered your schemas.

Service Registry web console

Having installed Service Registry, you connect to the web console from the ui endpoint:

http://MY-REGISTRY-URL/ui

From the console, you can add, view and configure schemas. You can also create the rules that prevent invalid content being added to the registry.

Curl command example

curl -X POST -H "Content-type: application/json; artifactType=AVRO" \
  -H "X-Registry-ArtifactId: prices-value" \
  --data '{ 1
      "type":"record",
      "name":"price",
      "namespace":"com.redhat",
      "fields":[{"name":"symbol","type":"string"},
      {"name":"price","type":"string"}]
    }'
  https://my-cluster-service-registry-myproject.example.com/api/artifacts -s 2
1
Avro schema artifact.
2
OpenShift route name that exposes Service Registry.

Maven plugin example

<plugin>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-maven-plugin</artifactId>
<version>${registry.version}</version>
<executions>
  <execution>
    <phase>generate-sources</phase>
    <goals>
      <goal>register</goal>
    </goals>
    <configuration>
      <registryUrl>https://my-cluster-service-registry-myproject.example.com/api</registryUrl>
      <artifactType>AVRO</artifactType>
      <artifacts>
        <schema1>${project.basedir}/schemas/schema1.avsc</schema1>
      </artifacts>
    </configuration>
  </execution>
</executions>
</plugin>

Configuration using a producer client example

String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", 1
    "https://my-cluster-service-registry-myproject.example.com/api");
try (RegistryService service = RegistryClient.create(registryUrl_node1)) {
    String artifactId = ApplicationImpl.INPUT_TOPIC + "-value";
    try {
        service.getArtifactMetaData(artifactId); 2
    } catch (WebApplicationException e) {
        CompletionStage <ArtifactMetaData> csa = service.createArtifact(
            ArtifactType.AVRO,
            artifactId,
            new ByteArrayInputStream(LogInput.SCHEMA$.toString().getBytes())
        );
        csa.toCompletableFuture().get();
    }
}
1
The properties are registered. You can register properties against more than one node.
2
Check to see if the schema already exists based on the artifact ID.

13.6. Using a schema from a Kafka consumer client

This procedure describes how to configure a Kafka consumer client written in Java to use a schema from Service Registry.

Prerequisites

  • Service Registry is installed
  • The schema is registered with Service Registry

Procedure

  1. Configure the client with the URL of Service Registry. For example:

    String registryUrl = "https://registry.example.com/api";
    Properties props = new Properties();
    props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, registryUrl);
  2. Configure the client with the Service Registry deserializer. For example:

    // Configure Kafka
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        AvroKafkaDeserializer.class.getName()); 1
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        AvroKafkaDeserializer.class.getName()); 2
    1
    The deserializer provided by Service Registry.
    2
    The deserialization is in Apache Avro JSON format.

13.7. Using a schema from a Kafka producer client

This procedure describes how to configure a Kafka producer client written in Java to use a schema from Service Registry.

Prerequisites

  • Service Registry is installed
  • The schema is registered with Service Registry

Procedure

  1. Configure the client with the URL of Service Registry. For example:

    String registryUrl = "https://registry.example.com/api";
    Properties props = new Properties();
    props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, registryUrl);
  2. Configure the client with the serializer, and the strategy to look up the schema in Service Registry. For example:

    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); 1
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); 2
    props.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, FindLatestIdStrategy.class.getName()); 3
    1
    The serializer for the message key provided by Service Registry.
    2
    The serializer for the message value provided by Service Registry.
    3
    Lookup strategy to find the global ID for the schema.

13.8. Using a schema from a Kafka Streams application

This procedure describes how to configure a Kafka Streams client written in Java to use a schema from Service Registry.

Prerequisites

  • Service Registry is installed
  • The schema is registered with Service Registry

Procedure

  1. Create and configure a REST client with the Service Registry. For example:

    String registryUrl = "https://registry.example.com/api";
    RegistryService client = RegistryClient.cached(registryUrl);
  2. Configure the serializer, deserializer, and create the Kafka Streams client. For example:

    Serializer<LogInput> serializer = new AvroKafkaSerializer<>( 1
        client,
        new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true)
    );
    Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <> ( 2
        client,
        new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true)
    );
    Serde<LogInput> logSerde = Serdes.serdeFrom( 3
        serializer,
        deserializer
    );
    KStream<String, LogInput> input = builder.stream( 4
        INPUT_TOPIC,
        Consumed.with(Serdes.String(), logSerde)
    );
    1
    The serializer provided by Service Registry.
    2
    The deserializer provided by Service Registry.
    3
    The deserialization is in Apache Avro format.
    4
    The Kafka Streams client application.