Chapter 8. Configuring Kafka serializers/deserializers in Java clients

This chapter provides detailed information on how to configure Kafka SerDes in your producer and consumer Java client applications:

8.1. Service Registry serializer/deserializer configuration in client applications

You can configure specific client serializer/deserializer (SerDe) services and schema lookup strategies directly in a client application using the example constants shown in this section. Alternatively, you can configure the corresponding Service Registry application properties in a file or an instance.

The following sections show examples of commonly used SerDe constants and configuration options.

Configuration for SerDe services

public class SerdeConfig {

   public static final String REGISTRY_URL = "apicurio.registry.url"; 1
   public static final String ID_HANDLER = "apicurio.registry.id-handler"; 2
   public static final String ENABLE_CONFLUENT_ID_HANDLER = "apicurio.registry.as-confluent"; 3
1
The required URL of Service Registry.
2
Extends ID handling to support other ID formats and make them compatible with Service Registry SerDe services. For example, changing the default ID format from Long to Integer supports the Confluent ID format.
3
Simplifies the handling of Confluent IDs. If set to true, an Integer is used for the global ID lookup. The setting should not be used with the ID_HANDLER option.

Additional resources

Configuration for SerDe lookup strategies

public class SerdeConfig {

   public static final String ARTIFACT_RESOLVER_STRATEGY = "apicurio.registry.artifact-resolver-strategy"; 1
   public static final String SCHEMA_RESOLVER = "apicurio.registry.schema-resolver"; 2
...
1
Java class that implements the artifact resolver strategy and maps between the Kafka SerDe and artifact ID. Defaults to the topic ID strategy. This is only used by the serializer class.
2
Java class that implements the schema resolver. Defaults to DefaultSchemaResolver. This is used by the serializer and deserializer classes.

Additional resources

Configuration for Kafka converters

public class SerdeBasedConverter<S, T> extends SchemaResolverConfigurer<S, T> implements Converter, Closeable {

   public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; 1
   public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; 2
1
The required serializer to use with the Service Registry Kafka converter.
2
The required deserializer to use with the Service Registry Kafka converter.

Additional resources

Configuration for different schema types

For details on how to configure SerDe for different schema technologies, see the following:

8.2. Service Registry serializer/deserializer configuration properties

This section provides reference information on Java configuration properties for Service Registry Kafka serializers/deserializers (SerDes).

SchemaResolver interface

Service Registry SerDes are based on the SchemaResolver interface, which abstracts access to the registry and applies the same lookup logic for the SerDes classes of all supported formats.

Table 8.1. Configuration property for SchemaResolver interface

ConstantPropertyDescriptionTypeDefault

SCHEMA_RESOLVER

apicurio.registry.schema-resolver

Used by serializers and deserializers. Fully-qualified Java classname that implements SchemaResolver.

String

io.apicurio.registry.serde.DefaultSchemaResolver

Note

The DefaultSchemaResolver is recommended and provides useful features for most use cases. For some advanced use cases, you might use a custom implementation of SchemaResolver.

DefaultSchemaResolver class

You can use the DefaultSchemaResolver to configure features such as:

  • Access to the registry API
  • How to look up artifacts in the registry
  • How to write and read artifact information to and from Kafka
  • Fall-back options for deserializers
Configuration for registry API access options

The DefaultSchemaResolver provides the following properties to configure access to the core registry API:

Table 8.2. Configuration properties for access to registry API

ConstantPropertyDescriptionTypeDefault

REGISTRY_URL

apicurio.registry.url

Used by serializers and deserializers. URL to access the registry API.

String

None

AUTH_SERVICE_URL

apicurio.auth.service.url

Used by serializers and deserializers. URL of the authentication service. Required when accessing a secure registry using the OAuth client credentials flow.

String

None

AUTH_REALM

apicurio.auth.realm

Used by serializers and deserializers. Realm to access the authentication service. Required when accessing a secure registry using the OAuth client credentials flow.

String

None

AUTH_CLIENT_ID

apicurio.auth.client.id

Used by serializers and deserializers. Client ID to access the authentication service. Required when accessing a secure registry using the OAuth client credentials flow.

String

None

AUTH_CLIENT_SECRET

apicurio.auth.client.secret

Used by serializers and deserializers. Client secret to access the authentication service. Required when accessing a secure registry using the OAuth client credentials flow.

String

None

AUTH_USERNAME

apicurio.auth.username

Used by serializers and deserializers. Username to access the registry. Required when accessing a secure registry using HTTP basic authentication.

String

None

AUTH_PASSWORD

apicurio.auth.password

Used by serializers and deserializers. Password to access the registry. Required when accessing a secure registry using HTTP basic authentication.

String

None

Configuration for registry lookup options

The DefaultSchemaResolver uses the following properties to configure how to look up artifacts in Service Registry.

Table 8.3. Configuration properties for registry artifact lookup

ConstantPropertyDescriptionTypeDefault

ARTIFACT_RESOLVER_STRATEGY

apicurio.registry.artifact-resolver-strategy

Used by serializers only. Fully-qualified Java classname that implements ArtifactResolverStrategy and maps each Kafka message to an ArtifactReference (groupId, artifactId, and version). For example, the default strategy uses the topic name as the schema artifactId.

String

io.apicurio.registry.serde.strategy.TopicIdStrategy

EXPLICIT_ARTIFACT_GROUP_ID

apicurio.registry.artifact.group-id

Used by serializers only. Sets the groupId used for querying or creating an artifact. Overrides the groupId returned by the ArtifactResolverStrategy.

String

None

EXPLICIT_ARTIFACT_ID

apicurio.registry.artifact.artifact-id

Used by serializers only. Sets the artifactId used for querying or creating an artifact. Overrides the artifactId returned by the ArtifactResolverStrategy.

String

None

EXPLICIT_ARTIFACT_VERSION

apicurio.registry.artifact.version

Used by serializers only. Sets the artifact version used for querying or creating an artifact. Overrides the version returned by the ArtifactResolverStrategy.

String

None

FIND_LATEST_ARTIFACT

apicurio.registry.find-latest

Used by serializers only. Specifies whether the serializer tries to find the latest artifact in the registry for the corresponding group ID and artifact ID.

boolean

false

AUTO_REGISTER_ARTIFACT

apicurio.registry.auto-register

Used by serializers only. Specifies whether the serializer tries to create an artifact in the registry. The JSON Schema serializer does not support this feature.

boolean

false

AUTO_REGISTER_ARTIFACT_IF_EXISTS

apicurio.registry.auto-register.if-exists

Used by serializers only. Configures the behavior of the client when there is a conflict creating an artifact because the artifact already exists. Available values are FAIL, UPDATE, RETURN, or RETURN_OR_UPDATE.

String

RETURN_OR_UPDATE

CHECK_PERIOD_MS

apicurio.registry.check-period-ms

Used by serializers and deserializers. Specifies how long to cache artifacts before auto-eviction. If not set, artifacts are fetched every time.

String

None

USE_ID

apicurio.registry.use-id

Used by serializers and deserializers. Configures to use the specified IdOption as the identifier for artifacts. Options are globalId and contentId. Instructs the serializer to write the specified ID to Kafka, and instructs the deserializer to use this ID to find the schema.

String

globalId

Configuration to read/write registry artifacts in Kafka

The DefaultSchemaResolver uses the following properties to configure how artifact information is written to and read from Kafka.

Table 8.4. Configuration properties to read/write artifact information in Kafka

ConstantPropertyDescriptionTypeDefault

ENABLE_HEADERS

apicurio.registry.headers.enabled

Used by serializers and deserializers. Configures to read/write the artifact identifier to Kafka message headers instead of in the message payload.

boolean

true

HEADERS_HANDLER

apicurio.registry.headers.handler

Used by serializers and deserializers. Fully-qualified Java classname that implements HeadersHandler and writes/reads the artifact identifier to/from the Kafka message headers.

String

io.apicurio.registry.serde.headers.DefaultHeadersHandler

ID_HANDLER

apicurio.registry.id-handler

Used by serializers and deserializers. Fully-qualified Java classname of a class that implements IdHandler and writes/reads the artifact identifier to/from the message payload. Only used if apicurio.registry.headers.enabled is set to false.

String

io.apicurio.registry.serde.DefaultIdHandler

ENABLE_CONFLUENT_ID_HANDLER

apicurio.registry.as-confluent

Used by serializers and deserializers. Shortcut for enabling the legacy Confluent-compatible implementation of IdHandler. Only used if apicurio.registry.headers.enabled is set to false.

boolean

true

Configuration for deserializer fall-back options

The DefaultSchemaResolver uses the following property to configure a fall-back provider for all deserializers.

Table 8.5. Configuration property for deserializer fall-back provider

ConstantPropertyDescriptionTypeDefault

FALLBACK_ARTIFACT_PROVIDER

apicurio.registry.fallback.provider

Only used by deserializers. Sets a custom implementation of FallbackArtifactProvider for resolving the artifact used for deserialization. FallbackArtifactProvider configures a fallback artifact to fetch from the registry in case the lookup fails.

String

io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider

The DefaultFallbackArtifactProvider uses the following properties to configure deserializer fall-back options:

Table 8.6. Configuration properties for deserializer fall-back options

ConstantPropertyDescriptionTypeDefault

FALLBACK_ARTIFACT_ID

apicurio.registry.fallback.artifact-id

Used by deserializers only. Sets the artifactId used as fallback for resolving the artifact used for deserialization.

String

None

FALLBACK_ARTIFACT_GROUP_ID

apicurio.registry.fallback.group-id

Used by deserializers only. Sets the groupId used as fallback for resolving the group used for deserialization.

String

None

FALLBACK_ARTIFACT_VERSION

apicurio.registry.fallback.version

Used by deserializers only. Sets the version used as fallback for resolving the artifact used for deserialization.

String

None

Additional resources

  • For more details, see the SerdeConfig Java class
  • You can configure application properties as Java system properties or include them in the Quarkus application.properties file. For more details, see the Quarkus documentation.

8.3. How to configure different client serializer/deserializer types

When using schemas in your Kafka client applications, you must choose which specific schema type to use, depending on your use case. Service Registry provides SerDe Java classes for Apache Avro, JSON Schema, and Google Protobuf. The following sections explain how to configure Kafka applications to use each type.

You can also use Kafka to implement custom serializer and deserializer classes, and leverage Service Registry functionality using the Service Registry REST Java client.

Kafka application configuration for serializers/deserializers

Using the SerDe classes provided by Service Registry in your Kafka application involves setting the correct configuration properties. The following simple Avro examples show how to configure a serializer in a Kafka producer application and how to configure a deserializer in a Kafka consumer application.

Example serializer configuration in a Kafka producer

// Create the Kafka producer
private static Producer<Object, Object> createKafkaProducer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

    // Use Service Registry-provided Kafka serializer for Avro
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName());

    // Configure the Service Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // Register the schema artifact if not found in the registry.
    props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);

    // Create the Kafka producer
    Producer<Object, Object> producer = new KafkaProducer<>(props);
    return producer;
}

Example deserializer configuration in a Kafka consumer

// Create the Kafka consumer
private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
    Properties props = new Properties();

    // Configure standard Kafka settings
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
    props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
    props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // Use Service Registry-provided Kafka deserializer for Avro
    props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

    // Configure the Service Registry location
    props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

    // No other configuration needed because the schema globalId the deserializer uses is sent
    // in the payload. The deserializer extracts the globalId and uses it to look up the schema
    // from the registry.

    // Create the Kafka consumer
    KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
    return consumer;
}

Additional resources

8.3.1. Configure Avro SerDe with Service Registry

Service Registry provides the following Kafka client serializer and deserializer classes for Apache Avro:

  • io.apicurio.registry.serde.avro.AvroKafkaSerializer
  • io.apicurio.registry.serde.avro.AvroKafkaDeserializer

Configure the Avro serializer

You can configure the Avro serializer class with the following:

  • Service Registry URL
  • Artifact resolver strategy
  • ID location
  • ID encoding
  • Avro datum provider
  • Avro encoding

ID location

The serializer passes the unique ID of the schema as part of the Kafka message so that consumers can use the correct schema for deserialization. The ID can be in the message payload or in the message headers. The default location is the message payload. To send the ID in the message headers, set the following configuration property:

props.putIfAbsent(SerdeConfig.ENABLE_HEADERS, "true")

The property name is apicurio.registry.headers.enabled.

ID encoding

You can customize how the schema ID is encoded when passing it in the Kafka message body. Set the apicurio.registry.id-handler configuration property to a class that implements the io.apicurio.registry.serde.IdHandler interface. Service Registry provides the following implementations:

  • io.apicurio.registry.serde.DefaultIdHandler: Stores the ID as an 8-byte long
  • io.apicurio.registry.serde.Legacy4ByteIdHandler: Stores the ID as an 4-byte integer

Service Registry represents the schema ID as a long, but for legacy reasons, or for compatibility with other registries or SerDe classes, you might want to use 4 bytes when sending the ID.

Avro datum provider

Avro provides different datum writers and readers to write and read data. Service Registry supports three different types:

  • Generic
  • Specific
  • Reflect

The Service Registry AvroDatumProvider is the abstraction of which type is used, where DefaultAvroDatumProvider is used by default.

You can set the following configuration options:

  • apicurio.registry.avro-datum-provider: Specifies a fully-qualified Java class name of the AvroDatumProvider implementation, for example io.apicurio.registry.serde.avro.ReflectAvroDatumProvider
  • apicurio.registry.use-specific-avro-reader: Set to true to use a specific type when using DefaultAvroDatumProvider

Avro encoding

When using Avro to serialize data, you can use the Avro binary encoding format to ensure the data is encoded in as efficient a format as possible. Avro also supports encoding the data as JSON, which makes it easier to inspect the payload of each message, for example, for logging or debugging.

You can set the Avro encoding by configuring the apicurio.registry.avro.encoding property with a value of JSON or BINARY. The default is BINARY.

Configure the Avro deserializer

You must configure the Avro deserializer class to match the following configuration settings of the serializer:

  • Service Registry URL
  • ID encoding
  • Avro datum provider
  • Avro encoding

See the serializer section for these configuration options. The property names and values are the same.

Note

The following options are not required when configuring the deserializer:

  • Artifact resolver strategy
  • ID location

The deserializer class can determine the values for these options from the message. The strategy is not required because the serializer is responsible for sending the ID as part of the message.

The ID location is determined by checking for the magic byte at the start of the message payload. If that byte is found, the ID is read from the message payload using the configured handler. If the magic byte is not found, the ID is read from the message headers.

Additional resources

8.3.2. Configure JSON Schema SerDe with Service Registry

Service Registry provides the following Kafka client serializer and deserializer classes for JSON Schema:

  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer
  • io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer

Unlike Apache Avro, JSON Schema is not a serialization technology, but is instead a validation technology. As a result, configuration options for JSON Schema are quite different. For example, there is no encoding option, because data is always encoded as JSON.

Configure the JSON Schema serializer

You can configure the JSON Schema serializer class as follows:

  • Service Registry URL
  • Artifact resolver strategy
  • Schema validation

The only non-standard configuration property is JSON Schema validation, which is enabled by default. You can disable this by setting apicurio.registry.serde.validation-enabled to "false". For example:

props.putIfAbsent(SerdeConfig.VALIDATION_ENABLED, Boolean.FALSE)

Configure the JSON Schema deserializer

You can configure the JSON Schema deserializer class as follows:

  • Service Registry URL
  • Schema validation
  • Class for deserializing data

You must provide the location of Service Registry so that the schema can be loaded. The other configuration is optional.

Note

Deserializer validation only works if the serializer passes the global ID in the Kafka message, which will only happen when validation is enabled in the serializer.

Additional resources

8.3.3. Configure Protobuf SerDe with Service Registry

Service Registry provides the following Kafka client serializer and deserializer classes for Google Protobuf:

  • io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer
  • io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer

Configure the Protobuf serializer

You can configure the Protobuf serializer class as follows:

  • Service Registry URL
  • Artifact resolver strategy
  • ID location
  • ID encoding
  • Schema validation

For details on these configuration options, see the following sections:

Configure the Protobuf deserializer

You must configure the Protobuf deserializer class to match the following configuration settings in the serializer:

  • Service Registry URL
  • ID encoding

The configuration property names and values are the same as for the serializer.

Note

The following options are not required when configuring the deserializer:

  • Artifact resolver strategy
  • ID location

The deserializer class can determine the values for these options from the message. The strategy is not required because the serializer is responsible for sending the ID as part of the message.

The ID location is determined by checking for the magic byte at the start of the message payload. If that byte is found, the ID is read from the message payload using the configured handler. If the magic byte is not found, the ID is read from the message headers.

Note

The Protobuf deserializer does not deserialize to your exact Protobuf Message implementation, but rather to a DynamicMessage instance. There is no appropriate API to do otherwise.

Additional resources