DEV Community

Cover image for Track every PostgreSQL data change using Debezium
Emtiaj Hasan
Emtiaj Hasan

Posted on • Edited on

Track every PostgreSQL data change using Debezium

Imagine we have a Customer Relationship Management (CRM), sales, and inventory systems. As every system's responsibility is specific, these systems only store domain-specific data in their database.

Now, assume we need to generate some reports. Data is scattered in multiple databases; therefore, we must somehow gather data in one place.

One idea can be to make API requests to each system and combine them later. It is definitely a bad idea to communicate systems synchronously. The good news is we can use an asynchronous Event-Driven approach.

Our goal is to keep all service data in the reporting service so that it does not need to ask for related service data. To achieve this, we can publish events when data, e.g., CRM service's customer-related info, is created/updated/deleted, and the reporting service consumes the event and syncs it.

Although it seems a good idea, it might be a headache when a service has a lot of tables and data gets changed using a lot of internal APIs. For simplicity, imagine our inventory service has Product, Location, Supplier, and Stock tables. We have CREATE, UPDATE and DELETE APIs for each table. Therefore, we need to publish the data change event from each API handler. On top of that, what if we have some scripts that we execute manually to change data? We also need to notify the reporting service about changes.

Change data capture

In databases, change data capture is a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data. - Wikipedia

Light-bulb moment, right? We are basically trying to build a system to capture every data change so that the reporting service always has the updated data.

Then I found a beautiful tool Debezium.

Debezium

Just like the Watchers in the Marvel Comics universe, who observe and record events throughout the universe, Debezium watches the database, keeps track of changes, and makes it easy to send those changes to other systems in real time.

Whenever a new row is added, a row is updated, or a row is deleted, Debezium notices it immediately. It then packages up these changes and sends them as a continuous stream of events by leveraging the power of Apache Kafka.

PoC implementation

In our scenario, we need to integrate the Debezium in each service so that events get published continually for each row-level change. If there is a consumer in the reporting service, it can easily get the data and store it in its database.

But for a PoC project, we will assume there is only one service, and that service will consume the change. Although it is funny, we are going to build it. As my favourite database is PostgreSQL, I will use it.

Project setup

Before diving into depth, let’s give a tiny piece of info that all codes are available on GitHub, which is a NestJS-based service.

Debezium has a nice tutorial, and I will basically follow it. But I won’t use ZooKeeper as KRaft is production-ready, and Kafka 4.0 will remove ZooKeeper entirely in 2024. Moreover, I will use a docker-compose file to spin up the required containers in the simplest way ever.

version: '3.7'


networks:
   cdc-using-debezium-network:
       name: cdc-using-debezium-network
       driver: bridge
       external: false


services:
   cdc-using-debezium-postgres:
       image: debezium/postgres:11
       container_name: cdc-using-debezium-postgres
       hostname: cdc-using-debezium-postgres
       restart: always
       ports:
           - '5443:5432'
       environment:
           POSTGRES_PASSWORD: 123
           POSTGRES_USER: postgres
           POSTGRES_DB: cdc-using-debezium
       volumes:
           - 'cdc-using-debezium-postgres-data:/var/lib/postgresql/data'
       networks:
           - cdc-using-debezium-network


   cdc-using-debezium-kafka:
       image: bitnami/kafka:3.4
       container_name: cdc-using-debezium-kafka
       hostname: cdc-using-debezium-kafka
       restart: always
       ports:
           - '9092:9092'
       environment:
           KAFKA_CFG_NODE_ID: 1
           KAFKA_KRAFT_CLUSTER_ID: q0k00yjQRaqWmAAAZv955w # base64 UUID
           KAFKA_CFG_PROCESS_ROLES: controller,broker
           KAFKA_CFG_LISTENERS: INTERNAL://cdc-using-debezium-kafka:29092,CONTROLLER://cdc-using-debezium-kafka:29093,EXTERNAL://0.0.0.0:9092
           KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://cdc-using-debezium-kafka:29092,EXTERNAL://localhost:9092
           KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
           KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@cdc-using-debezium-kafka:29093
           KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
           KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
       networks:
           - cdc-using-debezium-network


   cdc-using-debezium-connect:
       image: debezium/connect:2.3
       container_name: cdc-using-debezium-connect
       hostname: cdc-using-debezium-connect
       restart: always
       ports:
           - '8083:8083'
       environment:
           BOOTSTRAP_SERVERS: cdc-using-debezium-kafka:29092
           GROUP_ID: 1
           CONFIG_STORAGE_TOPIC: my_connect_configs
           OFFSET_STORAGE_TOPIC: my_connect_offsets
           STATUS_STORAGE_TOPIC: my_connect_statuses
           ENABLE_DEBEZIUM_SCRIPTING: 'true'
       links:
           - cdc-using-debezium-kafka
           - cdc-using-debezium-postgres
       networks:
           - cdc-using-debezium-network


volumes:
   cdc-using-debezium-postgres-data:
       name: cdc-using-debezium-postgres-data
       driver: local
Enter fullscreen mode Exit fullscreen mode

The above docker-compose.yml file defines and configures a set of Docker services that work together to set up a development environment. It's being set up with PostgreSQL as the source database, Kafka as the message broker, and Debezium Connect as the connector. It also defines a custom network that will be used to communicate between services.

All the required services are up and running. We just get confirmed by executing the docker-compose ps command. However, we need to verify a few specifications.

| Name                        | State | Ports                                                        |
| --------------------------- | ----- | ------------------------------------------------------------ |
| cdc-using-debezium-postgres | Up    | 0.0.0.0:5443->5432/tcp,:::5443->5432/tcp                     |
| cdc-using-debezium-kafka    | Up    | 0.0.0.0:9092->9092/tcp,:::9092->9092/tcp                     |
| cdc-using-debezium-connect  | Up    | 0.0.0.0:8083->8083/tcp,:::8083->8083/tcp, 8778/tcp, 9092/tcp |
Enter fullscreen mode Exit fullscreen mode

Verify PostgreSQL

By running the below command, let’s check the Write-Ahead Logging (WAL) value.

docker exec cdc-using-debezium-postgres psql --username=postgres --dbname=cdc-using-debezium --command='SHOW wal_level'
Enter fullscreen mode Exit fullscreen mode

The value should be logical. Otherwise, the Debezium won’t manage to capture the data change (e.g., when the value is replica).

One more configuration needs to be checked.

docker exec cdc-using-debezium-postgres psql --username=postgres --dbname=cdc-using-debezium --command='SHOW shared_preload_libraries'
Enter fullscreen mode Exit fullscreen mode

The above command should return decoderbufs,wal2json.

Don’t worry; our Docker setup already did it for us. By the way, to get detailed information about those specific configs, we can check the official documentation from here, and here. The mentioned articles also explain what happens behind capturing data changes. For us, it is just magic, but it’s the beauty of engineering. Worth reading!

Database setup

Our most simple DB will have only one table to keep users’ data, and the User table will have id, email, and name columns.

The query to create the table can be found in my Github repository.

Verify Kafka

By making an API request, we ensure the status of the Kafka.

curl localhost:8083 | jq '.'
Enter fullscreen mode Exit fullscreen mode

I used jq just to prettify the JSON. Anyway, it will give the following response.

{
   "version": "3.4.0",
   "commit": "2e1947d240607d53",
   "kafka_cluster_id": "q0k00yjQRaqWmAAAZv955w"
}
Enter fullscreen mode Exit fullscreen mode

Notice the cluster ID? It is the exact same value we defined in the docker-compose.yml file.

Deploying PostgreSQL connector

We are ready to deploy the Debezium PostgreSQL connector to start monitoring the cdc-using-debezium database.

The below curl command essentially sends a request to the Debezium Connect to create a connector that captures changes from a PostgreSQL database and publishes them to Kafka topics.

# IP = $(hostname -I | cut -d ' ' -f 1)
curl --location 'http://localhost:8083/connectors' \
   --header 'Accept: application/json' \
   --header 'Content-Type: application/json' \
   --data '{
   "name": "cdc-using-debezium-connector",
   "config": {
       "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
       "database.hostname": "192.168.1.110",
       "database.port": "5443",
       "database.user": "postgres",
       "database.password": "123",
       "database.dbname": "cdc-using-debezium",
       "database.server.id": "184054",
       "table.include.list": "public.User",
       "topic.prefix": "cdc-using-debezium-topic"
   }
}'
Enter fullscreen mode Exit fullscreen mode

In the above, 192.168.1.110 is my machine’s IP address, so please replace this with your IP address. Anyway, the Debezium tutorial explains the above configuration that we send as the API payload; that’s why I am not going to repeat it here.

Verify Kafka

server=localhost:9092

docker exec cdc-using-debezium-kafka /opt/bitnami/kafka/bin/kafka-metadata-quorum.sh --bootstrap-server $server describe --status

docker exec cdc-using-debezium-kafka /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server $server --list
Enter fullscreen mode Exit fullscreen mode

The first command is to get the status of the Kafka metadata quorum. The latter retrieves and displays a list of all the Kafka topics within the specified Kafka cluster, allowing us to see the names of all the topics in the Kafka cluster.

By running the Kafka Topics command, we should see the topic cdc-using-debezium-topic.public.User. The topic name is the combination of values of topic.prefix and table.include.list that we send as the API payload while creating the connector.

Monitoring database changes

Now, we can capture every row-level change of the User table. Let’s execute some queries.

INSERT INTO "User" (email, name)
VALUES ('ehasan+1@firecrackervocabulary.com', CONCAT('name_', (random() * 1000)::INTEGER::VARCHAR))
ON CONFLICT (email) DO UPDATE SET name = EXCLUDED.name
RETURNING *;


INSERT INTO "User" (email, name)
VALUES ('ehasan+2@firecrackervocabulary.com', 'name_2');


UPDATE "User"
SET name = 'name_20'
WHERE email = 'ehasan+2@firecrackervocabulary.com';


DELETE
FROM "User"
WHERE email = 'ehasan+2@firecrackervocabulary.com';
Enter fullscreen mode Exit fullscreen mode

Let's execute the Kafka-provided command to see all published events.

docker exec cdc-using-debezium-kafka /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cdc-using-debezium-topic.public.User --from-beginning | jq '.'
Enter fullscreen mode Exit fullscreen mode

Let's paste the generated output here, but as the produced output contains a lot of info, I am going to put curated data only. Don’t worry, a sample full JSON can be found here.

[
   {
       "payload": {
           "before": null,
           "after": {
               "id": "ad9c4382-db22-4635-a647-a13de4c5bdce",
               "email": "ehasan+1@firecrackervocabulary.com",
               "name": "name_882"
           },
           "source": {
               "db": "cdc-using-debezium",
               "table": "User"
           },
           "op": "c",
           "ts_ms": 1693046762871
       }
   },
   {
       "payload": {
           "before": null,
           "after": {
               "id": "4f259f0d-434d-4c73-90de-ee20f5635a3f",
               "email": "ehasan+2@firecrackervocabulary.com",
               "name": "name_2"
           },
           "op": "c",
           "ts_ms": 1693046762873
       }
   },
   {
       "payload": {
           "before": {
               "id": "4f259f0d-434d-4c73-90de-ee20f5635a3f",
               "email": "ehasan+2@firecrackervocabulary.com",
               "name": "name_2"
           },
           "after": {
               "id": "4f259f0d-434d-4c73-90de-ee20f5635a3f",
               "email": "ehasan+2@firecrackervocabulary.com",
               "name": "name_20"
           },
           "op": "u",
           "ts_ms": 1693046762874
       }
   },
   {
       "payload": {
           "before": {
               "id": "4f259f0d-434d-4c73-90de-ee20f5635a3f",
               "email": "ehasan+2@firecrackervocabulary.com",
               "name": "name_20"
           },
           "after": null,
           "op": "d",
           "ts_ms": 1693046762874
       }
   }
]
Enter fullscreen mode Exit fullscreen mode

The JSON above represents a sequence of database changes – creations, updates, and deletions – in the "User" table. Cool!

An important note: We might not get the previous data (within the before key) when executing an UPDATE or a DELETE query if PostgreSQL’s REPLICA IDENTITY is DEFAULT. So, we need to change it to FULL by executing the below query.

ALTER TABLE "User"
   REPLICA IDENTITY FULL;
Enter fullscreen mode Exit fullscreen mode

Implementing consumer

As we see, Kafka is publishing events for any changes, so our task is to subscribe to the event in the reporting service.

I will use the KafkaJS library to achieve it quickly. Let’s just paste the code here.

import { Injectable, OnApplicationBootstrap, OnApplicationShutdown } from '@nestjs/common';
import { Consumer, Kafka } from 'kafkajs';

enum OperationType {
    c = 'CREATE',
    u = 'UPDATE',
    d = 'DELETE',
}

@Injectable()
export class KafkaService implements OnApplicationShutdown, OnApplicationBootstrap {
    private readonly kafka: Kafka;

    private consumer: Consumer;

    constructor() {
        this.kafka = new Kafka({
            brokers: [`localhost:9092`],
        });
    }

    async onApplicationBootstrap(): Promise<void> {
        await this.listen();
    }

    async onApplicationShutdown(): Promise<void> {
        await this.consumer.disconnect();
    }

    private async listen(): Promise<void> {
        const consumer = this.kafka.consumer({
            groupId: 'console-consumer-24440',
        });
        this.consumer = consumer;

        await consumer.connect();
        await consumer.subscribe({ topics: ['cdc-using-debezium-topic.public.User'], fromBeginning: true });

        await consumer.run({
            eachMessage: async ({ topic, message }) => {
                const parsedMessage = JSON.parse(message.value.toString());
                const eventPayload = {
                    topic,
                    database: parsedMessage.payload.source.db,
                    table: parsedMessage.payload.source.table,
                    operationType: OperationType[parsedMessage.payload.op],
                    data: parsedMessage.payload.after,
                    previousData: parsedMessage.payload.before,
                };
                console.log({ eventPayload });
            },
        });
    }
}
Enter fullscreen mode Exit fullscreen mode

The KafkaService class demonstrates how to consume messages from a Kafka topic and process them. The actual Kafka message consumption logic resides in the listen method.

When a message arrives, the eachMessage callback is triggered. The incoming message’s content is transformed into the eventPayload object. This object contains details about the event, including the topic, database, table, operation type, data after the operation, and previous data before the operation.

Voilà! Now, the reporting service can persist the data into its database, and as we have all the data, we can easily generate various types of reports and perform analytical operations.

Conclusion

Debezium may not be the panacea for all problems, but I believe we may use it to delegate all our responsibilities in scenarios where we need to propagate changed data so that we can just concentrate on our actual business logic.

I hope you enjoyed it. Thank you for reading it.

NB: The cover image is taken from here.

Top comments (8)

Collapse
 
daveybrown profile image
daveybrown

Hey Emtiaj, I just worked through this. It's a really well written post, and I successfuly got your app up and running and logging consumed events 🎉

In docker compose I used version 16 of Postgres - SHOW shared_preload_libraries didn't return wal2json for me, but everything still worked. The guide is a solid foundation for me to build upon, so thank you :)

I did it live on a 2 hour stream, with some commentary, if you like I can share the link.

Collapse
 
emtiajium profile image
Emtiaj Hasan • Edited

That is awesome!

Yes, please. Sharing is caring! 🫠

By the way, I had planned to use AWS Kinesis instead of Kafka and write another blog post, but laziness is all with me. 😭

Collapse
 
daveybrown profile image
daveybrown

It's at youtu.be/kGixHsxirmA 🤲 maybe it can be useful for someone.

Kinesis looks cool, but I'm very disconnected from the AWS world. Would it be a big advantage over Kafka?

Thread Thread
 
emtiajium profile image
Emtiaj Hasan

I wanted to play with it as I heavily use SNS and SQS.

Just out of curiosity, you know!

Thread Thread
 
daveybrown profile image
daveybrown

Cool, I see :)

Also, thanks for the intro to NestJS. I'm researching it now. Do you use it professionally?

Thread Thread
 
emtiajium profile image
Emtiaj Hasan

In my company, for my vocabulary flashcard app's day-to-day's implementation, I use it. 😁

A cool framework!

Collapse
 
krishnesh_kumar_48371d150 profile image
Krishnesh Kumar

what if we have two tables related with Many2Many in this case both events from related table can go to different partition of the topic how we can handle that? I tried to add a partition key to the record via debzium configuration but it don't worked for me.

I tried partition routing but still I am getting the same result i.e my one of the user record is going to partition 0 and m2m related table is going to partition 1 and so forth. Here i am sharing my configuration,

{
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres-abcdef",
      "database.port": "5432",
      "database.user": "testx",
      "database.password": "qwerty123",
      "database.dbname": "dashboard",
      "database.server.name": "dashboard",
      "plugin.name": "pgoutput",
      "publication.name": "users_publication",
      "slot.name": "users_slot",
      "table.include.list": "public.zenauth_abcuser, public.zenauth_usercustomer",
      "database.history.kafka.bootstrap.servers": "kafka:9092",
      "database.history.kafka.topic": "schema-changes.postgres",
      "snapshot.mode": "never",
      "topic.prefix": "dashboard",
      "snapshot.isolation.mode": "read_committed",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false",
      "key.converter.schemas.enable": "false",

      "transforms": "PartitionRouting,Reroute",

      "transforms.PartitionRouting.type":"io.debezium.transforms.partitions.PartitionRouting",
      "transforms.PartitionRouting.partition.topic.num": "3",
      "transforms.PartitionRouting.partition.payload.fields":"after.username",

      "transforms.PartitionRouting.predicate":"allTopic",
      "predicates":"allTopic",
      "predicates.allTopic.type":"org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
      "predicates.allTopic.pattern":"public.*",

      "transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
      "transforms.Reroute.topic.regex": ".*",
      "transforms.Reroute.topic.replacement": "dashboard.users",


      "errors.tolerance": "all",
      "errors.log.enable": "true",
      "errors.log.include.messages": "true"
    }
Enter fullscreen mode Exit fullscreen mode

Also can you please help what if we have a delete event in that after object will be empty in case of create before object will be empty how we can handle these type of condition?

"transforms.PartitionRouting.partition.payload.fields":"after.username",

Also I saw in doc that we can refer these using change.username is that the correct way?

Collapse
 
emtiajium profile image
Emtiaj Hasan

@krishnesh Kumar, sorry to say that I didn't experience this type of scenario, therefore, I cannot help here. If I manage to replicate this scenario and fix it, for sure, I will share it with you. In the meantime, if you find a solution, don't bother to share it here.

Thanks!