241.8. 自定义管道
自定义频道管道通过插入自定义处理程序和解码器(而无需以非常简单的方式在 Netty Endpoint URL 中指定它们)来提供对处理器/interceptor 链的完整控制。
要添加自定义管道,必须通过上下文 registry (JNDIRegistry 或 camel-spring ApplicationContextRegistry 等)使用上下文创建并注册自定义频道管道工厂。
必须按照以下方式构建自定义管道工厂:
-
与 Producer 链接的频道工厂必须扩展抽象类
ClientPipelineFactory。 -
Consumer 链接的频道管道工厂必须扩展抽象类
ServerInitializerFactory。 -
类应该覆盖 initChannel()方法,以插入自定义处理程序、编码器和解码器。没有覆盖
initChannel()方法,创建没有处理程序、编码器或解码器到管道的管道。
以下示例显示如何创建 ServerInitializer factory 工厂
241.8.1. 使用自定义管道工厂
public class SampleServerInitializerFactory extends ServerInitializerFactory {
private int maxLineSize = 1024;
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast("encoder-SD", new StringEncoder(CharsetUtil.UTF_8));
channelPipeline.addLast("decoder-DELIM", new DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
channelPipeline.addLast("decoder-SD", new StringDecoder(CharsetUtil.UTF_8));
// here we add the default Camel ServerChannelHandler for the consumer, to allow Camel to route the message etc.
channelPipeline.addLast("handler", new ServerChannelHandler(consumer));
}
}然后,自定义频道管道工厂可以添加到 registry 中,并按照以下方式在 camel 路由中实例化/使用
Registry registry = camelContext.getRegistry();
ServerInitializerFactory factory = new TestServerInitializerFactory();
registry.bind("spf", factory);
context.addRoutes(new RouteBuilder() {
public void configure() {
String netty_ssl_endpoint =
"netty4:tcp://0.0.0.0:5150?serverInitializerFactory=#spf"
String return_string =
"When You Go Home, Tell Them Of Us And Say,"
+ "For Your Tomorrow, We Gave Our Today.";
from(netty_ssl_endpoint)
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.getOut().setBody(return_string);
}
}
}
});