48.5. 非同期応答

48.5.1. サーバー上の非同期処理

概要

サーバー側での呼び出しの非同期処理の目的は、スレッドをより効率的に使用できるようにすることで、最終的に、サーバーの要求スレッドがブロックされていることが原因でクライアントの接続試行が拒否されるというシナリオを回避することです。呼び出しが非同期的に処理されると、要求スレッドはほぼすぐに解放されます。

注記

サーバー側で非同期処理が有効になっている場合でも、サーバーから応答を受信するまでクライアントは、ブロックされたまま になることに注意してください。クライアント側で非同期動作を実行するには、クライアント側の非同期処理を実装する必要があります。「クライアントの非同期処理」を参照してください。

非同期処理の基本モデル

図48.1「非同期処理のスレッドモデル」 は、サーバー側での非同期処理の基本モデルの概要を示しています。

図48.1 非同期処理のスレッドモデル

asyncresponse 01

つまり、リクエストは非同期モデルで以下のように処理されます。

  1. 非同期リソースメソッドはリクエストスレッド内で呼び出されます (その後応答の送信に必要な AsyncResponse オブジェクトへの参照を受け取ります) 。
  2. リソースメソッドは、Runnable オブジェクトで一時停止されたリクエストをカプセル化します。これには、リクエストを処理するのに必要なすべての情報および処理ロジックが含まれます。
  3. リソースメソッドは、Runnable オブジェクトをエグゼキュータースレッドプールのブロッキングキューにプッシュします。
  4. リソースメソッドが返されるようになったため、要求スレッドが解放されます。
  5. Runnable オブジェクトがキューの先頭に到達すると、エグゼキュータースレッドプールのスレッドの 1 つによって処理されます。カプセル化された AsyncResponse オブジェクトは、応答をクライアントに送り返すために使用されます。

Java エグゼキューターを使用したスレッドプールの実装

java.util.concurrent API は、完全なスレッドプールの実装を非常に簡単に作成できる強力な API です。Java 同時実行 API の用語では、スレッドプールは エグゼキューター と呼ばれます。作業スレッドやそれを提供するブロッキングキューなど、完全な作業スレッドプールを作成するには、コード 1 行のみが必要です。

たとえば、図48.1「非同期処理のスレッドモデル」 に記載されている Executor Thread Pool のような完全な作業スレッドプールを作成するには、以下のように java.util.concurrent.Executor インスタンスを作成します。

Executor executor = new ThreadPoolExecutor(
    5,                                    // Core pool size
    5,                                    // Maximum pool size
    0,                                    // Keep-alive time
    TimeUnit.SECONDS,                     // Time unit
    new ArrayBlockingQueue<Runnable>(10)  // Blocking queue
);

このコンストラクタは、5 つのスレッドを持つ新しいスレッドプールを作成し、最大 10 個の Runnable オブジェクトを保持できる単一のブロッキングキューによって供給されます。スレッドプールにタスクを送信するには、executor.execute メソッドを呼び出して、Runnable オブジェクト (非同期タスクをカプセル化する) への参照を渡します。

非同期リソースメソッドの定義

非同期であるリソースメソッドを定義するには、@Suspended アノテーションを使用して型 javax.ws.rs.container.AsyncResponse の引数を注入し、メソッドが void を返すことを確認します。以下に例を示します。

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(@PathParam("id") String id,
                                    @Suspended AsyncResponse response) {
        ...
    }
    ...
}

注入された AsyncResponse オブジェクトは後でレスポンスを返すために使用されるため、リソースメソッドは void を返す必要があることに注意してください。

AsyncResponse クラス

javax.ws.rs.container.AsyncResponse クラスは、受信クライアントコネクションにおける抽象ハンドルを提供します。AsyncResponse オブジェクトがリソースメソッドに注入されると、ベースとなる TCP クライアントコネクションは最初は 一時停止 状態になります。後で応答を返す準備ができたら、AsyncResponse インスタンスで resume を呼び出すことで、基礎となる TCP クライアントコネクションを再度アクティブにし、レスポンスを返すことができます。または、呼び出しを中止する必要がある場合は、AsyncResponse インスタンスで cancel を呼び出すことができます。

一時停止した要求の Runnable としてのカプセル化

図48.1「非同期処理のスレッドモデル」 の非同期処理シナリオでは、一時停止された要求をキューにプッシュします。そのキューから、後で専用のスレッドプールで処理できます。ただし、このアプローチを機能させるには、オブジェクトで一時停止された要求を カプセル化 する方法が必要です。一時停止された要求オブジェクトは以下をカプセル化する必要があります。

  • 受信要求からのパラメーター (存在する場合)。
  • AsyncResponse オブジェクト。受信クライアントコネクションのハンドルと、レスポンスの返信方法を提供します。
  • 呼び出しのロジック。

これらをカプセル化する便利な方法は、Runnable クラスを定義して一時停止されたリクエストを表します。ここで、Runnable.run() メソッドは呼び出しのロジックをカプセル化します。これを実行する最も一般的な方法は、以下の例のように Runnable をローカルクラスとして実装することです。

非同期処理の例

非同期処理シナリオを実装するには、リソースメソッドの実装は Runnable オブジェクト (一時停止されたリクエストを表します) をエグゼキュータースレッドプールに渡す必要があります。Java 7 および 8 では、次の例に示すように、いくつかの新しい構文を利用して、Runnable クラスをローカルクラスとして定義できます。

// Java
package org.apache.cxf.systest.jaxrs;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

import org.apache.cxf.phase.PhaseInterceptorChain;

@Path("/bookstore")
public class BookContinuationStore {

    private Map<String, String> books = new HashMap<String, String>();
    private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                        new ArrayBlockingQueue<Runnable>(10));

    public BookContinuationStore() {
        init();
    }
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(final @PathParam("id") String id,
                                    final @Suspended AsyncResponse response) {
        executor.execute(new Runnable() {
            public void run() {
                // Retrieve the book data for 'id'
                // which is presumed to be a very slow, blocking operation
                // ...
                bookdata = ...
                // Re-activate the client connection with 'resume'
                // and send the 'bookdata' object as the response
                response.resume(bookdata);
            }
        });
    }
    ...
}

リソースメソッド引数 id および response が、Runnable ローカルクラスの定義に直接渡される方法に注意してください。この特別な構文を使用すると、ローカルクラスの対応するフィールドを定義しなくても、Runnable.run() メソッドでリソースメソッドの引数を直接使用できます。

重要

この特別な構文を機能させるには、リソースメソッドパラメーターを final として宣言する 必要 があります (メソッド実装で変更できないことを意味します)。