Using Quarkus applications with Kafka instances and Red Hat OpenShift Service Registry

Guide
  • Red Hat OpenShift Service Registry 1
  • Updated 17 December 2021
  • Published 27 October 2021

Using Quarkus applications with Kafka instances and Red Hat OpenShift Service Registry

Guide
Red Hat OpenShift Service Registry 1
  • Updated 17 December 2021
  • Published 27 October 2021

As a developer of applications and services, you can connect Quarkus applications to Kafka instances in Red Hat OpenShift Streams for Apache Kafka and Service Registry instances in Red Hat OpenShift Service Registry. This makes it easy for development teams to store and reuse schemas in event streaming architectures.

Quarkus is a Kubernetes-native Java framework made for Java virtual machines (JVMs) and native compilation, and optimized for serverless, cloud, and Kubernetes environments.

Quarkus is designed to work with popular Java standards, frameworks, and libraries like Eclipse MicroProfile and Spring, as well as Apache Kafka, RESTEasy (JAX-RS), Hibernate ORM (JPA), Infinispan, Camel, and many more.

Prerequisites

Importing the Quarkus sample code

For this quick start, you’ll use the Quarkus Service Registry sample code from the App Services Guides and Samples repository in GitHub. After you understand the concepts and tasks in this quick start, you can use your own Quarkus applications with OpenShift Streams for Apache Kafka and Service Registry in the same way.

Procedure
  1. On the command line, clone the App Services Guides and Samples repository from GitHub.

    Cloning the guides and samples repository
    git clone https://github.com/redhat-developer/app-services-guides app-services-guides
  2. In your IDE, open the code-examples/quarkus-service-registry-quickstart directory from the repository that you cloned.

Configuring the Quarkus application to connect to Kafka and Service Registry instances

To enable your Quarkus applications to access a Kafka instance, configure the connection properties using the Kafka bootstrap server endpoint. To access a Service Registry instance, configure the registry endpoint connection property with the Core Registry API value.

Access to the Service Registry and Kafka instances is managed using the same service account and SASL/OAUTHBEARER token endpoint. For Quarkus, you can configure all connection properties using the application.properties file. This example sets environment variables and references them in this file.

Quarkus applications use MicroProfile Reactive Messaging to produce messages to and consume messages from your Kafka instances in OpenShift Streams for Apache Kafka. For details on configuration options, see Using Apache Kafka with Reactive Messaging in the Quarkus documentation.

This Quarkus example application includes producer and consumer processes that serialize/deserialize Kafka messages using a schema stored in Service Registry.

Prerequisites
  • You have a service account with write access to Kafka and Service Registry instances and have stored your credentials securely (see Getting started with Kafka and Getting started with Service Registry).

  • You have the Kafka bootstrap server endpoint for the Kafka instance. You copied this information previously for the Kafka instance in OpenShift Streams for Apache Kafka by selecting the options menu (three vertical dots) and clicking Connection.

  • You have the Core Registry API endpoint for the Service Registry instance. You copied this information for the Service Registry instance by selecting the options menu (three vertical dots) and clicking Connection. From the list of endpoints, you copied the Core Registry API endpoint supported by the Apicurio serializer/deserializer (SerDes) used in this example.

  • You copied the Token endpoint URL value from the same list of endpoints to be used for the OAuth-based athentication method used in this example.

Procedure
  1. On the command line, set the following environment variables to use your Kafka and Service Registry instances with Quarkus or other applications. Replace the values with your own server and credential information:

    • The <bootstrap_server> is the bootstrap server endpoint for your Kafka instance.

    • The <core_registry_url> is the Core Registry API endpoint for your Service Registry instance.

    • The <oauth_token_endpoint_uri> is the SASL/OAUTHBEARER token endpoint.

    • The <client_id> and <client_secret> are the generated credentials for your service account.

      Setting environment variables for server and credentials
      $ export BOOTSTRAP_SERVER=<bootstrap_server>
      $ export REGISTRY_URL=<core_registry_url>
      $ export OAUTH_TOKEN_ENDPOINT_URI=<oauth_token_endpoint_uri>
      $ export CLIENT_ID=<client_id>
      $ export CLIENT_SECRET=<client_secret>
  2. In the Quarkus example application, review the /src/main/resources/application.properties files in the consumer and producer sub-folders to understand how the environment variables you set in the previous step are used. This example uses the dev configuration profile in the application.properties files.

Creating the quotes Kafka topic in OpenShift Streams for Apache Kafka

For this quick start, the Kafka topic that the Quarkus example application uses is called quotes. You must create this topic in OpenShift Streams for Apache Kafka so that the Quarkus application can interact with it.

Prerequisites
Procedure
  1. In the OpenShift Streams for Apache Kafka web console, go to Streams for Apache Kafka > Kafka Instances and click the name of the Kafka instance that you want to add a topic to.

  2. Select the Topics tab, click Create topic, and follow the guided steps to define the topic details. Click Next to complete each step and click Finish to complete the setup.

    Image of wizard to create a topic
    Figure 1. Guided steps to define topic details
    • Topic name: Enter quotes as the topic name.

    • Partitions: Set the number of partitions for this topic. This example sets the partition to 1 for a single partition. Partitions are distinct lists of messages in a topic and enable parts of a topic to be distributed over multiple brokers in the cluster. A topic can contain one or more partitions, enabling producer and consumer loads to be scaled.

    • Message retention: Set the message retention time and size to the relevant value and increment. This example sets the retention time to A week and the retention size to Unlimited. Message retention time is the amount of time that messages are retained in a topic before they are deleted or compacted, depending on the cleanup policy. Retention size is the maximum total size of all log segments in a partition before they are deleted or compacted.

    • Replicas: For this release of OpenShift Streams for Apache Kafka, the replicas are preconfigured. The number of partition replicas for the topic is set to 3 and the minimum number of follower replicas that must be in sync with a partition leader is set to 2.

      Replicas are copies of partitions in a topic. Partition replicas are distributed over multiple brokers in the cluster to ensure topic availability if a broker fails. When a follower replica is in sync with a partition leader, the follower replica can become the new partition leader if needed.

      After you complete the topic setup, the new Kafka topic is listed in the topics table. You can now run the Quarkus application to start producing and consuming messages using this topic.

Verification
  • Verify that the new quotes Kafka topic is listed in the topics table.

Running the Quarkus example application

After you configure your Quarkus application to connect to Kafka and Service Registry instances, and you create the Kafka topic, you can run the Quarkus application to start producing and consuming messages to and from this topic.

The Quarkus application in this quick start consists of two processes:

  • The consumer process is implemented by the QuotesResource class. This class exposes the /quotes REST endpoint that streams quotes from the quotes topic. This process also has a minimal frontend that streams quotes using Server-Sent Events to the web page.

  • The producer process is implemented by the QuotesProducer class. This class produces a new quote periodically (every 5 seconds) with a random quote value that is published to the quotes topic.

Prerequisites
Procedure
  1. On the command line, change to the code-examples/quarkus-service-registry-quickstart/consumer directory that you imported and run the consumer process.

    Running the example consumer process
    $ cd ~/code-examples/quarkus-service-registry-quickstart/consumer
    $ mvnw quarkus:dev
  2. After the consumer process is running, in a web browser, go to http://localhost:8080/quotes.html and verify that this process is available.

  3. Leave the consumer process running, and run the producer process on a different terminal.

    Running the example producer process
    $ cd ~/code-examples/quarkus-service-registry-quickstart/producer
    $ mvnw quarkus:dev
  4. When both the consumer and producer processes are running, view the generated quotes in the web browser at http://localhost:8080/quotes.html.

  5. In the web console, go to Service Registry > Service Registry Instances, select your Service Registry instance, and view the automatically generated schema for your application.

What just happened?
  • The Quarkus application is configured to use the io.apicurio.registry.serde.avro.AvroKafkaSerializer Java class for serializing and the io.apicurio.registry.serde.avro.AvroKafkaDeserializer class for deserializing messages to Avro format. This SerDes is configured to use remote schemas in Red Hat OpenShift Service Registry rather than the local schemas in the application.

  • Because there are no schemas in the Service Registry instance, the SerDes published the schema for the quotes topic. The name of the schema is managed by the TopicRecordIdStrategy class, which uses the topic_name-value convention. You can find this schema in the Service Registry instance and configure compatability rules to govern how the schema can evolve for future versions.

  • If the Quarkus application fails to run, review the error log in the terminal and address any problems. Also review the steps in this quick start to ensure that the Quarkus application and Kafka topic are configured correctly.