DEV Community

Cover image for Intro to Kafka using Docker and Python
Boyu
Boyu

Posted on • Updated on

Intro to Kafka using Docker and Python

Note: To get a copy of all code used in this tutorial, please clone this GitHub repository. Reference the README file on how to quickly run all experiments mentioned in this tutorial.

Object vs Log

In database design, the common practice is to think of the world in terms of things. This way of thinking can describe the design of SQL databases like MySQL and PostgreSQL. Developers came up with objects to describe things in the world and the objects get stored in a table with a defined schema. For example, if we are describing a light bulb, we might think of things like brand, manufacturer, brightness, energy consumption, and the current state of the light bulb (on or off). This is useful when the focus is to store data or creating a digital copy of a real-world object. However, object databases face challenges in handling streaming data. In the light bulb example, if the light bulb is used as an indication for Morse code that the state of the light bulb is constantly switching between on and off. It will be difficult to come up with an object representation of the changing light bulb. This is when the thinking of data as event logs become useful.

Light Bulb Example

What is Kafka?

Kafka is a distributed system designed for streaming data. It replaces the traditional object-based database storage with an event-based system. Kafka can be thought of as a distributed log, with information stored as events with data attached to it.

Kafka's design works well with the modern containerization trend and the shift from one big software that operates everything to many small containers that operate independent tasks. By thinking of data in terms of logs, processes can be easily decoupled by splitting an application into many independent read and write tasks. Kafka facilitates data communication by providing write access to data producers and read access to data consumers.

Kafka Architecture

Kafka can handle large traffic by being a distributed system. A Kafka deployment can have many broker servers, which allows for horizontal scaling. Event data in Kafka is organized by topic and each topic is split into multiple partition. This allows horizontal scaling of Kafka as new broker nodes can be assigned to handle and rebalance the partition load. Kafka ensures data fault tolerance by replicating each partition multiple times.

Deploy Kafka on Docker

We will deploy a Kafka cluster with three broker nodes using docker. For this example, we will use the Kafka-Zookeeper setup. An additional Kafdrop node will be used to provide a web user interface for monitoring the Kafka cluster. The architecture looks like the following:

Kafka Interface

Environment Setup

Docker, information on installing Docker here
Docker Compose, information on installing Docker Compose here

Writing Docker Compose

The configuration for Zookeeper. We use the zookeeper docker image, exposing port 2181, which is the default port for Zookeeper.



zookeeper:
  image: zookeeper:3.4.9
  hostname: zookeeper
  ports:
    - "2181:2181"
  environment:
    ZOO_MY_ID: 1
    ZOO_PORT: 2181
    ZOO_SERVERS: server.1=zookeeper:2888:3888
  volumes:
    - ./data/zookeeper/data:/data
    - ./data/zookeeper/datalog:/datalog


Enter fullscreen mode Exit fullscreen mode

The configuration for Kafka broker nodes. We use the Confluent docker image for Kafka and configures the Kafka node to communicate with the Zookeeper node. The following configuration is replicated three times with hostname kafka1, kakfa2, and kafka3 on port 9091, 9092, and 9093.



kafka1:
  image: confluentinc/cp-kafka:5.3.0
  hostname: kafka1
  ports:
    - "9091:9091"
  environment:
    KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
    KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
    KAFKA_BROKER_ID: 1
  volumes:
    - ./data/kafka1/data:/var/lib/kafka/data
  depends_on:
    - zookeeper


Enter fullscreen mode Exit fullscreen mode

The configuration for Kafdrop. We only need to configure Kafdrop to connect to one of the Kafka brokers and Kafdrop can get information about other Kafka brokers from its metadata.



kafdrop:
  image: obsidiandynamics/kafdrop
  restart: "no"
  ports:
    - "9000:9000"
  environment:
    KAFKA_BROKERCONNECT: "kafka1:19091"
  depends_on:
    - kafka1
    - kafka2
    - kafka3


Enter fullscreen mode Exit fullscreen mode

The final configuration should look like this docker-compose.yml file.

Start the Kafka cluster by running docker-compose up, this will deploy 5 docker containers. You can check this using docker ps.

Goto localhost:9000 and you should see the Kafdrop page showing your Kafka deployment with three broker nodes named kafka1, kafka2, and kafka3.

Kafdrop interface

Kafka Dataflow

With the local Kafka service running, we can start interacting with Kafka. In this part, we will build a publisher using the light bulb Morse code example mentioned at the beginning, and a consumer to count the numbers of "dots" and "dashes" being transmitted.

Create a Kafka Topic

We will first need to create a topic in Kafka for the publisher to send data to. In Kafdrop, click on New at the bottom of the page, give the topic name light_bulb to the new topic, and set the number of partitions to 3. For the replication factor, we can keep the default setting of 3, which means each partition of light_bulb will be replicated three times. Back to the home page, you should see the topic light_bulb with 3 partitions.
Kafka Interface

Building Publisher

To add data to the light_bulb topic, we will need to have a publisher to talk to Kafka. A simple way is to use a Python library. Install the Kafka Python connector by Confluent using pip install confluent-kafka and we can start sending data to Kafka using:



from confluent_kafka import Producer

p = Producer({'bootstrap.servers': 'localhost:9091'})
p.produce('light_bulb', key='hello', value='world')
p.flush(30)


Enter fullscreen mode Exit fullscreen mode

The Producer class takes a configuration dictionary and we specify the address for the Kafka broker. Only one address is needed because the brokers contain metadata for reaching other brokers. The produce function sends data without waiting for confirmation, it takes three inputs: the Kafka topic name, a key which is used to determine which partition the data is added to, and a value string of the log data. The flush function is used at the end of the process is completed.

This producer.py file provides an implementation of the Morse code sending light bulb example. We can have the light bulb to send Morse code by publishing on/off status log to the Kafka cluster.



python3 producer.py --key="light-1" --string="XYZ"


Enter fullscreen mode Exit fullscreen mode

With some published data, we can see the information about the data on the Kafka Interface.
Kafka Interface

Building Consumer

Here is a simple example of consuming messages using the Python library.



from confluent_kafka import Consumer, KafkaError

c = Consumer({
    'bootstrap.servers': 'localhost:9091',
    'group.id': 'counting-group',
    'enable.auto.commit': True,
    'session.timeout.ms': 6000,
    'default.topic.config': {'auto.offset.reset': 'smallest'}
})

c.subscribe(['light_bulb'])
while True:
    msg = c.poll(0.1)
    if msg is None:
        continue
    elif not msg.error():
        print(msg.value())
    elif msg.error().code() == KafkaError._PARTITION_EOF:
        print("End of partition reached")
    else:
        print("Error")


Enter fullscreen mode Exit fullscreen mode

The Consumer class also takes a configuration dictionary. We specify the name of the consumer group, enable auto-commit of the consumer offset, set timeout, and configure the offset to start at the smallest element. We can use the consumer object to subscribe to a list of topics and the while loop allows continuously check for new log data in the subscribed topics.

This consumer.py file provides an implementation of counting the number of "dots" and "dashes" in the Morse code transmitted in the light bulb example.

Failure Handling

Single Node Failure

Because Kafka is designed to be fault-tolerant, we can simulate node failure by stopping a container. This will simulate the effect of a server being brought down by faults that might present in the software, hardware, or network. To stop broker node number 3, we can run docker stop kafka_kafka3_1

Kafka without broker node 3

Refreshing the Kafdrop page, we should see that host kafka3 is missing and the partition that was served by kafka3 is now allocated between kafka1 and kafka2. In this case, we see each of the two remaining broker nodes is handling 2 partitions, which sum to 4 partitions in the system, which indicates no data was lost by the failure of the broker node kafaka3. We can click into the topic light_bulb and under partitions, we can see the partition that was lead by kafka3 was updated to another available node. Also, the loss of broker node 3 results in all of the partitions being under replicated, as the configuration calls for three replications but only two broker nodes are available.

Multi-Node Failure

We will also stop broker node number 2 by running docker stop kafka_kafka2_1. In this case, because we configured the replication factor as three, failing two nodes will not result in data loss. We can see that the leader node for all partitions is switched to the remaining broker node 1.
Kafka without broker node 2 and 3

All Nodes Failure

To stop the last remaining node, run docker stop kafka_kafka1_1. In this case, we will result in data loss. As we lose all replications of data. However, this is an unlikely situation and it is more likely caused by network issues than all of the broker nodes failed. So it is likely that when the network is resumed, data can be retrieved. However, having no broker node will mean the Kafka server is unavailable.

Summary

In general, Kafka is a highly configurable distributed system that fits the need of modern agile applications. It is the data service layer between the publisher and consumer of application data.

Top comments (2)

Collapse
 
sharmagaurav03 profile image
Gaurav Sharma

Hello,
When I am running your code on macOS Mojave, docker-compose version 1.29.2, build 5becea4c, I am getting below error:

zookeeper_1 | chown: /datalog: Operation not permitted
zookeeper_1 | chown: /datalog: Operation not permitted
kafka-light-bulb_zookeeper_1 exited with code 1

Any idea how can I resolved the same?

Collapse
 
snowleopard999 profile image
snow-leopard999

Hi! Thanks for the post. Everything works perfectly, except consumer.poll returns None. I basically copy pasta'd everything from the post, but can't consume messages. Do you have any idea why?