第 41 章 消费者接口
摘要
本章论述了如何实施 Apache Camel 组件,这是实施 Apache Camel 组件的基本步骤。
41.1. 消费者接口
概述
org.apache.camel.Consumer 类型的实例代表路由中的源端点。实施消费者的方法有几种不同(请参阅 第 38.1.3 节 “消费者模式和线程”),这种灵活性反映了在继承层次(请参阅 图 41.1 “消费者继承层次结构”)中,其中包括用于实施消费者的多个不同基础类。
图 41.1. 消费者继承层次结构

consumer 参数注入
对于遵循调度的轮询模式的消费者(请参阅 “调度的轮询模式”一节),Apache Camel 提供将参数注入消费者实例中的支持。例如,考虑 自定义 前缀标识的组件以下端点 URI:
custom:destination?consumer.myConsumerParam
Apache Camel 支持以 使用者格式自动注入查询选项。\*.对于 consumer.myConsumerParam 参数,您需要在 Consumer 实现类上定义对应的 setter 和 getter 方法,如下所示:
public class CustomConsumer extends ScheduledPollConsumer {
...
String getMyConsumerParam() { ... }
void setMyConsumerParam(String s) { ... }
...
}在 getter 和 setter 方法遵循常规的 Java bean 惯例(包括大写属性名称的第一个字母)。
除了在消费者实现中定义 bean 方法,您还必须记得在 Endpoint.createConsumer () 的实现中调用 configureConsumer () 方法(请参阅 “调度的轮询端点实现”一节)。
例 41.1 “FileEndpoint createConsumer() Implementation” 显示来自文件组件的 FileEndpoint 类的 createConsumer () 方法实现的示例:
例 41.1. FileEndpoint createConsumer() Implementation
...
public class FileEndpoint extends ScheduledPollEndpoint {
...
public Consumer createConsumer(Processor processor) throws Exception {
Consumer result = new FileConsumer(this, processor);
configureConsumer(result);
return result;
}
...
}在运行时,消费者参数注入的工作方式如下:
-
创建端点时,默认的
DefaultComponent.createEndpoint (String uri)会解析 URI 以提取消费者参数,并通过调用ScheduledPollEndpoint.configureProperties ()将端点存储在端点实例中。 -
当调用
createConsumer ()时,方法实现调用configureConsumer ()来注入使用者参数(请参阅 例 41.1 “FileEndpoint createConsumer() Implementation”)。 -
configureConsumer ()方法使用 Java reflection 调用与使用者前缀之后匹配的集合方法。前缀被剥离。
调度的轮询参数
遵循调度的轮询模式的使用者会自动支持 表 41.1 “调度的轮询参数” 中显示的使用者参数(它可以显示为端点 URI 中的查询选项)。
表 41.1. 调度的轮询参数
| Name | default | 描述 |
|---|---|---|
|
|
| 在第一次轮询前以毫秒为单位。 |
|
|
|
取决于 |
|
|
|
如果为
如果为 |
在事件驱动的和轮询消费者间转换
Apache Camel 提供了两种特殊的使用者实施,可用于在事件驱动的消费者和轮询消费者之间进行转换。提供了以下转换类:
-
org.apache.camel.impl.EventDrivenPollingConsumer记录将事件驱动的消费者引入轮询消费者实例中。 -
org.apache.camel.impl.DefaultScheduledPollConsumerfde-scannerConverer 轮询消费者到一个事件驱动的消费者实例中。
在实践中,这些类用于简化实施端点类型的任务。Endpoint 接口定义了以下两种方法来创建使用者实例:
package org.apache.camel;
public interface Endpoint {
...
Consumer createConsumer(Processor processor) throws Exception;
PollingConsumer createPollingConsumer() throws Exception;
}
createConsumer () 返回事件驱动的使用者,并创建PollingConsumer () 返回轮询使用者。您只需实施其中一种方法。例如,如果您遵循消费者的事件驱动模式,您要实施 createConsumer () 方法来提供方法实施,以创建仅引发异常的轮询Consumer ()。但是,在转换类的帮助下,Apache Camel 能够提供更有用的默认实施。
例如,如果要根据事件驱动的模式实施消费者,您可以通过扩展 DefaultEndpoint 并实施 createConsumer () 方法来实施端点。创建PollingConsumer () 的实现继承自 DefaultEndpoint,其定义如下:
public PollingConsumer<E> createPollingConsumer() throws Exception {
return new EventDrivenPollingConsumer<E>(this);
}
EventDrivenPolsum er 构造器使用对事件驱动的消费者的引用,此 有效地换行并将其转换为轮询消费者。要实施转换,EventDrivenPollingConsumer 实例缓冲传入的事件,并通过 receive ()、接收 (长超时) 以及 receiveNoWait () 方法按需提供。
类似地,如果您根据轮询模式实施您的使用者,您通过扩展 DefaultPollingEndpoint 并实施 creation PollingConsumer () 方法来实施端点。在本例中,createConsumer () 方法的实现从 DefaultPollingEndpoint 继承,默认的实施会返回 DefaultScheduledPollConsumer 实例(它将轮询使用者转换为事件驱动的使用者)。
ShutdownPrepared 接口
消费者类可以选择实施 org.apache.camel.spi.ShutdownPrepared 接口,使您的自定义消费者端点能够接收关闭通知。
例 41.2 “ShutdownPrepared Interface” 显示 ShutdownPrepared 接口的定义。
例 41.2. ShutdownPrepared Interface
package org.apache.camel.spi;
public interface ShutdownPrepared {
void prepareShutdown(boolean forced);
}
ShutdownPrepared 接口定义了以下方法:
prepareShutdown接收在一个或多个阶段中关闭消费者端点的通知,如下所示:
-
正常关闭 abrt-where,
强制参数的值为false。尝试正常清理资源。例如,通过正常停止线程。 -
强制关闭 TOKEN -where
强制参数的值为true。这意味着关闭已超时,因此您必须更加积极地清理资源。这是在进程退出前清理资源的最后几率。
-
正常关闭 abrt-where,
ShutdownAware Interface
消费者类可以选择实施 org.apache.camel.spi.ShutdownAware 接口,该界面与安全关闭机制交互,使消费者能够寻求额外的时间来关闭。这通常需要组件(如 SEDA),这样可将待处理的交换存储在内部队列中。通常,您希望在关闭 SEDA 消费者之前处理队列中的所有交换。
例 41.3 “ShutdownAware Interface” 显示 ShutdownAware 接口的定义。
例 41.3. ShutdownAware Interface
// Java
package org.apache.camel.spi;
import org.apache.camel.ShutdownRunningTask;
public interface ShutdownAware extends ShutdownPrepared {
boolean deferShutdown(ShutdownRunningTask shutdownRunningTask);
int getPendingExchangesSize();
}
ShutdownAware 接口定义了以下方法:
deferShutdown如果要延迟关闭消费者,则从此方法返回
true。shutdownRunningTask参数是一个enum,可采用以下值之一:-
ShutdownRunningTask.CompleteCurrentTaskOnly的 方式运行处理当前由消费者的线程池处理的交换,但不会尝试处理比这更多的交换。 -
ShutdownRunningTask.CompleteAllTasks例如,在 SEDA 组件的情况下,使用者将处理来自其传入队列的所有交换。
-
getPendingExchangesSize- 表示使用者仍然处理多少个交换。零值表示处理已完成,消费者可以关闭。
有关如何定义 ShutdownAware 方法的示例,请参考 例 41.7 “自定义线程实现”。