1.3. 将操作应用到连接中的数据

如果要在 Kamelet 和 event 频道之间传递数据执行操作,请使用 action Kamelets 作为 Kamelet 中的中间步骤。例如,您可以使用操作 Kamelet 序列化或反序列化数据,过滤数据,或者插入字段或消息标头。

操作操作,如过滤或添加字段,仅适用于 JSON 数据(即,当 Content-Type 标头设置为 application/json时)。如果事件数据使用 JSON 以外的格式(如 Avro 或 Protocol Buffers),您必须通过添加反序列化步骤(例如,引用 protobuf-deserialize-actionavro-deserialize-action Kamelet)来转换数据格式(例如: 它引用了 protobuf-serialize-actionavro-serialize-action Kamelet)。有关转换连接中数据格式的更多信息,请参阅数据转换 Kamelets

action Kamelets 包括:

1.3.1. 在 Kamelet Binding 中添加操作

要实施一个操作 Kamelet,在 Kamelet 文件的 spec 部分,在 source 和 sink 部分之间添加一个 steps 部分。

先决条件

  • 您已创建了一个 Kamelet Binding,如 Kamelet Binding 中的 Connecting source 和 sink 组件 所述。
  • 您知道要添加到 Kamelet 的哪个操作 Kamelet 以及 action Kamelet 的必要参数。

    对于此过程中的示例,predicate-filter-action Kamelet 的参数是一个 字符串类型 expression,它提供了一个 JSON Path Expression,它只过滤 coffee 数据到具有 "deep" taste intensity 的日志中。请注意,predicate-filter-action Kamelet 要求您在 Kamelet Binding 中设置 Builder 特征配置属性。

    这个示例还包括反序列化和序列化操作,在本例中是可选的,因为事件数据格式是 JSON。

流程

  1. 在编辑器中打开 KameletBinding 文件。

    例如,以下是 coffee-to-log.yaml 文件的内容:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffee-to-log
    spec:
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
  2. source 部分上方添加一个 集成 部分,并提供以下 Builder 特征配置属性(根据 predicate-filter-action Kamelet 要求):

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffee-to-log
    spec:
      integration:
             traits:
               builder:
                 configuration:
                   properties:
                     - "quarkus.arc.unremovable- types=com.fasterxml.jackson.databind.ObjectMapper"
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
  3. 源和 接收器 部分之间添加 steps 部分并定义 action Kamelet。例如:

    apiVersion: camel.apache.org/v1alpha1
    kind: KameletBinding
    metadata:
      name: coffee-to-log
      spec:
        integration:
               traits:
                 builder:
                   configuration:
                     properties:
                       - "quarkus.arc.unremovable-types=com.fasterxml.jackson.databind.ObjectMapper"
      source:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: coffee-source
        properties:
          period: 5000
      steps:
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: json-deserialize-action
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: predicate-filter-action
        properties:
          expression: "@.intensifier =~ /.*deep/"
      - ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: json-serialize-action
      sink:
        ref:
          kind: Kamelet
          apiVersion: camel.apache.org/v1alpha1
          name: log-sink
  4. 保存您的更改。
  5. 使用 oc apply 命令来更新 KameletBinding 资源,例如:

    oc apply -f coffee-to-log.yaml

    Camel K operator re-generates 并运行基于更新的 KameletBinding 资源生成的 CamelK 集成。

  6. 查看 Kamelet Binding 的状态:

    oc get kameletbindings

  7. 查看其对应的集成状态:

    oc get integrations

  8. 查看集成的日志文件输出:

    kamel logs <integration-name>

    例如,如果集成名称为 coffee-to-log

    kamel logs coffee-to-log

  9. 要停止集成,删除 Kamelet Binding:

    oc delete kameletbindings/<kameletbinding-name>

    例如:

    oc delete kameletbindings/coffee-to-log