So previously, we looked at how we can quickly get Quarkus up and running and create a basic UI to send messages back and forth from the client to the server. In this article we will take it to the next level and introduce Kafka as a messaging platform and have incoming messages from a topic pushed directly to the User Interface.
There is no true session management in this article, we can cover that in the future, but this does demonstrate how easy it is to manage some basic users and broadcast to them all.
Getting Kafka Up and Running
For this to work, we are going to need a Kafka instance up and running, so we will start with that.
This are the requirements for this article:
- Java 11
- Apache ZooKeeper
- Kafka 2.3.0
- The source code will be on this branch
We will refer to the location you unzipped Kafka as KAFKA_HOME
Starting ZooKeeper
Once you've downloaded zookeeper, unzip it to a directory and ensure Java 11 is the current JDK.
Next, we want to create a conf/zoo.cfg file with the following properties:
cfg/zoo.cfg
tickTime=2000
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=60
You can configure the dataDir to any location as long as the server can write to that directory. You can then start ZooKeeper with:
$ bin/zkServer.sh start conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED
Next we will setup Kafka.
Kafka Up and Running
To get Kafka running we first need to make sure we have Java 11 set as the JDK.
Next start up Kafka with:
$ bin/kafka-server.sh start config/server.properties
INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
INFO starting (kafka.server.KafkaServer) [2020-09-08 19:04:53,486] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
INFO [ZooKeeperClient Kafka server] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)
INFO Client environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf, built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)
INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.version=14.0.2 (org.apache.zookeeper.ZooKeeper)
INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
...
INFO Log directory /tmp/kafka-logs not found, creating it. (kafka.log.LogManager)
INFO Loading logs. (kafka.log.LogManager)
INFO Logs loading complete in 10 ms. (kafka.log.LogManager)
INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
INFO [SocketServer brokerId=0] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
There will be a bunch of mesages, but the more important one is the listener that was started: EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
This indicates we can connect to Kafka using a non-secured connection on port 9092
Create our Topic
We need to manually create a topic that we can read and write from. Open up a terminal, navigate to the KAFKA_HOME
directory and execute the following command:
$ bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic chat-messages --partitions 1 --replication-factor 1
Created topic chat-messages.
This will create a new topic for us called chat-messages
.
Updating the WebSocket API
In order to continue, we will need some more dependencies in our WebSocket API to connect to Kafka.
- io.quarkus:quarkus-kafka-streams
- org.testcontainers:testcontainers
- org.testcontainers:kafka
Update the pom.xml dependencies with:
pom.xml
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.14.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.14.3</version>
<scope>test</scope>
</dependency>
Configure the Kafka Connection
Next we want to make sure we have the application configured to connect to our Kafka server. Open the src/main/resources/application.properties
and make the following changes:
quarkus.kafka-streams.application-server=localhost:8011
quarkus.kafka-streams.application-id=${quarkus.application.name}
quarkus.kafka-streams.bootstrap-servers=${KAFKA_HOST:localhost}:${KAFKA_PORT:9092}
quarkus.kafka-streams.topics=chat-messages
For the Kafka host, we have have defined either the KAFKA_HOST
environment variable with a fallback of localhost
and a port set to the KAFKA_PORT
environment variable with a fallback to 9092
. We have also set a default topic to chat-messages
which we created earlier.
⚠️ NOTE
If you want to run a quick compile from here there are a couple of things we need to add to our test configuration:
src/test/resources/application.properties
quarkus.application.name=test-websockets
quarkus.log.category."com.brightfield.streams".level=ALL
quarkus.kafka-streams.topics=chat-messages
Create the Kafka Consumer
In order to do this, we will be updating our SocketEndpoint
class
First, let's create a method to broadcast to all users who are connected:
private void broadcast(String message) {
socketSessions.values().forEach(s -> {
s.getAsyncRemote().sendText(message, result -> {
if (result.getException() != null) {
log.error("Unable to send message: {}", result.getException().getMessage(), result.getException());
}
});
});
}
As you can see we are iterating through the Map we created of the different user sessions indexed by user name and creating an Async Remote to send the text message to each user.
Next let's add the consumer, again in the SocketEndpoint
class we want to add the following code:
@Produces
public Topology buildTopology() {
log.info("Building the Topology...");
StreamsBuilder builder = new StreamsBuilder();
builder.stream("chat-messages", Consumed.with(Serdes.String(), Serdes.String()))
.peek((id, message) -> {
log.info("Incoming transaction: {}", message);
broadcast(message);
});
return builder.build();
}
Here we have specified the stream we want to listen to and use a String KeySerializer and a String ValueSerializer to read the message from the topic. We then log the message and broadcast it to all users connected over the WebSocket.
Updating the Unit Tests
If we try and build the service we will hit a wall when running the tests if you don't have a Kafka server running. If you do, you will find the unit tests getting stuck because there is no shutdown process in the test. This is where testcontainers
come into play.
In the unit test we created in the previous article, we are going to enhance it to use a new lifecycle for our test Kafka server.
First we will create our test kafka instance:
src/test/java/com/brightfield/streams/InfrastructureTestResource.java
package com.brightfield.streams;
import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
public class InfrastructureTestResource implements QuarkusTestResourceLifecycleManager {
private final Logger log = LoggerFactory.getLogger(InfrastructureTestResource.class);
private final KafkaContainer kafkaContainer = new KafkaContainer("5.5.1");
@Override
public int order() {
return 1;
}
@Override
public void init(Map<String, String> initArgs) {
log.info("Initialising...");
}
@Override
public Map<String, String> start() {
log.info("Starting kafka test container...");
this.kafkaContainer.start();
log.info("Creating topic...");
createTopics("chat-messages");
return configurationParameters();
}
@Override
public void stop() {
this.kafkaContainer.close();
}
private void createTopics(String... topics) {
var newTopics =
Arrays.stream(topics)
.map(topic -> new NewTopic(topic, 1, (short) 1))
.collect(Collectors.toList());
try (var admin = AdminClient.create(Map.of("bootstrap.servers", getKafkaBrokers()))) {
admin.createTopics(newTopics);
}
}
private String getKafkaBrokers() {
this.kafkaContainer.getFirstMappedPort();
return String.format("%s:%d", kafkaContainer.getContainerIpAddress(), kafkaContainer.getMappedPort(KafkaContainer.KAFKA_PORT));
}
private Map<String, String> configurationParameters() {
log.info("Returning configurationParameters...");
final Map<String, String> conf = new HashMap<>();
String bootstrapServers = getKafkaBrokers();
log.info("Brokers: {}", bootstrapServers);
conf.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
conf.put("quarkus.kafka-streams.bootstrap-servers", bootstrapServers);
conf.put("mp.messaging.outgoing.delivery.bootstrap.servers", bootstrapServers);
return conf;
}
}
Next we want our test to use this Resource:
src/test/java/com/brightfield/streams/SocketEndpointTest.java
@QuarkusTest
@QuarkusTestResource(value = InfrastructureTestResource.class)
public class SocketEndpointTest {
...
}
When you compile and run the unit tests, you should now see the test running the websocket tests and connecting to the kafka container and then disconnecting and not getting stuck. By creating the InfrastructureTestResource, we have basically added a lifecycle to how the Kafka container is being managed.
- First, the
init()
method is called. In our scenario we are just logging out that theinit()
method has been called. - Next the
start()
method is called which creates the topics on thetestcontainer
we want to use then returns the configuration of the Kafka container. - When the tests are complete, the
close()
method is called to clean up and shut down the Kafka container.
⚠️
We won't be implementing full testing of the Kafka integration in this article, what we have for now will be sufficient to test from the command line.
Running our Stack
Everything should now be in place. Let's start our service and angular client application and see if it works!
Sending some test messages through the web interface should function as it did before:
To test our broadcast capabilities, we will revert to the command line and publish the messages from there.
Access the KAFKA_HOME
directory in a terminal window and enter:
$ bin/kafka-console-producer.sh --broker-list=localhost:9092 --topic chat-messages
>Below
>People
You should see the user interface update with the same values:
Conclusion
With this as your base you can come up with a straight forward fully blown messaging tool; listing users, their statuses, and even group messages.
In a future article we will explore how to test the Kafka component in the meantime, happy chatting!
Top comments (0)