48.5. 异步响应

48.5.1. 服务器上的异步处理

概述

服务器端异步调用的目的是实现更有效的线程使用,并最终避免因为所有服务器的请求线程都被阻断而拒绝客户端连接尝试的情况。当异步处理调用时,请求线程会立即释放。

注意

请注意,即使服务器端启用了异步处理,客户端 仍会被阻断,直到服务器收到响应为止。如果要在客户端中看到异步行为,则必须实现客户端异步处理。请参阅 第 49.6 节 “客户端上的异步处理”

用于异步处理的基本模型

图 48.1 “同步处理的线程模型” 展示了在服务器端异步处理的基本模型概述。

图 48.1. 同步处理的线程模型

asyncresponse 01

在概述中,请求按照异步模型中的如下处理:

  1. 异步资源方法在请求线程内调用(并接收对 AsyncResponse 对象的引用,稍后需要它来发回响应)。
  2. 资源方法将暂停的请求封装在可运行对象中,其中包含处理请求所需的所有信息和处理逻辑。
  3. 资源方法将 Runnable 对象推送到 executor 线程池的块队列中。
  4. 资源方法现在可以返回,从而释放请求线程。
  5. Runnable 对象进入队列的顶部时,它将由 executor 线程池中的其中一个线程处理。封装的 AsyncResponse 对象随后用于向客户端发送响应。

使用 Java 执行程序进行线程池实施

java.util.concurrent API 是一个强大的 API,可让您轻松创建完整的线程池实施。在 Java 并发 API 术语中,线程池称为 executor。它只需要一行代码来创建完整的工作线程池,包括工作线程和馈送的队列。

例如,要创建一个完整的工作线程池,如 图 48.1 “同步处理的线程模型” 中显示的 Executor Thread Pool,创建一个 java.util.concurrent.Executor 实例,如下所示:

Executor executor = new ThreadPoolExecutor(
    5,                                    // Core pool size
    5,                                    // Maximum pool size
    0,                                    // Keep-alive time
    TimeUnit.SECONDS,                     // Time unit
    new ArrayBlockingQueue<Runnable>(10)  // Blocking queue
);

此构造器创建一个新的线程池,其中包含五个线程,由单一块队列 fed,其中可容纳最多 10 个 可运行 对象。要向线程池提交任务,请调用 executor.execute 方法,传递对 可运行的 对象的引用(封装异步任务)。

定义异步资源方法

要定义异步的资源方法,请使用 @Suspended 注释,注入类型为 javax.ws.rs.container.AsyncResponse 的参数,并确保方法返回了。例如:

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(@PathParam("id") String id,
                                    @Suspended AsyncResponse response) {
        ...
    }
    ...
}

请注意,资源方法必须返回 void,因为稍后将使用注入的 AsyncResponse 对象来返回响应。

AsyncResponse 类

javax.ws.rs.container.AsyncResponse 类在传入的客户端连接上提供了一个抽象处理。当 AsyncResponse 对象注入资源方法时,底层 TCP 客户端连接最初 处于挂起状态。稍后,当您准备好返回响应时,可以通过调用 AsyncResponse 实例来重新激活底层 TCP 客户端连接并返回响应。或者,如果您需要中止调用,可以在 AsyncResponse 实例中调用 cancel

封装一个挂起的请求作为可运行

图 48.1 “同步处理的线程模型” 中显示的异步处理场景中,您要将挂起的请求推送到队列,以便稍后被专用线程池处理。为使此方法正常工作,您需要有某种方式在对象中 封装 已暂停的请求。暂停的请求对象需要封装以下操作:

  • 来自传入请求的参数(若有)。
  • AsyncResponse 对象在传入的客户端连接上提供句柄,以及发回响应的方法。
  • 调用的逻辑。

封装这些问题的便捷方法是定义一个可运行的类来代表暂停的请求,其中 Runnable .run() 方法封装调用的逻辑。执行此操作的最好方法是将 Runnable 作为本地类实施,如下例所示。

异步处理示例

要实施异步处理场景,资源方法的实施必须将可运行的对象(代表暂停的请求)传递到 executor 线程池。在 Java 7 和 8 中,您可以利用一些 novel 语法将可运行类定义为一个本地类,如下例所示:

// Java
package org.apache.cxf.systest.jaxrs;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

import org.apache.cxf.phase.PhaseInterceptorChain;

@Path("/bookstore")
public class BookContinuationStore {

    private Map<String, String> books = new HashMap<String, String>();
    private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                        new ArrayBlockingQueue<Runnable>(10));

    public BookContinuationStore() {
        init();
    }
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(final @PathParam("id") String id,
                                    final @Suspended AsyncResponse response) {
        executor.execute(new Runnable() {
            public void run() {
                // Retrieve the book data for 'id'
                // which is presumed to be a very slow, blocking operation
                // ...
                bookdata = ...
                // Re-activate the client connection with 'resume'
                // and send the 'bookdata' object as the response
                response.resume(bookdata);
            }
        });
    }
    ...
}

注意资源方法参数、ID响应 方式如何直接传递到可运行本地类的定义中。这个特殊语法可让您直接在 Runnable.run() 方法中使用资源方法参数,而无需在本地类中定义对应的字段。

重要

为了实现这种特殊语法,资源方法参数 必须声明最终 (这意味着在方法实施中不得更改它们)。