Hello All,
Hope you are doing well.
In this article, we are going to learn about how to set up a microservice using NestJs and Kafka.
Why did I start writing this?
I recently wanted to set up one and I was struggling to find a good example. After hours of googling, I was able to. So to make life easier for my fellow developers here I am.
Note: In this, I will not be explaining about Kafka or NestJs. So some basic knowledge on those would be helpful while implementing it.
Let's Get Started!!!
To install on windows/ubuntu please refer:
windows
ubuntu
To install Kafka on Mac using Homebrew:
$ brew cask install java
$ brew install Kafka
To start Zookeeper:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
Note: Zookeeper should be always started before starting the Kafka server.
To start Kafka:
kafka-server-start /usr/local/etc/kafka/server.properties
Lets check whether Kafka is working properly
To create a topic:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-first-topic
To create a Producer console:
kafka-console-producer --broker-list localhost:9092 --topic my-first-topic
To create a Consumer console:
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-first-topic --from-beginning
Now Producer console will be able to accept user inputs. Whatever we type on the Producer console should be visible in the Consumer console.
Let's write some code now
Let's create a NestJs application
nest new kafka-consumer
Once the application is created we need the microservices module and Kafka client library.
npm i --save @nestjs/microservices
npm i --save kafkajs
Now we need to update our main.ts to accept Kafka Transport and provide Kafka configurations
// main.ts
import { NestFactory } from '@nestjs/core';
import { Transport, MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'my-kafka-consumer',
}
}
});
app.listen(() => console.log('Kafka consumer service is listening!'))
}
bootstrap();
In app.controller.ts we are listening to our topic.
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from "@nestjs/microservices";
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) { }
@MessagePattern('my-first-topic') // Our topic name
getHello(@Payload() message) {
console.log(message.value);
return 'Hello World';
}
}
Start our server
npm start
Now let's go back to the producer console and send a message now it should be logged in our application console.
Voila!. Now we have set up our Kafka microservice successfully.
In case you want to set up producer inside a nest js application Please follow along.
Create an application and install the necessary dependencies
nest new kafka-producer
npm i --save @nestjs/microservices
npm i --save kafkajs
In app.controller.ts will set up our producer:
import { Controller, Get } from '@nestjs/common';
import { Client, ClientKafka, Transport } from "@nestjs/microservices";
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) { }
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'kafkaSample',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'my-kafka-consumer' // Should be the same thing we give in consumer
}
}
})
client: ClientKafka;
async onModuleInit() {
// Need to subscribe to topic
// so that we can get the response from kafka microservice
this.client.subscribeToResponseOf('my-first-topic');
await this.client.connect();
}
@Get()
getHello() {
return this.client.send('my-first-topic', 'Hello Kafka'); // args - topic, message
}
}
Quit the producer console and consumer console that we started earlier if not already.
Start our server
npm start
Now open any Rest API client and hit
GET: http://localhost:3000/
Now we can see the Hello World
as our response sent from Kafka microservice.
You can find the source code here:
Consumer Sample
Producer Sample
P.S This is my first blog post. Please feel free to give feedbacks.
Top comments (14)
This is weird example. Why the producer service, has @client decorator?
I am not sure why do you call this weird. Producer is the service which generates the message so that is why it has a client decorator.
I was more thinking, if is possible to annotate the services in a way, what is what, server-service, client-service. There is ServerKafka class, but I can't find way how to use it. There is nothing like @client decorator, to be replaced, like @server.
Why would you need a decorator for Server. Here the consumer is a microservice server listening on kafka queues. A single service class cannot become a server isnt it?
When it's appropriate to use ServerKafka class?
Can you point me to the documentation where you found this.
docs.nestjs.com/microservices/kafk...
But I can't find nothing related to where ServerKafka class, made sense to use it.
I dont think server kafka is a class here. Its just the name tht they use to distinguish between client and server
Thank you! not found any repo demo full communication from producer to consumer until this.
Excellent write up, quick concise and to the point! Thanks for taking the time!
I'd like only produce a message. In this case, need i use some decorator (EventPattern or MessagePattern) to consume a message of a topic? Because in your example, whitouth a subscriber, do not work.
感谢你的分享帮助到我!! Thank you for sharing and helping me.
how do i create kafka topic partition when send data to kafka topic
Wonderful, you give me the first tuto that worked finally! Thanks a lot