191.9. Kafka Headers propagation
可作为 Camel 2.22 提供
当消耗 Kafka 的消息时,标头会自动传播到 camel Exchange 标头。生成由同一行为支持的流 - 特定交换的 camel 标头将传播到 kafka 消息标头。
由于 kafka 标头仅允许 byte[]
值,因此要传播的 camel exchnage 标头应被序列化为 bytes[]
,否则将跳过标头。支持以下标头值类型: String
,Integer
,Long
, Double,
布尔值
,byte[]
。注: 从 kafka 传播到 camel Exchange 的所有标头都会默认包含 byte[]
值。为了覆盖默认的功能 uri 参数,可以为 kafkaHeaderDeserializer
设置为 来自
route 和 kafkaHeaderSerializer
的 kafkaHeaderSerializer。例如:
from("kafka:my_topic?kafkaHeaderDeserializer=#myDeserializer") ... .to("kafka:my_topic?kafkaHeaderSerializer=#mySerializer")
默认情况下,所有标头都由 KafkaHeaderFilterStrategy
过滤。策略过滤掉以 Camel
或 org.apache.camel
前缀开头的标头。可使用到路由中的 headerFilterStrategy
uri 参数来覆盖默认策略:
from("kafka:my_topic?headerFilterStrategy=#myStrategy") ... .to("kafka:my_topic?headerFilterStrategy=#myStrategy")
myStrategy
对象应该是 HeaderFilterStrategy
的子类,且必须手动放置在 Camel registry 中,或者将其注册为 Spring/Blueprint 中的 bean,因为它是 CamelContext
aware。