Debezium is a project built upon Apache Kafka and uses Kafka to stream the changes from one system to another. Once core feature of Debezium is the Change Data Capture which is able to capture data and pushes it into Kafka. In this updated tutorial we will learn how to configure Debezium and Apache Kafka to stream changes in the database.
Capturing Database changes isn’t anything new as you are able to use Database triggers on most popular Database solutions. This comes however to a price:
- You have to maintain extra logic into your Database
- Although it is easy to view table, constraints and indexes, triggers are often difficult to view.
- Triggers are black magic. They are transparent to both server and client applications and therefore hard to debug.
Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.
As we said, Debezium is built on the top of Apache Kafka, which in turn uses Apache ZooKeeper to store its metadata.
To learn how to get started with Debezium we can use or extend some of the examples available on their GitHub repository.
Step 1: Set up your Containers with Docker compose
In order to bootstrap your Debezium environment, the simplest option is to use a tool such as Docker Compose to start all Containers and define the links between them. You can learn more about Docker Compose in this step-by-step tutorial: Orchestrate containers using Docker compose
Since we will be using PostgreSQL as Database, create the following docker-compose-postgres.yaml file:
version: '2' services: zookeeper: image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION} ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: quay.io/debezium/kafka:${DEBEZIUM_VERSION} ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 postgres: image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION} ports: - 5432:5432 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres adminer: container_name: adminer image: adminer extra_hosts: [ 'host.docker.internal:host-gateway' ] restart: always environment: ADMINER_DEFAULT_SERVER: postgres depends_on: - postgres ports: - 8888:8080 connect: image: quay.io/debezium/connect:${DEBEZIUM_VERSION} ports: - 8083:8083 links: - kafka - postgres environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses
Here is a breakdown of the services in the Docker Compose file:
- zookeeper: This service provides the ZooKeeper component, which performs the distributed coordination and configuration management for the Kafka cluster.
- kafka: This service sets up the Kafka distributed streaming platform used for publishing and consuming messages.
- postgres: This service runs a PostgreSQL database instance that will be the source of data changes.
- adminer: This (optional) service sets up Adminer, allowing you to interact with the PostgreSQL database through a web interface.
- connect: This service runs Debezium Connect, which you can use to configure and manage connectors that capture data changes and stream them to Kafka topics.
Start Docker Compose with:
docker-compose -f docker-compose-postgres.yaml up
Then check that the Connectors are up and running:
Step 2: Start Postgres Connectors
Firstly, check that the PostgreSQL Connector is available. You can do that with the following REST call:
curl -s http://localhost:8083/connector-plugins | jq [ { "class": "io.debezium.connector.db2.Db2Connector", "type": "source", "version": "2.1.4.Final" }, { "class": "io.debezium.connector.mongodb.MongoDbConnector", "type": "source", "version": "2.1.4.Final" }, { "class": "io.debezium.connector.mysql.MySqlConnector", "type": "source", "version": "2.1.4.Final" }, { "class": "io.debezium.connector.oracle.OracleConnector", "type": "source", "version": "2.1.4.Final" }, { "class": "io.debezium.connector.postgresql.PostgresConnector", "type": "source", "version": "2.1.4.Final" }, . . . . ]
To start the PostgresConnector you need a Connector definition json file. For example, here is the “inventory-connector”:
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "topic.prefix": "dbserver1", "schema.include.list": "inventory" } }
Save it in a file register-postgres.json. Then, start the PostgreSQL Connector with:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
Check the Connector availability with:
curl http://localhost:8083/connectors/ ["inventory-connector"]
Step 3: Consume and Produce messages
We will now start a Kafka Consumer that receives messages on the topic when there is a Change Data Capture in the Database.
$ docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \ --bootstrap-server kafka:9092 \ --from-beginning \ --property print.key=true \ --topic dbserver1.inventory.customers
The above commands executes the kafka-console-consumer.sh on the kafka service (from the docker-compose-postgres.yaml). The Consumer will start. Meanwhile, connect to the Adminer Web interface of the Database:
http://localhost:8888/?pgsql=postgres&username=postgres&db=postgres&ns=inventory&select=customers
Then, edit one or more records and click on Save:
Back to the Consumer Command Line, you should be able to see the JSON with the Message Payload which includes the following part:
. . . . . "payload":{ "before":{ "id":1001, "first_name":"Sally", "last_name":"Thomas", "email":"[email protected]" }, "after":{ "id":1001, "first_name":"Franky", "last_name":"Thomas", "email":"[email protected]" },
payload
: This is the main part of the message that contains the information about the change.before
: This represents the state of the row before the change.after
: This represents the state of the row after the change. In this example, I have varied the first_name for that row.
Conclusion
This article was a step-by-step guide through the set up of a Kafka / Debezium which capture change events from a PostgreSQL Database. To learn how to capture Data Changes with a Quarkus application check the following article: How to capture Data changes with Debezium