2.21. 非同期リクエスト処理および反応的戻り値の型の RESTEasy サポートの拡張

重要

RESTEasy サポートの拡張はテクノロジープレビューとしてのみ提供されます。テクノロジープレビューの機能は、Red Hat の本番環境のサービスレベルアグリーメント (SLA) ではサポートされず、機能的に完全ではないことがあるため、Red Hat は本番環境での使用は推奨しません。テクノロジープレビューの機能は、最新の製品機能をいち早く提供して、開発段階で機能のテストを行いフィードバックを提供していただくことを目的としています。

テクノロジープレビュー機能のサポート範囲については、Red Hat カスタマーポータルの テクノロジープレビュー機能のサポート範囲 を参照してください。

2.21.1. プラグ可能なリアクティブタイプ

JAX-RS 2.1 は、さまざまなリアクティブライブラリーをサポートする拡張性があります。RESTEasy のオプションのモジュール resteasy-rxJava2 は、以下のリアクティブタイプをサポートします。

  • IO.reactivex.Single: 最も大きな値を保持するため CompletionStage に類似しています。
  • io.reactivex.Flowable: io.reactivex.Publisher を実装します。
  • io.reactivex.Observable: Flowable に似ていますが、バックプレッシャーに対応していないことを除き、ロードを制御するサブスクライバーの機能は、Subscription.request() を呼び出すことでプロデューサーから受信します。

Resteasy-RxJava2 をインポートする場合は、サーバー側でリソースメソッドからこれらのリアクティブタイプを返し、クライアント側でそれを受け取ります。

Resteasy-RxJava2 モジュールは、クライアント側で Singles、Observables、および Flowables にそれぞれアクセスするための以下のクラスをサポートします。

  • org.jboss.resteasy.rxjava2.SingleRxInvoker
  • org.jboss.resteasy.rxjava2.FlowableRxInvoker
  • org.jboss.resteasy.rxjava2.ObservableRxInvoker

2.21.2. 追加のリアクティブクラスの拡張

RESTEasy は、追加のリアクティブクラスの拡張をサポートするフレームワークを実装します。サーバー側では、リソースメソッドが CompletionStage タイプを返すと、RESTEasy は org.jboss.resteasy.core.AsyncResponseConsumer.CompletionStageResponseConsumer クラスを使用してそれにサブスクライブします。CompletionStage が完了すると、CompletionStageResponseConsumer.accept() が呼び出され、結果がクライアントに返されます。

CompletionStage のサポートは RESTEasy に組み込まれています。Single などのクラスにこのサポートを拡張するには、SingleCompletionStage に変換するメカニズムを提供します。resteasy-rxjava2 モジュールでは、org.jboss.resteasy.spi.AsyncResponseProvider<Single<?>> インターフェイスを実装する org.jboss.resteasy.rxjava2.SingleProvider がこのメカニズムを提供します。

public interface AsyncResponseProvider<T> {
   public CompletionStage toCompletionStage(T asyncResponse);
}

SingleProvider では、RESTEasy は Single を取得し CompletionStage に変換して CompletionStageResponseConsumer を使用し、Single の最終的な値を処理できます。同様に、リソースメソッドが Flowable などのストリーミングの反応的なクラスを返す場合、RESTEasy はそのクラスをサブスクライブし、データ要素のストリームを受け取り、クライアントに送信します。AsyncResponseConsumer には複数のサポートクラスがあり、それぞれは異なるストリーミングモードを実装します。

たとえば、AsyncResponseConsumer.AsyncGeneralStreamingSseResponseConsumer は一般的なストリーミングおよび SSE ストリーミングを処理します。サブスクリプションは、org.reactivestreams.Publisher.subscribe() を呼び出すことで行われるため、たとえば FlowablePublisher に変えるメカニズが必要です。つまり、org.jboss.resteasy.spi.AsyncStreamProvider<Flowable> の実装が呼び出されます。これは、AsyncStreamProvider を定義します。

public interface AsyncStreamProvider<T> {
   public Publisher toAsyncStream(T asyncResponse);
}

resteasy-rxjava2 モジュールでは、org.jboss.resteasy.FlowableProvider は、Flowable のメカニズムを提供します。

つまり、サーバー側では、ストリームの AsyncStreamProvider インターフェイスの @Provider アノテーションを宣言するか、単一値の AsyncResponseProvider インターフェイスを宣言して、他の反応的なタイプのサポートを追加できます。これらのインターフェイスには、ストリームの場合には、新しいリアクティブタイプを Publisher に、単一の値の場合には CompletionStage に変換する単一のメソッドがあります。

クライアント側では、JAX-RS 2.1 でリアクティブクラスのサポート要件が 2 つ課せられています。

  • javax.ws.rs.client.CompletionStageRxInvoker のインターフェイスの実装としての CompletionStage のサポート。
  • 実装するプロバイダーの登録をサポートしていることによる拡張性
public interface RxInvokerProvider<T extends RxInvoker> {
    public boolean isProviderFor(Class<T> clazz);
    public T getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService);
}

RxInvokerProvider が登録されれば javax.ws.rs.client.Invocation.Builder メソッドを呼び出して RxInvoker をリクエストできます。

public <T extends RxInvoker> T rx(Class<T> clazz);

RxInvoker を使用して、適切な反応的なクラスを返す呼び出しを作成できます。例を以下に示します。

FlowableRxInvoker invoker = client.target(generateURL("/get/string")).request().rx(FlowableRxInvoker.class);
Flowable<String> flowable = (Flowable<String>) invoker.get();

RESTEasy は、RxInvokers の実装に部分的なサポートを提供します。たとえば、上記の SingleProviderorg.jboss.resteasy.spi.AsyncClientResponseProvider<Single<?>> も実装します。ここでは AsyncClientResponseProvider は以下のように定義されます。

public interface AsyncClientResponseProvider<T> {
   public T fromCompletionStage(CompletionStage<?> completionStage);
}

2.21.3. Reactive Clients API

RESTEasy は RxInvoker という新型のインボーカーと、CompletionStageRxInvoker というこの型のデフォルト実装を定義します。CompletionStageRxInvoker は、Java 8 のインターフェイス CompletionStage を実装します。このインターフェイスは、非同期計算の管理専用のメソッドを多数宣言します。

2.21.4. 非同期フィルター

特定のリソースが利用可能になるまでフィルターの実行を一時停止する必要がある場合は、非同期フィルターに変換できます。リクエストを非同期にしても、リソースメソッド宣言や追加のフィルター宣言を変更する必要はありません。

フィルターの実行を非同期にするには、以下をキャストする必要があります。

  • 事前およびポスト要求フィルターの SuspendableContainerRequestContext への ContainerRequestContext
  • 応答フィルターの SuspendableContainerResponseContext への ContainerResponseContext

これらのコンテキストオブジェクトは、suspend() メソッドを呼び出して、現在のフィルターの実行を非同期にすることができます。非同期の場合には、フィルターチェーンは中断され、以下のメソッドのいずれかがコンテキストオブジェクトで呼び出された後にのみ再開されます。

  • abortWith(Response): フィルターチェーンを終了し、指定の Response をクライアントに返します。これは ContainerRequestFilter にのみ適用されます。
  • resume(): 次のフィルターを呼び出して、フィルターチェーンの実行を切り替えます。
  • resume(Throwable): 指定された例外を出力して、フィルターチェーンの実行を中止します。これは、フィルターが同期され、指定の例外が発生したかのように動作します。

2.21.5. プロキシー

プロキシーは、汎用 JAX-RS 呼び出しをアプリケーション固有のインターフェイス呼び出しに置き換える、直感的なプログラミングスタイルをサポートする RESTEasy 拡張です。プロキシーフレームワークは CompletionStage と RxJava2 タイプの両方、SingleObservableFlowable を含めるように拡張されています。以下の例は、RESTEasy プロキシーがどのように機能するかを表しています。

例 1:

@Path("")
public interface RxCompletionStageResource {

   @GET
   @Path("get/string")
   @Produces(MediaType.TEXT_PLAIN)
   public CompletionStage<String> getString();
}

@Path("")
public class RxCompletionStageResourceImpl {

   @GET
   @Path("get/string")
   @Produces(MediaType.TEXT_PLAIN)
   public CompletionStage<String> getString() { .... }
}

public class RxCompletionStageProxyTest {

   private static ResteasyClient client;
   private static RxCompletionStageResource proxy;

   static {
      client = new ResteasyClientBuilder().build();
      proxy = client.target(generateURL("/")).proxy(RxCompletionStageResource.class);
   }

   @Test
   public void testGet() throws Exception {
      CompletionStage<String> completionStage = proxy.getString();
      Assert.assertEquals("x", completionStage.toCompletableFuture().get());
   }
}

例 2:

public interface Rx2FlowableResource {

   @GET
   @Path("get/string")
   @Produces(MediaType.TEXT_PLAIN)
   @Stream
   public Flowable<String> getFlowable();
}

@Path("")
public class Rx2FlowableResourceImpl {

   @GET
   @Path("get/string")
   @Produces(MediaType.TEXT_PLAIN)
   @Stream
   public Flowable<String> getFlowable() { ... }
}

public class Rx2FlowableProxyTest {

   private static ResteasyClient client;
   private static Rx2FlowableResource proxy;

   static {
      client = new ResteasyClientBuilder().build();
      proxy = client.target(generateURL("/")).proxy(Rx2FlowableResource.class);
   }

   @Test
   public void testGet() throws Exception {
      Flowable<String> flowable = proxy.getFlowable();
      flowable.subscribe(
         (String o) -> stringList.add(o),
         (Throwable t) -> errors.incrementAndGet(),
         () -> latch.countDown());
      boolean waitResult = latch.await(30, TimeUnit.SECONDS);
      Assert.assertTrue("Waiting for event to be delivered has timed out.", waitResult);
      Assert.assertEquals(0, errors.get());
      Assert.assertEquals(xStringList, stringList);
   }
}