DEV Community

Cover image for Building a Simple Kafka Producer and Consumer using Python
Abhinav Nath
Abhinav Nath

Posted on

Building a Simple Kafka Producer and Consumer using Python

Let's create a simple mini project to interact with Kafka using Python.

We will build a simple message Producer and Consumer. Mini projects like this are helpful when you quickly want to test some behaviour of Kafka. So let's get started.

First off, let's spin up all the required components using this docker-compose.yml. It will start Zookeeper, Kafka and Kafdrop containers in your system.


Let's create our message Producer now.

We need to import KafkaProducer from the kafka library.

We have to specify the address of our Kafka server (which we created in the above step) while creating a KafkaProducer.

We need to pass these minimum parameters in the producer.send() method:

  1. Topic name : we will use the name my_test_topic
  2. value : the message itself
  3. key (optional) : the key of the message

from kafka import KafkaProducer

bootstrap_servers = ['localhost:29092']
topic_name = 'my_test_topic'

producer = KafkaProducer(bootstrap_servers = bootstrap_servers)

future = producer.send(topic_name, key=b'1', value=b'This is message 1')

metadata = future.get()

print('message sent successfully on topic %s partition %s' % (metadata.topic, metadata.partition))
Enter fullscreen mode Exit fullscreen mode

When we run our Producer, the specified Kafka topic (my_test_topic) gets created automatically and the message is sent to the topic.


Let's create the Consumer now. We need the server and topic name as above.

from kafka import KafkaConsumer
import sys

bootstrap_servers = ['localhost:29092']
topic_name = 'my_test_topic'

consumer = KafkaConsumer(
                         topic_name,
                         bootstrap_servers = bootstrap_servers,
                         auto_offset_reset = 'earliest',
                         enable_auto_commit = True,
                         group_id = 'my-group-1'
                        )

try:
    for message in consumer:
        print ("Message received - %s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))
except KeyboardInterrupt:
    sys.exit()
Enter fullscreen mode Exit fullscreen mode

Run the Consumer and it will receive the message sent by the Producer.


Thanks for reading! The source code is available in this GitHub repo.

Support me if you liked this post

Top comments (0)