Introduction
In today's data-driven world, maintaining synchronized data across different systems is crucial. Change Data Capture (CDC) has emerged as a powerful pattern for tracking and propagating changes from your database in real-time. In this guide, we'll build a practical example using Debezium and Apache Kafka to create a robust CDC pipeline.
What We'll Build
We'll create a simple e-commerce scenario where order updates in a PostgreSQL database are automatically synchronized with an Elasticsearch instance for real-time search capabilities. This setup demonstrates a common real-world use case for CDC.
Prerequisites
- Docker and Docker Compose
- Java 11 or higher
- Maven
- Git
- PostgreSQL client (psql)
- curl (for testing)
Architecture Overview
Our architecture consists of several components:
- PostgreSQL database (source)
- Debezium connector
- Apache Kafka
- Kafka Connect
- Elasticsearch (target)
- Simple Spring Boot application for testing
graph LR
A[PostgreSQL] -->|Debezium| B[Kafka Connect]
B -->|Events| C[Kafka]
C -->|Sink Connector| D[Elasticsearch]
E[Spring Boot App] -->|Writes| A
D -->|Search| E
Implementation Steps
1. Setting Up the Environment
First, let's create our project structure:
mkdir cdc-demo
cd cdc-demo
git init
Create a docker-compose.yml
file:
version: '3'
services:
postgres:
image: debezium/postgres:13
ports:
- "5432:5432"
environment:
- POSTGRES_DB=inventory
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
volumes:
- ./postgres/init:/docker-entrypoint-initdb.d
kafka:
image: confluentinc/cp-kafka:7.3.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
connect:
image: debezium/connect:2.1
ports:
- "8083:8083"
environment:
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
BOOTSTRAP_SERVERS: kafka:29092
depends_on:
- kafka
- postgres
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.0
ports:
- "9200:9200"
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms512m -Xmx512m
2. Creating the Database Schema
Create postgres/init/init.sql
:
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
customer_id INTEGER NOT NULL,
order_date TIMESTAMP NOT NULL,
status VARCHAR(50) NOT NULL,
total_amount DECIMAL(10,2) NOT NULL
);
ALTER TABLE orders REPLICA IDENTITY FULL;
3. Configuring Debezium
After starting the containers, configure the Debezium connector:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"database.server.name": "dbserver1",
"table.include.list": "public.orders",
"plugin.name": "pgoutput"
}
}'
4. Spring Boot Application
Create a new Spring Boot project with the following dependencies:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
Create the Order entity:
@Entity
@Table(name = "orders")
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private Long customerId;
private LocalDateTime orderDate;
private String status;
private BigDecimal totalAmount;
// Getters, setters, and constructors
}
Create a REST controller for testing:
@RestController
@RequestMapping("/api/orders")
public class OrderController {
private final OrderRepository orderRepository;
public OrderController(OrderRepository orderRepository) {
this.orderRepository = orderRepository;
}
@PostMapping
public Order createOrder(@RequestBody Order order) {
order.setOrderDate(LocalDateTime.now());
return orderRepository.save(order);
}
@PutMapping("/{id}")
public Order updateOrder(@PathVariable Long id, @RequestBody Order order) {
return orderRepository.findById(id)
.map(existingOrder -> {
existingOrder.setStatus(order.getStatus());
existingOrder.setTotalAmount(order.getTotalAmount());
return orderRepository.save(existingOrder);
})
.orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND));
}
}
5. Testing the Pipeline
- Start all containers:
docker-compose up -d
- Create a test order:
curl -X POST http://localhost:8080/api/orders \
-H "Content-Type: application/json" \
-d '{
"customerId": 1,
"status": "NEW",
"totalAmount": 99.99
}'
- Update the order:
curl -X PUT http://localhost:8080/api/orders/1 \
-H "Content-Type: application/json" \
-d '{
"status": "PROCESSING",
"totalAmount": 99.99
}'
- Check Kafka topics to verify the CDC events:
docker-compose exec kafka kafka-console-consumer \
--bootstrap-server kafka:29092 \
--topic dbserver1.public.orders \
--from-beginning
Common Challenges and Solutions
-
Data Consistency
- Use transaction logs for accurate change capture
- Implement idempotent consumers
- Handle out-of-order events
-
Performance Optimization
- Batch updates when possible
- Monitor Kafka partition lag
- Tune PostgreSQL replication slots
-
Error Handling
- Implement dead letter queues
- Set up proper monitoring and alerting
- Create retry mechanisms
Best Practices
-
Schema Evolution
- Use Avro for schema management
- Plan for backward/forward compatibility
- Test schema changes thoroughly
-
Monitoring
- Track replication lag
- Monitor Kafka consumer group offsets
- Set up alerts for connector failures
-
Security
- Use SSL/TLS for communication
- Implement proper authentication
- Follow least privilege principle
Conclusion
CDC with Debezium and Kafka provides a robust solution for real-time data synchronization. This setup can be extended to handle more complex scenarios like:
- Multi-region deployment
- Multiple target systems
- Complex transformation pipelines
- High availability requirements
Top comments (0)