230.7. 消费者

有几种类型的用户:

230.7.1. Tailable Cursor Consumer

MongoDB 提供了一种机制来即时消耗来自集合的持续数据,方法是使光标像对 *nix 系统的 tail -f 命令一样打开。这种机制比调度的轮询效率要高得多,因为服务器在客户端上推送新数据就会可用,而不是让客户端按计划的间隔返回来获取新数据。它还减少了其他冗余网络流量。

只需要使用尾部的光标,即集合必须是 "capped collection",这意味着它仅拥有 N 对象,并在达到限制时,MongoDB 清除旧对象与其最初插入的顺序相同。如需更多信息,请参阅 :http://www.mongodb.org/display/DOCS/Tailable+Cursors。

Camel MongoDB 组件实施尾部的光标消费者,使此功能可用于您的 Camel 路由。在插入新对象后,MongoDB 会将它们作为文档 推送到您的尾部光标消费者,这将将其转换为 Exchange,并触发您的路由逻辑。

230.7.1.1. 尾部的光标消费者的工作方式

要将光标置于可尾部的光标中,在第一次生成光标时,会将几个特殊标记信号分配给 MongoDB。创建后,光标将保持打开状态,并将在调用 MongoCursor.next() 方法时阻止,直到新数据到达。但是,如果新数据在非确定周期后没有出现,MongoDB 服务器保留自己要终止您的光标。如果您有兴趣继续使用新数据,则必须重新生成光标。为此,您必须记住离开的位置,或者您将再次占用率的其他位置。

Camel MongoDB tail 可移动的光标消费者为您处理所有这些任务。您只是向数据中增加性质的部分字段提供密钥,这样每次重新生成时,都将作为您的光标定位的标记,例如时间戳、后续 ID 等。它可以是 MongoDB 支持的任何数据类型。日期、字符串和 Integers 可以正常工作。我们在这个组件上下文中调用这种机制"tail 跟踪"。

使用者将记住此字段的最后值,以及每当要重新生成光标时,将通过一个过滤器(如 increase Field > lastValue )运行查询,以便只消耗非读数据。

设置 increasing 字段: 在端点 URI tailTrackingIncreasingField 选项上设置 increase 字段的键。在 Camel 2.10 中,它必须是数据中的顶层字段,目前还不支持此字段的嵌套导航。也就是说,"timestamp"字段是 okay,但"nested.timestamp"将无法工作。如果您需要支持嵌套增加的字段,请在 Camel JIRA 中创建一个 ticket。

光标重新生成延迟: 一个需要注意的是,如果新数据在初始发生时还没有提供,MongoDB 将立即终止光标。由于我们不想超过服务器,因此引进了 cursorRegenerationDelay 选项(默认值为 1000ms)。

例如:

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime")
    .id("tailableCursorConsumer1")
    .autoStartup(false)
    .to("mock:test");

以上路由将消耗来自"flights.cancellations"功能的集合,使用 "departureTime" 作为 increasing 字段,默认的重新生成光标延迟为 1000ms。

230.7.1.2. 持久跟踪

标准尾部跟踪是易失性,最后一个值仅保存在内存中。但是,在实践中,您需要现在每一次重启 Camel 容器,但最后的值将会丢失,并且您的尾部光标消费者再次使用自上而来,很可能将重复的记录发送到您的路由。

要克服这种情况,您可以启用 持久的尾部跟踪 功能,以便在 MongoDB 数据库的特殊集合中跟踪最后一次消耗的值。当消费者再次指出时,它将恢复最后的跟踪值,并在没有任何情况时继续操作。

最后的 read 值会保留两个结果:每次重新生成光标以及消费者关闭的时间。在未来 5 秒内,我们可能会考虑定期持续保留时间(每 5 秒清空一次),以便在需求时增加稳健性。要请求此功能,请在 Camel JIRA 中创建一个 ticket。

230.7.1.3. 启用持久跟踪

要启用此功能,在端点 URI 中至少设置以下选项:

  • persistentTailTracking 选项为 true
  • persistentId 选项针对这个消费者的唯一标识符,以便可以在许多用户间重复使用相同的集合

另外,您还可以将 tailTrackDbtailTrackCollectiontailTrackField 选项设置为保存运行时信息的自定义。有关每个选项的描述,请参考本页顶部的端点选项表。

例如,以下路由会使用 "flights.cancellations" 容量收集,使用 "departureTime" 作为 increasing 字段,默认的重新生成光标延迟为 1000ms,并打开持久跟踪,并保留在 "flights.camelTailTracking" 的"cancellationsTracker" id 下。 在 "lastTrackingValue" 字段中存储最后处理的值(camelTailTrackinglastTrackingValue 都是默认值)。

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker")
    .id("tailableCursorConsumer2")
    .autoStartup(false)
    .to("mock:test");

以下是与以上地址相同的另一个示例,但永久跟踪运行时信息将存储在 "trackers.camelTrackers" 集合中,在"lastProcessedDepartureTime"字段中:

from("mongodb3:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime&persistentTailTracking=true" +
     "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection=camelTrackers" +
     "&tailTrackField=lastProcessedDepartureTime")
    .id("tailableCursorConsumer3")
    .autoStartup(false)
    .to("mock:test");