In Part 1, we explored the workings of Debezium and its integration process. This post will guide us through implementing the Debezium engine, enabling it to connect to a PostgreSQL database and export the required records to Kafka.
Let's solve below usecase
A small online bookstore wants to notify its customers in real-time when books they are interested in become available or when their stock levels are low, encouraging them to make a purchase decision.
For this experiment , lets implement a solution which can create a channel for us to notify user's in real time for any new books inserted to book table
We can implement this end to end solution in 3 simple steps
- Install Postgres and Kafka modules using Docker.
- Configuring necessary permission for Debezium User.
- Developing the Embedded Debezium engine (using spring boot).
Step 1
Below Docker compose file let us to spin the tech stack
- Install Postgres ( our active Database )
- Install Kafka , Zookeeper ( Debezium uses to export the DB records)
- Kafkadrop ( to see the data in action )
version: '3.8'
services:
# PostgreSQL service
postgres:
image: postgres:latest
container_name: my-postgres-container
environment:
POSTGRES_DB: book_store
POSTGRES_USER: myuser
POSTGRES_PASSWORD: mypassword
ports:
- "5432:5432"
volumes:
- ./postgres-data:/var/lib/postgresql/data
- ./custom-postgresql.conf:/etc/postgresql/postgresql.conf
command: ["postgres", "-c", "config_file=/etc/postgresql/postgresql.conf"]
# Apache Kafka service using Confluent version
kafka:
image: confluentinc/cp-kafka:6.2.1
container_name: my-kafka-container
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
ports:
- "9092:9092"
volumes:
- ./kafka-data:/var/lib/kafka/data
depends_on:
- zookeeper
# Zookeeper service required for Kafka
zookeeper:
image: wurstmeister/zookeeper:3.4.6
container_name: my-zookeeper-container
ports:
- "2181:2181"
# kafdrop service required for kafka ui
kafdrop:
image: obsidiandynamics/kafdrop:latest
container_name: my-kafdrop-container
environment:
KAFKA_BROKERCONNECT: kafka:9093
JVM_OPTS: "-Xms32M -Xmx64M"
ports:
- "9000:9000"
depends_on:
- kafka
save the docker file and use docker compose to run
docker-compose -f debezium.yaml up
- Ensure that PostgreSQL is configured with wal_level=logical to enable logical replication. This setting informs PostgreSQL that logical replication is needed, allowing Debezium to interpret the WAL (Write-Ahead Logging) log accurately and extract detailed information about the affected rows.
Step 2
Configuring postgres publication to be use by debezium
create publication bookstore_replication for table book_store.book_inventory ;
Step 3
- In this step we create a simple Spring Boot application.
- Configure this app to subscribe to changes from our book_store inventory table.
- We will read all the DML operations from the DB and publish them as events to kafka.
Embedded Engine
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.tech.debezium</groupId>
<artifactId>embeddedDebeziumEngine</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>embeddedDebeziumEngine</name>
<description>EmbeddedDebeziumEngine</description>
<properties>
<version.debezium>2.5.2.Final</version.debezium>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>6.1.4</version>
</dependency>
<!-- For Maven -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${version.debezium}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version> <!-- Replace with the latest version -->
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-connect-storage-core -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Spring Boot Application Class
public static void main(String[] args) {
SpringApplication.run(EmbeddedDebeziumEngineApplication.class, args);
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Configuration postgresDebeziumConfig = io.debezium.config.Configuration.create()
.with("name", "postgres-inventory-connector")
.with("bootstrap.servers","localhost:9092")
.with("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore")
.with("offset.storage.topic", "debezium_bookstore_lsn")
.with("offset.storage.partitions", "1")
.with("offset.storage.replication.factor", "1")
.with("offset.flush.interval.ms","6000")
.with("database.hostname", "localhost")
.with("database.port", "5432")
.with("database.user", "myuser")
.with("database.password", "mypassword")
.with("database.dbname", "book_store")
.with("topic.prefix", "book_store")
.with("table.include.list", "book_store.book_inventory")
.with("slot.name","bookstore_replication")
.with("plugin.name","pgoutput")
.with("snapshot.mode","initial")
.build();
PostgresEventHandler changeEventProcessor = new PostgresEventHandler(properties);
debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(postgresDebeziumConfig.asProperties())
.notifying(changeEventProcessor::handleChangeEvent)
.build();
executorService = Executors.newSingleThreadExecutor();
executorService.execute(debeziumEngine);
// Start the Debezium engine
debeziumEngine.run();
}
Here is my handler class
package com.tech.debezium.embeddedDebeziumEngine.handler;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tech.debezium.embeddedDebeziumEngine.model.InventoryEvent;
import io.debezium.data.Envelope;
import io.debezium.engine.RecordChangeEvent;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.Properties;
import static io.debezium.data.Envelope.FieldName.OPERATION;
public class PostgresEventHandler {
private final KafkaProducer<String, String> kafkaProducer;
private static final String TOPIC_NAME = "bookstore_inventory_stream";
private static final String ERROR_MESSAGE = "Exception occurred during event handling";
private static final Logger logger = LoggerFactory.getLogger(PostgresEventHandler.class);
private final ObjectMapper objectMapper = new ObjectMapper();
public PostgresEventHandler(Properties kafkaProperties) {
this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
}
public void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
try {
Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
Optional<InventoryEvent> event = getProductEvent(sourceRecord, operation);
if (event.isPresent()) {
String jsonEvent = objectMapper.writeValueAsString(event.get());
kafkaProducer.send(new ProducerRecord<>(TOPIC_NAME, jsonEvent));
}
} catch (Exception e) {
logger.error(ERROR_MESSAGE, e);
}
}
}
private Optional<InventoryEvent> getProductEvent(SourceRecord event, Envelope.Operation op) {
final Struct value = (Struct) event.value();
Struct values = null;
// Since the operations for CREATE and READ are identical in handling,
// they are combined into a single case.
switch (op) {
case CREATE:
case READ:
case UPDATE: // Handle UPDATE similarly to CREATE and READ, but you're now aware it's an update.
values = value.getStruct("after");
break;
case DELETE:
values = value.getStruct("before");
if (values != null) {
Integer id = values.getInt32("id");
return Optional.of(new InventoryEvent(op.toString(), id, null, null));
} else {
return Optional.empty();
}
default:
// Consider whether you need a default case to handle unexpected operations
return Optional.empty();
}
if (values != null) {
String name = values.getString("name");
Integer id = values.getInt32("id");
Double price = (Double) values.get("price");
return Optional.of(new InventoryEvent(op.toString(), id, name, price));
} else {
return Optional.empty();
}
}
}
Once everything is set up, let's see Debezium in action
Start our Debezium server , this should subscribe to our database and push the recieved messages to kafka
-
Ensure that all events are captured in Kafka using kafkdrop , and you should see 2 topics
- bookstore_inventory_stream ( This topic contains the actual events corresponding to changes in our inventory table)
- debezium_bookstore_lsn ( This topic is utilized by Debezium to store the log sequence number up to which the engine has read. This ensures that in the event of restarts, Debezium can resume streaming from the precise position where it left off.)
Once the data is available in kafka , we can create multiple consumers based on our need to notify users, will cover it in separate post. Here is overall setup and how it looks
The complete source code is available on repository
Top comments (5)
Thanks a lot for your post, I've just referenced your content within mine (dedicated to debezium introduction) :
🪄 Debezium: the magic behind data capture & async replication (for free)
adriens ・ Jul 22
Quite nice post! also i can see my illustration being used. Thanks for the Tag :)
Indeed, I found it on the www, the same than your : are you the author of it ? It's just perfect to understand how things work 🤩
Yup :)
😎👏