48.5. 异步响应

48.5.1. 服务器上的异步处理

概述

服务器端异步调用的目的是实现更有效的线程使用,并最终避免因为所有服务器的请求线程都被阻断而拒绝客户端连接尝试的情况。当异步处理调用时,请求线程会立即释放。

注意

请注意,即使服务器端启用了异步处理,客户端 仍会被阻断,直到服务器收到响应为止。如果要在客户端中看到异步行为,则必须实现客户端异步处理。请参阅 第 49.6 节 “客户端上的异步处理”

用于异步处理的基本模型

图 48.1 “同步处理的线程模型” 展示了在服务器端异步处理的基本模型概述。

图 48.1. 同步处理的线程模型

asyncresponse 01

在概述中,请求按照异步模型中的如下处理:

  1. 异步资源方法在请求线程内调用(并接收对 AsyncResponse 对象的引用,稍后需要它来发回响应)。
  2. 资源方法将暂停的请求封装在可运行对象中,其中包含处理请求所需的所有信息和处理逻辑。
  3. 资源方法将 Runnable 对象推送到 executor 线程池的块队列中。
  4. 资源方法现在可以返回,从而释放请求线程。
  5. Runnable 对象进入队列的顶部时,它将由 executor 线程池中的其中一个线程处理。封装的 AsyncResponse 对象随后用于向客户端发送响应。

使用 Java 执行程序进行线程池实施

java.util.concurrent API 是一个强大的 API,可让您轻松创建完整的线程池实施。在 Java 并发 API 术语中,线程池称为 executor。它只需要一行代码来创建完整的工作线程池,包括工作线程和馈送的队列。

例如,要创建一个完整的工作线程池,如 图 48.1 “同步处理的线程模型” 中显示的 Executor Thread Pool,创建一个 java.util.concurrent.Executor 实例,如下所示:

Executor executor = new ThreadPoolExecutor(
    5,                                    // Core pool size
    5,                                    // Maximum pool size
    0,                                    // Keep-alive time
    TimeUnit.SECONDS,                     // Time unit
    new ArrayBlockingQueue<Runnable>(10)  // Blocking queue
);

此构造器创建一个新的线程池,其中包含五个线程,由单一块队列 fed,其中可容纳最多 10 个 可运行 对象。要向线程池提交任务,请调用 executor.execute 方法,传递对 可运行的 对象的引用(封装异步任务)。

定义异步资源方法

要定义异步的资源方法,请使用 @Suspended 注释,注入类型为 javax.ws.rs.container.AsyncResponse 的参数,并确保方法返回了。例如:

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(@PathParam("id") String id,
                                    @Suspended AsyncResponse response) {
        ...
    }
    ...
}

请注意,资源方法必须返回 void,因为稍后将使用注入的 AsyncResponse 对象来返回响应。

AsyncResponse 类

javax.ws.rs.container.AsyncResponse 类在传入的客户端连接上提供了一个抽象处理。当 AsyncResponse 对象注入资源方法时,底层 TCP 客户端连接最初 处于挂起状态。稍后,当您准备好返回响应时,可以通过调用 AsyncResponse 实例来重新激活底层 TCP 客户端连接并返回响应。或者,如果您需要中止调用,可以在 AsyncResponse 实例中调用 cancel

封装一个挂起的请求作为可运行

图 48.1 “同步处理的线程模型” 中显示的异步处理场景中,您要将挂起的请求推送到队列,以便稍后被专用线程池处理。为使此方法正常工作,您需要有某种方式在对象中 封装 已暂停的请求。暂停的请求对象需要封装以下操作:

  • 来自传入请求的参数(若有)。
  • AsyncResponse 对象在传入的客户端连接上提供句柄,以及发回响应的方法。
  • 调用的逻辑。

封装这些问题的便捷方法是定义一个可运行的类来代表暂停的请求,其中 Runnable .run() 方法封装调用的逻辑。执行此操作的最好方法是将 Runnable 作为本地类实施,如下例所示。

异步处理示例

要实施异步处理场景,资源方法的实施必须将可运行的对象(代表暂停的请求)传递到 executor 线程池。在 Java 7 和 8 中,您可以利用一些 novel 语法将可运行类定义为一个本地类,如下例所示:

// Java
package org.apache.cxf.systest.jaxrs;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

import org.apache.cxf.phase.PhaseInterceptorChain;

@Path("/bookstore")
public class BookContinuationStore {

    private Map<String, String> books = new HashMap<String, String>();
    private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                        new ArrayBlockingQueue<Runnable>(10));

    public BookContinuationStore() {
        init();
    }
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(final @PathParam("id") String id,
                                    final @Suspended AsyncResponse response) {
        executor.execute(new Runnable() {
            public void run() {
                // Retrieve the book data for 'id'
                // which is presumed to be a very slow, blocking operation
                // ...
                bookdata = ...
                // Re-activate the client connection with 'resume'
                // and send the 'bookdata' object as the response
                response.resume(bookdata);
            }
        });
    }
    ...
}

注意资源方法参数、ID响应 方式如何直接传递到可运行本地类的定义中。这个特殊语法可让您直接在 Runnable.run() 方法中使用资源方法参数,而无需在本地类中定义对应的字段。

重要

为了实现这种特殊语法,资源方法参数 必须声明最终 (这意味着在方法实施中不得更改它们)。

48.5.2. 超时和超时处理程序

概述

异步处理模型还支持在 REST 调用中设定超时。默认情况下,超时会导致将 HTTP 错误响应发回到客户端。但是,您还可以选择注册超时处理程序回调,它可让您自定义对超时事件的响应。

在没有处理程序的情况下设置超时示例

要定义简单的调用超时,但不指定超时处理程序,调用 AsyncResponse 对象上的 setTimeout 方法,如下例所示:

// Java
// Java
...
import java.util.concurrent.TimeUnit;
...
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("/books/defaulttimeout")
    public void getBookDescriptionWithTimeout(@Suspended AsyncResponse async) {
        async.setTimeout(2000, TimeUnit.MILLISECONDS);
        // Optionally, send request to executor queue for processing
        // ...
    }
    ...
}

请注意,您可以使用 java.util.concurrent.TimeUnit 类中的任何时间单位来指定超时值。前面的示例没有显示将请求发送到 executor 线程池的代码。如果您只想测试超时,您可以在资源方法正文中包含对 async.SetTimeout 的调用,并且每次调用时将触发超时。

AsyncResponse.NO_TIMEOUT 值代表无限超时。

默认超时(haviour)

默认情况下,如果触发调用超时,JAX-RS 运行时会引发 ServiceUnavailableException 异常,并发回一个 HTTP 错误响应,其状态为 503

TimeoutHandler 接口

如果要自定义超时,您必须通过实施 TimeoutHandler 接口来定义超时处理程序:

// Java
package javax.ws.rs.container;

public interface TimeoutHandler {
    public void handleTimeout(AsyncResponse asyncResponse);
}

当您覆盖实现类中的 handleTimeout 方法时,您可以选择以下方法来处理超时:

  • 通过调用 asyncResponse.cancel 方法来取消响应。
  • 通过调用 asyncResponse.resume 方法以及响应值来发送响应。
  • 通过调用 asyncResponse.setTimeout 方法来扩展等待的周期。(例如,要等待更多 10 秒,您可以调用 asyncResponse.setTimeout(10, TimeUnit.SECONDS))。

使用处理程序设置超时示例

要使用超时处理程序定义调用超时,在 AsyncResponse 对象上调用 setTimeout 方法和 setTimeoutHandler 方法,如下例所示:

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("/books/cancel")
    public void getBookDescriptionWithCancel(@PathParam("id") String id,
                                             @Suspended AsyncResponse async) {
        async.setTimeout(2000, TimeUnit.MILLISECONDS);
        async.setTimeoutHandler(new CancelTimeoutHandlerImpl());
        // Optionally, send request to executor queue for processing
        // ...
    }
    ...
}

在本例中,注册了 CancelTimeoutHandlerImpl 超时处理程序的实例,以处理调用超时。

使用超时处理程序取消响应

CancelTimeoutHandlerImpl 超时处理程序定义如下:

// Java
...
import javax.ws.rs.container.AsyncResponse;
...
import javax.ws.rs.container.TimeoutHandler;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    private class CancelTimeoutHandlerImpl implements TimeoutHandler {

        @Override
        public void handleTimeout(AsyncResponse asyncResponse) {
            asyncResponse.cancel();
        }

    }
    ...
}

AsyncResponse 对象上调用 cancel 的效果是向客户端发送 HTTP 503(服务不可用)错误响应。您可以选择为 cancel 方法指定参数(即 intjava.util.Date 值),用于在响应消息中设置 Retry-After: HTTP 标头。客户端通常会忽略 Retry-After: 标头。

在可运行的实例中处理已取消响应

如果您封装了作为可运行的实例暂停的请求(在 executor 线程池中处理),您可能会发现,线程池开始处理的时间可能会取消。 因此,您很难在可运行的实例中添加一些代码,这使它能够应对取消的 AsyncResponse 对象。例如:

// Java
...
@Path("/bookstore")
public class BookContinuationStore {
    ...
    private void sendRequestToThreadPool(final String id, final AsyncResponse response) {

        executor.execute(new Runnable() {
            public void run() {
                if ( !response.isCancelled() ) {
                    // Process the suspended request ...
                    // ...
                }
            }
        });

    }
    ...
}

48.5.3. 处理丢弃的连接

概述

可以添加一个回调来处理客户端连接丢失的情况。

ConnectionCallback 接口

要为丢弃的连接添加回调,您必须实现 javax.ws.rs.container.ConnectionCallback 接口,该接口定义如下:

// Java
package javax.ws.rs.container;

public interface ConnectionCallback {
    public void onDisconnect(AsyncResponse disconnected);
}

注册连接回调

在实施连接回调后,您必须调用其中一个 寄存器 方法将它注册到当前的 AsyncResponse 对象。例如,若要注册类型为 MyConnectionCallback 的连接回调:

asyncResponse.register(new MyConnectionCallback());

连接回调的典型场景

通常,实施连接回调的主要原因是,释放与丢弃客户端连接关联的资源(您可以使用 AsyncResponse 实例作为键,以标识需要释放的资源)。

48.5.4. 注册回调

概述

您可以选择将回调添加到 AsyncResponse 实例,以便在调用完成时获得通知。当可以调用此回调时,处理中有两个替代点,分别是:

  • 请求处理完成后,响应已被发送回客户端,或者
  • 请求处理完成后,向托管 I/O 容器传播一个未映射的 Throwable

CompletionCallback 接口

要添加完成回调,您必须实施 javax.ws.rs.container.CompletionCallback 接口,其定义如下:

// Java
package javax.ws.rs.container;

public interface CompletionCallback {
    public void onComplete(Throwable throwable);
}

通常,可迭代 的参数为 null。但是,如果请求处理导致了未映射异常,则可 抛出 包含未映射异常实例。

注册完成回调

在实施完回调后,您必须调用其中一个 寄存器 方法将它注册到当前的 AsyncResponse 对象。例如,若要注册类型为 MyCompletionCallback 的完成回调:

asyncResponse.register(new MyCompletionCallback());