Getting started with Debezium

Debezium is a project built upon Apache Kafka and uses Kafka to stream the changes from one system to another. Once core feature of Debezium is the Change Data Capture which is able to capture data and pushes it into Kafka. In this updated tutorial we will learn how to configure Debezium and Apache Kafka to stream changes in the database.

Capturing Database changes isn’t anything new as you are able to use Database triggers on most popular Database solutions. This comes however to a price:

  • You have to maintain extra logic into your Database
  • Although it is easy to view table, constraints and indexes, triggers are often difficult to view.
  • Triggers are black magic. They are transparent to both server and client applications and therefore hard to debug.

Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

As we said, Debezium is built on the top of Apache Kafka, which in turn uses Apache ZooKeeper to store its metadata.

To learn how to get started with Debezium we can use or extend some of the examples available on their GitHub repository.

Step 1: Set up your Containers with Docker compose

In order to bootstrap your Debezium environment, the simplest option is to use a tool such as Docker Compose to start all Containers and define the links between them. You can learn more about Docker Compose in this step-by-step tutorial: Orchestrate containers using Docker compose

Since we will be using PostgreSQL as Database, create the following docker-compose-postgres.yaml file:

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  postgres:
    image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
    ports:
     - 5432:5432
    environment:
     - POSTGRES_USER=postgres
     - POSTGRES_PASSWORD=postgres
  adminer:
      container_name: adminer
      image: adminer
      extra_hosts: [ 'host.docker.internal:host-gateway' ]
      restart: always
      environment:
          ADMINER_DEFAULT_SERVER: postgres
      depends_on:
          - postgres
      ports:
          - 8888:8080
  connect:
    image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - postgres
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses

Here is a breakdown of the services in the Docker Compose file:

  • zookeeper: This service provides the ZooKeeper component, which performs the distributed coordination and configuration management for the Kafka cluster.
  • kafka: This service sets up the Kafka distributed streaming platform used for publishing and consuming messages.
  • postgres: This service runs a PostgreSQL database instance that will be the source of data changes.
  • adminer: This (optional) service sets up Adminer, allowing you to interact with the PostgreSQL database through a web interface.
  • connect: This service runs Debezium Connect, which you can use to configure and manage connectors that capture data changes and stream them to Kafka topics.

Start Docker Compose with:

docker-compose -f docker-compose-postgres.yaml up

Then check that the Connectors are up and running:

debezium step-by-step guide

Step 2: Start Postgres Connectors

Firstly, check that the PostgreSQL Connector is available. You can do that with the following REST call:

curl -s http://localhost:8083/connector-plugins | jq
[
  {
    "class": "io.debezium.connector.db2.Db2Connector",
    "type": "source",
    "version": "2.1.4.Final"
  },
  {
    "class": "io.debezium.connector.mongodb.MongoDbConnector",
    "type": "source",
    "version": "2.1.4.Final"
  },
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "2.1.4.Final"
  },
  {
    "class": "io.debezium.connector.oracle.OracleConnector",
    "type": "source",
    "version": "2.1.4.Final"
  },
  {
    "class": "io.debezium.connector.postgresql.PostgresConnector",
    "type": "source",
    "version": "2.1.4.Final"
  },
. . . .

]

To start the PostgresConnector you need a Connector definition json file. For example, here is the “inventory-connector”:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "postgres",
        "topic.prefix": "dbserver1",
        "schema.include.list": "inventory"
    }
}

Save it in a file register-postgres.json. Then, start the PostgreSQL Connector with:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json

Check the Connector availability with:

curl http://localhost:8083/connectors/
["inventory-connector"]

Step 3: Consume and Produce messages

We will now start a Kafka Consumer that receives messages on the topic when there is a Change Data Capture in the Database.

$ docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic dbserver1.inventory.customers

The above commands executes the kafka-console-consumer.sh on the kafka service (from the docker-compose-postgres.yaml). The Consumer will start. Meanwhile, connect to the Adminer Web interface of the Database:

http://localhost:8888/?pgsql=postgres&username=postgres&db=postgres&ns=inventory&select=customers

Then, edit one or more records and click on Save:

debezium tutorial for beginners

Back to the Consumer Command Line, you should be able to see the JSON with the Message Payload which includes the following part:

. . . . .
 "payload":{
      "before":{
         "id":1001,
         "first_name":"Sally",
         "last_name":"Thomas",
         "email":"[email protected]"
      },
      "after":{
         "id":1001,
         "first_name":"Franky",
         "last_name":"Thomas",
         "email":"[email protected]"
      },
  • payload: This is the main part of the message that contains the information about the change.
    • before: This represents the state of the row before the change.
    • after: This represents the state of the row after the change. In this example, I have varied the first_name for that row.

Conclusion

This article was a step-by-step guide through the set up of a Kafka / Debezium which capture change events from a PostgreSQL Database. To learn how to capture Data Changes with a Quarkus application check the following article: How to capture Data changes with Debezium

Found the article helpful? if so please follow us on Socials