Chapter 5. Developing reactive applications using Spring Boot with Eclipse Vert.x

This section provides an introduction to developing applications in a reactive way using Spring Boot starters based on Spring Boot and Eclipse Vert.x. The following examples demonstrate how you can use the starters to create reactive applications.

5.1. Introduction to Spring Boot with Eclipse Vert.x

The Spring reactive stack is build on Project Reactor, a reactive library that implements backpressure and is compliant with the Reactive Streams specification. It provides the Flux and Mono functional API types that enable asynchronous event stream processing.

On top of Project Reactor, Spring provides WebFlux, an asynchronous event-driven web application framework. While WebFlux is designed to work primarily with Reactor Netty, it can also operate with other reactive HTTP servers, such as Eclipse Vert.x.

Spring WebFlux and Reactor enable you to create applications that are:

  • Non-blocking: The application continues to handle further requests when waiting for a response from a remote component or service that is required to complete the current request.
  • Asynchronous: the application responds to events from an event stream by generating response events and publishing them back to the event stream where they can be picked up by other clients in the application.
  • Event-driven: The application responds to events generated by the user or by another service, such as mouse clicks, HTTP requests, or new files being added to a storage.
  • Scalable: Increasing the number of Publishers or Subscribers to match the required event processing capacity of an application only results in a slight increase in the complexity of routing requests between individual clients in the application. Reactive applications can handle large numbers of events using fewer computing and networking resources as compared to other application programming models.
  • Resilient: The application can handle failure of services it depend on without a negative impact on its overall quality of service.

Additional advantages of using Spring WebFlux include:

Similarity with SpringMVC
The SpringMVC API types and WebFlux API types are similar, and it is easy for developers to apply knowledge of SpringMVC to programming applications with WebFlux.

The Spring Reactive offering by Red Hat brings the benefits of Reactor and WebFlux to OpenShift and stand-alone RHEL, and introduces a set of Eclipse Vert.x extensions for the WebFLux framework. This allows you to retain the level of abstraction and rapid prototyping capabilities of Spring Boot, and provides an asynchronous IO API that handles the network communications between the services in your application in a fully reactive manner.

Annotated controllers support
WebFlux retains the endpoint controller annotations introduced by SpringMVC (Both SpringMVC and WebFlux support reactive RxJava2 and Reactor return types).
Functional programming support
Reactor interacts with the Java 8 Functional API, as well as CompletablebFuture, and Stream APIs. In addition to annotation-based endpoints, WebFlux also supports functional endpoints.

Additional resources

See the following resources for additional in-depth information on the implementation details of technologies that are part of the Spring Reactive stack:

5.2. Reactive Spring Web

The spring-web module provides the foundational elements of the reactive capabilities of Spring WebFlux, including:

  • HTTP abstractions provided by the HttpHandler API
  • Reactive Streams adapters for supported servers (Eclipse Vert.x, Undertow and others)
  • Codecs for encoding and decoding event stream data. This includes:

    • DataBuffer, an abstraction for different types of byte buffer representations (Netty ByteBuf, java.nio.ByteBuffer, as well as others)
    • Low-level contracts to encode and decode content independent of HTTP
    • HttpMessageReader and HTTPMessageWriter contracts to encode and decode HTTP message content
  • The WebHandler API (a counterpart to the Servlet 3.1 I/O API that uses non-blocking contracts).

When designing your web application, you can choose between 2 programming models that Spring WebFlux provides:

Annotated Controllers
Annotated controllers in Spring WebFlux are consistent with Spring MVC, and are based on the same annotations from the spring-web module. In addition to the spring-web module from SpringMVC, its WebFlux counterpart also supports reactive @RequestBody arguments.
Functional Endpoints
Functional endpoints provided by spring WebFlux on Java 8 Lambda expressions and functional APIs, this programming model relies on a dedicated library (Reactor, in this case) that routes and handles requests. As opposed to annotation-based endpoint controllers that rely on declaring Intent and using callbacks to complete an activity, the reactive model based on functional endpoints allows request handling to be fully controlled by the application.

5.3. Creating a reactive Spring Boot HTTP service with WebFlux

Create a basic reactive Hello World HTTP web service using Spring Boot and WebFlux.

Prerequisites

Procedure

  1. Add vertx-spring-boot-starter-http as a dependency in the pom.xml file of your project.

    pom.xml

    <project>
    ...
      <dependencies>
      ...
        <dependency>
          <groupId>dev.snowdrop</groupId>
          <artifactId>vertx-spring-boot-starter-http</artifactId>
        </dependency>
      ...
      <dependencies>
    ...
    </project>

  2. Create a main class for your application and define the router and handler methods.

    HttpSampleApplication.java

    package dev.snowdrop.vertx.sample.http;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.web.reactive.function.server.RouterFunction;
    import org.springframework.web.reactive.function.server.ServerRequest;
    import org.springframework.web.reactive.function.server.ServerResponse;
    import reactor.core.publisher.Mono;
    
    import static org.springframework.web.reactive.function.BodyInserters.fromObject;
    import static org.springframework.web.reactive.function.server.RouterFunctions.route;
    import static org.springframework.web.reactive.function.server.ServerResponse.ok;
    
    @SpringBootApplication
    public class HttpSampleApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(HttpSampleApplication.class, args);
        }
    
        @Bean
        public RouterFunction<ServerResponse> helloRouter() {
            return route()
                .GET("/hello", this::helloHandler)
                .build();
        }
    
        private Mono<ServerResponse> helloHandler(ServerRequest request) {
            String name = request
                .queryParam("name")
                .orElse("World");
            String message = String.format("Hello, %s!", name);
    
            return ok()
                .body(fromObject(message));
        }
    }

  3. OPTIONAL: Run and test your application locally:

    1. Navigate to the root directory of your Maven project:

      $ cd myApp
    2. Package your application:

      $ mvn clean package
    3. Start your application from the command line:

      $ java -jar target/vertx-spring-boot-sample-http.jar
    4. In a new terminal window, issue an HTTP request on the /hello endpoint:

      $ curl localhost:8080/hello
      Hello, World!
    5. Provide a custom name with your request to get a personalized response:

      $ curl http://localhost:8080/hello?name=John
      Hello, John!

Additional resources

5.4. Using basic authentication in a reactive Spring Boot WebFlux application.

Create a reactive Hello World HTTP web service with basic form-based authentication using Spring Security and WebFlux starters.

Prerequisites

Procedure

  1. Add vertx-spring-boot-starter-http and spring-boot-starter-security as dependencies in the pom.xml file of your project.

    pom.xml

    <project>
    ...
      <dependencies>
      ...
        <dependency>
          <groupId>dev.snowdrop</groupId>
          <artifactId>vertx-spring-boot-starter-http</artifactId>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-security</artifactId>
        </dependency>
      ...
      <dependencies>
    ...
    </project>

  2. Create an endpoint controller class for your application:

    HelloController.java

    package dev.snowdrop.vertx.sample.http.security;
    
    import java.security.Principal;
    
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Mono;
    
    @RestController
    public class HelloController {
    
        @GetMapping("/")
        public Mono<String> hello(Mono<Principal> principal) {
            return principal
                .map(Principal::getName)
                .map(this::helloMessage);
        }
    
        private String helloMessage(String username) {
            return "Hello, " + username + "!";
        }
    }

  3. Create the main class of your application:

    HttpSecuritySampleApplication.java

    package dev.snowdrop.vertx.sample.http.security;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class HttpSecuritySampleApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(HttpSecuritySampleApplication.class, args);
        }
    }

  4. Create a SecurityConfiguration class that stores the user credentials for accessing the /hello endpoint.

    SecurityConfiguration.java

    package dev.snowdrop.vertx.sample.http.security;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
    import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
    import org.springframework.security.core.userdetails.User;
    import org.springframework.security.core.userdetails.UserDetails;
    
    @EnableWebFluxSecurity
    public class SecurityConfiguration {
    
        @Bean
        public MapReactiveUserDetailsService userDetailsService() {
            UserDetails user = User.withDefaultPasswordEncoder()
                .username("user")
                .password("user")
                .roles("USER")
                .build();
    
            return new MapReactiveUserDetailsService(user);
        }
    }

  5. OPTIONAL: Run and test your application locally:

    1. Navigate to the root directory of your Maven project:

      $ cd myApp
    2. Package your application:

      $ mvn clean package
    3. Start your application from the command line:

      $ java -jar target/vertx-spring-boot-sample-http-security.jar
    4. Navigate to http://localhost:8080 using a browser to access the login screen.
    5. Log in using the credentials below:

      • username: user
      • password: user

      You receive a customized greeting when you are logged in:

      Hello, user!
    6. Navigate to http://localhost:8080/logout using a web browser and use the Log out button to log out of your application.
    7. Alternatively, use a terminal to make an unauthenticated HTTP request on localhost:8080. You receive HTTP 401 Unauthorized response from your application.

      $ curl -I http://localhost:8080
      HTTP/1.1 401 Unauthorized
      WWW-Authenticate: Basic realm="Realm"
      Cache-Control: no-cache, no-store, max-age=0, must-revalidate
      Pragma: no-cache
      Expires: 0
      X-Content-Type-Options: nosniff
      X-Frame-Options: DENY
      X-XSS-Protection: 1 ; mode=block
      Referrer-Policy: no-referrer
    8. Issue an authenticated request using the example user credentials. You receive a personalized response.

      $ curl -u user:user http://localhost:8080
      Hello, user!

Additional resources

5.5. Using OAuth2 authentication in a reactive Spring Boot application.

Set up OAuth2 authentication for your reactive Spring Boot application and authenticate using your client ID and client secret.

Prerequisites

Procedure

  1. Register a new OAuth 2 application on your Github account. Ensure that you provide the following values in the registration form:

  2. Add the following dependencies in the pom.xml file of your project:

    • vertx-spring-boot-starter-http
    • spring-boot-starter-security
    • spring-boot-starter-oauth2-client
    • reactor-netty

      Note that the reactor-netty client is required to ensure that spring-boot-starter-oauth2-client works properly.

      pom.xml

      <project>
      ...
        <dependencies>
        ...
          <dependency>
            <groupId>dev.snowdrop</groupId>
            <artifactId>vertx-spring-boot-starter-http</artifactId>
          </dependency>
          <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
          </dependency>
          <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-oauth2-client</artifactId>
          </dependency>
          <!-- Spring OAuth2 client only works with Reactor Netty client -->
          <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
          </dependency>
        ...
        <dependencies>
      ...
      </project>

  3. Create an endpoint controller class for your application:

    HelloController.java

    package dev.snowdrop.vertx.sample.http.oauth;
    
    import org.springframework.security.core.annotation.AuthenticationPrincipal;
    import org.springframework.security.oauth2.core.user.OAuth2User;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Mono;
    
    @RestController
    public class HelloController {
    
        @GetMapping
        public Mono<String> hello(@AuthenticationPrincipal OAuth2User oauth2User) {
            return Mono.just("Hello, " + oauth2User.getAttributes().get("name") + "!");
        }
    }

  4. Create the main class of your application:

    OAuthSampleApplication.java

    package dev.snowdrop.vertx.sample.http.oauth;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class OAuthSampleApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(OAuthSampleApplication.class, args);
        }
    }

  5. Create a YAML configuration file to store the OAuth2 client ID and client secret you received from GitHub upon registering your application.

    src/main/resources/application.yml

    spring:
      security:
        oauth2:
          client:
            registration:
              github:
                client-id: YOUR_GITHUB_CLIENT_ID
                client-secret: YOUR_GITHUB_CLIENT_SECRET

  6. OPTIONAL: Run and test your application locally:

    1. Navigate to the root directory of your Maven project:

      $ cd myApp
    2. Package your application:

      $ mvn clean package
    3. Start your application from the command line:

      $ java -jar target/vertx-spring-boot-sample-http-oauth.jar
    4. Navigate to http://localhost:8080 using a web browser. You are redirected to an OAuth2 application authorization screen on GitHub. If prompted, log in using your GitHub account credentials.
    5. Click Authorize to confirm. You are redirected to a screen showing a personalized greeting message.

Additional resources

5.6. Creating a reactive Spring Boot SMTP mail application

Create a reactive SMTP email service with Spring Boot with Eclipse Vert.x.

Prerequisites

  • JDK 8 or JDK 11 installed
  • Maven installed
  • A Maven-based application project configured to use Spring Boot
  • A SMTP mail server configured on your machine

Procedure

  1. Add vertx-spring-boot-starter-http and vertx-spring-boot-starter-mail as dependencies in the pom.xml file of your project.

    pom.xml

    <project>
    ...
      <dependencies>
      ...
        <dependency>
          <groupId>dev.snowdrop</groupId>
          <artifactId>vertx-spring-boot-starter-http</artifactId>
        </dependency>
        <dependency>
          <groupId>dev.snowdrop</groupId>
          <artifactId>vertx-spring-boot-starter-mail</artifactId>
        </dependency>
      ...
      <dependencies>
    ...
    </project>

  2. Create a mail handler class for your application:

    MailHandler.java

    package dev.snowdrop.vertx.sample.mail;
    
    import dev.snowdrop.vertx.mail.MailClient;
    import dev.snowdrop.vertx.mail.MailMessage;
    import dev.snowdrop.vertx.mail.SimpleMailMessage;
    import org.springframework.stereotype.Component;
    import org.springframework.util.MultiValueMap;
    import org.springframework.web.reactive.function.server.ServerRequest;
    import org.springframework.web.reactive.function.server.ServerResponse;
    import reactor.core.publisher.Mono;
    
    import static org.springframework.web.reactive.function.server.ServerResponse.noContent;
    
    @Component
    public class MailHandler {
    
        private final MailClient mailClient;
    
        public MailHandler(MailClient mailClient) {
            this.mailClient = mailClient;
        }
    
        public Mono<ServerResponse> send(ServerRequest request) {
            return request.formData()
                .log()
                .map(this::formToMessage)
                .flatMap(mailClient::send)
                .flatMap(result -> noContent().build());
        }
    
        private MailMessage formToMessage(MultiValueMap<String, String> form) {
            return new SimpleMailMessage()
                .setFrom(form.getFirst("from"))
                .setTo(form.get("to"))
                .setSubject(form.getFirst("subject"))
                .setText(form.getFirst("text"));
        }
    
    }

  3. Create the main class of your application:

    MailSampleApplication.java

    package dev.snowdrop.vertx.sample.mail;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.web.reactive.function.server.RouterFunction;
    import org.springframework.web.reactive.function.server.ServerResponse;
    
    import static org.springframework.http.MediaType.APPLICATION_FORM_URLENCODED;
    import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
    import static org.springframework.web.reactive.function.server.RouterFunctions.resources;
    import static org.springframework.web.reactive.function.server.RouterFunctions.route;
    
    @SpringBootApplication
    public class MailSampleApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(MailSampleApplication.class, args);
        }
    
        @Bean
        public RouterFunction<ServerResponse> mailRouter(MailHandler mailHandler) {
            return route()
                    .POST("/mail", accept(APPLICATION_FORM_URLENCODED), mailHandler::send)
                    .build();
        }
    
        @Bean
        public RouterFunction<ServerResponse> staticResourceRouter() {
            return resources("/**", new ClassPathResource("static/"));
        }
    
    }

  4. Create an application.properties file to store your SMTP server credentials:

    application.properties

    vertx.mail.host=YOUR_SMTP_SERVER_HOSTNAME
    vertx.mail.username=YOUR_SMTP_SERVER_USERNAME
    vertx.mail.password=YOUR_SMTP_SERVER_PASSWORD

  5. Create a src/main/resources/static/index.html file that serves as the frontend of your application. Alternatively, use the example HTML email form available for this procedure.
  6. OPTIONAL: Run and test your application locally:

    1. Navigate to the root directory of your Maven project:

      $ cd myApp
    2. Package your application:

      $ mvn clean package
    3. Start your application from the command line.

      $ java -jar target/vertx-spring-boot-sample-mail.jar
    4. Navigate to http://localhost:8080/index.html using a web browser to access the email form.

Additional resources

5.7. Server-sent events

Server-sent events (SSE) is a push technology allowing HTTP sever to send unidirectional updates to the client. SSE works by establishing a connection between the event source and the client. The event source uses this connection to push events to the client-side. After the server pushes the events, the connection remains open and can be used to push subsequent events. When the client terminates the request on the server, the connection is closed. SSE represents a more resource-efficient alternative to polling, where a new connection must be established each time the client polls the event source for updates. As opposed to WebSockets, SSE pushes events in one direction only (that is, from the source to the client). It does not handle bidirectional communication between the event source and the client.

The specification for SSE is incorporated into HTML5, and is widely supported by web browsers, including their legacy versions. SSE can be used from the command line, and is relatively simple to set up compared to other protocols.

SSE is suitable for use cases that require frequent updates from the server to the client, while updates from the client side to the server are expected to be less frequent. Updates form the client side to the server can then be handled over a different protocol, such as REST. Examples of such use cases include social media feed updates or notifications sent to a client when new files are uploaded to a file server.

5.8. Using Server-sent events in a reactive Spring Boot application

Create a simple service that accepts HTTP requests and returns a stream of server-sent events (SSE). When the client establishes a connection to the server and the streaming starts, the connection remains open. The server re-uses the connection to continuously push new events to the client. Canceling the request closes the connection and stops the stream, causing the client to stop receiving updates form the server.

Prerequisites

Procedure

  1. Add vertx-spring-boot-starter-http as a dependency in the pom.xml file of your project.

    pom.xml

    <project>
    ...
      <dependencies>
      ...
        <dependency>
          <groupId>dev.snowdrop</groupId>
          <artifactId>vertx-spring-boot-starter-http</artifactId>
        </dependency>
      ...
      <dependencies>
    ...
    </project>

  2. Create the main class of your application:

    SseExampleApplication.java

    package dev.snowdrop.vertx.sample.sse;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class SseSampleApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SseSampleApplication.class, args);
        }
    }

  3. Create a Server-sent Event controller class for your application. In this example, the class generates a stream of random integers and prints them to a terminal application.

    SseController.java

    package dev.snowdrop.vertx.sample.sse;
    
    import java.time.Duration;
    import java.util.Random;
    
    import org.springframework.http.MediaType;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Flux;
    
    @RestController
    public class SseController {
    
        @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<Integer> getRandomNumberStream() {
            Random random = new Random();
    
            return Flux.interval(Duration.ofSeconds(1))
                .map(i -> random.nextInt())
                .log();
        }
    }

  4. OPTIONAL: Run and test your application locally:

    1. Navigate to the root directory of your Maven project:

      $ cd myApp
    2. Package your application:

      $ mvn clean package
    3. Start your application from the command line:

      $ java -jar target/vertx-spring-boot-sample-sse.jar
    4. In a new terminal window, issue a HTTP request to localhost. You start receiving a continuous stream of random integers from the server-sent event controller:

      $ curl localhost:8080
      data:-2126721954
      
      data:-573499422
      
      data:1404187823
      
      data:1338766210
      
      data:-666543077
      ...

      Press Ctrl+C to cancel your HTTP request and terminate the stream of responses.

Additional resources

5.9. WebSocket Protocol

The WebSocket protocol upgrades a standard HTTP connection to make it persistent and subsequently uses that connection to pass specially formatted messages between the client and server of your application. While the protocol relies on HTTP like handshakes to establish the initial connection between client and server over TCP, it uses a special message format for communication between client and server.

Unlike a standard HTTP connection, a WebSocket connection:

  • can be used to send messages in both directions
  • remains open after the initial request is completed,
  • uses special framing headers in messages, which allows you to send non-HTTP-formatted message payloads (for example control data) inside an HTTP request.

As a result, the WebSockets protocol extends the possibilities of a standard HTTP connection while requiring fewer networking resources and decreasing the risk of services failing due to network timeouts (compared to alternative methods of providing a real time messaging functionality, such as HTTP Long Polling).

WebSockets connections are supported by default on most currently available web browsers across different operating systems and hardware architectures, which makes WebSockets a suitable choice for writing cross-platform web-based applications that you can connect to using only a web browser.

5.10. Using WebSockets in a reactive application based on WebFlux

The following example demonstrates how you can use the WebSocket protocol in an application that provides a backend service that you can connect to using a web browser. When you access the web front end URL of your application using a web browser, the front-end initiates a WebSocket connection to a backend service. You can use the web form available on the website to send values formatted as text strings to the back-end service using the WebSocket connection. The application processes the received value by converting all characters to uppercase and sends the result to the front end using the same WebSocket connection.

Create an application using Spring on Reactive Stack that consists of:

  • a back end Java-based service with a WebSocket handler
  • a web front end based on HTML and JavaScript.

Prerequisites

  • A Maven-based Java application project that uses Spring Boot
  • JDK 8 or JDK 11 installed
  • Maven installed

Procedure:

  1. Add the vertx-spring-boot-starter-http as a dependency in the pom.xml file of your application project:

    pom.xml

    ...
    <dependencies>
      ...
       <dependency>
          <groupId>dev.snowdrop</groupId>
          <artifactId>vertx-spring-boot-starter-http</artifactId>
       </dependency>
      ...
    </dependencies>
    ...

  2. Create the class file containing the back-end application code:

    /src/main/java/webSocketSampleApplication.java

    package dev.snowdrop.WebSocketSampleApplication;
    
    import java.util.Collections;
    import java.util.Map;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.web.reactive.HandlerMapping;
    import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
    import org.springframework.web.reactive.socket.WebSocketHandler;
    import org.springframework.web.reactive.socket.WebSocketMessage;
    import org.springframework.web.reactive.socket.WebSocketSession;
    
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    @SpringBootApplication
    public class WebSocketSampleApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(WebSocketSampleApplication.class, args);
        }
    
        @Bean
        public HandlerMapping handlerMapping() {
            // Define URL mapping for the socket handlers
            Map<String, WebSocketHandler> handlers = Collections.singletonMap("/echo-upper", this::toUppercaseHandler);
    
            SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
            handlerMapping.setUrlMap(handlers);
            // Set a higher precedence than annotated controllers (smaller value means higher precedence)
            handlerMapping.setOrder(-1);
    
            return handlerMapping;
        }
    
        private Mono<Void> toUppercaseHandler(WebSocketSession session) {
            Flux<WebSocketMessage> messages = session.receive() // Get incoming messages stream
                .filter(message -> message.getType() == WebSocketMessage.Type.TEXT) // Filter out non-text messages
                .map(message -> message.getPayloadAsText().toUpperCase()) // Execute service logic
                .map(session::textMessage); // Create a response message
    
            return session.send(messages); // Send response messages
        }
    }

  3. Create the HTML document that serves as a front end for the application. Note, that in the following example, the <script> element contains the JavaScript code that handles the communication with the back end of your application:

    /src/main/resources/static/index.html

    <!doctype html>
    <html>
    <head>
      <meta charset="utf-8"/>
      <title>WebSocket Example</title>
      <script>
      const socket = new WebSocket("ws://localhost:8080/echo-upper");
    
      socket.onmessage = function(e) {
        console.log("Received a value: " + e.data);
        const messages = document.getElementById("messages");
        const message = document.createElement("li");
        message.innerHTML = e.data;
        messages.append(message);
      }
    
      window.onbeforeunload = function(e) {
        console.log("Closing socket");
        socket.close();
      }
    
      function send(event) {
        event.preventDefault();
    
        const value = document.getElementById("value-to-send").value.trim();
        if (value.length > 0) {
          console.log("Sending value to socket: " + value);
          socket.send(value);
        }
      }
      </script>
    </head>
    <body>
    <div>
      <h1>Vert.x Spring Boot WebSocket example</h1>
      <p>
        Enter a value to the form below and click submit. The value will be sent via socket to a backend service.
        The service will then uppercase the value and send it back via the same socket.
      </p>
    </div>
    <div>
      <form onsubmit="send(event)">
        <input type="text" id="value-to-send" placeholder="A value to be sent"/>
        <input type="submit"/>
      </form>
    </div>
    <div>
      <ol id="messages"></ol>
    </div>
    </body>
    </html>

  4. OPTIONAL: Run and test your application locally:

    1. Navigate to the root directory of your Maven project:

      $ cd myApp
    2. Package your application:

      $ mvn clean package
    3. Start your application from the command line:

      $ java -jar target/vertx-spring-boot-sample-websocket.jar
    4. Navigate to http://localhost:8080/index.html using a web browser. The website shows a web interface that contains

      • an input text box,
      • a list of processed results,
      • a Submit button.
  5. Enter a string value into the text box and select Submit.
  6. View the resulting value rendered in uppercase in the list below the input text box.

Additional resources

5.11. Advanced Message Queuing Protocol

The Advanced Message Queuing Protocol (AMQP) is a communication protocol designed to move messages between applications in a a non-blocking way. Standardized as AMQP 1.0, the protocol provides interoperability and messaging integration between new and legacy applications across different network topologies and environments. AMQP works with multiple broker architectures and provides a range of ways to deliver, receive, queue and route messages. AMQP can also work peer-to-peer when you are not using a broker. In a hybrid cloud environment, you can use AMQP to integrate your services with legacy applications without having to deal with processing a variety of different message formats. AMQP Supports real-time asynchronous message processing capabilities and is therefore suitable for use in reactive applications.

5.12. How the AMQP reactive example works

The messaging integration pattern that this example features is a Publisher-Subscriber pattern that 2 queues and a broker.

  • The Request queue stores HTTP requests containing strings that you enter using the Web interface to be processed by the text string processor.
  • The Result queue stores responses containing the strings that have been converted to Uppercase and are ready to be displayed.

The components that the application consist of are:

  • A front-end service that you can use to submit a text string to the application.
  • A back-end service that converts the string to uppercase characters.
  • A HTTP controller that is configured and provided by the Spring Boot HTTP Starter
  • An embedded Artemis AMQP Broker instance that routes messages between 2 messaging queues:

The request queue passes messages containing text strings from the front end to the text string processor service. When you submit a string for processing:

  1. The front end service sends a HTTP POST request containing your string as the payload of the request to the HTTP controller.
  2. The request is picked up by the messaging manager that routes the message to the AMQP Broker.
  3. The broker routes the message to the text string processor service. If the text processor service is unavailable to pick up the request, the broker routes the message to the next available processor instance, if such instance is available. Alternatively, the broker waits before resending the request to the same instance when it becomes available again.
  4. The text string processor service picks up the message and converts the characters in the string to uppercase. The processor service sends a request with the processed result in uppercase to the AMQP Broker.
  5. The AMQP broker routes the request with the processed results to the messaging manager.
  6. The messaging manager stores the request with the processed results in the outgoing queue where it can be accessed by the front end service.

The response queue stores HTTP responses that contain results processed by the string processor service. The front end application polls this queue at regular intervals to retrieve the results. When the processed result is ready to be displayed:

  1. The front end service sends a HTTP GET request to the HTTP controller provided by the Spring Boot HTTP Starter.
  2. The HTTP controller routes the request to the messaging manager.
  3. When a request previously submitted by the front end for processing is ready and available in the outgoing queue, the messaging manager sends the result as a response to the HTTP GET request back to the HTTP controller
  4. The HTTP controller routes the response back to the front end service that displays the result.

5.13. Using AMQP in a reactive application

Develop a simple messaging reactive application using the AMQP Client Starter with a Spring Boot HTTP controller. This example application integrates 2 services in a Publisher-Subscriber messaging integration pattern that uses 2 messaging queues and a broker.

This example shows how you can create a basic application with Spring Boot and Eclipse Vert.x on Reactor Netty that consists of 2 services integrated using AMQP messaging. The application consist of the following components:

  • A front-end service that you can use to submit text strings to the application
  • A back-end service that converts strings to uppercase characters
  • An Artemis AMQP broker that routes massages between the services and manages the request queue and response queue.
  • A HTTP controller provided by the Spring Boot HTTP Starter

Prerequisites

  • A Maven-based Java application project configured to use Spring Boot
  • JDK 8 or JDK 11 installed
  • Maven installed

Procedure

  1. Add the following dependencies to the pom.xml file of your application project:

    pom.xml

    ...
    <dependencies>
      ...
       <dependency>
          <groupId>dev.snowdrop</groupId>
          <artifactId>vertx-spring-boot-starter-http</artifactId>
       </dependency>
       <dependency>
           <groupId>dev.snowdrop</groupId>
           <artifactId>vertx-spring-boot-starter-amqp</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-artemis</artifactId>
       </dependency>
       <dependency>
           <groupId>org.apache.activemq</groupId>
           <artifactId>artemis-jms-server</artifactId>
       </dependency>
       <dependency>
           <groupId>org.apache.activemq</groupId>
           <artifactId>artemis-amqp-protocol</artifactId>
         <exclusions>
           <exclusion>
             <groupId>org.apache.qpid</groupId>
             <artifactId>proton-j</artifactId>
           </exclusion>
         </exclusions>
       </dependency>
      ...
    </dependencies>
    ...

  2. Create the main class file of the example application. This class contains methods that define the respective processing queues for requests and results:

    /src/main/java/AmqpExampleApplication.java

    package dev.snowdrop.AmqpExampleApplication.java;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import dev.snowdrop.vertx.amqp.AmqpProperties;
    import org.apache.activemq.artemis.api.core.TransportConfiguration;
    import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.autoconfigure.jms.artemis.ArtemisConfigurationCustomizer;
    import org.springframework.context.annotation.Bean;
    
    @SpringBootApplication
    public class AmqpExampleApplication {
    
        final static String PROCESSING_REQUESTS_QUEUE = "processing-requests";
    
        final static String PROCESSING_RESULTS_QUEUE = "processing-results";
    
        public static void main(String[] args) {
            SpringApplication.run(AmqpExampleApplication.class, args);
        }
    
        /**
         * Add Netty acceptor to the embedded Artemis server.
         */
        @Bean
        public ArtemisConfigurationCustomizer artemisConfigurationCustomizer(AmqpProperties properties) {
            Map<String, Object> params = new HashMap<>();
            params.put("host", properties.getHost());
            params.put("port", properties.getPort());
    
            return configuration -> configuration
                .addAcceptorConfiguration(new TransportConfiguration(NettyAcceptorFactory.class.getName(), params));
        }
    }

  3. Create the class file containing the code for the HTTP REST controller that manages the request queue and the response queue by exposing REST endpoints that handle your GET and POST requests:

    /src/main/java/Controller.java

    package dev.snowdrop.vertx.sample.amqp;
    
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE;
    
    /**
     * Rest controller exposing GET and POST resources to receive processed messages and submit messages for processing.
     */
    @RestController
    public class Controller {
    
        private final MessagesManager messagesManager;
    
        public Controller(MessagesManager messagesManager) {
            this.messagesManager = messagesManager;
        }
    
        /**
         * Get a flux of messages processed up to this point.
         */
        @GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
        public Flux<String> getProcessedMessages() {
            return Flux.fromIterable(messagesManager.getProcessedMessages());
        }
    
        /**
         * Submit a message for processing by publishing it to a processing requests queue.
         */
        @PostMapping
        public Mono<Void> submitMessageForProcessing(@RequestBody String body) {
            return messagesManager.processMessage(body.trim());
        }
    }

  4. Create the class file containing the messaging manager. The manager controls how applications components publish requests to the request queue and subsequently subscribe to the response queue to obtain processed results:

    /src/main/java/MessagesManager.java:

    package dev.snowdrop.vertx.sample.amqp;
    
    import java.util.List;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    import dev.snowdrop.vertx.amqp.AmqpClient;
    import dev.snowdrop.vertx.amqp.AmqpMessage;
    import dev.snowdrop.vertx.amqp.AmqpSender;
    import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.stereotype.Component;
    import reactor.core.Disposable;
    import reactor.core.publisher.Mono;
    
    import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_REQUESTS_QUEUE;
    import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_RESULTS_QUEUE;
    
    /**
     * Processor client submits messages to the requests queue and subscribes to the results queue for processed messages.
     */
    @Component
    public class MessagesManager implements InitializingBean, DisposableBean {
    
        private final Logger logger = LoggerFactory.getLogger(MessagesManager.class);
    
        private final List<String> processedMessages = new CopyOnWriteArrayList<>();
    
        private final AmqpClient client;
    
        private Disposable receiverDisposer;
    
        // Injecting EmbeddedActiveMQ to make sure it has started before creating this component.
        public MessagesManager(AmqpClient client, EmbeddedActiveMQ server) {
            this.client = client;
        }
    
        /**
         * Create a processed messages receiver and subscribe to its messages publisher.
         */
        @Override
        public void afterPropertiesSet() {
            receiverDisposer = client.createReceiver(PROCESSING_RESULTS_QUEUE)
                .flatMapMany(receiver -> receiver.flux()
                    .doOnCancel(() -> receiver.close().block())) // Close the receiver once subscription is disposed
                .subscribe(this::handleMessage);
        }
    
        /**
         * Cancel processed messages publisher subscription.
         */
        @Override
        public void destroy() {
            if (receiverDisposer != null) {
                receiverDisposer.dispose();
            }
        }
    
        /**
         * Get messages which were processed up to this moment.
         *
         * @return List of processed messages.
         */
        public List<String> getProcessedMessages() {
            return processedMessages;
        }
    
        /**
         * Submit a message for processing by publishing it to a processing requests queue.
         *
         * @param body Message body to be processed.
         * @return Mono which is completed once the message is sent.
         */
        public Mono<Void> processMessage(String body) {
            logger.info("Sending message '{}' for processing", body);
    
            AmqpMessage message = AmqpMessage.create()
                .withBody(body)
                .build();
    
            return client.createSender(PROCESSING_REQUESTS_QUEUE)
                .map(sender -> sender.send(message))
                .flatMap(AmqpSender::close);
        }
    
        private void handleMessage(AmqpMessage message) {
            String body = message.bodyAsString();
    
            logger.info("Received processed message '{}'", body);
            processedMessages.add(body);
        }
    }

  5. Create the class file containing the uppercase processor that receives text strings from the request queue and converts them to uppercase characters. The processor subsequently publishes the results to the response queue:

    /src/main/java/UppercaseProcessor.java

    package dev.snowdrop.vertx.sample.amqp;
    
    import dev.snowdrop.vertx.amqp.AmqpClient;
    import dev.snowdrop.vertx.amqp.AmqpMessage;
    import dev.snowdrop.vertx.amqp.AmqpSender;
    import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.stereotype.Component;
    import reactor.core.Disposable;
    import reactor.core.publisher.Mono;
    
    import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_REQUESTS_QUEUE;
    import static dev.snowdrop.vertx.sample.amqp.AmqpSampleApplication.PROCESSING_RESULTS_QUEUE;
    
    /**
     * Uppercase processor subscribes to the requests queue, converts each received message to uppercase and send it to the
     * results queue.
     */
    @Component
    public class UppercaseProcessor implements InitializingBean, DisposableBean {
    
        private final Logger logger = LoggerFactory.getLogger(UppercaseProcessor.class);
    
        private final AmqpClient client;
    
        private Disposable receiverDisposer;
    
        // Injecting EmbeddedActiveMQ to make sure it has started before creating this component.
        public UppercaseProcessor(AmqpClient client, EmbeddedActiveMQ server) {
            this.client = client;
        }
    
        /**
         * Create a processing requests receiver and subscribe to its messages publisher.
         */
        @Override
        public void afterPropertiesSet() {
            receiverDisposer = client.createReceiver(PROCESSING_REQUESTS_QUEUE)
                .flatMapMany(receiver -> receiver.flux()
                    .doOnCancel(() -> receiver.close().block())) // Close the receiver once subscription is disposed
                .flatMap(this::handleMessage)
                .subscribe();
        }
    
        /**
         * Cancel processing requests publisher subscription.
         */
        @Override
        public void destroy() {
            if (receiverDisposer != null) {
                receiverDisposer.dispose();
            }
        }
    
        /**
         * Convert the message body to uppercase and send it to the results queue.
         */
        private Mono<Void> handleMessage(AmqpMessage originalMessage) {
            logger.info("Processing '{}'", originalMessage.bodyAsString());
    
            AmqpMessage processedMessage = AmqpMessage.create()
                .withBody(originalMessage.bodyAsString().toUpperCase())
                .build();
    
            return client.createSender(PROCESSING_RESULTS_QUEUE)
                .map(sender -> sender.send(processedMessage))
                .flatMap(AmqpSender::close);
        }
    }

  6. OPTIONAL: Run and test your application locally:

    1. Navigate to the root directory of your Maven project:

      $ cd myApp
    2. Package your application:

      $ mvn clean package
    3. Start your application from the command line:

      $ java -jar target/vertx-spring-boot-sample-amqp.jar
    4. In a new terminal window, send a number of HTTP POST request that contain text strings to be processed to localhost

      $ curl -H "Content-Type: text/plain" -d 'Hello, World' -X POST http://localhost:8080
      $ curl -H "Content-Type: text/plain" -d 'Hello again' -X POST http://localhost:8080
    5. Send an HTTP GET request to localhost. You receive a HTTP response with the strings in uppercase.

      $ curl http://localhost:8080
      HTTP/1.1 200 OK
      Content-Type: text/event-stream;charset=UTF-8
      transfer-encoding: chunked
      
      data:HELLO, WORLD
      
      data:HELLO AGAIN

Additional resources

5.14. Apache Kafka

Apache Kafka is a scalable messaging integration system that is designed to exchange messages between processes, applications, and services. Kafka is based on clusters with one or more brokers that maintain a set of topics. Essentially, topics are categories that can be defined for every cluster using topic IDs. Each topic contains pieces of data called records that contain information about the events taking place in your application. Applications connected to the system can add records to these topics, or process and reprocess messages added earlier.

The broker is responsible for handling the communication with client applications and for managing the records in the topic. To ensure that no records are lost, the broker tracks all records in a commit log and keeps track of an offset value for each application. The offset is similar to a pointer that indicates the most recently added record.

Applications can pull the latest records from the topic, or they can change the offset to read records that have been added earlier earlier message. This functionality prevents client applications from becoming overwhelmed with incoming requests in case they can not process them in real time. When this happens, Kafka prevents loss of data by storing records that cannot be processed in real time in the commit log. when the client application is able to catch up with the incoming requests, it resumes processing records in real time

A broker can manage records in multiple topics by sorting them into topic partitions. Apache Kafka replicates these partitions to allow records form a single topic to be handled by multiple brokers in parallel, allowing you to scale the rate at which your applications process records in a topic. The replicated topic partitions (also called followers) are synchronized with the original topic partition (also called a Leader) to avoid redundancy in processing records. New records are committed to the Leader partition, Followers only replicate the changes made to the leader.

5.15. How the Apache Kafka reactive example works

This example application is based on a Publisher-Subscriber message streaming pattern implemented using an Apache Kafka. The components that the application consist of are:

  • The KafkaExampleApplication class that instantiates the log message producer and consumer
  • A WebFlux HTTP controller that is configured and provided by the Spring Boot HTTP Starter. The controller provides rest resources used to publish and read messages.
  • A KafkaLogger class that defines how the producer publishes messages to the log topic on Kafka.
  • A KafkaLog class that displays messages that the example application receives from the log topic on Kafka.

Publishing messages:

  1. You make an HTTP POST request to the example application with the log message as the payload.
  2. The HTTP controller routes the message to the REST endpoint used for publishing messages, and passes the message to the logger instance.
  3. The HTTP controller publishes the received message to the log topic on Kafka.
  4. KafkaLog instance receives the log message from a Kafka topic.

Reading messages:

  1. You send a HTTP GET request to the example application URL.
  2. The controller gets the messages from the KafkaLog instance and returns them as the body of the HTTP response.

5.16. Using Kafka in a reactive application

This example shows how you can create an example messaging application that uses Apache Kafka with Spring Boot and Eclipse Vert.x on Reactor Netty. The application publishes messages to a Kafka topic and then retrieves them and displays them when you send a request.

The Kafka configuration properties for message topics, URLs, and metadata used by the the Kafka cluster are stored in src/main/resources/application.yml.

Prerequisites

  • A Maven-based Java application project configured to use Spring Boot
  • JDK 8 or JDK 11 installed
  • Maven installed

Procedure

  1. Add the WebFlux HTTP Starter and the Apache Kafka Starter as dependencies in the pom.xml file of your application project:

    pom.xml

    ...
    <dependencies>
      ...
      <!-- Vert.x WebFlux starter used to handle HTTP requests -->
      <dependency>
        <groupId>dev.snowdrop</groupId>
        <artifactId>vertx-spring-boot-starter-http</artifactId>
      </dependency>
      <!-- Vert.x Kafka starter used to send and receive messages to/from Kafka cluster -->
      <dependency>
        <groupId>dev.snowdrop</groupId>
        <artifactId>vertx-spring-boot-starter-kafka</artifactId>
      </dependency>
      ...
    </dependencies>
    ...

  1. Create the KafkaLogger class. This class functions s a producer and sendas messages The KafkaLogger class defines how the Producer publishes messages (also called records) to the topic:

    /src/main/java/KafkaLogger.java

    ...
    final class KafkaLogger {
    
        private final KafkaProducer<String, String> producer;
    
        KafkaLogger(KafkaProducer<String, String> producer) {
            this.producer = producer;
        }
    
        public Mono<Void> logMessage(String body) {
            // Generic key and value types can be inferred if both key and value are used to create a builder
            ProducerRecord<String, String> record = ProducerRecord.<String, String>builder(LOG_TOPIC, body).build();
    
            return producer.send(record)
                .log("Kafka logger producer")
                .then();
        }
    }
    ...

  2. Crate KafkaLog class. This class functions as the consumer of kafka messages. KafkaLog retrieves messages from the topic an displays them in your terminal:

    /src/main/java/KafkaLog.java

    ...
    final class KafkaLog implements InitializingBean, DisposableBean {
    
        private final List<String> messages = new CopyOnWriteArrayList<>();
    
        private final KafkaConsumer<String, String> consumer;
    
        private Disposable consumerDisposer;
    
        KafkaLog(KafkaConsumer<String, String> consumer) {
            this.consumer = consumer;
        }
    
        @Override
        public void afterPropertiesSet() {
            consumerDisposer = consumer.subscribe(LOG_TOPIC)
                .thenMany(consumer.flux())
                .log("Kafka log consumer")
                .map(ConsumerRecord::value)
                .subscribe(messages::add);
        }
    
        @Override
        public void destroy() {
            if (consumerDisposer != null) {
                consumerDisposer.dispose();
            }
            consumer.unsubscribe()
                .block(Duration.ofSeconds(2));
        }
    
        public List<String> getMessages() {
            return messages;
        }
    }
    ...

  3. Create the class file that contains the the HTTP REST controller. The controller that exposes REST resources that your application uses to handle the logging and reading of messages.

    /src/main/java/Controller.java

    package dev.snowdrop.vertx.sample.kafka;
    
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    
    import static org.springframework.http.MediaType.TEXT_EVENT_STREAM_VALUE;
    
    /**
     * HTTP controller exposes GET and POST resources to log messages and to receive the previously logged ones.
     */
    @RestController
    public class Controller {
    
        private final KafkaLogger logger;
    
        private final KafkaLog log;
    
        public Controller(KafkaLogger logger, KafkaLog log) {
            this.logger = logger;
            this.log = log;
        }
    
        /**
         * Get a Flux of previously logged messages.
         */
        @GetMapping(produces = TEXT_EVENT_STREAM_VALUE)
        public Flux<String> getMessages() {
            return Flux.fromIterable(log.getMessages());
        }
    
        /**
         * Log a message.
         */
        @PostMapping
        public Mono<Void> logMessage(@RequestBody String body) {
            return logger.logMessage(body.trim());
        }
    }

  4. Crate the YAML template that contains the URLs that producers and consumers in your Apache Kafka Cluster use to log and read messages. In this example, the consumer and producer on your Apache Kafka Cluster communicate using port 9092 on localhost by default. Note, that you must configure the producers and consumers separately, as the following example shows:

    /src/main/resources/application.yml

    vertx:
      kafka:
        producer:
          bootstrap:
             # The producer in your cluster uses this URL to publish messages to the log.
            servers: localhost:9092
          key:
              # This class assigns the mandatory key attribute that is assigned to each message.
            serializer: org.apache.kafka.common.serialization.StringSerializer
          value:
              # This class assigns the mandatory value attribute that is assigned to each message.
            serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          bootstrap:
            servers: localhost:9092 # The consumer in your cluster uses this URL to read messages from the log.
          group:
            id: log # The consumer group IDs used to define a group of consumers that subscribe to the same topic. In this example, all consumers belong in the same consumer group.
          key:
            deserializer: org.apache.kafka.common.serialization.StringDeserializer # This class generates the mandatory key attribute that is assigned to each message.
          value:
            deserializer: org.apache.kafka.common.serialization.StringDeserializer # This class generates the mandatory value attribute that is assigned to each message.

  5. OPTIONAL: Run and test your application locally:

    1. Navigate to the root directory of your Maven project:

      $ cd vertx-spring-boot-sample-kafka
    2. Package your application:

      $ mvn clean package
    3. Start your application from the command line:

      $ java -jar target/vertx-spring-boot-sample-kafka.jar
    4. In a new terminal window, send a number of HTTP POST request that contain messages formatted as text strings to localhost. The messages are all published to the log topic.

      $ curl -H "Content-Type: text/plain" -d 'Hello, World' -X POST http://localhost:8080
      $ curl -H "Content-Type: text/plain" -d 'Hello again' -X POST http://localhost:8080
      ...
    5. Send an HTTP GET request to localhost. You receive a HTTP response that contains all the messages in the topic that your consumers subscribe to.

      $ curl http://localhost:8080
      HTTP/1.1 200 OK
      Content-Type: text/event-stream;charset=UTF-8
      transfer-encoding: chunked
      
      data:Hello, World
      
      data:Hello, again
      ...

Additional resources

In addition to using an example, you can also use Spring Boot with Eclipse Vert.x to create new Spring Boot applications from scratch and deploy them to OpenShift.