DEV Community

Cover image for Running Apache Kafka on Containers
Shardul Srivastava for AWS Community Builders

Posted on • Edited on • Originally published at shardul.dev

Running Apache Kafka on Containers

Apache Kafka is one of the most famous data stores. It's a go-to tool to collect streaming data at scale and process them with either Kafka streams or Apache Spark.

Getting started with Kafka is challenging as it involves familiarising a lot of new concepts such as topics, replication, and offsets, and then you have to understand what a Zookeeper is.

Confluent is a company specialising in Kafka with their cloud-based offering called Confluent cloud. Confluent is one of the biggest contributors to the Kafka open-source project. they have created some great tools to help with Kafka such as KsqlDB that allows us to query streaming data (It's amazing, try it).

Apart from that Confluent has great articles on understanding Kafka internals.

Kafka with Docker

To get started with Kafka on Docker, we are going to use confluent Kafka images.

  1. Create a docker-compose.yaml file with one zookeeper and one Kafka container:
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: broker
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

Start both containers in detached mode:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

docker-compose starts zookeeper on port 2181 and Kafka on port 9092 along with some configurations:

  1. Zookeeper

    1. ZOOKEEPER_CLIENT_PORT - Port where Zookeeper would be available.
    2. ZOOKEEPER_TICK_TIME- the length of a single tick.
  2. Kafka

    1. KAFKA_ZOOKEEPER_CONNECT - Instructs Kafka how to connect to ZooKeeper.
    2. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP - Defines key/value pairs for the security protocol to use, per listener name.
    3. KAFKA_ADVERTISED_LISTENERS - A comma-separated list of listeners with their host/IP and port. Read more about kafka listeners here.
    4. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR - Equivalent of broker configuration offsets.topic.replication.factor which is the replication factor for the offsets topic. Since we are running with just one Kafka node, we need to set this to 1.

Read more about how the connection to kafka broker works in a docker container here

Create Topics in Kafka

Kafka topics are like database tables. Just like a database, we have to create a table to start storing the data, for Kafka we have to create a topic.

Unlike a database that has a command to create a database, Kafka comes with some utility scripts, one of which is to create a topic that requires mandatory input as the topic name and a few other optional arguments.

  1. Log in to the Kafka container

    docker-compose exec kafka bash
    
  2. Create a topic by the name kafka-test

    /usr/bin/kafka-topics \
                 --bootstrap-server broker:9092 \
                 --create \
                 --topic kafka-test
    Created topic kafka-test.
    

Try running this command again and you will get this error:

Error while executing the topic command: Topic 'kafka-test' already exists.

Not so much CI friendly right ?? --if-not-exists allows you to create a topic if it doesn't exist and retuns exit code 0.

/usr/bin/kafka-topics \
             --bootstrap-server broker:9092 \
             --create \
             --if-not-exists \
             --topic kafka-test
Enter fullscreen mode Exit fullscreen mode

There are a couple of other arguments that are essential for a good understanding of Kafka:

  1. --partitions - Kafka topics are partitioned i.e the data of topics are spread across multiple brokers for scalability.
  2. --replication-factor - To make data in a topic fault-tolerant and highly-available, every topic can be replicated, so that there are always multiple brokers that have a copy of the data.

What's a Kafka Message

Once we have the topic created, we can start sending messages to the topic. A Message consists of headers, a key, and a value. Let's talk about each of these.

  1. Headers - Headers are key-value pairs and give the ability to add some metadata about the kafka message. Read the original KIP(Kafka Improvement Proposals) proposing headers here.

  2. Key - Key for the kafka message. The key value can be null. Randomly chosen keys (i.e. serial numbers and UUID) are the best example of message keys. Read more about when you should use a key here.

  3. Value - Actual data to be stored in kafka. Could be a string, json, Protobuf, or AVRO data format.

Writing to a Kafka Topic

Kafka provides a Producer API to send a message to the Kafka topic. This API is available in java with kafka-clients library and python with kafka-python package.

Luckily for us, we don't have to use any of these. Kafka comes with an out of box script kafka-console-producer that allows us to write data to a kafka topic.

Run the command and as soon as the command returns > with a new line, enter the Json message:

/usr/bin/kafka-console-producer \
    --bootstrap-server kafka:9092 \
    --topic kafka-test
>{"tickers": [{"name": "AMZN", "price": 1902}, {"name": "MSFT", "price": 107}, {"name": "AAPL", "price": 215}]}
Enter fullscreen mode Exit fullscreen mode

We have successfully sent a message to the topic.

Enter Control + C to stop the script.

however this message was sent without any key, To send a key we have to set the properties parse.key to allow sending the key.

Default key separator is \t(tab),we can change it by setting the property key.separator. Eg: --property "key.separator=:"

Let's try to send a message with a random key stocks-123 this time:

/usr/bin/kafka-console-producer \
    --bootstrap-server kafka:9092 \
    --topic kafka-test \
    --property "parse.key=true"
>stocks-123 {"tickers": [{"name": "AMZN", "price": 1902}, {"name": "MSFT", "price": 107}, {"name": "AAPL", "price": 215}]}
Enter fullscreen mode Exit fullscreen mode

With the release of kafka version 3.2.0, it's possible to send headers using ConsoleProducer by setting the property parse.headers to true.

Headers are metadata about the kafka message, souce of these stock prices would be a good candidate for the headers. Let's add a header key as stock_source and value as nyse to the Kafka message :

/usr/bin/kafka-console-producer \
    --bootstrap-server kafka:9092 \
    --topic kafka-test \
    --property "parse.key=true" \
    --property "parse.headers=true"
>stock_source:nyse  stocks-123  {"tickers": [{"name": "AMZN", "price": 1902}, {"name": "MSFT", "price": 107}, {"name": "AAPL", "price": 215}]}
Enter fullscreen mode Exit fullscreen mode

Now we have successfully sent a kafka message with a header, key and value.

baby-scream-yeah.gif

Check out supported properties for kafka-consoler-producer here.

Reading from a Kafka Topic

Kafka provides a Consumer API to read messages from a Kafka topic. This API is available in java with kafka-clients library and python with kafka-python package.

Kafka comes with an out of box script kafka-console-consumer to read messages from the kafka topic:

/usr/bin/kafka-console-consumer \
    --bootstrap-server kafka:9092 \
    --topic kafka-test
Enter fullscreen mode Exit fullscreen mode

However, this command only prints the values of the kafka message. To print the key and headers, we have to set the properties print.headers, print.key to true. We can also print the timestamp of the message with the property print.timestamp.

/usr/bin/kafka-console-consumer \
    --bootstrap-server kafka:9092 \
    --topic kafka-test \
    --property print.headers=true \
    --property print.key=true \
    --property print.timestamp=true
Enter fullscreen mode Exit fullscreen mode

There is other information such as partition and offset, they can be printed by setting the properties --property print.offset=true and --property print.partition=true.

Everytime we read from the a kafka topic, Kafka keeps track of the last offset the consumer read from and allows you to read from that point next time, however we can always read from the beginning using the arguments --from-beginning.

To always read from a kafka topic from the beginning:

/usr/bin/kafka-console-consumer \
    --bootstrap-server kafka:9092 \
    --topic kafka-test \
    --from-beginning \
    --property print.headers=true \
    --property print.key=true \
    --property print.timestamp=true
Enter fullscreen mode Exit fullscreen mode

Is this too much to remember ??
Don't worry we have an easier way of reading and writing from Kafka topics.

baby-yoda

kcat Utility

kcat is an awesome tool to make our lives easier, it allows us to read and write from kafka topics without tons of scripts and in a more user-friendly way.

As Confluent puts it, "It is a swiss-army knife of tools for inspecting and creating data in Kafka"

kcat has two modes, it runs in producer mode by specifying the argument -P and consumer mode by specifying the argument -C.It also automatically selects its mode depending on the terminal or pipe type. If data is being piped to kcat it will automatically select producer (-P) mode. If data is being piped from kcat (e.g. standard terminal output) it will automatically select consumer (-C) mode.

  1. To read data from kafka topics, simply run

    kcat -b localhost:9092 -t kafka-test
    
  2. To write data to a Kafka topic, run

    kcat -P -b localhost:9092 -t kafka-test
    

Take a look at the examples here to find out more about the usage.

here are some tips and tricks of using Kafka.

Top comments (1)

Collapse
 
nyangweso profile image
Rodgers Nyangweso

awesome staff