The aim of this tutorial is to capture every change (delete, insert, and update) from the Mysql table and sync it with Clickhouse.
Prerequisites
- Mysql
- Zookeeper
- Kafka
- Kafka-Connect
- Clickhouse
We can set up all of these services with a simple docker-compose file(Source).
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
mysql:
image: quay.io/debezium/example-mysql:${DEBEZIUM_VERSION}
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuse
clickhouse:
image: clickhouse/clickhouse-server:23.2.4.12
links:
- kafka
ulimits:
nofile:
soft: 262144
hard: 262144
ports:
- 8123:8123
- 9000:9000
You can read more about the options of every service in this tutorial.
After saving the yaml file as docker-compose.yml
:
export DEBEZIUM_VERSION=2.2
docker compose up
Now we have a Mysql
container which contains a simple database named inventory
, a Kafka
container, and Zookeeper
which manages a Kafka
cluster, connect
instance which adds abilities of Kafka-Connectors to Kafka and also a Clickhouse
instance. Now we have all perquisites.
Deploy Debezium connector
We can interact with Kafka-Connect
with Rest API.
Base request :
curl -i -X {Request_Type} -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/
See current connectors :
curl -i -X GET -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/
Delete {my-conn} connector:
curl -i -X DELETE -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/{my-conn}
Add connector:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{connector-config-as-json}'
Config for MySQL Connector
{
"name": "mysql-connector",
"config": {
"tasks.max": "1",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "debezium",
"database.include.list": "inventory",
"table.include.list": "inventory.orders",
"database.server.id": "1",
"message.key.columns": "inventory.orders:order_number",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "dbz.inventory.history",
"snapshot.mode": "schema_only",
"topic.prefix": "dbz.inventory.v2",
"transforms": "unwrap",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
-
name
: The name of the connector. -
config
: The connector’s configuration. -
tasks.max
: Only one task should operate at any one time. Because the MySQL connector reads the MySQL server’sbinlog
, using a single connector task ensures proper order and event handling. The Kafka Connect service uses connectors to start one or more tasks that do the work, and it automatically distributes the running tasks across the cluster of Kafka Connect services. If any of the services stop or crash, those tasks will be redistributed to running services. -
connector.class
: Type of connector, On of These -
database.hostname
: The database host, which is the name of the Docker container running the MySQL server (mysql
). Docker manipulates the network stack within the containers so that each linked container can be resolved with /etc/hosts using the container name for the hostname. If MySQL were running on a normal network, you would specify the IP address or resolvable hostname for this value. -
database.user
&database.password
: Username and password of mysql user with these privileges. For this example, I use the root user and pass. -
database.include.list
: Only changes in the inventory database will be detected. -
topic.prefix
: A unique topic prefix. This name will be used as the prefix for all Kafka topics. -
schema.history.internal.kafka.bootstrap.servers
&schema.history.internal.kafka.topic
: The connector will store the history of the database schemas in Kafka using this broker (the same broker to which you are sending events) and topic name. Upon restart, the connector will recover the schemas of the database that existed at the point in time in thebinlog
when the connector should begin reading. -
transforms*
: These transformations are needed to insert data in Clickhouse. More explanation here
Full reference of configs for MySQL connector can be found here.
Consume Messages From Kafka
We wanna see a list of topics in our Kafka broker. First, we should access bash inside the Kafka container :
docker exec -it {kafka-container-name} /bin/bash
Then:
/kafka/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
Note that the topic corresponding to our orders
table in MySQL has such format: {topic.prefix}.{database_name}.{table_name}
. In this example, it turns to dbz.inventory.v2.inventory.orders
To consume all messages from a topic:
/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbz.inventory.v2.inventory.orders --from-beginning
Set Up Clickhouse Tables
As mentioned in this article in Clickhouse doc, we need 3 tables:
- A Table witch Kafka engine
- A Materialized View table
- A MergeTree table
Kafka Engine Table
As mentioned in the doc we should specify the format of message arriving from Kafka topic (one of these), We can use [[Kafka Schema Registry]] but here we wanna parse Json directly, So with help of solution provided in this post we get message as JSONString
format and then parse it using Mat. View.
CREATE TABLE `default`.kafka_orders
(
`msg_json_str` String
)
Engine=Kafka('kafka:9092', 'dbz.inventory.v2.inventory.orders', 'clickhouse', 'JSONAsString')
Full doc of Kafka engine in Clickhouse.
MergeTree Table
As mentioned at the first of this article we wanna capture delete and update so we use ReplacingMergeTree
:
CREATE TABLE default.stream_orders
(
`order_number` Int16,
`order_date` DATE ,
`purchaser` Int16 ,
`quantity` Int16,
`product_id` Int16,
`__deleted` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY (order_number)
SETTINGS index_granularity = 8192
Mat. View
We parse Json using JSONExtract functions in Clickhouse.
We should consider that Debezium treats DATE
data type as a number of days since the 1970-01-01
Source. It's the cause of using toDate
with combination of JSONExtractInt
.
CREATE MATERIALIZED VIEW default.consumer__orders TO default.stream_orders
(
`order_number` Int16,
`order_date` DATE ,
`purchaser` Int16 ,
`quantity` Int16,
`product_id` Int16,
`__deleted` Nullable(String)
) AS
SELECT
JSONExtractInt(msg_json_str,'payload','order_number') AS order_number,
(toDate('1970-01-01')+JSONExtractInt(msg_json_str,'payload','order_date')) AS order_date,
JSONExtractInt(msg_json_str,'payload','purchaser') AS purchaser,
JSONExtractInt(msg_json_str,'payload','quantity') AS quantity,
JSONExtractInt(msg_json_str,'payload','product_id') as product_id,
JSONExtractString(msg_json_str,'payload','__deleted') AS __deleted
FROM default.kafka_orders
A View (Optional)
Clickhouse will merge consumer__orders
table in an irregular schedule so we can't see the latest version of data at all times. But we can use view to obtain this goal:
CREATE VIEW orders(
`order_number` Int16,
`order_date_` DATE ,
`purchaser` Int16 ,
`quantity` Int16,
`product_id` Int16
) AS
SELECT
order_number,
max(order_date) as order_date_,
argMax(purchaser,order_date) as purchaser,
argMax(quantity,order_date) as quantity,
argMax(product_id,order_date) as product_id
FROM default.stream_orders
WHERE `__deleted`= 'false'
GROUP BY order_number
We can also use FINAL
modified instead of GROUP BY
but it's not recommended in a production environment.
Troubleshooting
In case of any error or even lack of data in tables, we should check Clickhouse server logs located in /var/log/clickhouse-server/clickhouse-server.err.log
Top comments (0)