Getting started with Debezium

Debezium is a project built upon Apache Kafka and uses Kafka to stream the changes from one system to another. Once core feature of Debezium is the Change Data Capture which is able to capture data and pushes it into Kafka. In this tutorial we will learn how to configure Debezium and Apache Kafka to stream changes in the database generated by a Quarkus application.

Capturing Database changes isn’t anything new as you are able to use Database triggers on most popular Database solutions. This comes however to a price:

  • You have to maintain extra logic into your Database
  • Although it is easy to view table, constraints and indexes, triggers are often difficult to view.
  • Triggers are black magic. They are transparent to both server and client applications and therefore hard to debug.

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

As we said, Debezium is built on the top of Apache Kafka, which in turn uses Apache ZooKeeper to store its metadata. So let’s start Zookeeper and Apache Kafka

Step 1: Starting Zookeeper and Apache Kafka

In order to start Zookeeper, we can rely on its Docker image, taking care to export the ports required by Apache Kafka for communication:

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.1

Check that Zookeeper started successfully:

debezium tutorial example

Now let’s start Apache Kafka, linking its image with the Zookeeper image:

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.1

Check that Apache Kafka started successfully:

debezium tutorial example

Now let’s create a Database for our example.

Step 2: Configuring the Database

We will be using Postgresql as Database. We will also start it using Docker:

docker run --ulimit memlock=-1:-1 -it --rm=true --memory-swappiness=0 --name quarkus_test -e POSTGRES_USER=quarkus -e POSTGRES_PASSWORD=quarkus -e POSTGRES_DB=quarkusdb  -p 5432:5432 postgres:10.5 -c "wal_level=logical"

Please notice the parameter “wal_level=logical” which is required for data streaming replication.

Once the Database is started, we will log into its Container process and create an additional database, named “quarkusdb_backup“, which will be used to replicate the changes from the main schema (“quarkusdb“):

$ docker exec -it quarkus_test bash
[email protected]:/# su postgres
[email protected]:/$ psql quarkusdb quarkus
psql (10.5 (Debian 10.5-2.pgdg90+1))
Type "help" for help.

quarkusdb=# create database quarkusdb_backup;
CREATE DATABASE

In order to stream Database changes, we will now configure Debezium Connectors.

Step 3: Setting up Debezium’s JDBC connector to stream the events to Postgres

Thanks to Debezium’s PostgreSQL Connector we can monitor and record row-level changes in the schemas of a PostgreSQL database.

The connector produces a change event for every row-level insert, update, and delete operation that was received, recording all the change events for each table in a separate Kafka topic. Your client applications read the Kafka topics that correspond to the database tables they’re interested in following, and react to every row-level event it sees in those topics.

To start the Debezium’s JDBC connector we will add, on the top of the “debezium/connect” image the JDBC Driver for PostgreSQL.

Create the following Dockerfile:

FROM debezium/connect:0.10

ARG POSTGRES_VERSION=42.2.8
ARG KAFKA_JDBC_VERSION=5.3.1

# Fetch and deploy PostgreSQL JDBC driver
RUN cd /kafka/libs && \
    curl -sO https://repo1.maven.org/maven2/org/postgresql/postgresql/$POSTGRES_VERSION/postgresql-$POSTGRES_VERSION.jar

# Fetch and deploy Kafka Connect JDBC
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
RUN mkdir $KAFKA_CONNECT_JDBC_DIR

RUN cd $KAFKA_CONNECT_JDBC_DIR && \
    curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar

Now build the Dockerfile:

docker build .   

Sending build context to Docker daemon  2.56 kB
Step 1/7 : FROM debezium/connect:0.10
 ---> dcf13f1d9895
Step 2/7 : ARG POSTGRES_VERSION=42.2.8
 ---> Using cache
 ---> 8ae68ccdf0ea
Step 3/7 : ARG KAFKA_JDBC_VERSION=5.3.1
 ---> Using cache
 ---> f9e73b728ea3
Step 4/7 : RUN cd /kafka/libs &&     curl -sO https://repo1.maven.org/maven2/org/postgresql/postgresql/$POSTGRES_VERSION/postgresql-$POSTGRES_VERSION.jar
 ---> Using cache
 ---> 878c209e399b
Step 5/7 : ENV KAFKA_CONNECT_JDBC_DIR $KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
 ---> Using cache
 ---> cf0381058ff3
Step 6/7 : RUN mkdir $KAFKA_CONNECT_JDBC_DIR
 ---> Using cache
 ---> 079ddd91ced6
Step 7/7 : RUN cd $KAFKA_CONNECT_JDBC_DIR &&     curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
 ---> Using cache
 ---> a60cc53d69ef
Successfully built a60cc53d69ef

Then tag the image as jdbc-sink:

docker tag a60cc53d69ef jdbc-sink

Here is your tagged image:

$ docker images | grep a60cc53d69ef
jdbc-sink                                                        latest                        a60cc53d69ef        2 weeks ago         680 MB

Good, you are ready to start the Debezium’s JDBC connector:

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link quarkus_test:quarkus_test jdbc-sink

Check that the output is successful:

debezium tutorial example

Step 4: Creating the Source and Sink Connectors

Now that the Debezium’s JDBC connector is available let’s create a source connector which provides a name for the connector, how to connect to the database and which table to read:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @postgresql-connect.json

The content of postgresql-connect.json follows here:

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "quarkus_test",
    "plugin.name": "pgoutput",
    "database.port": "5432",
    "database.user": "quarkus",
    "database.password": "quarkus",
    "database.dbname" : "quarkusdb",
    "database.server.name": "fullfillment",
    "table.whitelist": "public.customer"
  }
}

The source connector will replicate the content of the “customer” table of the “quarkusdb” available in the hostname “quarkus_test“.

Check from the output that the operation completed successfully:

HTTP/1.1 201 Created
Date: Sun, 17 May 2020 10:12:13 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 411
Server: Jetty(9.4.18.v20190429)

{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"quarkus_test","plugin.name":"pgoutput","database.port":"5432","database.user":"quarkus","database.password":"quarkus","database.dbname":"quarkusdb","database.server.name":"fullfillment","table.whitelist":"public.customer","name":"inventory-connector"},"tasks":[],"type":"source"}

Now that we have the Source connector, similarly we create a JSON file to configure the sink connector, that is the destination schema for row-level changes in the PostgreSQL database:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @jdbc-sink.json

Here is the contect of the jdbc-sink.json:

{
  "name": "jdbc-sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "fullfillment.public.customer",
    "dialect.name": "PostgreSqlDatabaseDialect",
    "table.name.format": "customer",
    "connection.url": "jdbc:postgresql://quarkus_test:5432/quarkusdb_backup?user=quarkus&password=quarkus",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_key",
    "delete.enabled": "true"
  }
}

Check from the output that the operation completed successfully:

HTTP/1.1 201 Created
Date: Sun, 17 May 2020 10:13:52 GMT
Location: http://localhost:8083/connectors/jdbc-sink
Content-Type: application/json
Content-Length: 618
Server: Jetty(9.4.18.v20190429)

{"name":"jdbc-sink","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"fullfillment.public.customers","dialect.name":"PostgreSqlDatabaseDialect","table.name.format":"customer","connection.url":"jdbc:postgresql://quarkus_test:5432/quarkusdb_backup?user=quarkus&password=quarkus","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones":"false","auto.create":"true","insert.mode":"upsert","pk.fields":"id","pk.mode":"record_key","delete.enabled":"true","name":"jdbc-sink"},"tasks":[],"type":"sink"}

Great! Let’s trigger some changes with a simple Quarkus applications.

Step 5: Triggering changes with a Quarkus application

Once that the JDBC source and sink connectors are active, let’s run a sample JPA application which executes SQL Statements on the quarkusdb.

We will be using the example Quarkus Hibernate application – Check this tutorial for more details on this application: Getting started with Quarkus and Hibernate

package org.acme;

import org.jboss.resteasy.annotations.jaxrs.PathParam;

import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.transaction.Transactional;
import javax.ws.rs.*;
import javax.ws.rs.core.Response;
@Path("/customer")
@Produces("application/json")
@Consumes("application/json")
public class ExampleResource {

    @Inject
    EntityManager entityManager;

    @GET
    public Customer[] get() {
        return entityManager.createNamedQuery("Customers.findAll", Customer.class)
                .getResultList().toArray(new Customer[0]);
    }
    @POST
    @Transactional
    public Response create(Customer customer) {
        if (customer.getId() != null) {
            throw new WebApplicationException("Id was invalidly set on request.", 422);
        }
        System.out.println("Creating "+customer);
        entityManager.persist(customer);
        return Response.ok(customer).status(201).build();
    }

    @PUT
    @Path("{id}")
    @Transactional
    public Customer update(@PathParam Long id, Customer customer) {
        if (customer.getName() == null) {
            throw new WebApplicationException("Customer Name was not set on request.", 422);
        }

        Customer entity = entityManager.find(Customer.class, id);

        if (entity == null) {
            throw new WebApplicationException("Customer with id of " + id + " does not exist.", 404);
        }

        entity.setName(customer.getName());

        return entity;
    }

    @DELETE
    @Path("{id}")
    @Transactional
    public Response delete(@PathParam Long id) {
        Customer entity = entityManager.find(Customer.class, id);
        if (entity == null) {
            throw new WebApplicationException("Customer with id of " + id + " does not exist.", 404);
        }
        entityManager.remove(entity);
        return Response.status(204).build();
    }
}

The Entity Bean for this application, points to the customer Database table:

package org.acme;

import javax.persistence.*;

@Entity
@NamedQuery(name = "Customers.findAll", query = "SELECT c FROM Customer c ORDER BY c.name")
public class Customer {
    private Long id;
    private String name;

    @Id
    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "customerSequence")
    @SequenceGenerator(name = "customerSequence", sequenceName = "customerSeq", allocationSize = 1, initialValue = 10)
    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Customer{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

The application is configured to store data into the “quarkusdb” schema:

quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=quarkus
quarkus.datasource.password=quarkus
quarkus.datasource.jdbc.url=jdbc:postgresql://localhost/quarkusdb
quarkus.datasource.jdbc.max-size=8
quarkus.datasource.jdbc.min-size=2
 
quarkus.hibernate-orm.database.generation=drop-and-create
quarkus.hibernate-orm.log.sql=true
quarkus.hibernate-orm.sql-load-script=import.sql

As this example application contains an import.sql script, when you start it some Customer objects will be created:

mvn clean install quarkus:dev

Hibernate: 
    
    create table Customer (
       id int8 not null,
        name varchar(255),
        primary key (id)
    )
Hibernate: 
    INSERT INTO Customer(id, name) VALUES (1, 'Batman')
Hibernate: 
    INSERT INTO Customer(id, name) VALUES (2, 'Superman')
Hibernate: 
    INSERT INTO Customer(id, name) VALUES (3, 'Wonder woman')
2020-05-17 12:10:54,874 INFO  [io.quarkus] (Quarkus Main Thread) quarkus-hibernate /hello (powered by Quarkus 1.4.2.Final) started in 2.136s. Listening on: http://0.0.0.0:8080
2020-05-17 12:10:54,876 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
2020-05-17 12:10:54,876 INFO  [io.quarkus] (Quarkus Main Thread) Installed features: [agroal, cdi, hibernate-orm, jdbc-postgresql, narayana-jta, resteasy, resteasy-jsonb]

Let’s switch to our PostgreSQL shell to verify that the 3 Customer objects have been replicated in the “quarkusdb_backup“:

quarkusdb=# \c quarkusdb_backup

quarkusdb_backup=# select * from customer;
name         | id 
Batman       |  1
Superman     |  2
Wonder woman |  3
(3 rows)

Great! data has been streamed from Kafka into the Connector and then into the Database.

Let’s try adding another record:

curl -d '{"name":"Spiderman"}' -H "Content-Type: application/json" -X POST http://localhost:8080/customer

Let’s switch into the PostgreSQL shell:

select * from customer;
     name     | id 
 Batman       |  1
 Superman     |  2
 Wonder woman |  3
 Spiderman    | 10

Great! we have just demonstrated how to use Debezium to stream changes in your databases generated by a Quarkus applications.