In this blog post, we will be making a Redis server in Rust Programming Language. If you are new to rust and looking to learn the language this is a good starting point. In this post, no prior knowledge of the language is assumed. However, if you have read the rust book or some starter tutorial it would be great. If you are someone who has done some basic rust, then this is perfect to dive deeper by building a full scale software program.
The second part of this blog series.
The final implementation will have a fully functioning Redis server with get
and set
commands. You can get the code for this blog here.
This is going to be a 2 blog post series
- In this first blog post we will create a basic single threaded server client interaction and data persistence and retrieval in server.
- In the second post, we will implement multi-threaded client server interaction and implement shutdown notification to client using channels.
Prerequisite
You must have rust installed on your computer, if not follow instructions on the official installation guide.
Project Setup
Here we will create the application boilerplate, install all the necessary dependencies, and configure the project.
To get started, create a new rust project with the command.
$ cargo new my_redis
$ cd my_redis
Open the my_redis
project in visual studio code or any other editor of your choice to start coding.
You will get 2 main files in the code. first is Cargo.toml
and other is /src/main.rs
. Cargo.toml
file contains all the metadata of the project like project name and most importantly, the dependencies used in the project. Later, we will add libraries under the dependecies section of this file.
Dependencies used
main.rs
has main
function which is the entry point of our rust program whenever we run the command cargo run
from terminal.
Redis consists of 2 parts, first is server and second is client. Client makes a request to the server to save or retrieve data. Server holds the responsibility to persist data in the server and retrieving it. So, in command line/terminal we will have 2 windows running server and client each.
By default, main.rs is the default executable file. However, since we need 2 executables each for server and client, we will put the server and client executable in src/bin
folder which we will create soon. This folder holds as many executable files as we want. This is perfect for our use since we need 2 separate executable files for server and client to run in the terminal.
So, let's create a new folder inside src
called bin
. Next, create server.rs
and client.rs
inside the bin
folder. The folder structure at this point should be:
src/
│ ├── main.rs
│ └── bin/
│ └── server.rs
│ └── client.rs
Add main()
function in both files to get rid of the error which complains about missing main functions. Since, these are executable files, they must contain main functions which is the entry point for a rust program.
To run these executable files simply run the command like so:
$ cargo run --bin server
$ cargo run --bin client // in new terminal window
--bin
argument tell Cargo which executable to run in case there are multiple executables.
After this step, we don't need the main.rs
file since we have our separate binaries ready to be executed. So go ahead and delete the main.rs
file.
Server and client interaction will happen with a socket connection. Server will listen for incoming socket connection requests. Client will try to connect to server.
Intro to Tokio
Tokio provides asynchronous capabilities and utilities to work in rust. In this blog post, the major usage of tokio will be in form of TcpListener
and TcpStream
implementations from the tokio::net
crate. Tokio has exposed .await
functionality on these modules making them truly asynchronous. In short, whenever we use await
keyword on the async
function calls, the control flow waits till the execution of the function has completed.
To use tokio, add the tokio dependency in Cargo.toml
file.
File Cargo.toml
[dependencies]
tokio = {version="1", features = ["full"]}
Server implementation
Now, add #[tokio::main]
macro right before the async fn main()
line. This transforms the async main function into synchronous main function and wraps the code in the main function in an async block. The reason to put the code in the async block is because using .await
on tokio functions will require the main function to be async
type. But, since rust runtime expects the entry point main function to be synchronous in nature hence #[tokio::main]
does that transformation for us.
#[tokio::main]
async fn main() {
println!("in main");
}
//gets transformed to
fn main() {
let rt = tokio::runtime::Runtime::new().unwrap()
rt.block_on(async { // block_on takes a block of code which is async in nature, so any `.await` inside the block will be fine.
println!("in main");
})
}
Always remember, if you want to use .await
inside a function or block of code then the function or block of code should be async
type.
Next, we want to listen to incoming socket connections in the server.rs
file. Import the TcpListener
from tokio crate using use tokio::net::TcpListener
.
Now in main function using TcpListen::bind() method and supplying it an address with a port. Since the bind
method is an async method, we will use await
to hold the execution till bind returns the value. The ?
operator is a special operator which returns the value wrapped inside Result
or propagate the error from the function one level up. You can read more about it here.
File bin/server.rs
pub async fn main() -> Result<(), std::io::Error> {
let listener = TcpListener::bind("127.0.0.1:8081").await?;
loop {
let (mut socket, _) = listener.accept().await?;
println!("connection accepted {:?}", socket);
}
Ok(())
}
At this point, please note we have changed the return type of main function to Result
this is coming from core::Result<T, E>
. The error gets propagated from the ?
operator as explained above.
If we run the server with command $ cargo run --bin server
It will run the program and wait at listener.accept()
for new connections. With this our server is listening for socket connections on a single thread.
Now, let's move to the client for establishing a socket connection with server.
Since, the client initiates the socket connection, we will be using TcpStream
from tokio::net
crate. We will make the main function async
and annotate with the macro #[tokio::main]
, which is the same thing we did in the the server setup.
To create a new connection use the same host address and port as server.
pub async fn main() -> Result<(), std::io::Error> {
let mut stream = TcpStream::connect("127.0.0.1:8081").await?;
Ok(())
}
At this point, note we have changed the return type of main function to Result
this is coming from core::Result<T, E>
. The error gets propagated from the ?
operator as explained above.
Let's check this minimal server and client interaction. We expect the server to stop after accepting one client connection. It will also print the connection accepted
print message.
Run the server and client in the separate terminal windows with the commands shared above and you will notice that server will print something like this.
connection accepted PollEvented { io: Some(TcpStream { addr: 127.0.0.1:8081, peer: 127.0.0.1:61643, fd: 10 }) }
It means, our connection is established and socket is ready to start sending and receiving data.
Writing to socket from client
The first thing we want to do is send the command from client to server. For the sake of simplicity we will send data in form of a string (Bytes in actuality) on socket.
Let's use write_all()
on socket function which expects bytes to write data on the socket.
stream.write_all(b"set foo bar").await?;
In rust, any string literal prefixed with b
in front of it makes it bytes sequence. If you want to understand the difference in between string literal, string and byte sequence checkout this awesome post.
That's all we need to write data to the socket connection.
Reading from socket in server
To read data from the socket on the server side, we will use read_buf
method. This method accepts on argument as a buffer. It copies data from socket to the provided buffer.
We shall create a new buffer from BytesMut
. It comes from the bytes
crate. First add bytes
dependency under the [dependencies]
in Cargo.toml
file.
[dependencies]
bytes = { version = "1" }
Then import BytesMut
from bytes crate in the server.rs
file. use bytes::BytesMut
.
Let's create a mutable BytesMut
with a capacity. This buf
object is mutable because it will allow writing bytes into it. Next read the data from socket into the created buffer using read_buf
method.
Bytes::with_capacity() takes in number of bytes to allocate to the buffer. This buffer with automatically grow in size as needed but it's more efficient to start with a generous buffer size.
Take for example "foo" will take 3 bytes but "ƒoo" will take 4 bytes. let's start by 1024 bytes as a standard staring size for our buffer.
let mut buf = BytesMut::with_capacity(1024);
socket.read_buf(&mut buf).await?;
println!("buffer {:?}", buf); // printing the data in buffer
This is all we need to read data from socket into the server. Let's fire the server and client to see if our program is working as expected.
You will get buffer b"set foo bar"
as output on the server terminal.
If you reached to this point, congratulations are in order as you have successfully sent from client to server that too in rust.
Command matching
Now, that we have read the data from client which is a command like "set foo bar"
. The first element is the command name and the next 2 are key and value.
Depending on what is the command we will either perform a get
or set
.
So, let's get started by converting the buffer to a Vector
(sort of Array) of Strings. Then we can take the first string from the vector and fetch the command from Command
Enum. We will create Command
enum later in this post.
Let's create a method for this conversion called buffer_to_array
. Create a new file src/helper.rs
and put this function in that file. Since, this file is in src
folder, we need to create another file called lib.rs
and import the module Helper
which gets created as a result of new file cmd.rs
. Add pub mod helper;
in the src/lib.rs
file. This exposes the helper module to the library code. In rust, main.rs
is the binary executable and lib.rs
is the shared code. If you are creating a library lib.rs
is where you'll import all your files.
File src/helper.rs
use bytes::Buf; // get_u8
fn buffer_to_array(buf: &mut BytesMut) -> Vec<String> {
let mut vec = vec![];
let length = buf.len();
let mut word = "".to_string();
for i in 0..length {
match buf.get_u8() {
b' ' => { // match for space
vec.push(word);
word = "".to_string();
}
other => {
// increase the word
word.push(other as char);
let new = word.clone();
if i == length - 1 {
vec.push(new);
}
}
}
}
vec
}
get_u8
method comes from the use bytes::Buf
trait. Since BytesMut
implements Buf
trait so we can get_u8
on BytesMut
.
buffer_at_array
method essentially breaks the string at every space encountered (b' ')
and returns a vector of strings vec<String>
. Let's call this method in server.rs file in the loop to get a list of strings eg. ["set foo bar"]
.
use blog_redis::helper::buffer_to_array;
main() {
loop {
...// previous code
let attrs = buffer_to_array(&mut buf);
}
}
Next step, would be to find out the command. For this, take the first string from the vector and compare it with the list of known commands. In our case, Get
or Set
.
Let's create another file src/cmd.rs
. We will create an enum for the commands inside this file. These are the set of commands we will be implementing.
pub enum Command {
Get,
Set,
Invalid,
}
Now, we will try to match the first string and depending on the match return a valid command
. Let's create a function inside the src/cmd.rs
as implementation of Command
to return the command(enum) based on the first string matching.
impl Command {
pub fn get_command(str: &String) -> Command {
match str.as_bytes() {
b"set" => Command::Set,
b"get" => Command::Get,
_ => Command::Invalid,
}
}
}
Since this file is in src
folder, let's import the code in lib.rs
.
File lib.rs
pub mod cmd;
pub use cmd::Command;
Let's use this function to get the command.
loop {
.. // previous code
let attrs = buffer_to_array(&mut buf);
let command = Command::get_command(&attrs[0]);
}
Persisting key and value
For our use case, we need to store key value pairs in the database. A data structure which is good for storing key, value pairs is HashMap
. Any new connection request will take the reference to the db created and either update (set/write) data into it or get(read) data from it. We will create an instance of db first time when the server starts and for any new connections we will use the db instance to read or write data to it.
Let's create a struct for Database
which will hold our key value pairs inside Hashmap
. This struct should be added in its own file src/db.rs
.
Let's import this module in our src/lib.rs
like we did previously for cmd.rs
and helper.rs
.
File lib.rs
pub mod db;
pub use db::Db;
File src/db.rs
use bytes::Bytes;
use std::collections::HashMap;
pub struct Db {
entries: HashMap<String, Bytes>
}
Next, let's create a new instance of this Db
struct in the server.rs
main function. This will be done before the server starts accepting the socket connection requests in the loop {}
. Since, we want to persist the database object throughout the scope of the main function of the server.rs file. If we create db in the loop, then at the end of the first loop the object will be dropped from the scope. That's not what we want.
File server.rs
let db = Db::new();
loop {
let (socket, _) = listener.accept().await?;
}
Let's create a new function process_query()
in the server.rs
file. It will accept the command
, attrs
, socket
, db
parameters and write the attributes to the db
object.
pub async fn main() -> Result<(), std::io::Error> {
// previous main fn code...
process_query(command, attrs, &mut socket, &mut db).await?;
}
async fn process_query(
command: Command,
attrs: Vec<String>,
socket: &mut TcpStream,
db: &mut Db,
) -> std::io::Result<()> {
match command {
Command::Get => {
Ok(())
}
Command::Set => {
let resp = db.write(&attrs);
match resp {
Ok(result) => {
println!("set result: {}", result);
socket.write_all(&result.as_bytes()).await?;
}
Err(_err) => {
socket.write_all(b"").await?;
}
}
Ok(())
}
Command::Invalid => Ok(()),
}
In process_query()
method, we will match the command for Command::Get
, Command::Set
and Command::Invalid
.
In Command::Set
we will call db.write(&attrs)
. As of now, this write method on db
is not implemented, so let's go ahead and do that.
File src/db.rs
impl Db {
pub fn write(&mut self, arr: &[String]) -> Result<&str, &'static str> {
let key = &arr[1];
let value = &arr[2];
let val = value.clone();
let res: &Option<Bytes> = &self.entries.insert(String::from(key), Bytes::from(val));
match res {
Some(_res) => Ok("r Ok"),
None => Ok("Ok"),
}
}
We need to clone the referenced value since Bytes::from() function expects a 'static
lifetime variable but value
has unknown lifetime in this function context, since it's being defined outside the scope of this function.
insert()
function returns None
if they key was not present in the hashmap, it returns Some(old_value)
if the key was already present in the hashmap. We are returning "r Ok" in case of key already present and "Ok" when the key was not already present in the hash. This helps on the client side to give appropriate message to the client/consumer.
Some
and None
are types on the Option
type. Option
is used when we are expecting optional values, meaning either there will be a value with Some(value)
or None
.
Now, coming back to the bin/server.rs
notice that we are getting the result of db.write(&attrs)
into a result object. We are using match
to capture the response from db.write(...)
and writing the value directly into the socket
using socket.write_all(&result).await?;
. This will send bytes on the socket to be further read by the client socket connection.
client side result
File bin/client.rs
// previous main function code...
let mut buf = BytesMut::with_capacity(1024);
let _length = stream.read_buf(&mut buf).await?;
match std::str::from_utf8(&mut buf) {
Ok(resp) => {
if resp == "r Ok" {
println!("key updated");
} else if resp == "Ok" {
println!("key set");
}
}
Err(err: Utf8Error) => {
// failed to convert bytes into string slice
println!("error: {}", err);
}
}
We will create buf
of type BytesMut
and read the data from socket into the buffer. Why we are using BytesMut
is explained above.
Next, we will match the string data from socket and return response based on whether its fresh key write or key updated. We are also capturing Error from std::str::from_utf8(&mut buf)
as it will return Utf8Error
if it fails to convert the byte slice into string slice.
At this point, our implementation of set
command is somewhat complete. We can not test this from the command line.
Let's go ahead and test this by firing the client and hope to see a success message. It will be either "key set" or "key updated".
$ cargo run --bin client
// response should be "key set" in terminal.
So far, we have set a key value pair in our server. Let's try to implement the get
command and fetch the corresponding value. Let's implement this in next section.
Read data from server
To fetch the value associated with the key, we will create another socket connection in main()
of client.rs
. On this socket we will write the data get foo
.
File bin/client.rs
let mut stream = TcpStream::connect("127.0.0.1:8081").await?;
stream.write_all(b"get foo").await?;
Next, we will change the bin/server.rs
to accept the Command::Get
in the match function in the process_query
method.
File bin/server.rs
match command {
Command::Get => {
let result = db.read(&attrs);
match result {
Ok(result) => {
socket.write_all(&result).await?;
}
Err(_err) => {
println!("no key found {:?}", _err);
socket.write_all(b"").await?;
}
}
Ok(())
}
}
Command::Set => {
// already implemented above
}
The server has returned the response by writing to the socket with the response from the db. Next, we will update the src/db.rs
file to implement the read()
method.
pub fn read(&mut self, arr: &[String]) -> Result<&Bytes, &'static str> {
let key = &arr[1];
let query_result = self.entries.get(key);
if let Some(value) = query_result {
return Ok(value);
} else {
return Err("no such key found");
}
}
The error returned is a string slice str
. The &'static
lifetime generally means, that the value will persist till the end of the program. The reason for adding &'static
lifetime to the str is because, the string is created in this function scope and will be dropped as soon as the function call ends. Hence, making it &'static
helps convey to the compiler that this string lives forever. Rust lifetimes are not a simple concept, checkout this awesome tutorial for better understanding.
Final step would be to read the response in client.rs file and print the value in the command line.
let mut buf = BytesMut::with_capacity(1024);
let _length = stream.read_buf(&mut buf).await?;
println!("buffer: {:?}", &buf);
match std::str::from_utf8(&mut buf) {
Ok(resp) => {
if resp == "" {
println!("no such key found");
} else {
println!("value: {}", resp);
}
}
Err(_err) => {
println!("in errr");
}
}
This is all we need to implement a basic single-threaded redis server with set
and get
commands.
If you run the server and then the client, you will get response value: bar
in the terminal. Hurray!!!
CLAP implementation
So far we have hardcoded our Redis commands into the client.rs file, that's not how a client should be sending the data. So, let's try to accept the command from the command line/terminal.
Clap is a command line parser crate in rust. It makes accepting command line arguments super easy.
First things first, add the dependency in Cargo.toml
clap = { version = "3.1.18", features = ["derive"] }
Now, let's import this in client.rs file and start adding needed code. We are changing way we were sending command from client file to server file. Previously, we created to socket connections from client file. Now, we will only create on socket connection per command line run of client with the command line arguments.
File bin/client.rs
use clap::{Parser, Subcommand};
#[derive(Parser, Debug)]
struct Cli {
#[clap(subcommand)]
command: Command,
}
#[derive(Subcommand, Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
value: String,
},
}
main() {
let args = Cli::parse();
let mut stream = TcpStream::connect("127.0.0.1:8081").await.unwrap();
match args.command {
Command::Set { key, value } => {
stream.write_all(b"set").await?;
stream.write_all(b" ").await?;
stream.write_all(&key.as_bytes()).await?;
stream.write_all(b" ").await?;
stream.write_all(&value.as_bytes()).await?;
let mut buf = BytesMut::with_capacity(1024);
let _length = stream.read_buf(&mut buf).await?;
match std::str::from_utf8(&mut buf) {
Ok(resp) => {
if resp == "r Ok" {
println!("updated key");
} else if resp == "Ok" {
println!("key set");
}
}
Err(err) => {
// failed to convert bytes into string slice
println!("error: {}", err);
}
}
}
Command::Get { key } => {
stream.write_all(b"get").await?;
stream.write_all(b" ").await?;
stream.write_all(&key.as_bytes()).await?;
let mut buf = BytesMut::with_capacity(1024);
let _length = stream.read_buf(&mut buf).await?;
match std::str::from_utf8(&mut buf) {
Ok(resp) => {
if resp == "" {
println!("no such key found");
} else {
println!("key: {} => value: {}", key, resp);
}
}
Err(_err) => {
println!("in errr");
}
}
return Ok(());
}
}
}
In the code above, we are defining clap with subcommand, for this we are creating an Enum of commands with additional parameters and types.
We are doing simple matching on command supplied from the command line and writing the appropriate command on the socket connection for the server to read and execute.
What next?
The current implementation is a single threaded program. In reality, this is not what we want. Imagine that there are 5000 requests coming to our Redis server implementation, the current program will wait for the first request to complete to process the next one. However, we want all the requests to be handled without any wait time. This can be achieved using multi-threading. In the next post, we will change this single threaded implementation to a multi-threaded one. That is where Tokio shines the most with its advanced tokio::spawn
and tokio::select
functions.
We will also implement a shutdown mechanism powered by tokio channels like broadcast
and mpsc
strategies. Imagine there are 5000 active connections, what happens if the server is to shutdown. Graceful implementation of shutdown for all threads will be covered in the next post.
Happy Programming!
Top comments (2)
Hi,
I am following along the article and I am wondering where you are defining the
fn new()
that you then use to instantiate Db in the server.rs file?let db = Db::new();
Okay, I think I might have figured it out, at least to get it to work.
PS: I am new to Rust.