How to capture Data changes with Debezium

This tutorial will teach you how to use Debezium to capture changes from a Database. When a change happens in the database, you will see the resulting event streams captured by a Microprofile compatible Quarkus applications.

Debezium is a set of connectors for Apache Kafka Connect. Each connector ingests changes from a different database by leveraging the DB’s features for change data capture. Capturing Database changes are traditionally approached with polling, dual writes or relying on DB logs-based. On the other hand Debezium:

  • Ensures that all database changes are captured.
  • Produces change events with a very low delay while avoiding increased CPU usage required for frequent polling.
  • Requires no changes to your data model, such as a “Last Updated” column.
  • Can capture deletes.
  • Can capture old record state and additional metadata such as transaction ID and causing query, depending on the database’s capabilities and configuration.

To have an overview on the main components of Debezium we recommend having a look at this tutorial, which shows a step-by-step solution to start all required components: Getting started with Debezium

To get up to speed quickly, we will use the awesome stuff which is available in the debezium github repository:

Starting Debezium Kafka Connect and MySQL

There are several Debezium connectors available for most common Databases. In this tutorial we will use MySQL Debezium connector. There’s an handy docker-compose script to start the whole environment named: docker-compose-mysql.yaml

version: '2'
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
     - 2181:2181
     - 2888:2888
     - 3888:3888
    image: debezium/kafka:${DEBEZIUM_VERSION}
     - 9092:9092
     - zookeeper
     - ZOOKEEPER_CONNECT=zookeeper:2181
    image: debezium/example-mysql:${DEBEZIUM_VERSION}
     - 3306:3306
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
    image: debezium/connect:${DEBEZIUM_VERSION}
     - 8083:8083
     - kafka
     - mysql
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

You need to have installed Docker and Docker Compose in order to run the above script. Check this article to learn how to install Docker compose: Orchestrate containers using Docker compose

In order to start the Debezium topology, from the shell execute:

$ docker-compose -f docker-compose-mysql.yaml up 

That will take a while to download all container images, however once done, you will see that the following processes will be available:

$ docker ps --format '{{.Image}}'


Next, we need to Start the MySQL Connector. That can be done with a simple REST POST which contains the Database data and the Kafka servers and topic we are using:

    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "": "184054",
        "": "dbserver1",
        "database.whitelist": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"

The above file, named register-mysql.json, can be posted as follows:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

HTTP/1.1 201 Created

Ok now we have our environment up and running and listening to changes. Let’s code a basic Quarkus application which will listen to Database changes.

Coding a Kafka Consumer

The simplest way to get started is creating a new Quarkus project with the following command:

mvn io.quarkus:quarkus-maven-plugin:1.9.0.Final:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-debezium \

cd kafka-debezium

We will create a simple model Customer object to store the content of the Customers database table:

public class Customer {
    String first_name;
    String last_name;
    String email;

    public Customer(String first_name, String last_name, String email) {
        this.first_name = first_name;
        this.last_name = last_name; = email;

    public String toString() {
        return "Customer{" +
                "first_name='" + first_name + '\'' +
                ", last_name='" + last_name + '\'' +
                ", email='" + email + '\'' +

Then, we will add a Bean consuming data from the “debezium-connector” channel. This can Bean be used to apply some conversions if needed. In our case, we just push the data captured to the “my-data-stream” channel which is an in-memory stream:

public class CustomerConverter {

    private static final double CONVERSION_RATE = 0.88;

    // Consume from the `debezium-connector` channel and produce to the `my-data-stream` channel
    // Send to all subscribers
    public Customer process(Customer data) {

        System.out.println("Got changes from Customer table "+data);
        return data;


Then, we will add a simple resource retrieving the “in-memory” “my-data-stream” and sending the items to a server sent event:

public class CustomerResource {

    Publisher<Customer> prices;

    @Produces(MediaType.SERVER_SENT_EVENTS) // denotes that server side events (SSE) will be produced
    @SseElementType("text/plain") // denotes that the contained data, within this SSE, is just regular text/plain data
    public Publisher<Customer> stream() {
        return prices;

In order to capture the incoming Kafka Messages, we need to define our “debezium-connector” channel into the configuration file ( . Also, we will use a Deserializer named CustomerDeserializer which will transform the incoming JSON into a Customer object. Here is our file:


Here is our CustomerDeserializer Class which transforms the JSON payload into a Customer object by extending the JsonbDeserializer:

public class CustomerDeserializer extends JsonbDeserializer<Customer> {

    public CustomerDeserializer() {

    public Customer deserialize(String topic, byte[] data) {
        JsonReader reader = Json.createReader(new StringReader(new String(data)));
        JsonObject jsonObject = reader.readObject();

        JsonObject payload = jsonObject.getJsonObject("payload");
        String firstName = payload.getJsonObject("after").getString("first_name");
        String lastName = payload.getJsonObject("after").getString("last_name");
        String email = payload.getJsonObject("after").getString("email");
        return new Customer(firstName,lastName,email);


The login contained in the Deserializer is pretty simple: the JSON change event is composed of the following parts:

 "schema": { 
 "payload": { 

In our case, we are interested into the “payload” section which contains a nested “after” JsonObject, with the changes in the Database schema.

To display the streamed data, you can add the following Javascript section in an HTML page:

<script src=""></script>
    var source = new EventSource("/customers/stream");
    source.onmessage = function (event) {
        document.getElementById("content").innerHTML =;

Now run your application as follows:

$ mvn install quarkus:dev

Then, retrieve the html page (in our case customer.html) to activate the Streaming of events:

Just leave it there and let’s add some data into our Database.

Changing the Database

The simplest way to test our application is to run a mysql shell client into the running container. Use the following docker-compose shell command to re-use the docker-compose-mysql.yaml and log into MySQL:

$ docker-compose -f docker-compose-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 6
Server version: 5.7.32-log MySQL Community Server (GPL)

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.


Just add one more customer to the customers table:

mysql> select * from customers;
| id   | first_name | last_name | email                 |
| 1001 | Sally      | Thomas    | |
| 1002 | George     | Bailey    |    |
| 1003 | Edward     | Walker    |         |
| 1004 | Anne       | Kretchmar |    |
4 rows in set (0.00 sec)

mysql> insert into customers (first_name,last_name,email) values ('john','smith','');
Query OK, 1 row affected (0.02 sec)

You will see that the HTML page reflects the change as the event has been captured by the Kafka consumer:

Consuming Kafka messages from the shell

If you want to quickly test how to consume messages from a Debezium topic without writing an application, just use the tooling available in Kafka: the is all you need:

docker-compose -f docker-compose-mysql.yaml exec kafka /kafka/bin/ \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

You will see that all Database change events will be eventually displayed on the console.

Source code for this tutorial:

Getting started with Debezium

Debezium is a project built upon Apache Kafka and uses Kafka to stream the changes from one system to another. Once core feature of Debezium is the Change Data Capture which is able to capture data and pushes it into Kafka. In this tutorial we will learn how to configure Debezium and Apache Kafka to stream changes in the database generated by a Quarkus application.

Capturing Database changes isn’t anything new as you are able to use Database triggers on most popular Database solutions. This comes however to a price:

  • You have to maintain extra logic into your Database
  • Although it is easy to view table, constraints and indexes, triggers are often difficult to view.
  • Triggers are black magic. They are transparent to both server and client applications and therefore hard to debug.

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

As we said, Debezium is built on the top of Apache Kafka, which in turn uses Apache ZooKeeper to store its metadata. So let’s start Zookeeper and Apache Kafka

Step 1: Starting Zookeeper and Apache Kafka

In order to start Zookeeper, we can rely on its Docker image, taking care to export the ports required by Apache Kafka for communication:

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.1

Check that Zookeeper started successfully:

Now let’s start Apache Kafka, linking its image with the Zookeeper image:

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.1

Check that Apache Kafka started successfully:

Now let’s create a Database for our example.

Step 2: Configuring the Database

We will be using Postgresql as Database. We will also start it using Docker:

docker run --ulimit memlock=-1:-1 -it --rm=true --memory-swappiness=0 --name quarkus_test -e POSTGRES_USER=quarkus -e POSTGRES_PASSWORD=quarkus -e POSTGRES_DB=quarkusdb  -p 5432:5432 postgres:10.5 -c "wal_level=logical"

Please notice the parameter “wal_level=logical” which is required for data streaming replication.

Once the Database is started, we will log into its Container process and create an additional database, named “quarkusdb_backup“, which will be used to replicate the changes from the main schema (“quarkusdb“):

$ docker exec -it quarkus_test bash
root@3c484b787b32:/# su postgres
postgres@3c484b787b32:/$ psql quarkusdb quarkus
psql (10.5 (Debian 10.5-2.pgdg90+1))
Type "help" for help.

quarkusdb=# create database quarkusdb_backup;

In order to stream Database changes, we will now configure Debezium Connectors.

Step 3: Setting up Debezium’s JDBC connector to stream the events to Postgres

Thanks to Debezium’s PostgreSQL Connector we can monitor and record row-level changes in the schemas of a PostgreSQL database.

The connector produces a change event for every row-level insert, update, and delete operation that was received, recording all the change events for each table in a separate Kafka topic. Your client applications read the Kafka topics that correspond to the database tables they’re interested in following, and react to every row-level event it sees in those topics.

To start the Debezium’s JDBC connector we will add, on the top of the “debezium/connect” image the JDBC Driver for PostgreSQL.

Create the following Dockerfile:

FROM debezium/connect:0.10


# Fetch and deploy PostgreSQL JDBC driver
RUN cd /kafka/libs && \
    curl -sO$POSTGRES_VERSION/postgresql-$POSTGRES_VERSION.jar

# Fetch and deploy Kafka Connect JDBC

    curl -sO$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar

Now build the Dockerfile:

docker build .   

Sending build context to Docker daemon  2.56 kB
Step 1/7 : FROM debezium/connect:0.10
 ---> dcf13f1d9895
Step 2/7 : ARG POSTGRES_VERSION=42.2.8
 ---> Using cache
 ---> 8ae68ccdf0ea
 ---> Using cache
 ---> f9e73b728ea3
Step 4/7 : RUN cd /kafka/libs &&     curl -sO$POSTGRES_VERSION/postgresql-$POSTGRES_VERSION.jar
 ---> Using cache
 ---> 878c209e399b
 ---> Using cache
 ---> cf0381058ff3
 ---> Using cache
 ---> 079ddd91ced6
Step 7/7 : RUN cd $KAFKA_CONNECT_JDBC_DIR &&     curl -sO$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
 ---> Using cache
 ---> a60cc53d69ef
Successfully built a60cc53d69ef

Then tag the image as jdbc-sink:

docker tag a60cc53d69ef jdbc-sink

Here is your tagged image:

$ docker images | grep a60cc53d69ef
jdbc-sink                                                        latest                        a60cc53d69ef        2 weeks ago         680 MB

Good, you are ready to start the Debezium’s JDBC connector:

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link quarkus_test:quarkus_test jdbc-sink

Check that the output is successful:

Step 4: Creating the Source and Sink Connectors

Now that the Debezium’s JDBC connector is available let’s create a source connector which provides a name for the connector, how to connect to the database and which table to read:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @postgresql-connect.json

The content of postgresql-connect.json follows here:

  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "quarkus_test",
    "": "pgoutput",
    "database.port": "5432",
    "database.user": "quarkus",
    "database.password": "quarkus",
    "database.dbname" : "quarkusdb",
    "": "fullfillment",
    "table.whitelist": "public.customer"

The source connector will replicate the content of the “customer” table of the “quarkusdb” available in the hostname “quarkus_test“.

Check from the output that the operation completed successfully:

HTTP/1.1 201 Created
Date: Sun, 17 May 2020 10:12:13 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 411
Server: Jetty(9.4.18.v20190429)


Now that we have the Source connector, similarly we create a JSON file to configure the sink connector, that is the destination schema for row-level changes in the PostgreSQL database:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @jdbc-sink.json

Here is the contect of the jdbc-sink.json:

  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "fullfillment.public.customer",
    "": "PostgreSqlDatabaseDialect",
    "": "customer",
    "connection.url": "jdbc:postgresql://quarkus_test:5432/quarkusdb_backup?user=quarkus&password=quarkus",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_key",
    "delete.enabled": "true"

Check from the output that the operation completed successfully:

HTTP/1.1 201 Created
Date: Sun, 17 May 2020 10:13:52 GMT
Location: http://localhost:8083/connectors/jdbc-sink
Content-Type: application/json
Content-Length: 618
Server: Jetty(9.4.18.v20190429)


Great! Let’s trigger some changes with a simple Quarkus applications.

Step 5: Triggering changes with a Quarkus application

Once that the JDBC source and sink connectors are active, let’s run a sample JPA application which executes SQL Statements on the quarkusdb.

We will be using the example Quarkus Hibernate application – Check this tutorial for more details on this application: Getting started with Quarkus and Hibernate

package org.acme;

import org.jboss.resteasy.annotations.jaxrs.PathParam;

import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.transaction.Transactional;
public class ExampleResource {

    EntityManager entityManager;

    public Customer[] get() {
        return entityManager.createNamedQuery("Customers.findAll", Customer.class)
                .getResultList().toArray(new Customer[0]);
    public Response create(Customer customer) {
        if (customer.getId() != null) {
            throw new WebApplicationException("Id was invalidly set on request.", 422);
        System.out.println("Creating "+customer);
        return Response.ok(customer).status(201).build();

    public Customer update(@PathParam Long id, Customer customer) {
        if (customer.getName() == null) {
            throw new WebApplicationException("Customer Name was not set on request.", 422);

        Customer entity = entityManager.find(Customer.class, id);

        if (entity == null) {
            throw new WebApplicationException("Customer with id of " + id + " does not exist.", 404);


        return entity;

    public Response delete(@PathParam Long id) {
        Customer entity = entityManager.find(Customer.class, id);
        if (entity == null) {
            throw new WebApplicationException("Customer with id of " + id + " does not exist.", 404);
        return Response.status(204).build();

The Entity Bean for this application, points to the customer Database table:

package org.acme;

import javax.persistence.*;

@NamedQuery(name = "Customers.findAll", query = "SELECT c FROM Customer c ORDER BY")
public class Customer {
    private Long id;
    private String name;

    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "customerSequence")
    @SequenceGenerator(name = "customerSequence", sequenceName = "customerSeq", allocationSize = 1, initialValue = 10)
    public Long getId() {
        return id;

    public void setId(Long id) { = id;

    public String getName() {
        return name;

    public void setName(String name) { = name;

    public String toString() {
        return "Customer{" +
                "id=" + id +
                ", name='" + name + '\'' +

The application is configured to store data into the “quarkusdb” schema:


As this example application contains an import.sql script, when you start it some Customer objects will be created:

mvn clean install quarkus:dev

    create table Customer (
       id int8 not null,
        name varchar(255),
        primary key (id)
    INSERT INTO Customer(id, name) VALUES (1, 'Batman')
    INSERT INTO Customer(id, name) VALUES (2, 'Superman')
    INSERT INTO Customer(id, name) VALUES (3, 'Wonder woman')
2020-05-17 12:10:54,874 INFO  [io.quarkus] (Quarkus Main Thread) quarkus-hibernate /hello (powered by Quarkus 1.4.2.Final) started in 2.136s. Listening on:
2020-05-17 12:10:54,876 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2020-05-17 12:10:54,876 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [agroal, cdi, hibernate-orm, jdbc-postgresql, narayana-jta, resteasy, resteasy-jsonb]

Let’s switch to our PostgreSQL shell to verify that the 3 Customer objects have been replicated in the “quarkusdb_backup“:

quarkusdb=# \c quarkusdb_backup

quarkusdb_backup=# select * from customer;
name         | id 
Batman       |  1
Superman     |  2
Wonder woman |  3
(3 rows)

Great! data has been streamed from Kafka into the Connector and then into the Database.

Let’s try adding another record:

curl -d '{"name":"Spiderman"}' -H "Content-Type: application/json" -X POST http://localhost:8080/customer

Let’s switch into the PostgreSQL shell:

select * from customer;
     name     | id 
 Batman       |  1
 Superman     |  2
 Wonder woman |  3
 Spiderman    | 10

Great! we have just demonstrated how to use Debezium to stream changes in your databases generated by a Quarkus applications.