DEV Community

Rafael Camargo Leite
Rafael Camargo Leite

Posted on

Build your own Dynamo-like key/value database - Part 1 - TCP Server

Intro

This is the second blog post part of the Build your own Dynamo-like key/value database. You can find part one here.

Part 1 - Exposing APIs to clients

1. Background

Every database needs to somehow vend its APIs for clients to consume.
This is not strictly important for our goal of studying the internals of a database system but nevertheless we need to include an entry-point for clients to interact with, even if only to allow us to write proper end-to-end tests.

2. Requirements

2.1. Non-functional

  1. readability
    • This is going to be a requirement for every component but since this is our first one, better to state it clearly. If, at any point, going through the code base yields too many "wtf"s, we messed up..
  2. Minimal amount of dependencies
    • This another cornerstone of every component - our goal is to learn how all of these data structure and algorithms are implemented. if we just include dependencies for all of them, what's the point of even building this database!?
  3. Simplicity and extensibility
    • similar to requirement 1. - making components as simple as possible definitely helps with readability.
    • let's make sure adding new commands doesn't require lots of changes/boilerplate.

2.2. Functional

The only functional requirement I'll add is the ability to trace requests based on request_ids - ie: For every request our database receives, we have to be able to trace it for debugging purposes.

3. Design

Different databases provide different interfaces for clients to interact with. A few examples are:

  • SQLite - This is a sql database that runs embedded in the process that consumes it. For this reason, there's a C client library that exposes all necessary APIs for database management and query execution.
  • Redis - Redis is an in memory database that provides a wide variaty of APIs to access different data structures though TCP. It uses a custom protocol on top of TCP called RESP. So clients that want to integrate with Redis have to implement RESP on top of TCP to be able to issue commands against it.
  • CouchDB - exposes a RESTful HTTP API for clients to consume.

As previously stated, in our case, the actual interface is less relevant - our goal is to study the internals of storage systems, not to write a production ready, performant database for broad consumption. For that reason, I'll use non-functional requirements 2.(as few dependencies as possible) and 3. (simplicity) to choose to expose our APIs via TCP using a thin serialization protocol based on JSON.

Our serialization protocol will be based on Message - a struct composed of a small binary header followed by an optional JSON payload that represents the Command we want to execute.
After parsing a Message from the network, we will then build a Command which, in turn, can be executed.

4. Implementation

We will start by defining what a Command is.
Think of a Command as a Controller on a MVC application. A command basically interprets the request a client has issued and generates the appropriate response by calling internal methods where the business logic is.
Let's get a bit more concrete by implementing Ping.
From a client's perspective, Ping is as simple as:

  1. Client sends a Ping request over TCP,
  2. Cluster node parses the received bytes into a Message,
  3. Cluster node constructs a Ping command from the Message parsed in step 2
  4. Cluster node replies to the client with a "PONG" message

The ping command can be found here in rldb. The core of it's implementation is shown below:

...
#[derive(Debug)]
pub struct Ping;

#[derive(Serialize, Deserialize)]
pub struct PingResponse {
    pub message: String,
}

impl Ping {
    pub async fn execute(self) -> Result<PingResponse> {
        Ok(PingResponse {
            message: "PONG".to_string(),
        })
    }
}
...
Enter fullscreen mode Exit fullscreen mode

Now, how do we construct a Ping Command from a Tcp connection?
That's where the Message definition comes into play. Message is our serialization protocol on top of TCP.
It defines the format/frame of the bytes a client has to send in order for our server to be able to interpret it.
Think of it as an intermediate abstraction that enables our database server to properly route requests to the specific Command based on the arguments provided.

A Message is defined as follows:

pub struct Message {
    /// Used as a way of identifying the format of the payload for deserialization
    pub cmd_id: CommandId,
    /// A unique request identifier - used for request tracing and debugging
    /// Note that this has to be encoded as utf8 otherwise parsing the message will fail
    pub request_id: String,
    /// the Request payload
    pub payload: Option<Bytes>,
}
Enter fullscreen mode Exit fullscreen mode

Every field prior to the payload can be thought of as the header of the message while payload is what actually encodes the Command we are trying to execute. Many other fields could be included as part of the header. Two fields will most likely be added in the future

  1. a checksum of the payload (maybe one including the overall message as well?)
  2. the timestamp of the message (just for debugging purposes)

Now let's construct a Message from a Tcp connection:

impl Message {
    pub async fn try_from_async_read(tcp_connection: &mut TcpStream) -> Result<Self> {
        let cmd_id = CommandId::try_from(tcp_connection.read_u8().await?)?;
        let request_id_length = tcp_connection.read_u32().await?;
        let request_id = {
            let mut buf = vec![0u8; request_id_length as usize];
            tcp_connection.read_exact(&mut buf).await?;
            String::from_utf8(buf).map_err(|_| {
                Error::InvalidRequest(InvalidRequest::MessageRequestIdMustBeUtf8Encoded)
            })?
        };

        let payload_length = tcp_connection.read_u32().await?;

        let payload = if payload_length > 0 {
            let mut buf = vec![0u8; payload_length as usize];
            tcp_connection.read_exact(&mut buf).await?;
            Some(buf.into())
        } else {
            None
        };

        Ok(Self {
            cmd_id,
            request_id,
            payload,
        })
    }
}
Enter fullscreen mode Exit fullscreen mode

Three notes:

  1. I removed most of the error handling from this snippet to make it more readable. Read the actual implementation here if curious.
  2. If you check the real implementation, you will see that the signature of the function is slightly different. Instead of
pub async fn try_from_async_read(tcp_connection: &mut TcpStream) -> Result<Message>
Enter fullscreen mode Exit fullscreen mode

we have

pub async fn try_from_async_read<R: AsyncRead + Unpin>(reader: &mut R) -> Result<Self>
Enter fullscreen mode Exit fullscreen mode

This is an important distinction that is worth discussing. If we had the TcpStream as argument, every test would require that we setup a real Tcp server, create Tcp connections and send / receive bytes via localhost.
Not only this would make tests slow, they would also make writing tests much more complicated than they had to be.

Here is an example of a test that we can write without setting up any tcp connection:

#[tokio::test]
    async fn test_max_message_size_exceeded() {
        let mut reader = MaxMessageSizeExceededAsyncRead::default();
        let err = Message::try_from_async_read(&mut reader)
            .await
            .err()
            .unwrap();

        match err {
            Error::InvalidRequest(InvalidRequest::MaxMessageSizeExceeded { max, got }) => {
                assert_eq!(max, MAX_MESSAGE_SIZE);
                assert_eq!(got, MAX_MESSAGE_SIZE + 1);
            }
            _ => {
                panic!("Unexpected error: {}", err);
            }
        }
    }
Enter fullscreen mode Exit fullscreen mode

In this example, I chose to implement AsyncRead for my custom struct MaxMessageSizeExceededAsyncRead. But we could've used UnixStreams or many other options of types that impl AsyncRead to inject the bytes we are interested in.

  1. You will likely notice that no timeouts are set to any of the tcp connection interactions. This is something that has to be included if we are building a production ready service but that I chose to skip at this point as it is not the focus of our work. But it has to be stated that if you plan on deploying anything that interacts with the network to a production environment, error handling of timeouts/slow requests is mandatory.

Now that we can build a Message out of a TcpStream, we must be able to serialize a Message into bytes so that it can be sent via network. The snippet below depicts how this can be done (without error handling again)

impl Message {
    pub fn serialize(self) -> Bytes {
        let payload = self.payload.clone();
        let payload_len = payload.clone().map_or(0, |payload| payload.len());
        let mut buf = BytesMut::with_capacity(
            self.request_id.len() + payload_len + 2 * size_of::<u32>() + size_of::<u8>(),
        );

        buf.put_u8(self.cmd_id as u8);
        buf.put_u32(self.request_id.len() as u32);
        buf.put(self.request_id.as_bytes());
        buf.put_u32(payload_len as u32);
        if let Some(payload) = payload {
            buf.put(payload);
        }

        assert_eq!(buf.capacity(), buf.len());
        buf.freeze()
    }
}
Enter fullscreen mode Exit fullscreen mode

Let's go over some notes here as well:

  1. You will quickly see that I rely on the bytes crate heavily throughout this project. Bytes is a very useful tool to write network related applications that allows you to work with contiguous byte buffers while avoiding memcopies almost entirely. It also provides some neat traits like BufMut which gives use methods like put_u32 etc.. Please refer to its documentation for more information.
  2. I included an assertion about buf len and capacity that is there to make sure that an important invariant is held: If we allocate a buffer of size X, we must completely fill it without ever resizing it. This guarantees that we are properly computing the buffer size prior to filling it (which is important if we care about performance, for example)

Finally, given that we have a cmd_id field in Message, we can easily choose how to serialize the payload field for each specific Command and vice versa.

For Ping, a Request Message would look like

Message {
    cmd_id: 1,
    request_id: <some string>,
    payload: None 
}
Enter fullscreen mode Exit fullscreen mode

and a response message would look like

Message {
    cmd_id: 1,
    request_id: <some string>,
    payload Some(Bytes::from(serde_json::to_string(PingResponse {message: "PONG".to_string()})))
}
Enter fullscreen mode Exit fullscreen mode

And that's it: A Client needs to know only about Message in order to be able to interact with our database.

If you want to walk through the code yourself, here are the pointers to the 4 larger components

  1. The TcpListener
  2. Message
  3. Command
  4. Ping

The IntoMessage trait and request_id (covers the functional requirement around tracing requests)

For any type that we want to be able to be converted into a Message, we can implement the IntoMessage trait for it.

pub trait IntoMessage {
    /// Same as [`Message::cmd_id`]
    fn cmd_id(&self) -> CommandId;
    /// Same as [`Message::payload`]
    fn payload(&self) -> Option<Bytes> {
        None
    }
    fn request_id(&self) -> String {
        REQUEST_ID
            .try_with(|rid| rid.clone())
            .unwrap_or("NOT_SET".to_string())
    }
}
Enter fullscreen mode Exit fullscreen mode

You will see that this trait has 2 default implementations:

  1. payload -> Commands like Ping don't have a payload. So Ping doesn't have to implement this specific function and just rely on the default behavior
  2. request_id -> This is the more interesting one: Every message has to have a request_id associated with it. This is going to be extremely important once we have to analyze logs/traces for requests received by the database.

The way request_id is handled by rldb at the time of this writing is: request_id is injected either by the client or by the server into tokio::task_local as soon as the Message is parsed from the network.
If you really think about it, it's a bit strange that we let the client set the request_id that is internal to the database. We allow this to happen (at least for now) because rldb nodes talk to each other. In requests like Get, a rldb cluster node will have to talk to at least another 2 nodes to issue internal GET requests. In order for us to be able to connect all of the logs generated for a client request, we have to provide a mechanism for a rldb node to inject the request_id being executed when issuing requests to other nodes in the cluster.

Final thoughts

This post covered how rldb handles incoming client requests. It described how requests are serialized/deserialized and how request ids are propagated throughout the request lifecycle.

Next chapter

  • Part 3 - Bootstrapping our cluster: Node discovery and failure detection

Which will cover how gossip protocols work and go over the rldb gossip implementation.

Top comments (0)