This is a two-part series to help you get started with Rust and Kafka. We will be using the rust-rdkafka crate which itself is based on librdkafka (C
library).
In this post we will cover the Kafka Consumer API.
Initial setup
Make sure you install a Kafka broker - a local setup should suffice. Of course you will need to have Rust installed as well - you will need version 1.45 or above
Before you begin, clone the GitHub repo:
git clone https://github.com/abhirockzz/rust-kafka-101
cd part2
Simple Consumer
Creating a low-level consumer (BaseConsumer) is strikingly similar to how you'd create the its counterpart - BaseProducer. The only difference is that you will have to cast the output to the right type (which in this case is BaseConsumer
)
let consumer: BaseConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "my_consumer_group")
.create()
.expect("invalid consumer config");
consumer
.subscribe(&["rust"])
.expect("topic subscribe failed");
Notice that the
group.id
config has also been included.set("group.id", "my_consumer_group")
- its mandatory.
Once a BaseConsumer
is created, one can subscribe to one or more topics (in this case, its just one topic with the name rust
).
To fetch messages from the topic, we start (spawn) a new thread
:
thread::spawn(move || loop {
for msg_result in consumer.iter() {
let msg = msg_result.unwrap();
let key: &str = msg.key_view().unwrap().unwrap();
let value = msg.payload().unwrap();
let user: User = serde_json::from_slice(value).expect("failed to deser JSON to User");
println!(
"received key {} with value {:?} in offset {:?} from partition {}",
key,
user,
msg.offset(),
msg.partition()
)
}
});
It accepts a closure
which in this case happens to be a infinite loop
that:
- Receives messages, and,
- Prints out the key, value along with offset and partition info
Calling iter on the consumer is just a short-cut invoking poll
without any timeout.
Other variations are also possible. You can use poll
directly:
loop {
let message = consumer.poll(Duration::from_secs(2));
...
}
Or, use this format:
for message in &consumer {
...
}
Run the program
- Rename the file
src/1_consumer_simple.rs
tomain.rs
, and - execute
cargo run
Output:
sending message
sending message
produced message with key user-1 in offset 25 of partition 2
produced message with key user-2 in offset 12 of partition 4
sending message
produced message with key user-3 in offset 20 of partition 0
received key user-3 with value User { id: 3, email: "user-3@foobar.com" } in offset 20 from partition 0
sending message
produced message with key user-4 in offset 24 of partition 3
received key user-4 with value User { id: 4, email: "user-4@foobar.com" } in offset 24 from partition 3
sending message
produced message with key user-5 in offset 25 of partition 3
received key user-5 with value User { id: 5, email: "user-5@foobar.com" } in offset 25 from partition 3
sending message
produced message with key user-6 in offset 26 of partition 3
received key user-6 with value User { id: 6, email: "user-6@foobar.com" } in offset 26 from partition 3
sending message
produced message with key user-7 in offset 27 of partition 3
received key user-7 with value User { id: 7, email: "user-7@foobar.com" } in offset 27 from partition 3
As expected:
- You see the producer callbacks - confirms that the message was sent to Kafka
- Consumer received the message as well - as confirmed by the log
What about Consumer callbacks?
Yes, just like the producer, the consumer API also has callbacks for:
- Re-balancing
- Offset commit
To do this, we need to implement the ConsumerContext trait. We will:
- Define a
struct
- Provide an empty implementation for
ClientContext
- Override the following methods from
ConsumerContext
trait - pre_rebalance, post_rebalance, commit_callback
struct ConsumerCallbackLogger;
impl ClientContext for ConsumerCallbackLogger {}
impl ConsumerContext for ConsumerCallbackLogger {
...
}
We will skip the
pre_rebalance
method and focus onpost_rebalance
in this example:
fn post_rebalance<'a>(&self, rebalance: &rdkafka::consumer::Rebalance<'a>) {
println!("post_rebalance callback");
match rebalance {
Rebalance::Assign(tpl) => {
for e in tpl.elements() {
println!("rebalanced partition {}", e.partition())
}
}
Rebalance::Revoke => {
println!("ALL partitions are REVOKED")
}
Rebalance::Error(err_info) => {
println!("Post Rebalance error {}", err_info)
}
}
}
Rebalance is an enum
. As a part of the implementation, we match it against all the possible options (partitions assigned, partitions revoked, rebalance error) and simply log it.
fn commit_callback(
&self,
result: rdkafka::error::KafkaResult<()>,
offsets: &rdkafka::TopicPartitionList,
) {
match result {
Ok(_) => {
for e in offsets.elements() {
println!(
"committed offset {:?} in partition {}",
e.offset(),
e.partition()
)
}
}
Err(err) => {
println!("error committing offset - {}", err)
}
}
}
For commit callback events, we match on the KafkaResult (available in the commit_callback
parameter) to check whether the commit was successful. If it was, we simply print out the committed offset in the partition or log the error that occurred during the commit process.
Once this is done, we simply need to plug-in our new implementation:
let consumer: BaseConsumer<ConsumerCallbackLogger> = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092",)
....
.create_with_context(ConsumerCallbackLogger {})
.expect("invalid consumer config");
To do this, we made a couple of changes:
- Use
create_with_context
(instead ofcreate
) - use
BaseConsumer<ConsumerCallbackLogger>
Run the program
- Rename the file
src/2_consumer_callback.rs
tomain.rs
, and - execute
cargo run
sending message
sending message
produced message with key user-1 in offset 0 of partition 2
post_rebalance callback
rebalanced partition 0
rebalanced partition 1
rebalanced partition 2
rebalanced partition 3
rebalanced partition 4
rebalanced partition 5
produced message with key user-2 in offset 0 of partition 4
sending message
produced message with key user-3 in offset 0 of partition 0
received key user-3 with value User { id: 3, email: "user-3@foobar.com" } in offset 0 from partition 0
sending message
committed offset Offset(1) in partition 0
committed offset Offset(1) in partition 4
produced message with key user-4 in offset 0 of partition 3
received key user-4 with value User { id: 4, email: "user-4@foobar.com" } in offset 0 from partition 3
As expected, the re-balance events were logged along with the successful commits.
Trigger a Re-balance
Partition assignment happens the first time when you start the application and you're able to witness this, thanks to our ConsumerContext
implementation. You can also trigger the rebalance again by starting the new instance of the application. Since there are two instances in the same consumer group, the topic partitions will be rebalanced. For e.g. if you had 6
partitions in the topic, they will be equally split up amongst these two instances.
You should see log messages similar to this:
....
# instance 1
post_rebalance callback
rebalanced partition 0
rebalanced partition 1
rebalanced partition 2
...
# instance 2
post_rebalance callback
rebalanced partition 3
rebalanced partition 4
rebalanced partition 5
Switching to Manual commit
By default, the offset commit process is taken care of by the library itself. But we can exercise more control over it by switching to manual mode.
First thing would be do set enable.auto.commit
to false
- set("enable.auto.commit", "false");
At-least once delivery
To achieve this, we need to make sure we indeed process the message successfully before committing the offset. To simulate this, let's write a function (named process
) that can fail randomly. We will then use this in our consumer loop and commit only when this functions returns successfully.
fn process(u: User) -> Result<(), ()> {
let mut rnd = rand::thread_rng();
let ok = rnd.gen_bool(1.0 / 2.0); //50% probability of returning true
match ok {
true => {
println!("SUCCESSFULLY processed User info {:?}", u);
Ok(())
}
false => {
println!("FAILED to process User info {:?}", u);
Err(())
}
}
}
We will need to modify our consumer loop"
- Add manual offset commit based on response from the
process
function - Add a label (
'consumer_thread
) to our thread loop
thread::spawn(move || 'consumer_thread: loop {
for msg_result in consumer.iter() {
//..... omitted
println!(
"received key {} with value {:?} in offset {:?} from partition {}",
key,
user,
msg.offset(),
msg.partition()
);
let processed = process(user);
match processed {
Ok(_) => {
consumer.commit_message(&msg, CommitMode::Sync);
}
Err(_) => {
println!("loop encountered processing error");
break 'consumer_thread;
}
}
}
});
We call process
- this is to simulate processing of each record received by the consumer. In case the processing succeeds (returns Ok
), we commit the record using commit_message.
Note that the commit itself may fail. This should ideally be handled in the
commit_callback
implementation ofConsumerContext
Run the program
- Rename the file
src/3_manual_commit.rs
tomain.rs
, and - Execute
cargo run
The program output is lengthy, but bear with me.
Output:
produced message with key user-1 in offset 22 of partition 2
produced message with key user-2 in offset 28 of partition 4
post_rebalance callback
rebalanced partition 0
rebalanced partition 1
rebalanced partition 2
rebalanced partition 3
rebalanced partition 4
rebalanced partition 5
received key user-5 with value User { id: 5, email: "user-5@foobar.com" } in offset 52 from partition 3
SUCCESSFULLY processed User info User { id: 5, email: "user-5@foobar.com" }
committed offset Offset(53) in partition 3
received key user-2 with value User { id: 2, email: "user-2@foobar.com" } in offset 28 from partition 4
SUCCESSFULLY processed User info User { id: 2, email: "user-2@foobar.com" }
produced message with key user-3 in offset 35 of partition 0
committed offset Offset(29) in partition 4
received key user-1 with value User { id: 1, email: "user-1@foobar.com" } in offset 22 from partition 2
FAILED to process User info User { id: 1, email: "user-1@foobar.com" }
loop encountered processing error. closing consumer...
post_rebalance callback
ALL partitions have been REVOKED
Notice these logs messages when process
returns successfully:
- received key user-5 with value User { id: 5, email: "user-5@foobar.com" } in offset 52 from partition 3
- SUCCESSFULLY processed User info User { id: 5, email: "user-5@foobar.com" }
- committed offset Offset(52) in partition 3
For a failure scenario:
- received key user-1 with value User { id: 1, email: "user-1@foobar.com" } in offset 22 from partition 2
- FAILED to process User info User { id: 1, email: "user-1@foobar.com" }
- loop encountered processing error. closing consumer...
We ended up stopping the consumer when processing failed? The question here is:
How to handle messages that did not get processed?
Note that failure could happen due to many reasons. A couple of them are:
- Processing failed (this is what we simulated in this example), or,
- Processing was successful, but the commit failed
If we continue with our consumer loop after a failed message, we could end up losing messages (data loss). Why? It's because the commit_message
method also marks smaller offsets (less that the one being handled) as committed. For e.g. if you had a scenario where offset 20
from partition 5
failed to get processed (and committed), you continue processing and offset 21
from partition 5
was processed and committed successfully, you will end up missing data from offset 20
- this is because committing offset 21
will also commit offsets 20
and below. Even after you re-start the application, this will not be detected.
To prevent this...
You can either:
- Halt the consumer process after detecting the first failure. In this example, we do this by exiting our consumer thread itself (this is not acceptable for real-world applications though). When you restart the application, the processing will begin from the last committed offset and the failed message will be picked up and re-processed.
- Even better - You can handle this in
commit_callback
by sending this data to another Kafka topic (also know as a "dead letter topic") which can be processed separately.
Other considerations
This is by no means an exhaustive list or coverage of all the delivery semantics:
- We did not cover
at-most once
andexactly once
. - You may want to choose the use Async commit mode - this has it's own set of caveats.
- Committing each and every message (even asynchronously) carries overhead. You may want to commit messages/offsets in batches. As always, you need to take care of a lot of corner cases here as well.
That's all for this two part series on getting started with Rust and Kafka using the rust-rdkafka
library. We covered:
- A simple producer
- Producer with delivery callback
- How to send JSON payloads
- A basic consumer
- Handle re-balance and offset commit callbacks
- Explore manual commit and at-least once delivery semantics
Top comments (0)