In today's post we'll explore how to create a basic background job in Rust, simulating Rust channels with a Vector-based queue.
First things first
Generally, a background job operates on one or more threads that continuously consume messages from a queue.
In this post, we'll use a Vector to represent our queue.
This Vector is an instance of the standard Rust library implementation known as VecDeque. VecDeque is a double-ended queue that acts as a growing ring buffer.
Data model
To make our solution more organized, we can define 3 structs:
Transmitter
The transmitter (tx) holds an store, which is the queue (Vector) encapsulated by a Arc/Mutex; and an emitter, which is a Condvar, used for synchronization based on a condition.
Receiver
The receiver (rx), pretty much like the transmitter, also holds a store and an emitter.
Channel
Channel holds a transmitter and a receiver.
struct Transmitter<T> {
store: Arc<Mutex<VecDeque<T>>>,
emitter: Arc<Condvar>,
}
struct Receiver<T> {
store: Arc<Mutex<VecDeque<T>>>,
emitter: Arc<Condvar>,
}
struct Channel<T> {
tx: Transmitter<T>,
rx: Receiver<T>,
}
What is an Arc in Rust?
The queue (VecDeque) is going to be shared across the channel for one or more threads.
In Rust, such problem requires shared ownership addressed by a reference counter (Rc), but since we are in a multi-thread scenario, Rc is not thread-safe, that's why we need an atomic reference counter, or simply Arc, which is indeed thread-safe.
You can learn more details about smart pointers by reading my post on Understanding the basics of smart pointers in Rust
How about Mutex?
Since Arc is a reference counter, its references are immutable. For mutability in the underlying data, we need interior mutability using RefCell.
My mentioned post about smart pointers also covers interior mutability, check it out for further details
In the same as Rc, RefCell is not thread-safe. For a thread-safe scenario, we need to synchronize access to data using locks. That's where mutual exclusion (Mutex) comes in.
Okay, and Condvar? What the heck is that?
Condvar is a primitive for synchronization in concurrent systems where we can put a thread to "wait" (suspended) until a given condition is met.
For blocking queues, we basically want the following condition (pseudo-code):
queue = some_array
mutex = os_lock
emitter = os_condvar
// Thread is suspened until the array gets some data
// There's no CPU consume
while queue is empty
emitter.wait(mutex)
end
// Someone emitted a signal
data = queue.pop
In other process:
queue.push(data)
emitter.signal
Data modeling implementation
Now, let's implement the methods send
and recv
(receive) in our simulated channel.
Transmitter
The transmitter (tx) will have a method called send, which basically:
- locks the shared queue (
store.lock().unwrap()
) - pushes data to the queue (
push_back(data)
) - emits a signal (
emitter.notify_one
) to notify some suspended thread that is waiting for data in the queue
impl<T> Transmitter<T> {
fn send(&self, data: T) {
self.store.lock().unwrap().push_back(data);
self.emitter.notify_one();
}
}
Receiver
The receiver (rx) has a method called recv (short for receive) which:
- creates a lock in the shared queue (
store.lock().unwrap()
) - suspends the current thread until the condition is met, in other words, while the queue is empty, the thread is suspended in the operating system, thus not consuming CPU (
emitter.wait
) - once the thread is awaken, it can pops the data from the queue (
store.pop_front()
)
impl<T> Receiver<T> {
fn recv(&self) -> Option<T> {
let mut store = self.store.lock().unwrap();
while store.is_empty() {
store = self.emitter.wait(store).unwrap();
}
store.pop_front()
}
}
Moreover, the Receiver struct can have an extra method called try_recv which does not block the thread, not using the Condvar condition:
fn try_recv(&self) -> Option<T> {
self.store.lock().unwrap().pop_front()
}
Channel
Once the Transmitter and Receiver are already implemented, the implementation of Channel is a piece of cake:
impl<T> Channel<T> {
fn new() -> Self {
let store = Arc::new(Mutex::new(VecDeque::new()));
let emitter = Arc::new(Condvar::new());
Channel {
tx: Transmitter { store: Arc::clone(&store), emitter: Arc::clone(&emitter) },
rx: Receiver { store: Arc::clone(&store), emitter: Arc::clone(&emitter) },
}
}
}
Note that both Mutex and Condvar are encapsulated in an Arc (atomic reference counter), because we have to share them across tx and rx at the same time.
Main
The main function can me implemented as follows:
- create a channel and binds the tx and rx respectively
- the channel holds a shared Mutex/VecDeque and a Condvar
- tx is used to send data to the channel
- rx is used from the inner thread to receive data from the channel
fn main() {
// Initialize channel
let channel = Channel::new();
let (tx, rx) = (channel.tx, channel.rx);
// Push data to the channel
tx.send("Some job to do: 1");
tx.send("Another job: 2");
// Process the channel
let worker = thread::spawn(move || {
loop {
let job = rx.recv(); // we could use try_recv too
match job {
Some(job) => println!("Job: {}", job),
None => break,
}
}
});
// Push more data to the channel
tx.send("Yet another job");
worker.join().unwrap();
}
We run the code and, Yay, everything is working as expected:
Job: Some job to do: 1
Job: Another job: 2
Job: Yet another job
Rust channels for the rescue
You may be wondering:
Hey Leandro, why doesn't Rust bring all this stuff already built-in? Do we really need to implement a raw queue and use synchronization primitives on our own every time we want to create a channel for threads?
Today is your lucky day. Indeed Rust brings Channels, which employ the same techniques described in this very post, but more robust, of course:
use std::sync::mpsc;
use std::thread;
fn main() {
// Initialize channel
let (tx, rx) = mpsc::channel();
// Push data to the channel
tx.send("Some job to do: 1").unwrap();
tx.send("Another job: 2").unwrap();
let worker = thread::spawn(move || {
loop {
let job = rx.recv();
match job {
Ok(job) => println!("Job: {}", job),
Err(_) => break,
}
}
});
// Push more data to the channel
tx.send("Yet another job").unwrap();
worker.join().unwrap();
}
-
mpsc
stands for multiple producers, single consumer -
mpsc::channel
creates a channel with a internal shared queue and returns a transmitter (tx) and a receiver (rx) - pretty much like our custom implementation,
tx.send
sends data to the channel, whereastx.recv
reads/pops data from the channel
How cool is that?
References
https://doc.rust-lang.org/book/ch16-00-concurrency.html
https://doc.rust-lang.org/std/vec/struct.Vec.html
https://doc.rust-lang.org/std/collections/struct.VecDeque.html
https://dev.to/leandronsp/understanding-the-basics-of-smart-pointers-in-rust-3dff
Top comments (1)
I implement a similar thing in the video: message passing in Rust with channel.