第 38 章 实施组件

摘要

本章一般概述可用于实施 Apache Camel 组件。

38.1. 组件架构

38.1.1. 组件的工厂模式

概述

Apache Camel 组件由一组通过工厂模式相互关联的类组成。组件的主要入口点是 Component 对象本身( org.apache.camel.Component 类型的实例)。您可以使用 component 对象作为工厂来创建 Endpoint 对象,后者又是创建 使用者ProducerExchange 对象的工厂。这些关系在 中进行了概述 图 38.1 “组件 onnectionFactoryy Patterns”

图 38.1. 组件 onnectionFactoryy Patterns

组件工厂模式

组件

组件实施是一个端点工厂。组件实施器的主要任务是实施 component .createEndpoint() 方法,它负责根据需要创建新端点。

每种组件必须与出现在端点 URI 中的组件前缀 关联。例如,文件组件通常与 文件 前缀关联,该前缀可以在 file://tmp/messages/input 等端点 URI 中使用。当您在 Apache Camel 中安装新组件时,您必须定义特定组件前缀和实现组件的类名称之间的关联。

端点

每个端点实例封装了特定的端点 URI。每次 Apache Camel 遇到新端点 URI 时,它都会创建一个新的端点实例。端点对象也是创建消费者端点和制作者端点的工厂。

端点必须实施 org.apache.camel.Endpoint 接口。Endpoint 接口定义了以下工厂方法:

  • createConsumer()createPollingConsumer() abrt-活动Creates 是一个消费者端点,代表路由开头的源端点。
  • createProducer() >_<-jaxbCreates a producer 端点,它代表路由末尾的目标端点。
  • createExchange() InventoryService-jaxbCreates a exchange 对象,用于封装通过和关闭路由的消息。

消费者

消费者端点 会消耗 请求。它们始终显示在路由开始时,并封装负责接收传入请求和分配传出回复的代码。从面向服务的视角来看,使用者代表 服务

消费者必须实施 org.apache.camel.Consumer 接口。在实施消费者时,可以遵循多种不同模式。这些模式在 第 38.1.3 节 “消费者模式和线程” 中所述。

producer

生产者端点 生成 请求。它们始终出现在路由的末尾,并封装负责分配传出请求和接收传入的回复的代码。从面向服务的视角来看,生产者代表服务消费者

制作者必须实施 org.apache.camel.Producer 接口。您可以选择实施制作者来支持异步处理方式。详情请查看 第 38.1.4 节 “异步处理”

Exchange

Exchange 对象封装一组相关的消息。例如,一种消息交换是同步调用,它由请求消息和相关回复组成。

Exchanges 必须实施 org.apache.camel.Exchange 接口。默认的实现( DefaultExchange )足以满足许多组件实施。但是,如果您要将额外的数据与交换相关联,或者交换了额外处理,那么自定义交换实施会很有用。

消息

Exchange 对象中有两个不同的消息插槽:

  • message 这个过程中保留当前消息。
  • out message>_<-abrttemporarily 包含回复信息。

所有消息类型都由同一 Java 对象 org.apache.camel.Message 来表示。并非始终需要自定义消息的实施,即 default Message 通常会足够。

38.1.2. 在路由中使用组件

概述

Apache Camel 路由基本上是一个处理器的管道,即 org.apache.camel.Processor 类型。消息封装在交换对象 E 中,通过调用 process() 方法从节点传递到节点。处理器管道的架构在 图 38.2 “Route 中的使用者和 Producer 实例” 中进行了说明。

图 38.2. Route 中的使用者和 Producer 实例

路由中的使用者和 Producer 实例

源端点

在路由开始时,您有源端点,由 org.apache.camel.Consumer 对象表示。源端点负责接受传入的请求消息并分配回复。在构建路由时,Apache Camel 根据端点 URI 的组件前缀创建适当的 Consumer 类型,如 第 38.1.1 节 “组件的工厂模式” 所述。

处理器

管道中的每个中间节点由处理器对象(实施 org.apache.camel.Processor 接口)表示。您可以插入标准处理器(例如,过滤throttlerdelayer)或者插入您自己的自定义处理器实施。

目标端点

路由的末尾是目标端点,由 org.apache.camel.Producer 对象代表。由于它源自处理器管道,因此制作者也是处理器对象(实施 org.apache.camel.Processor 接口)。目标端点负责发送传出请求消息并接收传入的回复。在构建路由时,Apache Camel 根据来自端点 URI 的组件前缀来创建适当的 Producer 类型。

38.1.3. 消费者模式和线程

概述

用于实施使用者的模式决定了处理传入交换时使用的线程模型。消费者可使用以下模式之一实施:

事件驱动的模式

在事件驱动的模式中,当应用程序的另一个部分(通常是第三方库)调用消费者实施的方法时,发起传入请求的处理。事件驱动的消费者是一个很好的例子,即 Apache Camel JMX 组件,其中的事件由 JMX 库启动。JMX 库调用 handleNotification() 方法,以启动 requests processing>_<-abrt,有关详细信息,请参阅 例 41.4 “JMXConsumer 实施”

图 38.3 “event-Driven Consumer” 显示了事件驱动的消费者模式的概要。在本例中,假定调用 notify() 方法触发了处理。

图 38.3. event-Driven Consumer

使用事件驱动的消费者进行消息链

事件驱动的消费者按如下方式处理传入请求:

  1. 使用者必须实施一种方法才能接收传入事件(在 图 38.3 “event-Driven Consumer” 中,这由 notify() 方法表示)。调用 notify() 的线程通常是应用的独立部分,因此消费者的线程策略是外部的。

    例如,在 JMX 消费者实施的情况下,使用者实施 NotificationListener.handleNotification() 方法,以接收来自 JMX 的通知。驱动消费者处理的线程在 JMX 层内创建。

  2. notify() 方法的正文中,使用者首先将传入的事件转换为交换对象 E,然后在路由中的下一个处理器调用 process(),并将交换对象作为参数传递。

调度的轮询模式

在计划的轮询模式中,使用者通过定期检查间隔来检索传入的请求,无论请求是否到达。内置的计时器类( 调度的 executor 服务 )会自动检查请求,这是 java.util.concurrent 库提供的标准模式。调度的 executor 服务在固定间隔时执行特定的任务,还可管理一个线程池,用于运行任务实例。

图 38.4 “调度的 Poll Consumer” 显示计划的轮询消费者模式的概要。

图 38.4. 调度的 Poll Consumer

调度的 Poll Consumer

调度的轮询消费者处理传入请求,如下所示:

  1. 调度的 executor 服务有一个线程池,可用于启动消费者处理。在各个调度的时间间隔过后,调度的 executor 服务会尝试从池中获取可用线程(默认情况下,池中有五个线程)。如果可用线程可用,它将使用该线程在消费者上调用 poll() 方法。
  2. 使用者的 poll() 方法用于触发传入请求的处理。在 poll() 方法的正文中,使用者会尝试检索传入的消息。如果没有请求可用,则 poll() 方法会立即返回。
  3. 如果请求消息可用,则使用者将其插入到交换对象中,然后在路由中的下一个处理器调用 process(),并将交换对象传递至其参数。

轮询模式

在轮询模式中,当第三方调用一个消费者轮询方法时,启动传入请求的处理:

  • receive()
  • receiveNoWait()
  • 接收(长超时)

组件实施过程由组件实施来定义精确的机制,以发起对轮询方法的调用。这种机制在轮询模式中没有指定。

图 38.5 “polling Consumer” 显示了轮询消费者模式的概要。

图 38.5. polling Consumer

polling Consumer

轮询消费者按如下方式处理传入请求:

  1. 每当称为消费者的轮询方法之一时,启动传入请求的处理。调用这些轮询方法的机制由组件实施定义。
  2. receive() 方法的正文中,使用者会尝试检索传入的请求消息。如果目前没有可用的消息,则行为取决于调用哪个接收方法。

    • receiveNoWait() returns immediately
    • 接收(长超时) 等待指定的超时间隔[2] 在返回前
    • receive() 等待直到收到消息
  3. 如果请求消息可用,则使用者将其插入到交换对象中,然后在路由中的下一个处理器调用 process(),并将交换对象传递至其参数。

38.1.4. 异步处理

概述

在处理交换时,制作者端点通常遵循 同步 模式。当制作者上的管道调用 process() 中的前面的处理器时,process() 方法将阻止,直到收到回复。在这种情况下,处理器的线程将会被阻止,直到制作者完成发送请求并接收回复的生命周期。

但有时候,您可能更喜欢将前面的处理器与制作者分离,从而使处理器的线程会立即释放,并且 process() 调用 不会阻止。在这种情况下,您应该使用 异步 模式实施制作者,为前面的处理器提供调用 process() 方法的非阻塞版本的选项。

为了让您了解不同的实现选项,本节描述了实施制作者端点的同步和异步模式。

同步制作者

图 38.6 “synchronous Producer” 显示同步制作者的概要,其中前面的处理器块直到制作者完成交换过程。

图 38.6. synchronous Producer

synchronous Producer

同步制作者会按如下方式处理交换:

  1. 管道中的前面的处理器调用制作者上的 synchronous process() 方法,以启动同步处理。同步 process() 方法采用单一交换参数。
  2. process() 方法的正文中,生产者将请求(在消息中 )发送到端点。
  3. 如果交换模式需要,生产者会等待回复( 消息)从端点到达。此步骤可能会导致 process() 方法无限期阻止。但是,如果交换模式没有强制回复,process() 方法可以在发送请求后立即返回。
  4. process() 方法返回时,exchange 对象包含来自同步调用( Out 消息消息)的回复。

异步制作者

图 38.7 “asynchronous Producer” 展示了异步制作者的概要,其中生产者在子线程中处理交换,而前面的处理器在一定时间内不会被阻止。

图 38.7. asynchronous Producer

asynchronous Producer

异步制作者以如下方式处理交换:

  1. 在处理器可以调用异步 process() 方法之前,它必须创建一个 异步回调 对象,该对象负责处理路由返回部分的交换。对于异步回调,处理器必须实施从 AsyncCallback 接口继承的类。
  2. 处理器调用制作者上的异步 process() 方法,以启动异步处理。异步 process() 方法采用两个参数:

    • Exchange 对象
    • 同步回调对象
  3. process() 方法的正文中,制作者会创建一个 可运行的 对象来封装处理代码。然后,制作者将这个 Runnable 对象的 执行委托给子线程。
  4. 异步 process() 方法返回,从而释放处理器线程。交换处理将继续处于单独的子线程中。
  5. Runnable 对象将 In 消息发送到端点。
  6. 如果交换模式需要,Run nable 对象会等待回复(OutFailure 消息)来到达端点。Runnable 对象 会一直被阻止,直到收到回复为止。
  7. 在回复到达后,Run nable 对象会将回复(传出消息)插入到交换对象中,然后在异步回调对象上调用 done()然后,异步回调会负责处理回复消息(在子线程中执行)。


[2] 超时间隔通常以毫秒为单位指定。