Red Hat Training

A Red Hat training course is available for Red Hat Fuse

229.7. カスタムパイプライン

カスタムチャンネルパイプラインは、カスタムハンドラー、エンコーダー(s)、デコーダーを挿入することで、ハンドラー/インターセプターチェーン上で完全な制御を提供します。

カスタムパイプラインを追加するには、コンテキストレジストリー(JNDIRegistry、または camel-spring ApplicationContextRegistry など)を使用してコンテキストで、カスタムチャネルパイプラインファクトリーを作成して登録する必要があります。

カスタムパイプラインファクトリーは、以下のように構築する必要があります。

  • プロデューサーにリンクされたチャネルパイプラインファクトリーは、抽象クラス ClientPipelineFactory を拡張する必要があります。
  • コンシューマーにリンクされたチャネルパイプラインファクトリーは、抽象クラス ServerInitializerFactory を拡張する必要があります。
  • クラスは、カスタムハンドラー、エンコーダー、およびデコーダーを挿入するために、initChannel()メソッドをオーバーライドする必要があります。initChannel() メソッドを上書きすると、パイプラインに接続しているハンドラー、エンコーダー、またはデコーダーのないパイプラインが作成されます。

以下の例は、ServerInitializerFactory ファクトリーの作成方法を示しています。

229.7.1. カスタムパイプラインファクトリーの使用

public class SampleServerInitializerFactory extends ServerInitializerFactory {
    private int maxLineSize = 1024;

    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline channelPipeline = ch.pipeline();

        channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
        channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
        // here we add the default Camel ServerChannelHandler for the consumer, to allow Camel to route the message etc.
        channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
    }
}

その後、カスタムチャネルパイプラインファクトリーをレジストリーに追加し、以下の方法で camel ルートでインスタンス化/使用率を使用できるようにします。

Registry registry = camelContext.getRegistry();
ServerInitializerFactory factory = new TestServerInitializerFactory();
registry.bind("spf", factory);
context.addRoutes(new RouteBuilder() {
  public void configure() {
      String netty_ssl_endpoint =
         "netty4:tcp://localhost:5150?serverInitializerFactory=#spf"
      String return_string =
         "When You Go Home, Tell Them Of Us And Say,"
         + "For Your Tomorrow, We Gave Our Today.";

      from(netty_ssl_endpoint)
       .process(new Processor() {
          public void process(Exchange exchange) throws Exception {
            exchange.getOut().setBody(return_string);
          }
       }
  }
});