Kafka topics are the categories used to organize events. You create different topics to hold different kinds of events, and different topics to hold filtered and transformed versions of the same kind of event. As a developer, the topic is among the first set of things you think about when designing how events flow in your system. You create topics and then read or write events to topics.
In a DevOps-driven team where infrastructure is managed and automated via code, how do you automate the creation or deletion of Kafka topics?
In this post, I'm going to show you how to do that using JavaScript. There are different ways to achieve the same thing, for different infrastructure set-ups. For this post, let's assume we're running a Kafka cluster that's only accessible within a VPC, and that we also have a Kubernetes cluster. The solution will be a JavaScript app that runs as a Kubernetes Job, whenever a topic needs to be created or deleted.
Why JavaScript? Well, it's an easy way to write a script without the complexities of Bash. If you have JavaScript developers, this enables other developers to contribute as well. If you're a Python shop, the solution can be applied using Python as well.
Set Up The Application
The solution is a Node.js application and for that, you will need a Node.js project. You can create a new project using the npm init
command. If you don't have Node.js and npm, you should download and install the required binaries from nodejs.org/en/download.
Open your terminal to the directory you want to create the app, then run the command npm init -y
. Install the Kafka JavaScript client as a dependency using the command npm install kafkajs
.
Implementing The Solution
The application will read a list of topics to create/delete through a JSON file. What I want to achieve here is a workflow where anyone can make changes to a JSON file in a GitHub repo, and open a PR with their change. Once the PR is merged into the main branch, the code reads the data from that file and then creates or deletes a list of topics as desired.
To achieve this you should create a JSON file named topics.json with the following content:
{
"create": [],
"delete": []
}
That structure allows you to have an array of strings containing the names of topics to create or delete. Also, looking at that file in the source control system gives me an idea of the topics created in Kafka.
Next, create a file api.js. Copy the following code snippet and paste it into api.js.
async function createTopics(topics, kafkaAdmin) {
if (topics.length > 0) {
await kafkaAdmin.createTopics({
topics: topics.map((topic) => ({
topic,
numPartitions: 1,
replicationFactor: 3,
configEntries: [{ name: "min.insync.replicas", value: "2" }],
})),
});
}
}
async function deleteTopics(topics, kafkaAdmin) {
if (topics.length > 0) {
await kafkaAdmin.deleteTopics({ topics: topics });
}
}
module.exports = { createTopics, deleteTopics };
This module exports functions to create and delete Kafka topics. The createTopics
function takes an array of topics and the Kafka admin client instance as arguments. Then it calls kafkaAdmin.createTopics
to create the topics. The number of partitions and config entries specified is just an example. You should configure them to match your setup.
Create a new file index.js and paste the following code into it.
const { Kafka } = require("kafkajs");
const { createTopics, deleteTopics } = require("./api");
const topics = require("../topics.json");
const username = process.env.KAFKA_USERNAME;
const password = process.env.KAFKA_PASSWORD;
const brokers = process.env.KAFKA_URL ? process.env.KAFKA_URL.split(",") : [];
if (!username && !password && brokers.length === 0) {
throw new Error("Missing Kafka Client Credential");
}
const kafka = new Kafka({
clientId: "admin-script",
brokers: brokers,
ssl: {
rejectUnauthorized: false,
},
sasl: {
mechanism: "scram-sha-512",
username,
password,
},
});
const admin = kafka.admin();
admin.connect().then(async () => {
const existingTopics = await admin.listTopics();
const newTopics = topics.create.filter((x) => !existingTopics.includes(x));
await createTopics(newTopics, admin);
const deletionTopics = topics.delete.filter((x) =>
existingTopics.includes(x)
);
await deleteTopics(deletionTopics, admin);
await admin.disconnect();
});
The code above creates a Kafka client and connects to the Kafka admin API. After the connection is established, it calls the functions createTopics
and deleteTopics
respectively and then exits.
Automation Using GitHub Actions
Let's assume your code lives in a GitHub repository, whenever the topics.json file is modified, you want to run a Kubernetes Job that executes the Node.js app. We will do that using GitHub Actions.
Add the file kafka.yml to the directory .github/workflows.
name: Deploy Kafka Topics Job
on:
push:
branches: [main]
env:
JOB_NAME: kafka-topics
AWS_REGION: eu-west-1
KUBERNETES_CLUSTER: demo-cluster
KUBERNETES_NAMESPACE: default
jobs:
build-and-push:
name: Build & Push to ECR
runs-on: ubuntu-latest
steps:
- name: Git checkout
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-west-1
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Add short commit hash
id: short-commit-hash
run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"
- name: Build Docker container and push to ECR
uses: dfreilich/pack-action@v2.1.1
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
IMAGE_TAG: ${{ steps.short-commit-hash.outputs.sha_short }}
with:
args: "build ${{ env.ECR_REGISTRY }}/${{ env.JOB_NAME}}:${{ env.IMAGE_TAG}} --builder heroku/buildpacks --buildpack heroku/nodejs --publish"
deploy-job:
name: Deploy to Kubernetes
needs: [build-and-push]
runs-on: ubuntu-latest
steps:
- name: Git checkout
uses: actions/checkout@v3
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: eu-west-1
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1
- name: Add short commit hash
id: short-commit-hash
run: echo "::set-output name=sha_short::$(git rev-parse --short HEAD)"
- name: Set Image Name
env:
ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
IMAGE_TAG: ${{ steps.short-commit-hash.outputs.sha_short }}
run: 'echo "IMAGE_NAME=$(echo ${ECR_REGISTRY})/$(echo ${JOB_NAME}):$(echo ${IMAGE_TAG})" >> $GITHUB_ENV'
- name: Create Job
env:
SHA: ${{ steps.short-commit-hash.outputs.sha_short }}
run: |
aws eks update-kubeconfig \
--region ${AWS_REGION} \
--name ${KUBERNETES_CLUSTER}
cat <<EOF | kubectl apply -f -
apiVersion: batch/v1
kind: Job
metadata:
name: ${JOB_NAME}-${SHA}
namespace: ${KUBERNETES_NAMESPACE}
labels:
jobgroup: ${JOB_NAME}
spec:
ttlSecondsAfterFinished: 259200
template:
spec:
containers:
- name: ${JOB_NAME}-${SHA}
image: ${IMAGE_NAME}
envFrom:
- secretRef:
name: kafka-secrets
restartPolicy: Never
backoffLimit: 2
EOF
The workflow above has two jobs. The build-and-push
job builds a container image using the Pack CLI and Paketo Buildpacks, and pushes the built image to AWS container registry. The deploy-job
creates a Kubernetes Job using the built image.
The workflow assumes the use of AWS and GitHub actions, but a similar thing can be replicated for other source control and CI/CD systems. The point here is to use JavaScript, a JSON file, and GitHub Actions, to automate creating or deleting Kafka topics when the JSON file changes. The concept can be adapted to satisfy your use case.
Top comments (0)