Introduction
Kafka is an open-source distributed streaming platform developed by the Apache Software Foundation. It is designed to handle large volumes of real-time data streams and is used for building real-time data pipelines and streaming applications.
For instance, if you would like to have a real-time event log fetch from the source (Twitter, etc) and insert it into the target (CSV, SQLite, etc), you can leverage the Kafka library in Python. So, you need to know what are producer and consumer in Kafka.
Producer and Consumer
- Producer is a program or application that sends data or messages to a Kafka topic.
- Consumer is a program or application that reads data or messages from a Kafka topic.
How to setup Kafka in Python?
Now, let's start with requirements to configure Kafka in Python.
Requirements
Kafka server
You can start with local installation or AWS and GCP for advanced method). In this tutorial, I am launching Kafka server using Docker Compose.
Assume Docker is installed and running or run docker --version
to confirm
Create a docker compose file (docker-compose.yaml) and copy below snippet.
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Then, we could build the Docker
docker-compose up --build --force-recreate -d
To confirm the containers are up, we need to check the status.
Command
╰─➤ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------------------------
kafka_kafka_1 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 9092/tcp
kafka_zookeeper_1 /etc/confluent/docker/run Up 0.0.0.0:22181->2181/tcp, 2888/tcp, 3888/tcp
Okay, now it shows the container is successfully built and running. Next step, how to create a topic and send the message?
Kafka producer (Python)
Create a topic
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient({"bootstrap.servers": "localhost:29092"})
topic_name = "my_first_topic"
num_partitions = 3
new_topic = NewTopic(topic_name, num_partitions)
try:
admin_client.create_topics([new_topic])
print(f"{topic_name} is created!!")
except:
print(f"{topic_name} is not created!!")
I am using confluent_kafka
to integrate with Kafka server.
Also, we can define the number of partitions for the topic. Important to know that number of partitions cannot be changed after the topic has been created.
If you have a topic with 10 partitions, you can have up to 10 consumers processing messages from that topic in parallel, each handling messages from its assigned partition.
╰─➤ python kafka_topic.py
my_first_topic is created!!
Okay, the topic has been created!
Note: If you first forgot to create a topic, no worries. It should create when you post the message to the new topic.
Post a message
import json
import uuid
from confluent_kafka import Producer
def post_message(producer, data):
try:
producer.produce("my_first_topic", json.dumps(data).encode("utf-8"))
producer.flush()
print(f"message posted!! --> {data['comment']}")
except:
print("failed to post message")
data = {
"user_session_id": str(uuid.uuid4()),
"user_name": "John Doe",
"comment": "Malaysia Boleh!",
}
producer = Producer({"bootstrap.servers": "localhost:29092"})
post_message(producer, data)
When you run the Python script, it could post the message
╰─➤ python producer.py
message posted!! --> Malaysia Boleh!
Kafka consumer (Python)
Okay, let's move to the consumer to check whether the data is really posted.
from confluent_kafka import Consumer
consumer = Consumer(
{
"bootstrap.servers": "localhost:29092",
"group.id": "python-consumer",
"auto.offset.reset": "earliest",
}
)
consumer.subscribe(["my_first_topic"])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print("Error: {}".format(msg.error()))
continue
data = msg.value().decode("utf-8")
topic = msg.topic()
ts = msg.timestamp()
print(data, topic, ts)
And, here is the output when you run the script
╰─➤ python consumer.py
{"user_session_id": "5321202b-694a-4d82-9d2b-97174303595e", "user_name": "John Doe", "comment": "Malaysia Boleh!"} my_first_topic (1, 1678561082329)
Here is the demo when you post a message, the consumer can print it out within a second.
Enjoy ~
Top comments (0)