304.5.4. transacted Batch Consumers & Producers

SJMS 组件旨在支持对 Producer 和 Consumer 端点上的本地 JMS 事务的批处理。它们的处理方式各不相同。

SJMS 使用者端点是一种简单的实施,可以在将 X 消息提交至关联的会话之前处理 X 消息。要在使用者上启用批处理事务,首先将 transacted 参数设置为 true,然后添加 transactionBatchCount 并将其设置为大于 0 的值。例如,以下配置将每 10 个消息提交 Session:

sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10

如果在处理消费者端点上的批处理过程中发生异常,则会调用 Session rollback,从而导致消息重放给下一个可用的消费者。该计数器还被重置为相关会话的 BatchTransactionCommitStrategy 的 0。用户负责确保其批处理消息处理器中的 hook 监视 JMSRedelivered 标头设置为 true 的消息。这是指示消息在某一时间点回滚,并且应进行成功处理验证。

翻译的批处理消费者还附带的内部计时器实例,在消息之间等待默认时间(5000ms),然后提交会话上的开放事务。默认值为 5000ms (最小 1000ms)应该足以满足大多数用例,但是如果需要进一步调整,只需要设置 transactionBatchTimeout 参数。

sjms:queue:transacted.batch.consumer?transacted=true&transactionBatchCount=10&transactionBatchTimeout=2000

可以接受的最小值是 1000ms,因为上下文切换的数量可能会导致不必要的性能影响,而不获得好处。

生产者端点处理方式有不同。在每条消息传送给 Exchange 的目标后,生产者都关闭,不再引用该消息。为了让所有消息可用于重新传送,您只需在发布 BatchMessages 的 Producer Endpoints 上启用事务。事务将在交换结束时提交,其中包含批处理列表中的所有消息。不需要配置任何其他操作。例如:

List<BatchMessage<String>> messages = new ArrayList<BatchMessage<String>>();
for (int i = 1; i <= messageCount; i++) {
    String body = "Hello World " + i;
    BatchMessage<String> message = new BatchMessage<String>(body, null);
    messages.add(message);
}

现在发布启用了事务的列表:

template.sendBody("sjms:queue:batch.queue?transacted=true", messages);