The problem to solve
Some months back, I encountered a scenario where I needed to synchronize two databases: one based on SQL Server and the other on MongoDB. The SQL Server database was the primary source of information derived from the client's business logic, and the MongoDB was the source of information for our proprietary e-commerce sites.
We needed to synchronize some of the client SQL server operations on our Mongo DB.
The problem in pieces
After some research, we found a way to achieve this using an event-driven architecture pattern:
- We must capture events from SQL Server (INSERT, DELETE, UPDATES, etc.. )
- Stream those events
- Consume events and update the MongoDB.
Solution
We used the change data capture (CDC) of SQL Server in combination with Debizium connectors to capture the SQL operations (such as inserts, deletes, updates, etc..) and send them to Apache Kafka, an open-source event streaming platform. And for the consumers part we develop our own using python.
The architecture will look like this:
Image from Debizium documentation
Tutorial
I will explain how to do it using MiniKube (local kubernetes) ideal for testing before production. You probably need to have some basic knowladge of Kubernetes.
The first part in this process is to enable change data capture for the table of your interest (Microsoft Docs)
-- Enable CDC for a table specifying filegroup
USE MyDB
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'MyTable',
@role_name = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 1
GO
Once you enable the CDC, we can start using minikube to deploy locally our cluster.
Deploying architecture in Minikube
To deploy the architecture locally I strongly suggest to follow the steps in the debizium documentation, It's straigh forward and the only thing I could do is rewrite it here, and its not worth it. Insted I will hightligth some key steps to use it with SQl server connector insted of Mysql connector (which is the current example in the docs)
Configuring Kafka connect
In order to connect to SQL Server you need to use the Kafka connect connector: debizium-sqlserver-connnector, you can find it here: https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/
Then use the next kafka-connect-configuration file to create the connector
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: strimzi-cluster-operator
# annotations:
# # use-connector-resources configures this KafkaConnect
# # to use KafkaConnector resources to avoid
# # needing to call the Connect REST API directly
# strimzi.io/use-connector-resources: "true"
spec:
version: 3.5.1
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output: # (2)
type: docker
image: {here-go-your-container-repository-registry}
pushSecret: {your-secret}
plugins: # (3)
- name: debezium-sqlserver-connector
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/2.3.3.Final/debezium-connector-sqlserver-2.3.3.Final-plugin.tar.gz
As you see on (2) in the image field of the yaml file, you need to write a container repository registry pointing to a docker image that will be use to build the kafka-connect container. Example: gustavo/kafka-connect-sql-server:latest, use your own. I use docker hub for this one.
And finally you need to create a kubernetes's secret, you can name it as you wish, for example: secret-kafka.
Here is an example:
kubectl create secret docker-registry secret-kafka -n kafka \
--docker-email=your-user-email-goes-here \
--docker-username=your-user-name-goes-here \
--docker-password=your-password-goes-here
note that in this example we are pointing to the the kafka namespace (-n kafka) if you are following the debizium tutorial, you should use the same namespace.
Creating the Debizium SQL Server Connector
Finally this how your kafka-connection should look like:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-sqlserver
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.sqlserver.SqlServerConnector
tasksMax: 1
config:
schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
database.hostname: your-sql-server-hostname
database.port: you-sql-server-port
database.user: your-sql-server-user
database.password: your-sql-server-password
database.dbname: your-sql-server-db
database.server.name: your-sql-server-name
topic.prefix: chose-a-prefix
database.names: database-name
database.encrypt: true
database.trustServerCertificate: true
database.include.list: database-list-comma-separated
table.include.list: table-list-comma-separated
Look the sql server connector docs for more details.
Example:
table.include.list = dbo.table0, dbo.table1, dbo.table2
topic.prefix = dev-prefix
database.names = SQLENTERPRISE
Consumers
Through this step you should have a running cluster using minikube.
The last step is to make some consumers to capture kafka events. You can refert to confluent documentation to install the confluent kafka client, this will help us interact with the kafka cluster.
Let's create a file that will help us to listen kafka events
decorators.py
from confluent_kafka import Consumer, KafkaError
# Define the Kafka consumer configuration
consumer_config = {
'bootstrap.servers': '10.10.1.1:9092', # Replace with your Kafka broker(s) address
'group.id': 'connect-cluster-id',
'auto.offset.reset': 'earliest', # Start consuming from the beginning of the topic
}
# Create a Kafka consumer instance
consumer = Consumer(consumer_config)
# Subscribe to a Kafka topic
consumer.subscribe(['dev-prefix.SQLENTERPRISE.dbo.table1', 'dev-prefix.SQLENTERPRISE.dbo.table2', 'dev-prefix.SQLENTERPRISE.dbo.table3']) # Replace with the name of the topic you want to consume from
print("Subscribed to topic")
def loop(func):
def wrapper():
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
# Handle any errors that occurred during consumption
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print(f"Reached end of partition for topic {msg.topic()} partition {msg.partition()}")
else:
print(f"Error while consuming message: {msg.error()}")
else:
# kafka logic
print(f"Consumed message: {msg.value()}")
func(msg)
except KeyboardInterrupt:
pass
finally:
# Close the Kafka consumer when done
consumer.close()
return wrapper
main.py
from util.decorators import loop
import json
@loop
def main(msg):
key = str(msg.topic()).replace("dev-prefix.SQLENTERPRISE.dbo.", '').lower()
value = json.loads(msg.value().decode("utf-8"))
payload = { key: value }
if msg.topic() == "dev-prefix.SQLENTERPRISE.dbo.table0":
print(payload)
elif msg.topic() == "dev-prefix.SQLENTERPRISE.dbo.table1":
print(payload)
elif msg.topic() == "dev-prefix.SQLENTERPRISE.dbo.table2":
print(payload)
else:
print("topic not found")
main()
Finally you could replace the print(payload)
to a POST call pointing to a microservice that you want to delegate mongoDB updates or you could do it rigth there.
And that it all! 🎉🎉
In the future, I will explain how to deploy it on production using google cloud.
Top comments (0)