5.2. Kafka 통합 실행

생산자 통합 실행

  1. 샘플 생산자 통합을 생성합니다. 이렇게 하면 10초마다 메시지로 주제가 채워집니다.

    Sample SaslSSLKafkaProducer.java

    // kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev
    // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003
    
    import org.apache.camel.builder.RouteBuilder;
    import org.apache.camel.component.kafka.KafkaConstants;
    
    public class SaslSSLKafkaProducer extends RouteBuilder {
      @Override
      public void configure() throws Exception {
      log.info("About to start route: Timer -> Kafka ");
      from("timer:foo")
        .routeId("FromTimer2Kafka")
        .setBody()
          .simple("Message #${exchangeProperty.CamelTimerCounter}")
        .to("kafka:{{producer.topic}}")
        .log("Message correctly sent to the topic!");
      }
    }

  2. 그런 다음 통합 절차를 실행합니다.

    kamel run --secret kafka-props SaslSSLKafkaProducer.java --dev

    생산자는 새 메시지를 생성하고 항목에 푸시하고 일부 정보를 기록합니다.

    [2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #1 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:11,854 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #3 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:11,973 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #5 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:12,970 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #7 - KafkaProducer[test]) Message correctly sent to the topic!
    [2] 2021-05-06 08:48:13,970 INFO  [FromTimer2Kafka] (Camel (camel-1) thread #9 - KafkaProducer[test]) Message correctly sent to the topic!

소비자 통합 실행

  1. 소비자 통합을 생성합니다.

    Sample SaslSSLKafkaProducer.java

    // kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev
    // camel-k: language=java dependency=mvn:org.apache.camel.quarkus:camel-quarkus-kafka dependency=mvn:io.strimzi:kafka-oauth-client:0.7.1.redhat-00003
    
    import org.apache.camel.builder.RouteBuilder;
    
    public class SaslSSLKafkaConsumer extends RouteBuilder {
      @Override
      public void configure() throws Exception {
    	log.info("About to start route: Kafka -> Log ");
    	from("kafka:{{consumer.topic}}")
        .routeId("FromKafka2Log")
        .log("${body}");
      }
    }

  2. 다른 쉘을 열고 다음 명령을 사용하여 소비자 통합을 실행합니다.

    kamel run --secret kafka-props SaslSSLKafkaConsumer.java --dev

    소비자는 주제에 있는 이벤트 로깅을 시작합니다.

    [1] 2021-05-06 08:51:08,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #8
    [1] 2021-05-06 08:51:10,065 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #9
    [1] 2021-05-06 08:51:10,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #10
    [1] 2021-05-06 08:51:11,991 INFO  [FromKafka2Log] (Camel (camel-1) thread #0 - KafkaConsumer[test]) Message #11