Menu Close

48.5. 非同期応答

48.5.1. サーバーの非同期処理

概要

サーバー側での呼び出しの非同期処理の目的は、スレッドの効率的な使用を可能にし、最終的には、サーバーのすべての要求スレッドがブロックされてクライアント接続の試行が拒否されるシナリオを回避することです。呼び出しが非同期的に処理されると、リクエストスレッドはほぼすぐに解放されます。

注記

非同期処理がサーバー側で有効になっている場合でも、クライアントはサーバーから応答を受け取るまでブロックされたままであることに注意してください。。クライアント側で非同期動作を確認したい場合は、クライアント側の非同期処理を実装する必要があります。「クライアントでの非同期処理」 を参照してください。

非同期処理の基本モデル

図48.1「非同期処理のスレッドモデル」 は、サーバー側の非同期処理に対する基本モデルの概要を示しています。

図48.1 非同期処理のスレッドモデル

asyncresponse 01

簡単に説明すると、要求は非同期モデルで以下のように処理されます。

  1. 非同期リソースメソッドはリクエストスレッド内で呼び出されます (その後応答の送信に必要な AsyncResponse オブジェクトへの参照を受け取ります) 。
  2. リソースメソッドは、Runnable オブジェクトで一時停止されたリクエストをカプセル化します。これには、リクエストを処理するのに必要なすべての情報および処理ロジックが含まれます。
  3. リソースメソッドは、Runnable オブジェクトをエグゼキュータースレッドプールのブロッキングキューにプッシュします。
  4. リソースメソッドを返すことができ、要求スレッドが解放されます。
  5. Runnableオブジェクトがキューの先頭に到達すると、エグゼキュータースレッドプールのスレッドの 1 つによって処理されます。カプセル化された AsyncResponse オブジェクトは、応答をクライアントに送り返すために使用されます。

Java エグゼキューターでのスレッドプールの実装

java.util.concurrent API は、完全なスレッドプールの実装を非常に簡単に作成できる強力な API です。Java Concurrency API の用語では、スレッドプールは エグゼキューター と呼ばれます。完全な作業スレッドプールを作成するには、1 つの行のコードのみが必要になります。これには、作業スレッドやフィードするブロックキューが含まれます。

たとえば、図48.1「非同期処理のスレッドモデル」 に記載されている Executor Thread Pool のような完全な作業スレッドプールを作成するには、以下のように java.util.concurrent.Executor インスタンスを作成します。

Executor executor = new ThreadPoolExecutor(
    5,                                    // Core pool size
    5,                                    // Maximum pool size
    0,                                    // Keep-alive time
    TimeUnit.SECONDS,                     // Time unit
    new ArrayBlockingQueue<Runnable>(10)  // Blocking queue
);

このコンストラクタは、5 つのスレッドを持つ新しいスレッドプールを作成し、最大 10 個の Runnable オブジェクトを保持できる単一のブロッキングキューによって供給されます。スレッドプールにタスクを送信するには、executor.execute メソッドを呼び出して、Runnable オブジェクト (非同期タスクをカプセル化する) への参照を渡します。

非同期リソースメソッドの定義

非同期であるリソースメソッドを定義するには、@Suspended アノテーションを使用して型 javax.ws.rs.container.AsyncResponse の引数を注入し、メソッドが void を返すことを確認します。以下に例を示します。

// Java
...
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;

@Path("/bookstore")
public class BookContinuationStore {
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(@PathParam("id") String id,
                                    @Suspended AsyncResponse response) {
        ...
    }
    ...
}

注入された AsyncResponse オブジェクトは後でレスポンスを返すために使用されるため、リソースメソッドは void を返す必要があることに注意してください。

AsyncResponse クラス

javax.ws.rs.container.AsyncResponse クラスは、受信クライアントコネクションにおける抽象ハンドルを提供します。AsyncResponse オブジェクトがリソースメソッドに注入されると、ベースとなる TCP クライアントコネクションは最初は 一時停止 状態になります。後で応答を返す準備ができたら、AsyncResponse インスタンスで resume を呼び出すことで、基礎となる TCP クライアントコネクションを再度アクティブにし、レスポンスを返すことができます。または、呼び出しを中止する必要がある場合は、AsyncResponse インスタンスで cancel を呼び出すことができます。

一時停止したリクエストを Runnable としてカプセル化

図48.1「非同期処理のスレッドモデル」 にある非同期処理のシナリオでは、一時停止されたリクエストをキューにプッシュし、そのリクエストは後で専用のスレッドプールによって処理できます。ただし、このやり方が機能するには、オブジェクトで一時停止されたリクエストを カプセル 化する方法が必要です。一時停止されたリクエストオブジェクトは、以下をカプセル化する必要があります。

  • 受信リクエストからのパラメーター (ある場合)。
  • AsyncResponse オブジェクト。受信クライアントコネクションのハンドルと、レスポンスの返信方法を提供します。
  • 呼び出しのロジック。

これらをカプセル化する便利な方法は、Runnable クラスを定義して一時停止されたリクエストを表します。ここで、Runnable.run() メソッドは呼び出しのロジックをカプセル化します。これを実行する最も一般的な方法は、以下の例のように Runnable をローカルクラスとして実装することです。

非同期処理の例

非同期処理シナリオを実装するには、リソースメソッドの実装は Runnable オブジェクト (一時停止されたリクエストを表します) をエグゼキュータースレッドプールに渡す必要があります。Java 7 および 8 では、次の例に示すように、いくつかの新しい構文を利用して、Runnable クラスをローカルクラスとして定義できます。

// Java
package org.apache.cxf.systest.jaxrs;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.container.TimeoutHandler;

import org.apache.cxf.phase.PhaseInterceptorChain;

@Path("/bookstore")
public class BookContinuationStore {

    private Map<String, String> books = new HashMap<String, String>();
    private Executor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS,
                                        new ArrayBlockingQueue<Runnable>(10));

    public BookContinuationStore() {
        init();
    }
    ...
    @GET
    @Path("{id}")
    public void handleRequestInPool(final @PathParam("id") String id,
                                    final @Suspended AsyncResponse response) {
        executor.execute(new Runnable() {
            public void run() {
                // Retrieve the book data for 'id'
                // which is presumed to be a very slow, blocking operation
                // ...
                bookdata = ...
                // Re-activate the client connection with 'resume'
                // and send the 'bookdata' object as the response
                response.resume(bookdata);
            }
        });
    }
    ...
}

リソースメソッド引数 id および response が、Runnable ローカルクラスの定義に直接渡される方法に注意してください。この特別な構文を使用すると、ローカルクラスの対応するフィールドを定義しなくても、Runnable.run() メソッドでリソースメソッドの引数を直接使用できます。

重要

この特別な構文を機能させるには、リソースメソッドパラメーターを final として宣言する 必要 があります (メソッド実装で変更できないことを意味します)。