Tutorial: Using Debezium JDBC Connector

Debezium is an open-source CDC (Change Data Capture) platform that allows you to capture and stream database changes in real-time. The Debezium JDBC Connector enables you to monitor changes in relational databases, like PostgreSQL, and stream those changes to various downstream systems.

In this tutorial, we will guide you through the process of using the Debezium JDBC Connector to capture changes in a PostgreSQL database and stream them to Apache Kafka. We’ll use Docker to set up a PostgreSQL database, Apache Kafka, and Debezium, creating an end-to-end example.

Overview of JDBC Connectors

Firstly, if you are new to Debezium, we recommend checking this article for a quick introduction to it: Getting started with Debezium

The Debezium JDBC connector is a tool that helps move data from different places where things happen (topics) to a storage like a Relational Database. It can work with many types of databases like Db2, MySQL, Oracle, PostgreSQL, and SQL Server. In this article we will learn how to move data Changes.

In the context of Debezium JDBC connector, “source” and “sink” are important concepts that describe how data flows between different systems.

  1. Source: The “source” is where data originates or comes from. In the case of the Debezium JDBC connector, the source is the place where events or changes are happening. These events are usually generated in a database or some other system. The connector’s job is to capture these events from the source and make them available for further processing.
  2. Sink: The “sink” is where data ends up or is directed to. In the case of the Debezium JDBC connector, the sink is the relational database that the connector writes data into. The connector takes the events captured from the source and writes them into the sink, which is the target database. This allows you to store and manage the data in a structured way.

Here is an overview of a JDBC Connector that allows capturing events from a Source (MySQL) and send it to the Sink (PostgreSQL)

debezium jdbc connector step-by-step guide

In the next section we will show how to set up the Services we need to replicate this architecture. In this tutorial we will be using a simplified version of this Debezium example which also includes as sink an Elastic Search Component.

Set up the Containers with Docker Compose

Firstly, create a docker-compose-jdbc.yaml file with the list of Services:

version: '2'
services:
  zookeeper:
    image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw 
  postgres:
    image: quay.io/debezium/postgres:9.6
    ports:
     - "5432:5432"
    environment:
     - POSTGRES_USER=postgresuser
     - POSTGRES_PASSWORD=postgrespw
     - POSTGRES_DB=inventory
  connect:
    image: debezium/connect-jdbc-es:${DEBEZIUM_VERSION}
    build:
      context: debezium-jdbc-es
      args:
        DEBEZIUM_VERSION: ${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
     - 5005:5005
    links:
     - kafka
     - mysql
     - postgres
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_source_connect_statuses

Here is brief description of the Services that we will start with Docker Compose:

  • zookeeper: Runs a ZooKeeper instance required for managing distributed systems like Kafka.
  • kafka: Runs a Debezium Service on Apache Kafka broker the distributed message streaming platform.
  • mysql: Runs a MySQL database instance.
  • postgres: Runs a PostgreSQL database instance.
  • connect: Runs the Debezium Connect service, which is used to capture and stream database changes.

The most interesting part, for of the purpose of this tutorial, is the connect Service which is built from a Dockerfile available in the debezium-jdbc-es folder:

debezium jdbc connector tutorial

Here is the Dockerfile:

ARG DEBEZIUM_VERSION=2.1
FROM quay.io/debezium/connect:${DEBEZIUM_VERSION}
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc 

ARG POSTGRES_VERSION=42.5.1
ARG KAFKA_JDBC_VERSION=5.3.2


# Deploy PostgreSQL JDBC Driver
RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar

# Deploy Kafka Connect JDBC
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
	curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar

The above Dockerfile performs the following steps:

  • Downloads the PostgreSQL JDBC Connector and stores it into the /kafka/libs folder of the Container
  • Downloads the Kafka Connect JDBC Connector from Confluent packages and stores it into the $KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc

By building the above Dockerfile from Docker Compose will result in an Image named debezium/connect-jdbc-es:${DEBEZIUM_VERSION}

Start Debezium and the JDBC Connectors

Next step will be to start the Containers, including the –build option to build the Dockerfile:

export DEBEZIUM_VERSION=2.1
docker-compose -f docker-compose-jdbc.yaml up --build

Then, we will start the PostgreSQL Sink Connector. To do that, we will define the Connector Configuration in a file, for example jdbc-sink.json:

{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "customers",
        "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",
        "auto.create": "true",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "pk.fields": "id",
        "pk.mode": "record_key"
    }
}

Next, start the PostgreSQL Connector through the jdbc-sink.json file and the REST API available at localhost:8083/connectors:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json

Then, start the MySQL Source Connector. For this purpose, define the Connector Configuration in the source.json file:

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "topic.prefix": "dbserver1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "dbz",
        "database.server.id": "184054",
        "database.include.list": "inventory",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "$3"
    }
}

Finally, start the MySQL Source Connector as follows:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json

Testing the Source and Sink Connectors

To test the JDBC Connector we will launch a MySQL Client and insert some data. Then, we will verify from a PostgreSQL Client that the same data was streamed correctly to PostgreSQL.

Firstly, launch the MySQL Client shell:

docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'

NOTE: The “docker-compose exec” command allows you to execute a command inside a running Docker container managed by a Docker Compose-defined service.

Then, let’s check the content of the Customers table:

mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | [email protected] |
| 1002 | George     | Bailey    | [email protected]    |
| 1003 | Edward     | Walker    | [email protected]         |
| 1004 | Anne       | Kretchmar | [email protected]    |
+------+------------+-----------+-----------------------+
4 rows in set (0.00 sec)

Next, add one row to the Customers table:

insert into customers (id,first_name,last_name,email) values (1005, 'John','Doe','[email protected]');
Query OK, 1 row affected (0.01 sec)

We will now check if the JDBC Connector streamed the changes also the PostgreSQL Database. From another command line, we will execute a PostgreSQL query line as follows:

docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'

The expected outcome is to see the exact data also in the postgres schema:

 last_name |  id  | first_name |         email         
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | [email protected]
 Bailey    | 1002 | George     | [email protected]
 Walker    | 1003 | Edward     | [email protected]
 Kretchmar | 1004 | Anne       | [email protected]
 Doe       | 1005 | John       | [email protected]
(5 rows)

Back to the MySQL Client, you can try removing the Customer with id 1005:

delete from customers where id=1005;

Again, from the PostgreSQL Client you should now see the original list of Customers:

docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email         
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | [email protected]
 Bailey    | 1002 | George     | [email protected]
 Walker    | 1003 | Edward     | [email protected]
 Kretchmar | 1004 | Anne       | [email protected]

Conclusion

In conclusion, the article has provided a comprehensive guide on leveraging the Debezium JDBC Connector to efficiently capture and migrate data from a MySQL database to a PostgreSQL database.

By following the step-by-step instructions taken from Debezium examples, users can harness the power of Debezium to establish real-time data synchronization between these two widely used database systems.