Kafka Connect is a framework and toolset for building and running data pipelines between Apache Kafka and other data systems. This article provides an overview of Kafka Connect and its key components, such as workers, connectors, tasks, transformations, and converters. It also includes code examples to demonstrate how to configure these components to build a data pipeline from a MySQL database into Kafka.
Kafka Connect workers
You can configure Kafka Connect as a standalone system with a single worker (server) or a distributed system with multiple workers. Each worker is assigned a unique identifier. The worker executes connectors and tasks, oversees their lifecycle, and provides scalability.
In a distributed Kafka Connect setup, multiple workers coordinate task divisions and share metadata and configuration information. If one worker fails, another worker takes over the failed worker's tasks to ensure no disruption to data processing. This allows for parallel data processing and fault tolerance.
Starting Kafka Connect workers
To start a Kafka Connect worker, you must create a properties file (e.g. connect-worker-mysql.properties
) containing the configuration details. Some of the mandatory attributes required to set up a worker are:
- bootstrap.servers - Kafka broker connection string
- group.id - Unique identifier for the worker group
- rest.port - REST interface port number
Additionally, you need to specify the converter classes that determine the serialization formats for keys and values. Other important configurations relate to offset storage, status updates, and plugin paths.
# Example
# connect-worker-mysql.properties
bootstrap.servers=localhost:9092
group.id=connect-worker-1
rest.port=8083
key.converter = org.apache.kafka.connect.json.JsonConverter
value.converter = org.apache.kafka.connect.json.JsonConverter
offset.storage.topic=connect-offsets
config.storage.topic=connect-configs
status.storage.topic=connect-status
plugin.path=/path/to/connector/jar
To start the worker in distributed mode:
$ bin/connect-distributed.sh config/connect-worker-mysql.properties
This command spins up a Kafka Connect worker using the settings defined in the properties file. Make sure the Kafka services are running beforehand.
With multiple workers running, Kafka Connect can distribute connectors and tasks across workers to scale data pipelines.
Kafka Connect connectors
Connectors in Kafka Connect provide a simplified way to integrate with external systems. They are reusable plugins that are responsible for connecting to external data sources or destinations.
Types of connectors
There are two types of connectors in Kafka Connect.
Source connectors are used for ingesting data from external sources into Kafka. For example, a MySQL source connector captures row changes from a database table and produces messages to a Kafka topic.
Sink connectors are used for writing data from Kafka topics to external systems such as HDFS, S3, Elasticsearch, etc. For example, an Elasticsearch sink connector consumes messages from Kafka and writes to an Elasticsearch index.
Connector configuration
To set up a connector, you need to create a JSON configuration file that specifies details such as:
- Connector class name
- External system connection parameters
- Topics to publish data to or consume data from
- Number of tasks
- Converters for serialization See example below:
{
"name": "mysql-source-connector",
"config": {
"connector.class": "MySqlSourceConnector",
"connection.url": "jdbc:mysql://localhost:3306/mydatabase",
"table.whitelist": "users",
"tasks.max": 2,
"topic.prefix": "mysql-topic-",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
This MySQL source connector configuration uses two tasks to stream changes from the "users" table to a Kafka topic named "mysql-topic-users" in JSON format.
The connector configuration can be submitted to Kafka Connect REST API to instantiate the connector. Then Kafka Connect handles executing the connector's tasks and publishing data to the desired topics.
Kafka Connect tasks
Tasks are independent units of work that enable parallel data processing in Kafka Connect. When a connector is created, Kafka Connect divides its work into multiple tasks based on the configured level of parallelism.
Task assignment
Each task processes a subset of the data for a connector. For example, if the source system is a database table, you may assign each task a partition of the table data based on some criteria like a column value.
Some ways in which tasks can be assigned partitions:
- Round-robin for non-partitioned tables
- Hash partitioning based on the hash of the primary key column
- Time-based date or timestamp ranges
-
Custom like any application-specific logic
If there are no natural partitions, Kafka Connect distributes partitions across tasks randomly using round-robin.Task configuration
Tasks inherit most configurations from their parent connector but also allow some custom settings such as:
transforms - Data manipulation logic
converters - Serialization formats
{
"connector.class": "MySqlSourceConnector",
"tasks.max": "4",
"transforms": "insertTopic",
"transforms.insertTopic.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTopic.topic.field": "topic"
}
This MySQL source connector with four tasks inserts a "topic" field to records using a transformation before publishing to Kafka.
Fault tolerance
If any task fails, Kafka Connect restarts it or reassigns partitions to other running tasks. This provides fault tolerance for the data pipeline built using Kafka Connect tasks.
Coordinating partitions across a pool of tasks enables Kafka Connect pipelines to scale the ingestion and delivery of data streams.
Whatβs next
This is just a brief overview and doesn't include many important implementation details. If you are interested in learning the concept more deeply, visit the original Redpanda guide - What is Kafka Connect.
Top comments (0)