Chapter 9. Managing schemas with Service Registry

This chapter outlines how to deploy and integrate AMQ Streams with Red Hat Service Registry. You can use Service Registry as a centralized store of service schemas for data streaming. For Kafka, you can use Service Registry to store Apache Avro or JSON schema.

Service Registry provides a REST API and a Java REST client to register and query the schemas from client applications through server-side endpoints. You can configure producer and consumer clients to use Service Registry.

A Maven plugin is also provided so that you can upload and download schemas as part of your build. The Maven plugin is useful for testing and validation, when checking that your schema updates are compatible with client applications.

Important

Service Registry is a Technology Preview only. Technology Preview features are not supported with Red Hat production service-level agreements (SLAs) and might not be functionally complete. Red Hat does not recommend implementing any Technology Preview features in production environments. This Technology Preview feature provides early access to upcoming product innovations, enabling you to test functionality and provide feedback during the development process. For more information about the support scope of Red Hat Technology Preview features, see Technology Preview Features Support Scope.

Additional resources

9.1. Why use Service Registry?

Using Service Registry decouples the process of managing schemas from the configuration of client applications. You enable an application to use a schema from the registry by specifying its URL in the client code.

For example, the schemas to serialize and deserialize messages can be stored in the registry, which are then referenced from the applications that use them to ensure that the messages that they send and receive are compatible with those schemas.

Kafka client applications can push or pull their schemas from Service Registry at runtime.

Schemas can evolve, so you can define rules in Service Registry, for example, to ensure that changes to a schema are valid and do not break previous versions used by applications. Service Registry checks for compatibility by comparing a modified schema with previous versions of schemas.

Service Registry provides full schema registry support for Avro schemas, which are used by client applications through Kafka client serializer/deserializer (SerDe) services provided by Service Registry.

9.2. Producer schema configuration

A producer client application uses a serializer to put the messages it sends to a specific broker topic into the correct data format.

To enable a producer to use Service Registry for serialization, you:

After registering your schema, when you start Kafka and Service Registry, you can access the schema to format messages sent to the Kafka broker topic by the producer.

If a schema already exists, you can create a new version through the REST API based on compatibility rules defined in Service Registry. Versions are used for compatibility checking as a schema evolves. An artifact ID and schema version represents a unique tuple that identifies a schema.

9.3. Consumer schema configuration

A consumer client application uses a deserializer to get the messages it consumes from a specific broker topic into the correct data format.

To enable a consumer to use Service Registry for deserialization, you:

The schema is then retrieved by the deserializer using a global ID written into the message being consumed. The message received must, therefore, include a global ID as well as the message data.

For example:

# ...
[MAGIC_BYTE]
[GLOBAL_ID]
[MESSAGE DATA]

Now, when you start Kafka and Service Registry, you can access the schema in order to format messages received from the Kafka broker topic.

9.4. Strategies to lookup a schema

A Service Registry strategy is used by the Kafka client serializer/deserializer to determine the artifact ID or global ID under which the message schema is registered in Service Registry.

For a given topic and message, you can use implementations of the following Java classes:

  • ArtifactIdStrategy to return an artifact ID
  • GlobalIdStrategy to return a global ID

The artifact ID returned depends on whether the key or value in the message is being serialized.

The classes for each strategy are organized in the io.apicurio.registry.utils.serde.strategy package.

The default strategy is TopicIdStrategy, which looks for Service Registry artifacts with the same name as the Kafka topic receiving messages.

For example:

public String artifactId(String topic, boolean isKey, T schema) {
    return String.format("%s-%s", topic, isKey ? "key" : "value");
}
  • The topic parameter is the name of the Kafka topic receiving the message.
  • The isKey parameter is true when the message key is being serialized, and false when the message value is being serialized.
  • The schema parameter is the schema of the message being serialized/deserialized.
  • The artifactID returned is the ID under which the schema is registered in Service Registry.

What lookup strategy you use depends on how and where you store your schema. For example, you might use a strategy that uses a record ID if you have different Kafka topics with the same Avro message type.

Strategies to return an artifact ID

Strategies to return an artifact ID based on an implementation of ArtifactIdStrategy.

RecordIdStrategy
Avro-specific strategy that uses the full name of the schema.
TopicRecordIdStrategy
Avro-specific strategy that uses the topic name and the full name of the schema.
TopicIdStrategy
(Default) strategy that uses the topic name and key or value suffix.
SimpleTopicIdStrategy
Simple strategy that only uses the topic name.

Strategies to return a global ID

Strategies to return a global ID based on an implementation of GlobalIdStrategy.

FindLatestIdStrategy
Strategy that returns the global ID of the latest schema version, based on an artifact ID.
FindBySchemaIdStrategy
Strategy that matches schema content, based on an artifact ID, to return a global ID.
GetOrCreateIdStrategy
Strategy that tries to get the latest schema, based on an artifact ID, and if it does not exist, it creates a new schema.
AutoRegisterIdStrategy
Strategy that updates the schema, and uses the global ID of the updated schema.

9.5. Service Registry constants

You can configure specific client SerDe services and schema lookup strategies directly into a client using the constants outlined here.

Alternatively, you can use specify the constants in a properties file, or a properties instance.

Constants for serializer/deserializer (SerDe) services

public abstract class AbstractKafkaSerDe<T extends AbstractKafkaSerDe<T>> implements AutoCloseable {
      protected final Logger log = LoggerFactory.getLogger(getClass());

      public static final String REGISTRY_URL_CONFIG_PARAM = "apicurio.registry.url"; 1
      public static final String REGISTRY_CACHED_CONFIG_PARAM = "apicurio.registry.cached"; 2
      public static final String REGISTRY_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.id-handler"; 3
      public static final String REGISTRY_CONFLUENT_ID_HANDLER_CONFIG_PARAM = "apicurio.registry.as-confluent"; 4
1
(Required) The URL of Service Registry.
2
Allows the client to make the request and look up the information from a cache of previous results, to improve processing time. If the cache is empty, the lookup is performed from Service Registry.
3
Extends ID handling to support other ID formats and make them compatible with Service Registry SerDe services. For example, changing the ID format from Long to Integer supports the Confluent ID format.
4
A flag to simplify the handling of Confluent IDs. If set to true, an Integer is used for the global ID lookup.

Constants for lookup strategies

public abstract class AbstractKafkaStrategyAwareSerDe<T, S extends AbstractKafkaStrategyAwareSerDe<T, S>> extends AbstractKafkaSerDe<S> {
      public static final String REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.artifact-id"; 1
      public static final String REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM = "apicurio.registry.global-id"; 2

Constants for converters

public class SchemalessConverter<T> extends AbstractKafkaSerDe<SchemalessConverter<T>> implements Converter {
      public static final String REGISTRY_CONVERTER_SERIALIZER_PARAM = "apicurio.registry.converter.serializer"; 1
      public static final String REGISTRY_CONVERTER_DESERIALIZER_PARAM = "apicurio.registry.converter.deserializer"; 2
1
(Required) Serializer to use with the converter.
2
(Required) Deserializer to use with the converter.

Constants for Avro data providers

public interface AvroDatumProvider<T> {
      String REGISTRY_AVRO_DATUM_PROVIDER_CONFIG_PARAM = "apicurio.registry.avro-datum-provider"; 1
      String REGISTRY_USE_SPECIFIC_AVRO_READER_CONFIG_PARAM = "apicurio.registry.use-specific-avro-reader"; 2
1
Avro Datum provider to write data to a schema, with or without reflection.
2
Flag to set to use an Avro-specific datum reader.
DefaultAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 1
ReflectAvroDatumProvider (io.apicurio.registry.utils.serde.avro) 2
1
Default datum reader.
2
Datum reader using reflection.

9.6. Installing Service Registry

The instructions to install Service Registry with AMQ Streams storage are described in the Service Registry documentation.

You can install more than one instance of Service Registry depending on your cluster configuration. The number of instances depends on the storage type you use and how many schemas you need to handle.

9.7. Registering a schema to Service Registry

After you have defined a schema in the appropriate format, such as Apache Avro, you can add the schema to Service Registry.

You can add the schema through:

  • A curl command using the Service Registry API
  • A Maven plugin supplied with Service Registry
  • Schema configuration added to your client code

Client applications cannot use Service Registry until you have registered your schemas.

Curl example

curl -X POST -H "Content-type: application/json; artifactType=AVRO" \
  -H "X-Registry-ArtifactId: prices-value" \
  --data '{ 1
      "type":"record",
      "name":"price",
      "namespace":"com.redhat",
      "fields":[{"name":"symbol","type":"string"},
      {"name":"price","type":"string"}]
    }'
  my-cluster-service-registry-myproject.example.com/artifacts -s 2
1
Avro schema
2
OpenShift route name that exposes Service Registry

Plugin example

<plugin>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-maven-plugin</artifactId>
<version>${registry.version}</version>
<executions>
  <execution>
    <phase>generate-sources</phase>
    <goals>
      <goal>register</goal>
    </goals>
    <configuration>
      <registryUrl>https//my-cluster-service-registry-myproject.example.com</registryUrl>
      <artifactType>AVRO</artifactType>
      <artifacts>
        <schema1>${project.basedir}/schemas/schema1.avsc</schema1>
      </artifacts>
    </configuration>
  </execution>
</executions>
</plugin>

Configuration through a (producer) client example

String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1", 1
    "https//my-cluster-service-registry-myproject.example.com");
try (RegistryService service = RegistryClient.create(registryUrl_node1)) {
    String artifactId = ApplicationImpl.INPUT_TOPIC + "-value";
    try {
        service.getArtifactMetaData(artifactId); 2
    } catch (WebApplicationException e) {
        CompletionStage <ArtifactMetaData> csa = service.createArtifact(
            ArtifactType.AVRO,
            artifactId,
            new ByteArrayInputStream(LogInput.SCHEMA$.toString().getBytes())
        );
        csa.toCompletableFuture().get();
    }
}
1
The properties are registered. You can register properties against more than one node.
2
Check to see if the schema already exists based on the artifact ID.

9.8. Using a Service Registry schema from a producer client

This procedure describes how to configure a Java producer client to use a schema from Service Registry.

Procedure

  1. Configure the client with the URL of Service Registry.

    For example:

    String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
        "https//my-cluster-service-registry-myproject.example.com");
    RegistryService service = RegistryClient.cached(registryUrl);
  2. Configure the client with the serializer services, and the strategy to look up the schema in Service Registry.

    For example:

    String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
        "https//my-cluster-service-registry-myproject.example.com");
    
        clientProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, property(clientProperties, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "my-cluster-kafka-bootstrap:9092"));
        clientProperties.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, registryUrl_node1); 1
        clientProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 2
        clientProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class.getName()); 3
        clientProperties.put(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, FindLatestIdStrategy.class.getName()); 4
    1
    The Service Registry URL.
    2
    The serializer service for the message key provided by Service Registry.
    3
    The serializer service for the message value provided by Service Registry.
    4
    Lookup strategy to find the global ID for the schema. Matches the schema of the message against its global ID (artifact ID and schema version) in Service Registry.

9.9. Using a Service Registry schema from a consumer client

This procedure describes how to configure a Java consumer client to use a schema from Service Registry.

Procedure

  1. Configure the client with the URL of Service Registry.

    For example:

    String registryUrl_node1 = PropertiesUtil.property(clientProperties, "registry.url.node1",
        "https//my-cluster-service-registry-myproject.example.com");
    RegistryService service = RegistryClient.cached(registryUrl);
  2. Configure the client with the Service Registry deserializer service.

    For example:

    Deserializer<LogInput> deserializer = new AvroKafkaDeserializer <> ( 1
        service,
        new DefaultAvroDatumProvider<LogInput>().setUseSpecificAvroReader(true)
    );
    Serde<LogInput> logSerde = Serdes.serdeFrom( 2
        new AvroKafkaSerializer<>(service),
        deserializer
    );
    KStream<String, LogInput> input = builder.stream( 3
        INPUT_TOPIC,
        Consumed.with(Serdes.String(), logSerde)
    );
    1
    The deserializer service provided by Service Registry.
    2
    The deserialization is in Apache Avro JSON format.
    3
    The input data for deserialization derived from the topic values consumed by the client.