Debezium is a project built upon Apache Kafka and uses Kafka to stream the changes from one system to another. Once core feature of Debezium is the Change Data Capture which is able to capture data and pushes it into Kafka. In this tutorial we will learn how to configure Debezium and Apache Kafka to stream changes in the database generated by a Quarkus application.
Capturing Database changes isn’t anything new as you are able to use Database triggers on most popular Database solutions. This comes however to a price:
- You have to maintain extra logic into your Database
- Although it is easy to view table, constraints and indexes, triggers are often difficult to view.
- Triggers are black magic. They are transparent to both server and client applications and therefore hard to debug.
Debezium is a set of distributed services to capture changes in your databases so that your applications can see those changes and respond to them. Debezium records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.
As we said, Debezium is built on the top of Apache Kafka, which in turn uses Apache ZooKeeper to store its metadata. So let’s start Zookeeper and Apache Kafka
Step 1: Starting Zookeeper and Apache Kafka
In order to start Zookeeper, we can rely on its Docker image, taking care to export the ports required by Apache Kafka for communication:
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.1
Check that Zookeeper started successfully:
Now let’s start Apache Kafka, linking its image with the Zookeeper image:
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.1
Check that Apache Kafka started successfully:
Now let’s create a Database for our example.
Step 2: Configuring the Database
We will be using Postgresql as Database. We will also start it using Docker:
docker run --ulimit memlock=-1:-1 -it --rm=true --memory-swappiness=0 --name quarkus_test -e POSTGRES_USER=quarkus -e POSTGRES_PASSWORD=quarkus -e POSTGRES_DB=quarkusdb -p 5432:5432 postgres:10.5 -c "wal_level=logical"
Please notice the parameter “wal_level=logical” which is required for data streaming replication.
Once the Database is started, we will log into its Container process and create an additional database, named “quarkusdb_backup“, which will be used to replicate the changes from the main schema (“quarkusdb“):
$ docker exec -it quarkus_test bash [email protected]:/# su postgres [email protected]:/$ psql quarkusdb quarkus psql (10.5 (Debian 10.5-2.pgdg90+1)) Type "help" for help. quarkusdb=# create database quarkusdb_backup; CREATE DATABASE
In order to stream Database changes, we will now configure Debezium Connectors.
Step 3: Setting up Debezium’s JDBC connector to stream the events to Postgres
Thanks to Debezium’s PostgreSQL Connector we can monitor and record row-level changes in the schemas of a PostgreSQL database.
The connector produces a change event for every row-level insert, update, and delete operation that was received, recording all the change events for each table in a separate Kafka topic. Your client applications read the Kafka topics that correspond to the database tables they’re interested in following, and react to every row-level event it sees in those topics.
To start the Debezium’s JDBC connector we will add, on the top of the “debezium/connect” image the JDBC Driver for PostgreSQL.
Create the following Dockerfile:
FROM debezium/connect:0.10 ARG POSTGRES_VERSION=42.2.8 ARG KAFKA_JDBC_VERSION=5.3.1 # Fetch and deploy PostgreSQL JDBC driver RUN cd /kafka/libs && \ curl -sO https://repo1.maven.org/maven2/org/postgresql/postgresql/$POSTGRES_VERSION/postgresql-$POSTGRES_VERSION.jar # Fetch and deploy Kafka Connect JDBC ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc RUN mkdir $KAFKA_CONNECT_JDBC_DIR RUN cd $KAFKA_CONNECT_JDBC_DIR && \ curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
Now build the Dockerfile:
docker build . Sending build context to Docker daemon 2.56 kB Step 1/7 : FROM debezium/connect:0.10 ---> dcf13f1d9895 Step 2/7 : ARG POSTGRES_VERSION=42.2.8 ---> Using cache ---> 8ae68ccdf0ea Step 3/7 : ARG KAFKA_JDBC_VERSION=5.3.1 ---> Using cache ---> f9e73b728ea3 Step 4/7 : RUN cd /kafka/libs && curl -sO https://repo1.maven.org/maven2/org/postgresql/postgresql/$POSTGRES_VERSION/postgresql-$POSTGRES_VERSION.jar ---> Using cache ---> 878c209e399b Step 5/7 : ENV KAFKA_CONNECT_JDBC_DIR $KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc ---> Using cache ---> cf0381058ff3 Step 6/7 : RUN mkdir $KAFKA_CONNECT_JDBC_DIR ---> Using cache ---> 079ddd91ced6 Step 7/7 : RUN cd $KAFKA_CONNECT_JDBC_DIR && curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar ---> Using cache ---> a60cc53d69ef Successfully built a60cc53d69ef
Then tag the image as jdbc-sink:
docker tag a60cc53d69ef jdbc-sink
Here is your tagged image:
$ docker images | grep a60cc53d69ef jdbc-sink latest a60cc53d69ef 2 weeks ago 680 MB
Good, you are ready to start the Debezium’s JDBC connector:
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link quarkus_test:quarkus_test jdbc-sink
Check that the output is successful:
Step 4: Creating the Source and Sink Connectors
Now that the Debezium’s JDBC connector is available let’s create a source connector which provides a name for the connector, how to connect to the database and which table to read:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @postgresql-connect.json
The content of postgresql-connect.json follows here:
{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "quarkus_test", "plugin.name": "pgoutput", "database.port": "5432", "database.user": "quarkus", "database.password": "quarkus", "database.dbname" : "quarkusdb", "database.server.name": "fullfillment", "table.whitelist": "public.customer" } }
The source connector will replicate the content of the “customer” table of the “quarkusdb” available in the hostname “quarkus_test“.
Check from the output that the operation completed successfully:
HTTP/1.1 201 Created Date: Sun, 17 May 2020 10:12:13 GMT Location: http://localhost:8083/connectors/inventory-connector Content-Type: application/json Content-Length: 411 Server: Jetty(9.4.18.v20190429) {"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"quarkus_test","plugin.name":"pgoutput","database.port":"5432","database.user":"quarkus","database.password":"quarkus","database.dbname":"quarkusdb","database.server.name":"fullfillment","table.whitelist":"public.customer","name":"inventory-connector"},"tasks":[],"type":"source"}
Now that we have the Source connector, similarly we create a JSON file to configure the sink connector, that is the destination schema for row-level changes in the PostgreSQL database:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @jdbc-sink.json
Here is the contect of the jdbc-sink.json:
{ "name": "jdbc-sink", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "fullfillment.public.customer", "dialect.name": "PostgreSqlDatabaseDialect", "table.name.format": "customer", "connection.url": "jdbc:postgresql://quarkus_test:5432/quarkusdb_backup?user=quarkus&password=quarkus", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "auto.create": "true", "insert.mode": "upsert", "pk.fields": "id", "pk.mode": "record_key", "delete.enabled": "true" } }
Check from the output that the operation completed successfully:
HTTP/1.1 201 Created Date: Sun, 17 May 2020 10:13:52 GMT Location: http://localhost:8083/connectors/jdbc-sink Content-Type: application/json Content-Length: 618 Server: Jetty(9.4.18.v20190429) {"name":"jdbc-sink","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max":"1","topics":"fullfillment.public.customers","dialect.name":"PostgreSqlDatabaseDialect","table.name.format":"customer","connection.url":"jdbc:postgresql://quarkus_test:5432/quarkusdb_backup?user=quarkus&password=quarkus","transforms":"unwrap","transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones":"false","auto.create":"true","insert.mode":"upsert","pk.fields":"id","pk.mode":"record_key","delete.enabled":"true","name":"jdbc-sink"},"tasks":[],"type":"sink"}
Great! Let’s trigger some changes with a simple Quarkus applications.
Step 5: Triggering changes with a Quarkus application
Once that the JDBC source and sink connectors are active, let’s run a sample JPA application which executes SQL Statements on the quarkusdb.
We will be using the example Quarkus Hibernate application – Check this tutorial for more details on this application: Getting started with Quarkus and Hibernate
package org.acme; import org.jboss.resteasy.annotations.jaxrs.PathParam; import javax.inject.Inject; import javax.persistence.EntityManager; import javax.transaction.Transactional; import javax.ws.rs.*; import javax.ws.rs.core.Response; @Path("/customer") @Produces("application/json") @Consumes("application/json") public class ExampleResource { @Inject EntityManager entityManager; @GET public Customer[] get() { return entityManager.createNamedQuery("Customers.findAll", Customer.class) .getResultList().toArray(new Customer[0]); } @POST @Transactional public Response create(Customer customer) { if (customer.getId() != null) { throw new WebApplicationException("Id was invalidly set on request.", 422); } System.out.println("Creating "+customer); entityManager.persist(customer); return Response.ok(customer).status(201).build(); } @PUT @Path("{id}") @Transactional public Customer update(@PathParam Long id, Customer customer) { if (customer.getName() == null) { throw new WebApplicationException("Customer Name was not set on request.", 422); } Customer entity = entityManager.find(Customer.class, id); if (entity == null) { throw new WebApplicationException("Customer with id of " + id + " does not exist.", 404); } entity.setName(customer.getName()); return entity; } @DELETE @Path("{id}") @Transactional public Response delete(@PathParam Long id) { Customer entity = entityManager.find(Customer.class, id); if (entity == null) { throw new WebApplicationException("Customer with id of " + id + " does not exist.", 404); } entityManager.remove(entity); return Response.status(204).build(); } }
The Entity Bean for this application, points to the customer Database table:
package org.acme; import javax.persistence.*; @Entity @NamedQuery(name = "Customers.findAll", query = "SELECT c FROM Customer c ORDER BY c.name") public class Customer { private Long id; private String name; @Id @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "customerSequence") @SequenceGenerator(name = "customerSequence", sequenceName = "customerSeq", allocationSize = 1, initialValue = 10) public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Customer{" + "id=" + id + ", name='" + name + '\'' + '}'; } }
The application is configured to store data into the “quarkusdb” schema:
quarkus.datasource.db-kind=postgresql quarkus.datasource.username=quarkus quarkus.datasource.password=quarkus quarkus.datasource.jdbc.url=jdbc:postgresql://localhost/quarkusdb quarkus.datasource.jdbc.max-size=8 quarkus.datasource.jdbc.min-size=2 quarkus.hibernate-orm.database.generation=drop-and-create quarkus.hibernate-orm.log.sql=true quarkus.hibernate-orm.sql-load-script=import.sql
As this example application contains an import.sql script, when you start it some Customer objects will be created:
mvn clean install quarkus:dev Hibernate: create table Customer ( id int8 not null, name varchar(255), primary key (id) ) Hibernate: INSERT INTO Customer(id, name) VALUES (1, 'Batman') Hibernate: INSERT INTO Customer(id, name) VALUES (2, 'Superman') Hibernate: INSERT INTO Customer(id, name) VALUES (3, 'Wonder woman') 2020-05-17 12:10:54,874 INFO [io.quarkus] (Quarkus Main Thread) quarkus-hibernate /hello (powered by Quarkus 1.4.2.Final) started in 2.136s. Listening on: http://0.0.0.0:8080 2020-05-17 12:10:54,876 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated. 2020-05-17 12:10:54,876 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [agroal, cdi, hibernate-orm, jdbc-postgresql, narayana-jta, resteasy, resteasy-jsonb]
Let’s switch to our PostgreSQL shell to verify that the 3 Customer objects have been replicated in the “quarkusdb_backup“:
quarkusdb=# \c quarkusdb_backup quarkusdb_backup=# select * from customer; name | id Batman | 1 Superman | 2 Wonder woman | 3 (3 rows)
Great! data has been streamed from Kafka into the Connector and then into the Database.
Let’s try adding another record:
curl -d '{"name":"Spiderman"}' -H "Content-Type: application/json" -X POST http://localhost:8080/customer
Let’s switch into the PostgreSQL shell:
select * from customer; name | id Batman | 1 Superman | 2 Wonder woman | 3 Spiderman | 10
Great! we have just demonstrated how to use Debezium to stream changes in your databases generated by a Quarkus applications.