第 37 章 producer 和 Consumer 模板

摘要

Apache Camel 中的制作者和消费者模板按照 Spring 容器 API 的功能建模,其中可通过简化、易用的 API(称为 模板 )来提供对资源的访问。对于 Apache Camel,生产者模板和使用者模板提供了从生产者端点和消费者端点发送到和接收消息的简化接口。

37.1. 使用 Producer 模板

37.1.1. Producer 模板介绍

概述

producer 模板支持多种不同方法来调用制作者端点。有方法支持请求消息的不同格式(作为 Exchange 对象、作为邮件正文、带有单一标头设置的消息正文,等等)的方法支持同步和异步调用方式。总体而言,制作者模板方法可分为以下类别:

或者,请参阅 第 37.2 节 “使用 Fluent Producer 模板”

同步调用

异步调用端点的方法具有 发送Suffix() 形式的名称 并请求Suffix ()。例如,使用默认消息交换模式(MEP)或明确指定 MEP 的调用端点的方法命名为 send()、 sendBody()sendBodyAndHeader() (其中这些方法分别发送 Exchange 对象、邮件正文或消息正文和标题值)。如果您想要强制 MEP 成为 InOut (请求/回复语义),您可以改为调用 request()、 requestBody()requestBodyAndHeader() 方法。

以下示例演示了如何创建 ProducerTemplate 实例,并使用它来向 activemq:MyQueue 端点发送消息正文。这个示例还演示了如何使用 sendBodyAndHeader() 发送消息正文和标头值。

import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultProducerTemplate
...
ProducerTemplate template = context.createProducerTemplate();

// Send to a specific queue
template.sendBody("activemq:MyQueue", "<hello>world!</hello>");

// Send with a body and header
template.sendBodyAndHeader(
    "activemq:MyQueue",
    "<hello>world!</hello>",
    "CustomerRating", "Gold" );

使用处理器同步调用

一个特殊的同步调用,当您通过 Processor 参数(而非 Exchange 参数)提供 send() 方法。在这种情况下,制作者模板隐式询问指定的端点来创建 Exchange 实例(通常通常不会默认具有 InOnly MEP)。然后,此默认交换会传递到处理器,初始化交换对象的内容。

以下示例演示了如何将 MyProcessor 处理器初始化的交换发送到 activemq:MyQueue 端点。

import org.apache.camel.ProducerTemplate
import org.apache.camel.impl.DefaultProducerTemplate
...
ProducerTemplate template = context.createProducerTemplate();

// Send to a specific queue, using a processor to initialize
template.send("activemq:MyQueue", new MyProcessor());

MyProcessor 类按照下例中所示实施。除了设置 In 消息正文(如此处所示),您也可以初始化邮件标题和交换属性。

import org.apache.camel.Processor;
import org.apache.camel.Exchange;
...
public class MyProcessor implements Processor {
    public MyProcessor() { }

    public void process(Exchange ex) {
        ex.getIn().setBody("<hello>world!</hello>");
    }
}

异步调用

异步 调用端点的方法具有格式 asyncSendSuffix()asyncRequestSuffix()。例如,使用默认消息交换模式(MEP)或明确指定 MEP 的调用端点的方法命名为 asyncSend()asyncSendBody() (其中,这两种方法分别发送 Exchange 对象或消息正文)。如果您想要强制 MEP 成为 InOut (请求/reply 语义),您可以调用 asyncRequestBody()、 asyncRequestBodyAndHeader() 以及 asyncRequestBodyAndHeaders() 方法。

以下示例演示了如何异步向 direct:start 端点发送交换。asyncSend() 方法返回 java.util.concurrent.Future 对象,用于稍后检索调用结果。

import java.util.concurrent.Future;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
...
Exchange exchange = new DefaultExchange(context);
exchange.getIn().setBody("Hello");

Future<Exchange> future = template.asyncSend("direct:start", exchange);

// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the resulting exchange from the Future
Exchange result = future.get();

producer 模板还提供了以异步方式发送消息正文的方法(例如,使用 asyncSendBody()asyncRequestBody())。在这种情况下,您可以使用以下帮助程序方法之一从 Future 对象中提取返回的消息正文:

<T> T extractFutureBody(Future future, Class<T> type);
<T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException;

extractFutureBody() 方法的第一个版本将阻止,直到调用完成并且回复消息可用。extractFutureBody() 方法的第二个版本允许您指定超时。两种方法都具有类型参数,键入,它使用内置的类型转换器将返回的消息正文转换为指定的类型。

以下示例演示了如何使用 asyncRequestBody() 方法向 direct:start 端点发送消息正文。然后,使用块 extractFutureBody() 方法从 Future 对象检索回复消息正文。

Future<Object> future = template.asyncRequestBody("direct:start", "Hello");

// You can do other things, whilst waiting for the invocation to complete
...
// Now, retrieve the reply message body as a String type
String result = template.extractFutureBody(future, String.class);

使用回调的异步调用

在前面的异步示例中,请求消息会在子线程中发送,同时由主线程检索和处理回复。制作者模板还为您提供 选项;但是,使用 asyncCallback()asyncCallbackSendBody()asyncCallbackRequestBody() 方法来处理回复。在这种情况下,您可以提供一个回调对象( org.apache.camel.impl.SynchronizationAdapter 类型),它会在回复消息到达后立即在子线程中调用它。

同步 回调接口定义如下:

package org.apache.camel.spi;

import org.apache.camel.Exchange;

public interface Synchronization {
    void onComplete(Exchange exchange);
    void onFailure(Exchange exchange);
}

在收到普通回复时调用 onComplete() 方法,并在收到错误消息回复时调用 onFailure() 方法。只有其中一种方法可以返回,因此您必须覆盖这两者,以确保处理所有类型的答复。

以下示例演示了如何将交换发送到 direct:start 端点,该端点由 SynchronizationAdapter 回调对象在子线程中处理答复信息。

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.SynchronizationAdapter;
...
Exchange exchange = context.getEndpoint("direct:start").createExchange();
exchange.getIn().setBody("Hello");

Future<Exchange> future = template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() {
    @Override
    public void onComplete(Exchange exchange) {
        assertEquals("Hello World", exchange.getIn().getBody());
    }
});

如果 SynchronizationAdapter 类是 Synchronization 界面的默认实现,您可以覆盖它以提供 Complete()和 onFailure() 回调方法自己的定义。

您仍然可以选择从主线程访问回复,因为 asyncCallback() 方法也返回 Future 对象是:

// Retrieve the reply from the main thread, specifying a timeout
Exchange reply = future.get(10, TimeUnit.SECONDS);

37.1.2. 同步发送

概述

同步发送 方法是您可以用来调用制作者端点的方法集合,即当前线程块,直到方法调用完成并且收到回复(若有)。这些方法与任何消息交换协议兼容。

发送交换

基本 send() 方法是一种通用的方法,它利用交换模式(MEP)将 Exchange 对象的内容发送到端点。返回值是您由制作者端点处理后获得的交换(可能含有 Out 消息,具体取决于 MEP)。

发送交换的 send() 方法有三个变体,可让您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI,或作为 Endpoint 对象。

Exchange send(Exchange exchange);
Exchange send(String endpointUri, Exchange exchange);
Exchange send(Endpoint endpoint, Exchange exchange);

发送由处理器填充的交换

常规 send() 方法的一个简单变体是使用处理器来填充默认交换,而不是显式提供交换对象(详情请参阅 “使用处理器同步调用”一节 )。

发送由处理器填充的交换的 send() 方法可让您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI,或作为 Endpoint 对象。另外,您可以通过提供 模式 参数来指定交换的 MEP,而不必接受默认值。

Exchange send(Processor processor);
Exchange send(String endpointUri, Processor processor);
Exchange send(Endpoint endpoint, Processor processor);
Exchange send(
    String endpointUri,
    ExchangePattern pattern,
    Processor processor
);
Exchange send(
    Endpoint endpoint,
    ExchangePattern pattern,
    Processor processor
);

发送消息正文

如果您只关注要发送的消息正文内容,您可以使用 sendBody() 方法将消息正文作为参数提供,并让制作者模板将正文插入到默认交换对象中。

sendBody() 方法可让您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI,或作为 Endpoint 对象。另外,您可以通过提供 模式 参数来指定交换的 MEP,而不必接受默认值。没有 模式 参数返回 void 的方法(即使调用可能会引发回复);并且带有 pattern 参数的方法返回 Out 消息的正文(如果存在一个)或 In 消息的正文(否则为跳过)。

void sendBody(Object body);
void sendBody(String endpointUri, Object body);
void sendBody(Endpoint endpoint, Object body);
Object sendBody(
    String endpointUri,
    ExchangePattern pattern,
    Object body
);
Object sendBody(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body
);

发送邮件正文和标头。

出于测试目的,尝试 单个 标头设置的影响通常是对于这种标头测试很有用的,而 sendBodyAndHeader() 方法也很有用。您将消息正文和标头设置作为环境变量提供 sendBodyAndHeader(),并让制作者模板负责将 body 和 header 设置插入到默认交换对象中。

sendBodyAndHeader() 方法可让您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI,或作为 Endpoint 对象。另外,您可以通过提供 模式 参数来指定交换的 MEP,而不必接受默认值。没有 模式 参数返回 void 的方法(即使调用可能会引发回复);并且带有 pattern 参数的方法返回 Out 消息的正文(如果存在一个)或 In 消息的正文(否则为跳过)。

void sendBodyAndHeader(
    Object body,
    String header,
    Object headerValue
);
void sendBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
void sendBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
Object sendBodyAndHeader(
    String endpointUri,
    ExchangePattern pattern,
    Object body,
    String header,
    Object headerValue
);
Object sendBodyAndHeader(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    String header,
    Object headerValue
);

sendBodyAndHeaders() 方法与 sendBodyAndHeader() 方法类似,除了除了仅提供单个标头设置外,这两种方法都允许您指定标头设置的完整散列映射。

void sendBodyAndHeaders(
    Object body,
    Map<String, Object> headers
);
void sendBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
void sendBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
Object sendBodyAndHeaders(
    String endpointUri,
    ExchangePattern pattern,
    Object body,
    Map<String, Object> headers
);
Object sendBodyAndHeaders(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    Map<String, Object> headers
);

发送消息正文和交换属性

您可以使用 sendBodyAndProperty() 方法尝试设置单个交换属性的效果。您将消息正文和属性设置作为环境变量提供 sendBodyAndProperty(),并让制作者模板负责将正文和交换属性插入到默认交换对象中。

sendBodyAndProperty() 方法可让您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI,或作为 Endpoint 对象。另外,您可以通过提供 模式 参数来指定交换的 MEP,而不必接受默认值。没有 模式 参数返回 void 的方法(即使调用可能会引发回复);并且带有 pattern 参数的方法返回 Out 消息的正文(如果存在一个)或 In 消息的正文(否则为跳过)。

void sendBodyAndProperty(
    Object body,
    String property,
    Object propertyValue
);
void sendBodyAndProperty(
    String endpointUri,
    Object body,
    String property,
    Object propertyValue
);
void sendBodyAndProperty(
    Endpoint endpoint,
    Object body,
    String property,
    Object propertyValue
);
Object sendBodyAndProperty(
    String endpoint,
    ExchangePattern pattern,
    Object body,
    String property,
    Object propertyValue
);
Object sendBodyAndProperty(
    Endpoint endpoint,
    ExchangePattern pattern,
    Object body,
    String property,
    Object propertyValue
);

37.1.3. 通过 InOut Pattern 进行同步请求

概述

同步请求 方法与同步发送方法类似,除了请求方法强制消息交换模式强制为 InOut (与 request/reply 语义一致)。因此,如果您期望从制作者端点接收回复,通常使用同步请求方法。

请求一个由处理器填充的交换

basic request() 方法是一个通用的通用方法,它使用处理器填充默认交换方法,并强制消息交换模式是 InOut (因此调用 obeys 请求/reply语义)。return 值是您由制作者端点处理后获取的交换,其中 Out 消息包含回复消息。

用于发送交换的 request() 方法可让您使用以下方法之一指定目标端点: 作为端点 URI,或作为 Endpoint 对象。

Exchange request(String endpointUri, Processor processor);
Exchange request(Endpoint endpoint, Processor processor);

请求消息正文

如果您只关注请求和回复中消息正文的内容,您可以使用 requestBody() 方法以参数形式提供请求消息正文,并让制作者模板负责将正文插入到默认交换对象中。

requestBody() 方法可让您使用以下方法之一指定目标端点: 作为默认端点,作为端点 URI,或作为 Endpoint 对象。返回值是回复消息的正文(Out message body),它可以作为普通 对象 返回或转换为特定类型的 T,使用内置的类型转换器(请参阅 第 34.3 节 “内置(In Type Converters)”)。

Object requestBody(Object body);
<T> T requestBody(Object body, Class<T> type);
Object requestBody(
    String endpointUri,
    Object body
);
<T> T requestBody(
    String endpointUri,
    Object body,
    Class<T> type
);
Object requestBody(
    Endpoint endpoint,
    Object body
);
<T> T requestBody(
    Endpoint endpoint,
    Object body,
    Class<T> type
);

请求消息正文和标头。

您可以使用 requestBodyAndHeader() 方法尝试设置单个标头值的效果。您将消息正文和标头设置作为环境变量提供 requestBodyAndHeader(),并让制作者模板负责将正文和交换属性插入到默认交换对象中。

requestBodyAndHeader() 方法可让您使用以下方法之一指定目标端点: 作为端点 URI,或作为 Endpoint 对象。返回值是回复消息的正文(Out message body),它可以作为普通 对象 返回或转换为特定类型的 T,使用内置的类型转换器(请参阅 第 34.3 节 “内置(In Type Converters)”)。

Object requestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
<T> T requestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
Object requestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
<T> T requestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);

requestBodyAndHeaders() 方法与 requestBodyAndHeader() 方法类似,除了除了仅提供一个标头设置外,这两种方法都允许您指定标头设置的完整散列映射。

Object requestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);
Object requestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
<T> T requestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);

37.1.4. 异步发送

概述

producer 模板提供了各种方法来异步调用制作者端点,因此主线程在等待调用完成并且稍后可以检索回复消息时不会阻断。本节中描述的异步发送方法与任何消息交换协议兼容。

发送交换

基本 asyncSend() 方法采用 Exchange 参数,并异步调用端点,并使用指定交换的消息交换模式(MEP)。返回值是一个 java.util.concurrent.Future 对象,它是一个 ticket,您可以在以后用来收集回复信息。有关如何从 Future 对象获取返回值的详情,请参考 “异步调用”一节

以下 asyncSend() 方法可让您使用以下方法之一指定目标端点: 作为端点 URI,或作为 Endpoint 对象。

Future<Exchange> asyncSend(String endpointUri, Exchange exchange);
Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);

发送由处理器填充的交换

常规 asyncSend() 方法的简单变体是使用处理器来填充默认交换,而不是显式提供交换对象。

以下 asyncSend() 方法可让您使用以下方法之一指定目标端点: 作为端点 URI,或作为 Endpoint 对象。

Future<Exchange> asyncSend(String endpointUri, Processor processor);
Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);

发送消息正文

如果您只关注要发送的消息正文内容,您可以使用 asyncSendBody() 方法异步发送消息正文,并让制作者模板将正文插入到默认交换对象中。

asyncSendBody() 方法可让您使用以下方法之一指定目标端点: 作为端点 URI,或作为 Endpoint 对象。

Future<Object> asyncSendBody(String endpointUri, Object body);
Future<Object> asyncSendBody(Endpoint endpoint, Object body);

37.1.5. 使用 InOut Pattern 的异步请求

概述

异步请求 方法与异步发送方法类似,除了请求方法强制消息交换模式强制为 InOut (显示 request/reply语义)。因此,如果您期望从制作者端点接收回复,通常会方便使用异步请求方法。

请求消息正文

如果您只关注请求和回复中消息正文的内容,您可以使用 requestBody() 方法以参数形式提供请求消息正文,并让制作者模板负责将正文插入到默认交换对象中。

asyncRequestBody() 方法可让您使用以下方法之一指定目标端点: 作为端点 URI,或作为 Endpoint 对象。从 Future 对象检索的返回值是回复消息的正文(传出消息正文),可以作为纯文本对象或转换为特定类型的 T (使用内置类型转换器)返回(请参见 “异步调用”一节)。

Future<Object> asyncRequestBody(
    String endpointUri,
    Object body
);
<T> Future<T> asyncRequestBody(
    String endpointUri,
    Object body,
    Class<T> type
);
Future<Object> asyncRequestBody(
    Endpoint endpoint,
    Object body
);
<T> Future<T> asyncRequestBody(
    Endpoint endpoint,
    Object body,
    Class<T> type
);

请求消息正文和标头。

您可以使用 asyncRequestBodyAndHeader() 方法尝试设置单个标头值的效果。您将消息正文和标头设置作为环境变量提供给 asyncRequestBodyAndHeader(),并让制作者模板负责将正文和交换属性插入默认交换对象。

asyncRequestBodyAndHeader() 方法可让您使用以下方法之一指定目标端点: 作为端点 URI,或作为 Endpoint 对象。从 Future 对象检索的返回值是回复消息的正文(传出消息正文),可以作为纯文本对象或转换为特定类型的 T (使用内置类型转换器)返回(请参见 “异步调用”一节)。

Future<Object> asyncRequestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
    String endpointUri,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);
Future<Object> asyncRequestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue
);
<T> Future<T> asyncRequestBodyAndHeader(
    Endpoint endpoint,
    Object body,
    String header,
    Object headerValue,
    Class<T> type
);

asyncRequestBodyAndHeaders() 方法与 asyncRequestBodyAndHeader() 方法类似,除了提供单个标头设置外,这两种方法都允许您指定完整的哈希映射。

Future<Object> asyncRequestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
    String endpointUri,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);
Future<Object> asyncRequestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers
);
<T> Future<T> asyncRequestBodyAndHeaders(
    Endpoint endpoint,
    Object body,
    Map<String, Object> headers,
    Class<T> type
);

37.1.6. 使用回调进行异步发送

概述

制作者模板也提供用于调用制作者端点的同一子线程中处理回复消息的选项。在这种情况下,您提供一个回调对象,当收到回复消息时,该对象会在子线程中自动调用。换而言之,使用回调方法的异步发送 可让您在主线程中发起调用,然后让制作者端点的所有关联处理在制作器端点的关联处理会,等待回复和处理在子线程中异步处理回复。

发送交换

基本 asyncCallback() 方法采用 交换 参数,并异步调用端点,并使用指定交换的消息交换模式(MEP)。此方法类似于用于交换的 asyncSend() 方法,但它使用额外的 org.apache.camel.spi.Synchronization 参数,它是一个带有以下两个方法的回调接口: 在Complete()onFailure() 上。有关如何使用 Synchronization 回调的详情,请参考 “使用回调的异步调用”一节

以下 asyncCallback() 方法可让您使用以下方法之一指定目标端点: 作为端点 URI,或作为 Endpoint 对象。

Future<Exchange> asyncCallback(
    String endpointUri,
    Exchange exchange,
    Synchronization onCompletion
);
Future<Exchange> asyncCallback(
    Endpoint endpoint,
    Exchange exchange,
    Synchronization onCompletion
);

发送由处理器填充的交换

处理器的 asyncCallback() 方法调用处理器以填充默认交换,并强制消息交换模式为 InOut (因此调用 obeys 请求/reply 语义)。

以下 asyncCallback() 方法可让您使用以下方法之一指定目标端点: 作为端点 URI,或作为 Endpoint 对象。

Future<Exchange> asyncCallback(
    String endpointUri,
    Processor processor,
    Synchronization onCompletion
);
Future<Exchange> asyncCallback(
    Endpoint endpoint,
    Processor processor,
    Synchronization onCompletion
);

发送消息正文

如果您只关注要发送的消息正文内容,您可以使用 asyncCallbackSendBody() 方法以异步发送消息正文,并让制作者模板负责将正文插入默认交换对象。

asyncCallbackSendBody() 方法可让您使用以下方法之一指定目标端点: 作为端点 URI 或一个 Endpoint 对象。

Future<Object> asyncCallbackSendBody(
    String endpointUri,
    Object body,
    Synchronization onCompletion
);
Future<Object> asyncCallbackSendBody(
    Endpoint endpoint,
    Object body,
    Synchronization onCompletion
);

请求消息正文

如果您只关注请求和回复中消息正文的内容,您可以使用 asyncCallbackRequestBody() 方法将请求消息正文作为参数提供,并让制作者模板将正文插入默认交换对象。

asyncCallbackRequestBody() 方法可让您使用以下方法之一指定目标端点: 作为端点 URI,或作为 Endpoint 对象。

Future<Object> asyncCallbackRequestBody(
    String endpointUri,
    Object body,
    Synchronization onCompletion
);
Future<Object> asyncCallbackRequestBody(
    Endpoint endpoint,
    Object body,
    Synchronization onCompletion
);