DEV Community

Cover image for Kafka Microservice using Nest Js
Kannan
Kannan

Posted on

Kafka Microservice using Nest Js

Hello All,

Hope you are doing well.
In this article, we are going to learn about how to set up a microservice using NestJs and Kafka.

Why did I start writing this?
I recently wanted to set up one and I was struggling to find a good example. After hours of googling, I was able to. So to make life easier for my fellow developers here I am.

Note: In this, I will not be explaining about Kafka or NestJs. So some basic knowledge on those would be helpful while implementing it.

Let's Get Started!!!

To install on windows/ubuntu please refer:
windows
ubuntu

To install Kafka on Mac using Homebrew:

$ brew cask install java
$ brew install Kafka
Enter fullscreen mode Exit fullscreen mode

To start Zookeeper:

zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
Enter fullscreen mode Exit fullscreen mode

Note: Zookeeper should be always started before starting the Kafka server.

To start Kafka:

kafka-server-start /usr/local/etc/kafka/server.properties
Enter fullscreen mode Exit fullscreen mode

Lets check whether Kafka is working properly

To create a topic:

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-first-topic
Enter fullscreen mode Exit fullscreen mode

To create a Producer console:

kafka-console-producer --broker-list localhost:9092 --topic my-first-topic
Enter fullscreen mode Exit fullscreen mode

To create a Consumer console:

kafka-console-consumer --bootstrap-server localhost:9092 --topic my-first-topic --from-beginning
Enter fullscreen mode Exit fullscreen mode

Now Producer console will be able to accept user inputs. Whatever we type on the Producer console should be visible in the Consumer console.

Let's write some code now

Let's create a NestJs application

nest new kafka-consumer
Enter fullscreen mode Exit fullscreen mode

Once the application is created we need the microservices module and Kafka client library.

npm i --save @nestjs/microservices
npm i --save kafkajs
Enter fullscreen mode Exit fullscreen mode

Now we need to update our main.ts to accept Kafka Transport and provide Kafka configurations

// main.ts

import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';

import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: ['localhost:9092'],
      },
      consumer: {
          groupId: 'my-kafka-consumer',
      }
    }
  });

  app.listen(() => console.log('Kafka consumer service is listening!'))
}
bootstrap();

Enter fullscreen mode Exit fullscreen mode

In app.controller.ts we are listening to our topic.

import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from "@nestjs/microservices";
import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) { }

  @MessagePattern('my-first-topic') // Our topic name
  getHello(@Payload() message) {
    console.log(message.value);
    return 'Hello World';
  }
}
Enter fullscreen mode Exit fullscreen mode

Start our server

npm start
Enter fullscreen mode Exit fullscreen mode

Now let's go back to the producer console and send a message now it should be logged in our application console.

Voila!. Now we have set up our Kafka microservice successfully.

In case you want to set up producer inside a nest js application Please follow along.

Create an application and install the necessary dependencies

nest new kafka-producer
npm i --save @nestjs/microservices
npm i --save kafkajs
Enter fullscreen mode Exit fullscreen mode

In app.controller.ts will set up our producer:

import { Controller, Get } from '@nestjs/common';
import { Client, ClientKafka, Transport } from "@nestjs/microservices";
import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) { }

  @Client({
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: 'kafkaSample',
        brokers: ['localhost:9092'],
      },
      consumer: {
        groupId: 'my-kafka-consumer' // Should be the same thing we give in consumer
      }
    }
  })
  client: ClientKafka;

  async onModuleInit() {
    // Need to subscribe to topic 
    // so that we can get the response from kafka microservice
    this.client.subscribeToResponseOf('my-first-topic');
    await this.client.connect();
  }

  @Get()
  getHello() {
    return this.client.send('my-first-topic', 'Hello Kafka'); // args - topic, message
  }
}

Enter fullscreen mode Exit fullscreen mode

Quit the producer console and consumer console that we started earlier if not already.

Start our server

npm start
Enter fullscreen mode Exit fullscreen mode

Now open any Rest API client and hit
GET: http://localhost:3000/

Now we can see the Hello World as our response sent from Kafka microservice.

You can find the source code here:
Consumer Sample
Producer Sample

P.S This is my first blog post. Please feel free to give feedbacks.

Top comments (14)

Collapse
 
igortas profile image
Igor Tashevski

This is weird example. Why the producer service, has @client decorator?

Collapse
 
kannndev profile image
Kannan

I am not sure why do you call this weird. Producer is the service which generates the message so that is why it has a client decorator.

Collapse
 
igortas profile image
Igor Tashevski

I was more thinking, if is possible to annotate the services in a way, what is what, server-service, client-service. There is ServerKafka class, but I can't find way how to use it. There is nothing like @client decorator, to be replaced, like @server.

Thread Thread
 
kannndev profile image
Kannan

Why would you need a decorator for Server. Here the consumer is a microservice server listening on kafka queues. A single service class cannot become a server isnt it?

Thread Thread
 
igortas profile image
Igor Tashevski

When it's appropriate to use ServerKafka class?

Thread Thread
 
kannndev profile image
Kannan

Can you point me to the documentation where you found this.

Thread Thread
 
igortas profile image
Igor Tashevski

docs.nestjs.com/microservices/kafk...
But I can't find nothing related to where ServerKafka class, made sense to use it.

Thread Thread
 
kannndev profile image
Kannan

I dont think server kafka is a class here. Its just the name tht they use to distinguish between client and server

Collapse
 
abomb302 profile image
abomb302

Thank you! not found any repo demo full communication from producer to consumer until this.

Collapse
 
theringleman profile image
Sam Ringleman

Excellent write up, quick concise and to the point! Thanks for taking the time!

Collapse
 
thiagonovato profile image
Thiago Novato

I'd like only produce a message. In this case, need i use some decorator (EventPattern or MessagePattern) to consume a message of a topic? Because in your example, whitouth a subscriber, do not work.

Collapse
 
____jackson___ profile image
jackson

感谢你的分享帮助到我!! Thank you for sharing and helping me.

Collapse
 
lmdinez profile image
lmdinez

how do i create kafka topic partition when send data to kafka topic

Collapse
 
rais_joel profile image
joel beyantumba

Wonderful, you give me the first tuto that worked finally! Thanks a lot