How to capture Data changes with Debezium

This tutorial will teach you how to use Debezium to capture changes from a Database. When a change happens in the database, you will see the resulting event streams captured by a Microprofile compatible Quarkus applications.

Debezium is a set of connectors for Apache Kafka Connect. Each connector ingests changes from a different database by leveraging the DB’s features for change data capture. Capturing Database changes are traditionally approached with polling, dual writes or relying on DB logs-based. On the other hand Debezium:

  • Ensures that all database changes are captured.
  • Produces change events with a very low delay while avoiding increased CPU usage required for frequent polling.
  • Requires no changes to your data model, such as a “Last Updated” column.
  • Can capture deletes.
  • Can capture old record state and additional metadata such as transaction ID and causing query, depending on the database’s capabilities and configuration.

To have an overview on the main components of Debezium we recommend having a look at this tutorial, which shows a step-by-step solution to start all required components: Getting started with Debezium

To get up to speed quickly, we will use the awesome stuff which is available in the debezium github repository: https://github.com/debezium/debezium-examples/tree/master/tutorial

Starting Debezium Kafka Connect and MySQL

There are several Debezium connectors available for most common Databases. In this tutorial we will use MySQL Debezium connector. There’s an handy docker-compose script to start the whole environment named: docker-compose-mysql.yaml

version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
  connect:
    image: debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

You need to have installed Docker and Docker Compose in order to run the above script. Check this article to learn how to install Docker compose: Orchestrate containers using Docker compose

In order to start the Debezium topology, from the shell execute:

$ export DEBEZIUM_VERSION=1.3
$ docker-compose -f docker-compose-mysql.yaml up 

That will take a while to download all container images, however once done, you will see that the following processes will be available:

$ docker ps --format '{{.Image}}'

debezium/connect:1.3
debezium/kafka:1.3
debezium/zookeeper:1.3
debezium/example-mysql:1.3

Next, we need to Start the MySQL Connector. That can be done with a simple REST POST which contains the Database data and the Kafka servers and topic we are using:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
    }
}

The above file, named register-mysql.json, can be posted as follows:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

HTTP/1.1 201 Created

Ok now we have our environment up and running and listening to changes. Let’s code a basic Quarkus application which will listen to Database changes.

Coding a Kafka Consumer

The simplest way to get started is creating a new Quarkus project with the following command:

mvn io.quarkus:quarkus-maven-plugin:1.9.0.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-debezium \
    -Dextensions="smallrye-reactive-messaging-kafka"

cd kafka-debezium

We will create a simple model Customer object to store the content of the Customers database table:

public class Customer {
    String first_name;
    String last_name;
    String email;

    public Customer(String first_name, String last_name, String email) {
        this.first_name = first_name;
        this.last_name = last_name;
        this.email = email;
    }

    @Override
    public String toString() {
        return "Customer{" +
                "first_name='" + first_name + '\'' +
                ", last_name='" + last_name + '\'' +
                ", email='" + email + '\'' +
                '}';
    }
}

Then, we will add a Bean consuming data from the “debezium-connector” channel. This can Bean be used to apply some conversions if needed. In our case, we just push the data captured to the “my-data-stream” channel which is an in-memory stream:

@ApplicationScoped
public class CustomerConverter {

    private static final double CONVERSION_RATE = 0.88;

    // Consume from the `debezium-connector` channel and produce to the `my-data-stream` channel
    @Incoming("debezium-connector")
    @Outgoing("my-data-stream")
    // Send to all subscribers
    @Broadcast
    @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING)
    public Customer process(Customer data) {

        System.out.println("Got changes from Customer table "+data);
        return data;
    }

}

Then, we will add a simple resource retrieving the “in-memory” “my-data-stream” and sending the items to a server sent event:

@Path("/customers")
public class CustomerResource {

    @Inject
    @Channel("my-data-stream")
    Publisher<Customer> prices;

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS) // denotes that server side events (SSE) will be produced
    @SseElementType("text/plain") // denotes that the contained data, within this SSE, is just regular text/plain data
    public Publisher<Customer> stream() {
        return prices;
    }
}

In order to capture the incoming Kafka Messages, we need to define our “debezium-connector” channel into the configuration file (application.properties) . Also, we will use a Deserializer named CustomerDeserializer which will transform the incoming JSON into a Customer object. Here is our application.properties file:

mp.messaging.incoming.debezium-connector.connector=smallrye-kafka
mp.messaging.incoming.debezium-connector.topic=dbserver1.inventory.customers
mp.messaging.incoming.debezium-connector.value.deserializer=org.acme.kafka.CustomerDeserializer

Here is our CustomerDeserializer Class which transforms the JSON payload into a Customer object by extending the JsonbDeserializer:

@RegisterForReflection
public class CustomerDeserializer extends JsonbDeserializer<Customer> {

    public CustomerDeserializer() {
        super(Customer.class);
    }

    @Override
    public Customer deserialize(String topic, byte[] data) {
        JsonReader reader = Json.createReader(new StringReader(new String(data)));
        JsonObject jsonObject = reader.readObject();

        JsonObject payload = jsonObject.getJsonObject("payload");
        String firstName = payload.getJsonObject("after").getString("first_name");
        String lastName = payload.getJsonObject("after").getString("last_name");
        String email = payload.getJsonObject("after").getString("email");
        return new Customer(firstName,lastName,email);

    }
}

The login contained in the Deserializer is pretty simple: the JSON change event is composed of the following parts:

{
 "schema": { 
   ...
  },
 "payload": { 
   ...
 }
}

In our case, we are interested into the “payload” section which contains a nested “after” JsonObject, with the changes in the Database schema.

To display the streamed data, you can add the following Javascript section in an HTML page:

<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
<script>
    var source = new EventSource("/customers/stream");
    source.onmessage = function (event) {
        document.getElementById("content").innerHTML = event.data;
    };
</script>

Now run your application as follows:

$ mvn install quarkus:dev

Then, retrieve the html page (in our case customer.html) to activate the Streaming of events:

debezium kafka connect

Just leave it there and let’s add some data into our Database.

Changing the Database

The simplest way to test our application is to run a mysql shell client into the running container. Use the following docker-compose shell command to re-use the docker-compose-mysql.yaml and log into MySQL:

$ docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 6
Server version: 5.7.32-log MySQL Community Server (GPL)

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> 

Just add one more customer to the customers table:

mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | [email protected] |
| 1002 | George     | Bailey    | [email protected]    |
| 1003 | Edward     | Walker    | [email protected]         |
| 1004 | Anne       | Kretchmar | [email protected]    |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)

mysql> insert into customers (first_name,last_name,email) values ('john','smith','[email protected]');
Query OK, 1 row affected (0.02 sec)

You will see that the HTML page reflects the change as the event has been captured by the Kafka consumer:

debezium kafka connect

Consuming Kafka messages from the shell

If you want to quickly test how to consume messages from a Debezium topic without writing an application, just use the tooling available in Kafka: the kafka-console-consumer.sh is all you need:

docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

You will see that all Database change events will be eventually displayed on the console.

Source code for this tutorial: https://github.com/fmarchioni/mastertheboss/tree/master/quarkus/debezium/kafka-quickstart