第 38 章 实施组件

摘要

本章概述了可用于实施 Apache Camel 组件的方法。

38.1. 组件架构

38.1.1. 组件的工厂模式

概述

Apache Camel 组件由一组通过工厂模式相互相关的类组成。组件的主要入口点是 components 对象本身(一个 org.apache.camel.Component 类型)实例。您可以将 Component 对象用作工厂创建 Endpoint 对象,后者又充当创建 消费者生产者Exchange Exchange 对象的工厂。这些关系在 中进行了概述 图 38.1 “组件因素模式”

图 38.1. 组件因素模式

组件工厂模式

组件

组件实现是端点工厂。组件实现器的主要任务是实施 component .createEndpoint () 方法,它负责按需创建新端点。

每种组件必须与端点 URI 中显示的组件前缀 关联。例如,文件组件通常与 文件 前缀相关联,前缀可在类似 file://tmp/messages/input 的端点 URI 中使用。在 Apache Camel 中安装新组件时,您必须定义特定组件前缀和实施组件的类名称之间的关联。

端点

每个端点实例封装了一个特定的端点 URI。每次 Apache Camel 遇到新端点 URI 时,它都会创建一个新的端点实例。端点对象也是创建消费者端点和制作者端点的工厂。

端点必须实施 org.apache.camel.Endpoint 接口。Endpoint 接口定义了以下工厂方法:

  • createConsumer ()createPollingConsumer () Memcached-»Creates consumer 端点,它代表路由开头的源端点。
  • createProducer () 方式为 producer 端点,它代表路由末尾的目标端点。
  • createExchange () CURRENTCreates 是一个交换对象,它封装了传递和关闭路由的消息。

消费者

消费者端点 使用 请求。它们始终显示在路由开始时,它们封装了负责接收传入请求并发送传出回复的代码。从面向服务的视角来看,消费者表示 服务

消费者必须实施 org.apache.camel.Consumer 接口。在实施消费者时,您可以遵循许多不同的模式。第 38.1.3 节 “消费者模式和线程” 描述了这些模式。

制作者

制作者端点 生成 请求。它们始终出现在路由的末尾,它们封装代码负责发送传出请求并接收传入的回复。从面向服务的视角来看,生产者代表 服务使用者

生产者必须实施 org.apache.camel.Producer 接口。您可以选择实施制作者以支持异步处理方式。详情请查看 第 38.1.4 节 “异步处理”

Exchange

交换对象封装一组相关的消息。例如,一种消息交换是同步调用,它由请求消息及其相关回复组成。

Exchanges 必须实施 org.apache.camel.Exchange 接口。默认实施 DefaultExchange 足以满足许多组件实现。但是,如果要与交换关联额外数据,或者交换了额外的处理,那么自定义交换实施会很有用。

消息

Exchange 对象中有两个不同的消息插槽:

  • messagetimer-sandboxed 中保留当前的消息。
  • out message-2021-33-的temporaritaly 包含回复消息。

所有消息类型都由同一 Java 对象 org.apache.camel.Message 表示。自定义消息 implementation将默认实现 DefaultMessage 认为是不够的,并非始终是必要的。

38.1.2. 在 Route 中使用组件

概述

Apache Camel 路由基本上是一个处理器管道,即 org.apache.camel.Processor 类型。消息封装在一个交换对象 E 中,通过调用 process () 方法从节点传递到节点。处理器管道的架构在 图 38.2 “路由中的使用者和生产者实例” 中进行了说明。

图 38.2. 路由中的使用者和生产者实例

路由中的使用者和 Producer 实例

源端点

在路由开始时,您具有源端点,该端点由 org.apache.camel.Consumer 对象表示。源端点负责接受传入请求消息并分配回复。在构建路由时,Apache Camel 根据端点 URI 中的组件前缀创建适当的 Consumer 类型,如 第 38.1.1 节 “组件的工厂模式” 所述。

处理器

管道中的每个中间节点都由一个处理器对象(实施 org.apache.camel.Processor 接口)表示。您可以插入标准处理器(例如,过滤throttlerdelayer)或插入您自己的自定义处理器实现。

目标端点

路由末尾是目标端点,它由 org.apache.camel.Producer 对象表示。由于它位于处理器管道的末尾,因此制作者也是处理器对象(实施 org.apache.camel.Processor 接口)。目标端点负责发送传出请求消息并接收传入的回复。在构建路由时,Apache Camel 根据端点 URI 中的组件前缀创建适当的 Producer 类型。

38.1.3. 消费者模式和线程

概述

用于实施使用者的模式决定了处理传入交换时使用的线程模型。消费者可使用以下模式之一实现:

事件驱动的模式

在事件驱动模式中,当应用的另一部分(通常是第三方库)的另一部分时,会启动传入请求的处理,调用使用者实施的方法。事件驱动使用者是一个很好的示例,即 Apache Camel JMX 组件,其中事件由 JMX 库启动。JMX 库调用 handleNotification () 方法来发起请求处理将 processing processing-clusterSelector 查看详细信息,请参阅 例 41.4 “JMXConsumer 实施”

图 38.3 “event-Driven Consumer” 显示事件驱动的消费者模式的概述。在本例中,假定处理是由 notify() 方法的调用触发。

图 38.3. event-Driven Consumer

使用事件驱动的消费者消息链

事件驱动的消费者处理传入的请求,如下所示:

  1. 消费者必须实施一种方法来接收传入的事件(在 图 38.3 “event-Driven Consumer” 中,这由 notify() 方法表示)。调用 notify() 的线程通常是应用的一个单独部分,因此消费者的线程是外部驱动的。

    例如,在 JMX 消费者实施时,消费者实施 NotificationListener.handleNotification () 方法,从 JMX 接收通知。驱动消费者处理的线程在 JMX 层内创建。

  2. notify() 方法的正文中,使用者首先将传入的事件转换为交换对象 E,然后在路由中的下一个处理器上调用 process (),并将 Exchange 对象作为其参数。

调度的轮询模式

在调度的轮询模式中,消费者通过定期检查请求是否到达,从而检索传入请求。检查请求由内置计时器类、计划的 executor 服务自动调度,这是 java.util.concurrent 库提供的标准模式。调度的 executor 服务以定时间隔执行特定的任务,还管理一组用于运行任务实例的线程池。

图 38.4 “调度的轮询消费者” 显示调度的轮询消费者模式的概要。

图 38.4. 调度的轮询消费者

调度的轮询消费者

调度的轮询消费者处理传入请求,如下所示:

  1. 调度的 executor 服务有一组线程池,可用于启动消费者处理。在生成了每个调度的时间间隔后,调度的 executor 服务会尝试从池中获取空闲线程(默认为五个线程)。如果有可用线程,它将使用该线程来调用使用者上的 poll () 方法。
  2. 消费者的 poll () 方法旨在触发传入请求的处理。在 poll () 方法的正文中,使用者会尝试检索传入的消息。如果没有请求可用,则 poll () 方法会立即返回。
  3. 如果请求消息可用,使用者将其插入到交换对象中,然后在路由中的下一个处理器上调用 process (),将 Exchange 对象作为参数传递。

轮询模式

在轮询模式中,当第三方调用其中一个消费者轮询方法时,启动传入请求的处理:

  • receive()
  • receiveNoWait()
  • receive(long timeout)

这些组件的实施是用来定义启动轮询方法调用的精确机制。这种机制在轮询模式中没有指定。

图 38.5 “轮询消费者” 显示轮询消费者模式的概述。

图 38.5. 轮询消费者

轮询消费者

轮询使用者处理传入的请求,如下所示:

  1. 每当调用其中一个消费者的轮询方法时,都会启动传入请求的处理。调用这些轮询方法的机制由组件实施定义。
  2. receive () 方法的正文中,使用者会尝试检索传入请求消息。如果目前没有消息可用,则行为将取决于调用哪个接收方法。

    • receiveNoWait() returns immediately
    • receive (长超时) 等待指定的超时间隔[2] 在返回前
    • receive () 等待收到消息
  3. 如果请求消息可用,使用者将其插入到交换对象中,然后在路由中的下一个处理器上调用 process (),将 Exchange 对象作为参数传递。

38.1.4. 异步处理

概述

制作者端点在处理交换时通常遵循 同步 模式。当管道中的上述处理器在制作者上调用 process () 时,process () 方法会阻止,直到收到回复为止。在这种情况下,处理器的线程会保留阻止,直到制作者完成发送请求并接收回复的周期。

但有时候,您可能更倾向于将上述处理器与生产者分离,以便立即释放处理器的线程,而 process () 调用也 不会阻止。在这种情况下,您应该使用 异步 模式实施制作者,这为前一个处理器提供了调用 进程() 方法的非阻塞版本的选项。

为了让您了解不同的实施选项,本节将同时描述实施制作者端点的同步和异步模式。

同步制作者

图 38.6 “同步生产者” 显示同步制作者的概述,其中前面的处理器块直到生产者完成交换完成。

图 38.6. 同步生产者

同步生产者

同步制作者会按如下方式处理交换:

  1. 管道中的前面的处理器调用制作者上的 synchronous process () 方法来启动同步处理。同步 process () 方法采用单一交换参数。
  2. process () 方法的正文中,生产者将请求(消息)发送到端点。
  3. 如果交换模式要求,则制作者会等待回复(Out message)从端点到达。此步骤可能会导致 process () 方法无限期阻止。但是,如果交换模式不强制回复,则 process () 方法可以在发送请求后立即返回。
  4. process () 方法返回时,交换对象包含来自同步调用(an Out message)的回复。

异步制作者

图 38.7 “异步 Producer” 显示异步制作者的概述,生产者在子线程中处理交换,并且上述处理器不会因任何大量时间而阻止。

图 38.7. 异步 Producer

异步 Producer

异步制作者处理交换,如下所示:

  1. 在处理器可以调用异步 process () 方法之前,它必须创建一个 异步回调 对象,该对象负责处理路由返回部分的交换。对于异步回调,处理器必须实施从 AsyncCallback 接口继承的类。
  2. 处理器调用制作者上的异步 process () 方法来启动异步处理。异步 process () 方法采用两个参数:

    • Exchange 对象
    • 同步回调对象
  3. process () 方法的正文中,生产者会创建一个可运行的对象,用于封装处理代码。然后,生产者会将此 Runnable 对象的执行委托给一个子线程。
  4. 异步 process () 方法返回,从而释放处理器的线程。交换处理将继续处于单独的子线程。
  5. Runnable 对象将 In 消息发送到端点。
  6. 如果交换模式需要,可运行的 对象会等待回复(OutFault 消息)到达端点。在收到回复前,可运行的对象会保持阻止。
  7. 在回复到达后,可运行的 对象将回复(Out 消息)插入到交换对象中,然后在异步回调对象上调用 done ()。然后,异步回调负责处理回复消息(在子线程中执行)。


[2] 超时间隔通常以毫秒为单位指定。