Red Hat Training

A Red Hat training course is available for Red Hat Fuse

47.5. Asynchronous Response

47.5.1. Asynchronous Processing on the Server

Overview

The purpose of asynchronous processing of invocations on the server side is to enable more efficient use of threads and, ultimately, to avoid the scenario where client connection attempts are refused because all of the server's request threads are blocked. When an invocation is processed asynchronously, the request thread is freed up almost immediately.
Note
Note that even when asynchronous processing is enabled on the server side, a client will still remain blocked until it receives a response from the server. If you want to see asynchronous behaviour on the client side, you must implement client-side asynchronous processing. See ???.

Basic model for asynchronous processing

Figure 47.1, “Threading Model for Asynchronous Processing” shows an overview of the basic model for asynchronous processing on the server side.

Figure 47.1. Threading Model for Asynchronous Processing

Threading Model for Asynchronous Processing
In outline, a request is processed as follows in the asynchronous model:
  1. An asynchronous resource method is invoked within a request thread (and receives a reference to an AsyncResponse object, which will be needed later to send back the response).
  2. The resource method encapsulates the suspended request in a Runnable object, which contains all of the information and processing logic required to process the request.
  3. The resource method pushes the Runnable object onto the blocking queue of the executor thread pool.
  4. The resource method can now return, thus freeing up the request thread.
  5. When the Runnable object gets to the top of the queue, it is processed by one of the threads in the executor thread pool. The encapsulated AsyncResponse object is then used to send the response back to the client.

Thread pool implementation with Java executor

The java.util.concurrent API is a powerful API that enables you to create a complete thread pool implementation very easily. In the terminology of the Java concurrency API, a thread pool is called an executor. It requires only a single line of code to create a complete working thread pool, including the working threads and the blocking queue that feeds them.
For example, to create a complete working thread pool like the Executor Thread Pool shown in Figure 47.1, “Threading Model for Asynchronous Processing”, create a java.util.concurrent.Executor instance, as follows:
Executor executor = new ThreadPoolExecutor(
    5,                                    // Core pool size
    5,                                    // Maximum pool size
    0,                                    // Keep-alive time
    TimeUnit.SECONDS,                     // Time unit
    new ArrayBlockingQueue<Runnable>(10)  // Blocking queue
);
This constructor creates a new thread pool with five threads, fed by a single blocking queue with which can hold up to 10 Runnable objects. To submit a task to the thread pool, call the executor.execute method, passing in a reference to a Runnable object (which encapsulates the asynchronous task).

Defining an asynchronous resource method

To define a resource method that is asynchronous, inject an argument of type javax.ws.rs.container.AsyncResponse using the @Suspended annotation and make sure that the method returns void. For example:
// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

@Path("/bookstore")
public class BookContinuationStore {
    ...    
    @GET
    @Path("{id}")
    public void handleRequestInPool(@PathParam("id") String id, 
                                    @Suspended AsyncResponse response) {
        ...
    }
    ...
}
Note that the resource method must return void, because the injected AsyncResponse object will be used to return the response at a later time.

AsyncResponse class

The javax.ws.rs.container.AsyncResponse class provides an a abstract handle on an incoming client connection. When an AsyncResponse object is injected into a resource method, the underlying TCP client connection is initially in a suspended state. At a later time, when you are ready to return the response, you can re-activate the underlying TCP client connection and pass back the response, by calling resume on the AsyncResponse instance. Alternatively, if you need to abort the invocation, you could call cancel on the AsyncResponse instance.

Encapsulating a suspended request as a Runnable

In the asynchronous processing scenario shown in Figure 47.1, “Threading Model for Asynchronous Processing”, you push the suspended request onto a queue, from where it can be processed at a later time by a dedicated thread pool. In order for this approach to work, however, you need to have some way of encapsulating the suspended request in an object. The suspended request object needs to encapsulate the following things:
  • Parameters from the incoming request (if any).
  • The AsyncResponse object, which provides a handle on the incoming client connection and a way of sending back the response.
  • The logic of the invocation.
A convenient way to encapsulate these things is to define a Runnable class to represent the suspended request, where the Runnable.run() method encapsulates the logic of the invocation. The most elegant way to do this is to implement the Runnable as a local class, as shown in the following example.

Example of asynchronous processing

To implement the asynchronous processing scenario, the implementation of the resource method must pass a Runnable object (representing the suspended request) to the executor thread pool. In Java 7 and 8, you can exploit some novel syntax to define the Runnable class as a local class, as shown in the following example:
// Java
package org.apache.cxf.systest.jaxrs;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

import org.apache.cxf.phase.PhaseInterceptorChain;

@Path("/bookstore")
public class BookContinuationStore {

    private Map<String, String> books = new HashMap<String, String>();
    private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                        new ArrayBlockingQueue<Runnable>(10));
    
    public BookContinuationStore() {
        init();
    }    
    ...    
    @GET
    @Path("{id}")
    public void handleRequestInPool(final @PathParam("id") String id, 
                                    final @Suspended AsyncResponse response) {
        executor.execute(new Runnable() {
            public void run() {
                // Retrieve the book data for 'id'
                // which is presumed to be a very slow, blocking operation
                // ...
                bookdata = ...
                // Re-activate the client connection with 'resume'
                // and send the 'bookdata' object as the response
                response.resume(bookdata);
            }
        });
    }
    ...
}
Note how the resource method arguments, id and response, are passed straight into the definition of the Runnable local class. This special syntax enables you to use the resource method arguments directly in the Runnable.run() method, without having to define corresponding fields in the local class.
Important
In order for this special syntax to work, the resource method parameters must be declared as final (which implies that they must not be changed in the method implementation).

47.5.2. Timeouts and Timeout Handlers

Overview

The asynchronous processing model also provides support for imposing timeouts on REST invocations. By default, a timeout results in a HTTP error response being sent back to the client. But you also have the option of registering a timeout handler callback, which enables you to customize the response to a timeout event.

Example of setting a timeout without a handler

To define a simple invocation timeout, without specifying a timeout handler, call the setTimeout method on the AsyncResponse object, as shown in the following example:
// Java
// Java
...
import java.util.concurrent.TimeUnit;
...
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("/books/defaulttimeout")
    public void getBookDescriptionWithTimeout(@Suspended AsyncResponse async) {
        async.setTimeout(2000, TimeUnit.MILLISECONDS);
        // Optionally, send request to executor queue for processing
        // ...
    }
    ...
}
Note that you can specify the timeout value using any time unit from the java.util.concurrent.TimeUnit class. The preceding example does not show the code for sending the request to the executor thread pool. If you just wanted to test the timeout behaviour, you could include just the call to async.SetTimeout in the resource method body, and the timeout would be triggered on every invocation.
Tip
The AsyncResponse.NO_TIMEOUT value represents an infinite timeout.

Default timeout behaviour

By default, if the invocation timeout is triggered, the JAX-RS runtime raises a ServiceUnavailableException exception and sends back a HTTP error response with the status 503.

TimeoutHandler interface

If you want to customize the timeout behaviour, you must define a timeout handler, by implementing the TimeoutHandler interface:
// Java
package javax.ws.rs.container;

public interface TimeoutHandler {
    public void handleTimeout(AsyncResponse asyncResponse);
}
When you override the handleTimeout method in your implementation class, you can choose between the following approaches to dealing with the timeout:
  • Cancel the response, by calling the asyncResponse.cancel method.
  • Send a response, by calling the asyncResponse.resume method with the response value.
  • Extend the waiting period, by calling the asyncResponse.setTimeout method. (For example, to wait for a further 10 seconds, you could call asyncResponse.setTimeout(10, TimeUnit.SECONDS)).

Example of setting a timeout with a handler

To define an invocation timeout with a timeout handler, call both the setTimeout method and the setTimeoutHandler method on the AsyncResponse object, as shown in the following example:
// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("/books/cancel")
    public void getBookDescriptionWithCancel(@PathParam("id") String id, 
                                             @Suspended AsyncResponse async) {
        async.setTimeout(2000, TimeUnit.MILLISECONDS);
        async.setTimeoutHandler(new CancelTimeoutHandlerImpl());
        // Optionally, send request to executor queue for processing
        // ...
    }
    ...
}
Where this example registers an instance of the CancelTimeoutHandlerImpl timeout handler to handle the invocation timeout.

Using a timeout handler to cancel the response

The CancelTimeoutHandlerImpl timeout handler is defined as follows:
// Java
...
import javax.ws.rs.container.AsyncResponse;
...
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    private class CancelTimeoutHandlerImpl implements TimeoutHandler {

        @Override
        public void handleTimeout(AsyncResponse asyncResponse) {
            asyncResponse.cancel();
        }
        
    }
    ...
}
The effect of calling cancel on the AsyncResponse object is to send a HTTP 503 (Service unavailable) error response to the client. You can optionally specify an argument to the cancel method (either an int or a java.util.Date value), which would be used to set a Retry-After: HTTP header in the response message. Clients often ignore the Retry-After: header, however.

Dealing with a cancelled response in the Runnable instance

If you have encapsulated a suspended request as a Runnable instance, which is queued for processing in an executor thread pool, you might find that the AsyncResponse has been cancelled by the time the thread pool gets around to processing the request. For this reason, you ought to add some code to your Runnable instance, which enables it to cope with a cancelled AsyncResponse object. For example:
// Java
...
@Path("/bookstore")
public class BookContinuationStore {
    ...
    private void sendRequestToThreadPool(final String id, final AsyncResponse response) {
        
        executor.execute(new Runnable() {
            public void run() {
                if ( !response.isCancelled() ) {
                    // Process the suspended request ...
                    // ...
                }
            }
        });
        
    }
    ...
}

47.5.3. Handling Dropped Connections

Overview

It is possible to add a callback to deal with the case where the client connection is lost.

ConnectionCallback interface

To add a callback for dropped connections, you must implement the javax.ws.rs.container.ConnectionCallback interface, which is defined as follows:
// Java
package javax.ws.rs.container;

public interface ConnectionCallback {
    public void onDisconnect(AsyncResponse disconnected);
}

Registering a connection callback

After implementing a connection callback, you must register it with the current AsyncResponse object, by calling one of the register methods. For example, to register a connection callback of type, MyConnectionCallback:
asyncResponse.register(new MyConnectionCallback());

Typical scenario for connection callback

Typically, the main reason for implementing a connection callback would be to free up resources associated with the dropped client connection (where you could use the AsyncResponse instance as the key to identify the resources that need to be freed).

47.5.4. Registering Callbacks

Overview

You can optionally add a callback to an AsyncResponse instance, in order to be notified when the invocation has completed. There are two alternative points in the processing when this callback can be invoked, either:
  • After the request processing is finished and the response has already been sent back to the client, or
  • After the request processing is finished and an unmapped Throwable has been propagated to the hosting I/O container.

CompletionCallback interface

To add a completion callback, you must implement the javax.ws.rs.container.CompletionCallback interface, which is defined as follows:
// Java
package javax.ws.rs.container;

public interface CompletionCallback {
    public void onComplete(Throwable throwable);
}
Usually, the throwable argument is null. However, if the request processing resulted in an unmapped exception, throwable contains the unmapped exception instance.

Registering a completion callback

After implementing a completion callback, you must register it with the current AsyncResponse object, by calling one of the register methods. For example, to register a completion callback of type, MyCompletionCallback:
asyncResponse.register(new MyCompletionCallback());