Chapter 7. Using Kafka client serializers/deserializers

Service Registry provides Kafka client serializers/deserializers for Kafka producer and consumer applications. Kafka producer applications use serializers to encode messages that conform to a specific event schema. Kafka consumer applications then use deserializers to validate that the messages have been serialized using the correct schema, based on a specific schema ID. This ensures consistent schema use and helps to prevent data errors at runtime.

This chapter provides instructions on how to use the Kafka client serializer and deserializer for Apache Avro in your AMQ Streams producer and consumer client applications:

Prerequisites

7.1. Kafka client applications and Service Registry

Using Service Registry decouples the process of managing schemas from the configuration of client applications. You enable an application to use a schema from the registry by specifying its URL in the client code.

For example, the schemas to serialize and deserialize messages can be stored in the registry, which are then referenced from the applications that use them to ensure that the messages that they send and receive are compatible with those schemas.

Kafka client applications can push or pull their schemas from Service Registry at runtime.

Schemas can evolve, so you can define rules in Service Registry, for example, to ensure that changes to a schema are valid and do not break previous versions used by applications. Service Registry checks for compatibility by comparing a modified schema with previous versions of schemas.

Service Registry provides full schema registry support for Avro schemas, which are used by client applications through Kafka client serializer/deserializer (SerDe) services provided by Service Registry.

7.2. Producer schema configuration

A producer client application uses a serializer to put the messages it sends to a specific broker topic into the correct data format.

To enable a producer to use Service Registry for serialization, you:

After registering your schema, when you start Kafka and Service Registry, you can access the schema to format messages sent to the Kafka broker topic by the producer.

If a schema already exists, you can create a new version through the REST API based on compatibility rules defined in Service Registry. Versions are used for compatibility checking as a schema evolves. An artifact ID and schema version represents a unique tuple that identifies a schema.

7.3. Consumer schema configuration

A consumer client application uses a deserializer to get the messages it consumes from a specific broker topic into the correct data format.

To enable a consumer to use Service Registry for deserialization, you:

The schema is then retrieved by the deserializer using a global ID written into the message being consumed. The message received must, therefore, include a global ID as well as the message data.

For example:

# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]

Now, when you start Kafka and Service Registry, you can access the schema in order to format messages received from the Kafka broker topic.

7.4. Strategies to lookup a schema

A Service Registry strategy is used by the Kafka client serializer/deserializer to determine the artifact ID or global ID under which the message schema is registered in Service Registry.

For a given topic and message, you can use implementations of the following Java classes:

  • ArtifactIdStrategy to return an artifact ID
  • GlobalIdStrategy to return a global ID

The artifact ID returned depends on whether the key or value in the message is being serialized.

The classes for each strategy are organized in the io.apicurio.registry.utils.serde.strategy package.

The default strategy is TopicIdStrategy, which looks for Service Registry artifacts with the same name as the Kafka topic receiving messages.

For example:

public String artifactId(String topic, boolean isKey, T schema) {
    return String.format("%s-%s", topic, isKey ? "key" : "value");
}
  • The topic parameter is the name of the Kafka topic receiving the message.
  • The isKey parameter is true when the message key is being serialized, and false when the message value is being serialized.
  • The schema parameter is the schema of the message being serialized/deserialized.
  • The artifactID returned is the ID under which the schema is registered in Service Registry.

What lookup strategy you use depends on how and where you store your schema. For example, you might use a strategy that uses a record ID if you have different Kafka topics with the same Avro message type.

Strategies to return an artifact ID

Strategies to return an artifact ID based on an implementation of ArtifactIdStrategy.

RecordIdStrategy
Avro-specific strategy that uses the full name of the schema.
TopicRecordIdStrategy
Avro-specific strategy that uses the topic name and the full name of the schema.
TopicIdStrategy
(Default) strategy that uses the topic name and key or value suffix.
SimpleTopicIdStrategy
Simple strategy that only uses the topic name.

Strategies to return a global ID

Strategies to return a global ID based on an implementation of GlobalIdStrategy.

FindLatestIdStrategy
Strategy that returns the global ID of the latest schema version, based on an artifact ID.
FindBySchemaIdStrategy
Strategy that matches schema content, based on an artifact ID, to return a global ID.
GetOrCreateIdStrategy
Strategy that tries to get the latest schema, based on an artifact ID, and if it does not exist, it creates a new schema.
AutoRegisterIdStrategy
Strategy that updates the schema, and uses the global ID of the updated schema.

7.5. Service Registry constants

You can configure specific client SerDe services and schema lookup strategies directly into a client using the constants outlined here.

Alternatively, you can use specify the constants in a properties file, or a properties instance.

Constants for serializer/deserializer (SerDe) services

public abstract class AbstractKafkaSerDe<T extends AbstractKafkaSerDe<T>> implements AutoCloseable {
      protected final Logger log = LoggerFactory.getLogger(getClass());

      public static final String REGISTRY_URL_CONFIG_PARAM = "apicurio.registry.url"; 1
      public static final String REGISTRY_CACHED_CONFIG_PARAM = "apicurio.registry.cached"; 2
      public static final String REGISTRY_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.id-handler"; 3
      public static final String REGISTRY_CONFLUENT_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.as-confluent"; 4
1
(Required) The URL of Service Registry.
2
Allows the client to make the request and look up the information from a cache of previous results, to improve processing time. If the cache is empty, the lookup is performed from Service Registry.
3
Extends ID handling to support other ID formats and make them compatible with Service Registry SerDe services. For example, changing the ID format from Long to Integer supports the Confluent ID format.
4
A flag to simplify the handling of Confluent IDs. If set to true, an Integer is used for the global ID lookup.

Constants for lookup strategies

public abstract class AbstractKafkaStrategyAwareSerDe<T, S extends AbstractKafkaStrategyAwareSerDe<T, S>> extends AbstractKafkaSerDe<S> {
      public static final String REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.artifact-id"; 1
      public static final String REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.global-id"; 2

Constants for converters

public class SchemalessConverter<T> extends AbstractKafkaSerDe<SchemalessConverter<T>> implements Converter {
      public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; 1
      public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; 2
1
(Required) Serializer to use with the converter.
2
(Required) Deserializer to use with the converter.

Constants for Avro data providers

public interface AvroDatumProvider<T> {
      String REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM = "apicurio.registry.avro-datum-provider"; 1
      String REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM = "apicurio.registry.use-specific-avro-reader"; 2
1
Avro Datum provider to write data to a schema, with or without reflection.
2
Flag to set to use an Avro-specific datum reader.
DefaultAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 1
ReflectAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 2
1
Default datum reader.
2
Datum reader using reflection.

7.6. Registering a schema to Service Registry

After you have defined a schema in the appropriate format, such as Apache Avro, you can add the schema to Service Registry.

You can add the schema through:

  • The Service Registry web console
  • A curl command using the Service Registry API
  • A Maven plugin supplied with Service Registry
  • Schema configuration added to your client code

Client applications cannot use Service Registry until you have registered your schemas.

Service Registry web console

Having installed Service Registry, you connect to the web console from the ui endpoint:

http://MY-REGISTRY-URL/ui

From the console, you can add, view and configure schemas. You can also create the rules that prevent invalid content being added to the registry.

For more information on using the Service Registry web console, see the Chapter 3, Installing Service Registry on OpenShift.

Curl example

curl -X POST -H "Content-type: application/json; artifactType=AVRO" \
  -H "X-Registry-ArtifactId: prices-value" \
  --data '{ 1
      "type":"record",
      "name":"price",
      "namespace":"com.redhat",
      "fields":[{"name":"symbol","type":"string"},
      {"name":"price","type":"string"}]
    }'
  https://my-cluster-service-registry-myproject.example.com/api/artifacts -s 2
1
Avro schema
2
OpenShift route name that exposes Service Registry

Plugin example

<plugin>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-maven-plugin</artifactId>
<version>${registry.version}</version>
<executions>
  <execution>
    <phase>generate-sources</phase>
    <goals>
      <goal>register</goal>
    </goals>
    <configuration>
      <registryUrl>https://my-cluster-service-registry-myproject.example.com/api</registryUrl>
      <artifactType>AVRO</artifactType>
      <artifacts>
        <schema1>${project.basedir}/schemas/schema1.avsc</schema1>
      </artifacts>
    </configuration>
  </execution>
</executions>
</plugin>

Configuration through a (producer) client example

String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", 1
    "https://my-cluster-service-registry-myproject.example.com/api");
try (RegistryService service = RegistryClient.create(registryUrl_node1)) {
    String artifactId = ApplicationImpl.INPUT_TOPIC + "-value";
    try {
        service.getArtifactMetaData(artifactId); 2
    } catch (WebApplicationException e) {
        CompletionStage <ArtifactMetaData> csa = service.createArtifact(
            ArtifactType.AVRO,
            artifactId,
            new ByteArrayInputStream(LogInput.SCHEMA$.toString().getBytes())
        );
        csa.toCompletableFuture().get();
    }
}
1
The properties are registered. You can register properties against more than one node.
2
Check to see if the schema already exists based on the artifact ID.

7.7. Using a Service Registry schema from a consumer client

This procedure describes how to configure a Java consumer client to use a schema from Service Registry.

Procedure

  1. Configure the client with the URL of Service Registry.

    For example:

    String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
        "https://my-cluster-service-registry-myproject.example.com/api");
    RegistryService service = RegistryClient.cached(registryUrl);
  2. Configure the client with the Service Registry deserializer service.

    For example:

    Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <> ( 1
        service,
        new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true)
    );
    Serde<LogInput> logSerde = Serdes.serdeFrom( 2
        new AvroKafkaSerializer<>(service),
        deserializer
    );
    KStream<String, LogInput> input = builder.stream( 3
        INPUT_TOPIC,
        Consumed.with(Serdes.String(), logSerde)
    );
    1
    The deserializer service provided by Service Registry.
    2
    The deserialization is in Apache Avro JSON format.
    3
    The input data for deserialization derived from the topic values consumed by the client.

7.8. Using a Service Registry schema from a producer client

This procedure describes how to configure a Java producer client to use a schema from Service Registry.

Procedure

  1. Configure the client with the URL of Service Registry.

    For example:

    String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
        "https://my-cluster-service-registry-myproject.example.com/api");
    RegistryService service = RegistryClient.cached(registryUrl);
  2. Configure the client with the serializer services, and the strategy to look up the schema in Service Registry.

    For example:

    String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
        "https://my-cluster-service-registry-myproject.example.com/api");
    
        clientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, property(clientProperties, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092"));
        clientProperties.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, registryUrl_node1); 1
        clientProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 2
        clientProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); 3
        clientProperties.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, FindLatestIdStrategy.class.getName()); 4
    1
    The Service Registry URL.
    2
    The serializer service for the message key provided by Service Registry.
    3
    The serializer service for the message value provided by Service Registry.
    4
    Lookup strategy to find the global ID for the schema. Matches the schema of the message against its global ID (artifact ID and schema version) in Service Registry.