Chapter 3. Creating a connector to monitor the inventory database

After starting the Kafka, Debezium, and MySQL services, you are ready to create a connector instance to monitor the inventory database.

In this procedure, you will create the connector instance by creating a KafkaConnector Custom Resource (CR) that defines the connector instance, and then applying it. After applying the CR, the connector instance will start monitoring the inventory database’s binlog. The binlog records all of the database’s transactions (such as changes to individual rows and changes to the schemas). When a row in the database changes, Debezium generates a change event.

Note

Typically, you would likely use the Kafka tools to manually create the necessary topics, including specifying the number of replicas. However, for this tutorial, Kafka is configured to automatically create the topics with just one replica.

Procedure

  1. Open the examples/kafka-connect/kafka-connect-s2i-single-node-kafka.yaml file that you used to deploy Kafka Connect.

    Before you can create the MySQL connector instance, you must first enable connector resources in the KafkaConnectS2I Custom Resource (CR).

  2. In the metadata.annotations section, enable Kafka Connect to use connector resources.

    This example adds an annotation to the examples/kafka-connect/kafka-connect-s2i-single-node-kafka.yaml example file:

    kafka-connect-s2i-single-node-kafka.yaml

    apiVersion: kafka.strimzi.io/v1beta1
    kind: KafkaConnectS2I
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true"
    spec:
       ...

  3. Apply the updated kafka-connect-s2i-single-node-kafka.yaml file to update the KafkaConnectS2I CR.

    $ oc apply -f kafka-connect-s2i-single-node-kafka.yaml
  4. Create a MySQL connector instance to monitor the inventory database.

    This example creates a KafkaConnector CR that defines the MySQL connector instance:

    inventory-connector.yaml

      apiVersion: kafka.strimzi.io/v1beta1
      kind: KafkaConnector
      metadata:
        name: inventory-connector  1
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mysql.MySqlConnector
        tasksMax: 1  2
        config:  3
          database.hostname: mysql  4
          database.port: 3306
          database.user: debezium
          database.password: dbz
          database.server.id: 184054  5
          database.server.name: dbserver1  6
          database.whitelist: inventory  7
          database.history.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092  8
          database.history.kafka.topic: schema-changes.inventory  9

    1
    The name of the connector.
    2
    Only one task should operate at any one time. Because the MySQL connector reads the MySQL server’s binlog, using a single connector task ensures proper order and event handling. The Kafka Connect service uses connectors to start one or more tasks that do the work, and it automatically distributes the running tasks across the cluster of Kafka Connect services. If any of the services stop or crash, those tasks will be redistributed to running services.
    3
    The connector’s configuration.
    4
    The database host, which is the name of the container running the MySQL server (mysql).
    5 6
    A unique server ID and name. The server name is the logical identifier for the MySQL server or cluster of servers. This name will be used as the prefix for all Kafka topics.
    7
    Only changes in the inventory database will be detected.
    8 9
    The connector will store the history of the database schemas in Kafka using this broker (the same broker to which you are sending events) and topic name. Upon restart, the connector will recover the schemas of the database that existed at the point in time in the binlog when the connector should begin reading.
  5. Apply the connector instance.

    $ oc apply -f inventory-connector.yaml

    The inventory-connector connector is registered and starts to run against the inventory database.

  6. Verify that inventory-connector was created and has started to monitor the inventory database.

    You can verify the connector instance by watching the Kafka Connect log output as inventory-connector starts.

    1. Display the Kafka Connect log output:

      $ oc logs $(oc get pods -o name -l strimzi.io/name=my-connect-cluster-connect)
    2. Review the log output and verify that the initial snapshot has been executed.

      These lines show that the initial snapshot has started:

      ...
      2020-02-21 17:57:30,801 INFO Starting snapshot for jdbc:mysql://mysql:3306/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL&connectTimeout=30000 with user 'debezium' with locking mode 'minimal' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,805 INFO Snapshot is using user 'debezium' with these MySQL grants: (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      The snapshot involves a number of steps:

      ...
      2020-02-21 17:57:30,822 INFO Step 0: disabling autocommit, enabling repeatable read transactions, and setting lock wait timeout to 10 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,836 INFO Step 1: flush and obtain global read lock to prevent writes to database (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,839 INFO Step 2: start transaction with consistent snapshot (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,840 INFO Step 3: read binlog position of MySQL master (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:30,843 INFO 	 using binlog 'mysql-bin.000003' at position '154' and gtid '' (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...
      2020-02-21 17:57:34,423 INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      2020-02-21 17:57:34,424 INFO Completed snapshot in 00:00:03.632 (io.debezium.connector.mysql.SnapshotReader) [debezium-mysqlconnector-dbserver1-snapshot]
      ...

      After completing the snapshot, Debezium begins monitoring the inventory database’s binlog for change events:

      ...
      2020-02-21 17:57:35,584 INFO Transitioning from the snapshot reader to the binlog reader (io.debezium.connector.mysql.ChainedReader) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,613 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [task-thread-inventory-connector-0]
      2020-02-21 17:57:35,630 INFO Creating thread debezium-mysqlconnector-dbserver1-binlog-client (io.debezium.util.Threads) [blc-mysql:3306]
      Feb 21, 2020 5:57:35 PM com.github.shyiko.mysql.binlog.BinaryLogClient connect
      INFO: Connected to mysql:3306 at mysql-bin.000003/154 (sid:184054, cid:5)
      2020-02-21 17:57:35,775 INFO Connected to MySQL binlog at mysql:3306, starting at binlog file 'mysql-bin.000003', pos=154, skipping 0 events plus 0 rows (io.debezium.connector.mysql.BinlogReader) [blc-mysql:3306]
      ...