239.9. カスタムチャネルパイプラインファクトリーを追加して、作成されたパイプラインを完全に制御します

Camel 2.5 で利用可能

カスタムチャネルパイプラインは、Netty エンドポイント URL で非常に簡単な方法で指定することなく、カスタムハンドラー、エンコーダー、デコーダーを挿入することで、ハンドラー/インターセプターチェーンを完全に制御します。

カスタムパイプラインを追加するには、カスタムチャネルパイプラインファクトリーを作成し、コンテキストレジストリー (JNDIRegistry、または camel-spring ApplicationContextRegistry など) を介してコンテキストに登録する必要があります。

カスタムパイプラインファクトリーは、次のように構築する必要があります。

  • プロデューサーにリンクされたチャネルパイプラインファクトリーは、抽象クラス ClientPipelineFactory を拡張する必要があります。
  • コンシューマーリンクチャネルパイプラインファクトリーは、抽象クラス ServerPipelineFactory を拡張する必要があります。
  • カスタムハンドラー、エンコーダ、およびデコーダを挿入するには、クラスで getPipeline() メソッドをオーバーライドする必要があります。getPipeline() メソッドをオーバーライドしないと、パイプラインに接続されたハンドラー、エンコーダー、またはデコーダーのないパイプラインが作成されます。

以下の例は、ServerChannel パイプラインファクトリーを作成する方法を示しています。

カスタムパイプラインファクトリーの使用

public class SampleServerChannelPipelineFactory extends ServerPipelineFactory {
    private int maxLineSize = 1024;

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline channelPipeline = Channels.pipeline();

        channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
        channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
        channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
        // here we add the default Camel ServerChannelHandler for the consumer, to allow Camel to route the message etc.
        channelPipeline.addLast("handler", new ServerChannelHandler(consumer));

        return channelPipeline;
    }
}

次に、カスタムチャネルパイプラインファクトリーをレジストリーに追加し、次の方法でキャメルルートでインスタンス化/利用できます。

Registry registry = camelContext.getRegistry();
serverPipelineFactory = new TestServerChannelPipelineFactory();
registry.bind("spf", serverPipelineFactory);
context.addRoutes(new RouteBuilder() {
  public void configure() {
      String netty_ssl_endpoint =
         "netty:tcp://0.0.0.0:5150?serverPipelineFactory=#spf"
      String return_string =
         "When You Go Home, Tell Them Of Us And Say,"
         + "For Your Tomorrow, We Gave Our Today.";

      from(netty_ssl_endpoint)
       .process(new Processor() {
          public void process(Exchange exchange) throws Exception {
            exchange.getOut().setBody(return_string);
          }
       }
  }
});