What we'll cover
So far we've mainly worked with Strings, and arrays of Strings. A command is received as an array of Strings. GET
accepts a String and return a String, or a nil String. SET
works with two Strings, a key and a value. We've also handled a few integers here and there, like for the result value of the TTL
& PTTL
commands.
We are going to add full support for List related commands in this chapter.
Redis Data Types
When it comes to key/value pairs stored in the main keyspace, that is, the @data_store
Dict
instance, Redis supports other data types beside Strings:
- Lists: An ordered collection of one or more Strings, sorted by order of insertion.
- Sets: An unordered collection of unique Strings
- Sorted Sets: An ordered collection of unique Strings. Strings are always added with a score, a float value, and the set orders them by their score. Uniqueness is enforced for the String value, multiple elements can have the same score. An example would be a list of user names, sorted by their age. The usernames are unique, but not the age:
{ <'pierre', 30>, <'james', 31>, <'jane', 32>, <'john', 32> }
- Hashes: An unordered collection of key/value pairs, where keys are unique. Keys and values are Strings. This is essentially the same data type we implemented in Chapter 6.
- Bit arrays (or bitmaps): This one is slightly different than the other data types as it uses Strings under the hood, but through dedicated commands allows users to perform operations using said Strings as arrays of bits. An example of such command is
BITOP AND dest-key k1 k2
, which will perform anAND
operation between the values at keysk1
andk2
and store the value indest-key
. - HyperLogLogs: A HyperLogLog (HLL for short) is a data structure optimized to trade space for efficiency when counting elements in a set. The main operation of an HLL is to count the number of unique elements. The result might not be exact, but the storage used by HLLs will always be extremely small regardless of how many items were counted. Similarly to Bit arrays, HLLs are implemented using Strings. A Set could be used to achieve a similar count operation, the difference being that the set will store each element, allowing it to return an accurate count, at the cost of its memory footprint being proportional to the size of the set. A HyperLogLog in Redis cannot grow larger than 12,288 (12k) bytes.
- Streams: An append-only collection, conceptually similar to a log structure. Elements are key/value pairs and are by default ordered by the time they were added to the stream. Streams are a fairly complicated data structure, well documented on the official website and will not be covered in this book.
Note about empty collections: Lists, Sets, Sorted Sets & Hashes cannot be empty, once the last element is deleted, the collection is removed from the keyspace.
Note about integers: Redis handles integers in a way that can seem confusing. The type of a value is never specified, it is always a String. Whether you send the command SET a-string-key a-string
or SET an-integer-key 100
, the value is sent as a string. Internally Redis knows whether or not what was received as a String can be treated as an integer. In some cases it will optimize the way it stores the data based on whether or not it is an integer. There are even commands that will only work if a value can be treated as a number, such as the INCR command.
List related terms
Let's clarify some list related terms first. A list is often represented as a left to right sequence of elements. The left-most element is called the head and the right-most element is called the tail.
It is also common to use the term "tail" to describe all the elements after the head. According to this definition, the tail is another list, empty if the list itself has zero or one element, non empty otherwise. This is not the definition we'll use throughout this book.
An empty list has no head and no tail, or it could be said that they're both nil. For a one-element list, the head and the tail are equal.
Elements can be added or removed from either side, left or right, as well as from within the list. Adding elements is commonly referred to as a push and removing elements as a pop. The terms shift and unshift are also used, where shift commonly means popping an element to the left of the list and unshift pushing an element to the left.
Finally, the terms append & prepend can also be used. In this context append is similar to a right push, and prepend is similar to a left push, or unshift.
Lists and other data structures
Lists can be used as the backing data structures for various more specific structures such as queues and stacks.
A list can be used as a queue by implementing the push operation as a "left push", that is, new elements are added to the left of the list, and the new element becomes the head of the list. Elements are removed with a "right pop", that is, elements are removed from the right of the list, the tail is removed and the element that preceded the tail becomes the new tail. This satisfies the First In First Out constraint of a queue.
A list can be used as a stack by implementing the push operation as a "right push", that is, new elements are added to the right of the list, and the new element becomes the new tail. Elements are removed as a "right pop", similarly to how we described a queue above. This satisfies the Last In First Out constraint of a stack.
The choice of right and left in these two examples is arbitrary but also very common within western countries where languages are read and written left to right.
Identical data structures could be implemented by reversing the side of each operations. A queue could be implemented by pushing new elements with a right push and removing them with a left pop. A stack could be implemented by pushing new elements with a left push and removing them with a left pop.
Redis List commands
Redis supports eighteen list related commands:
- LINDEX: Gets an element by its index
- LINSERT: Inserts an element
- LLEN: Returns the length of the list
- LPOP: Removes an element on the left of the list
- LPOS: Returns the position of one or elements
- LPUSH: Adds an element to the left of the list, creating it if needed
- LPUSHX: Same as LPUSH, but does not create a new list if it doesn't already exist
- LRANGE: Returns elements from the list
- LREM: Removes one or more elements from the list
- LSET: Replaces an element in a list
- LTRIM: Keeps a subset of the list
- RPOP: Removes an element on the right of the list
- RPOPLPUSH: Removes an element to the right of a list and adds it to the left of another list
- RPUSH: Adds an element to the right of the list, creating it if needed
- RPUSHX: Same as RPUSH, but does not create a new list if it doesn't already exist
- BLPOP: Blocking variant of LPOP
- BRPOP: Blocking variant of RPOP
- BRPOPLPUSH: Blocking variant of RPOPLPUSH
How does Redis do it
We've actually already implemented a list in the previous chapter to handle hash collisions and store multiple pairs in the same bucket.
We could use a similar structure to implement all the list commands detailed previously but the list we used for the Dict
implementation has a few limitations with regards to performance that would be problematic in some cases.
The need for a doubly linked list
The following is what we used to create a list of entries in a bucket:
module BYORedis
class DictEntry
attr_accessor :next, :value
attr_reader :key
def initialize(key, value)
@key = key
@value = value
@next = nil
end
end
end
listing 7.1 The linked list used for the Dict
class
We used the class
syntax to control which getters and setters we wanted to be generated, with attr_accessor
& attr_reader
, but it could be simplified as the following:
Node = Struct.new(:value, :next)
listing 7.2 The Node struct
Given the nature of Ruby's type system, value
could be anything, and could therefore be a key/value pair if we passed a two-element array such as [ 'a-key', 'a-value' ]
.
This implementation is a "singly linked list". Each node has one "link" to its successor in the list, the next
attribute in the previous example. If the link value is empty, then there's no successor and the element is the last one in the list.
One problem with this approach is that adding or deleting an element at the end of list requires to iterate through the whole list, which will become slower and slower as the list grows. We covered this problem in the previous chapter when we explained the need for growing the hash table.
A solution to this problem is to use what is commonly called a sentinel node, essentially a way to hold a reference to the tail of the list, for easy access to it, this would look like this:
class List
def initialize
@head = nil
@tail = nil
end
def prepend(element)
# The new node is created with its next value set to @head, if @head was nil, the list was
# empty, and new_node is now the head, its next value will also be nil.
# If @head was not nil, new_node now points to what used to be the head
new_node = Node.new(element, @head)
# The head of the list is now the new node
@head = new_node
if @tail.nil?
# If @tail is nil, the list was empty, and has now one element, the head and the tail are
# the same node
@tail = new_node
end
new_node
end
def append(element)
# new_node will be the new tail, so its next value must be nil
new_node = Node.new(element, nil)
if @tail
@tail.next = new_node
else
# If the list is empty, both @head and @tail are nil
@head = @tail
end
# Update the @tail value to now point to new_node
@tail = new_node
new_node
end
end
listing 7.3 A singly linked list with prepend & append operations
The append operation is now O(1) with this implementation, it will require the same amount of steps regardless of the size of the list, it runs in constant time.
While this is a big win, another common list operation, removing the tail, also known as right pop, cannot be optimized from O(n) to O(1) with this approach:
def right_pop
# If @tail is nil, the list is empty
return nil if @tail.nil?
# When the list has only one element, @head will be equal to @tail. In this case, popping from
# the right or the left is effectively the same, and we can do so by setting both variables to
# nil. The list is now empty
if @head == @tail
@head = nil
@tail = nil
nil
elsif @head.next == @tail
# If the second element, the one pointed at by @head.next is equal to @tail, the list has
# only two elements
@head.next = nil
tail = @tail
@tail = @head
tail
else
# We now need a reference to the element pointing at @tail, so we can set its next value to
# nil, instead of @tail, and set @tail to it, doing so will require iterating through all
# the element in the list:
cursor = @head.next
# The number of steps this loop will go through is proportional to the size of the list,
# making this method O(n)
while cursor.next != @tail
cursor = cursor.next
end
# We exited the loop, so the condition cursor.next == @tail is now true
cursor.next = nil
tail = @tail
@tail = cursor
tail
end
end
listing 7.4 right_pop operation for a singly linked list
We could keep going down this road, and optimize the right_pop
method by adding a third instance variable to the List
class, one that always holds a value the second to last node. With such a value we would not have to iterate through the list in the else
branch. Maintaining the value would require a few more steps in the append
, prepend
& pop
operations to always keep it pointing at the second to last element.
This would still not allow us to efficiently implement all operations. The LRANGE
command is used to return elements in the list, given two values start
& stop
. LRANGE list 0 1
means "return the first two items of the list list
, the one at index 0 and the one at index 1. LRANGE list 0 3
means "return the first four items, the ones at indices 0, 1, 2 & 3, or put differently, all items between index 0 and 3. But the LRANGE
command also supports a different syntax, with negative indices, to count elements starting from the tail. -1 means the last element in the list, -2, the second to last, and so on. Using this syntax we can return all the elements in the list with LRANGE 0 -1
. We could also return the last three elements of the list with LRANGE -3 -1
.
The main benefit of negative indices is that you don't need to know the size of the list to return the last n elements. In the previous examples, if we knew the list had 10 elements, we could have used LRANGE 7 9
to return the last three elements. But if an 11th element is added to the list, we would need to now send LRANGE 8 10
. Using the negative index syntax, we can always use LRANGE -3 -1
, regardless of the size of the list.
As we've already discussed a few times, Redis should be able to handle really big data sets, in the case of lists, ideally ones with thousands, and even millions of elements. Being able to return the last n elements of large lists should ideally not require to iterate through the whole list, which, sadly, with a singly linked list, we'd be forced to do.
A solution to this problem is to use a doubly linked list, one where each node contains a link to the next element but also one to the previous element. It's important to address right away that this approach comes with an important trade-off, we're now storing twice as much metadata per node, in exchange for potential speedups.
A doubly linked list can be implemented with the following Ruby struct:
DoublyLinkedNode = Struct.new(:value, :prev_node, :next_node)
listing 7.5 The Doubly Linked List Struct
We can now use this DoublyLinkedNode
class in combination with the sentinel approach to implement the LRANGE
command in an efficient manner:
def range(start, stop)
# The complete range method, which handles all the possible edge cases is implemented later
# on, we're only showing a simplified version here, to highlight the benefits of the doubly
# linked list
# The "real" method should handle the case where only one of the two bounds is negative, such
# as LRANGE 0 -1
list_subset = []
if start < 0 && stop < 0 && start <= stop
current_index = -1
cursor = @tail
while current_index >= start
if current_index <= stop
list_subset.prepend(cursor.value)
end
cursor = cursor.prev_node
current_index -= 1
end
end
list_subset
end
listing 7.6 A partial range method implementation leveraging a doubly linked list
As detailed in the comment, the previous example is not complete, but it shows how we can use the prev_node
field to iterate from right to left instead of being forced to iterated from left to right with a singly linked list.
We can also simplify the right_pop
method. The prev_node
field on each node removes the need for a third instance variable to hold the second to last node.
def right_pop
return if @tail.nil?
tail = @tail
@tail = @tail.prev_node
if @tail
@tail.next_node = nil
else
# If the list had only element, we removed it and the list is now empty, @tail is now nil
# and we also need to set @head to nil
@head = nil
end
tail
end
listing 7.7 The right_pop operation leveraging a doubly linked list
Redis uses a doubly linked list approach, for the reasons we mentioned above, but it uses a more optimized version to reduce memory usage.
On a 64-bit CPU, which is what most modern CPUs use, a pointer is a 64-bit integer, which uses 8 bytes (8 * 8 = 64 bits). So regardless of what the node actually stores, you need two pointers per node, one for the next node, one for the previous node, that's 16 bytes, per node. In a common case where each node would store an integer, which in C can have different sizes, as small as one byte and as big as 8, it means that even if you're storing a large integer, one that takes 8 bytes, you end up with 16 bytes of metadata, for 8 bytes of actual data. The problem is potentially even worse if you're storing smaller integer, say that the integers stored are small, 2 bytes for instance, you would end up with a node of 18 bytes where only 2 bytes are the actual data. This problem might not seem too bad with small lists but as the list grows, it would cause the server to waste a lot of memory to store these pointers.
Redis uses two data structures to optimize for this problem. At the top level, it uses a structure it calls Quicklist, which is a doubly linked list where each element is a Ziplist. A Ziplist is a compact optimization of a doubly linked list that does not suffer from the metadata storage problem we described.
Briefly summarizing a Ziplist is a complicated task, but it is conceptually close to how arrays work. It allocates a contiguous chunk of memory with malloc(3)
. A key difference is that all elements in an array have the same size, so there's not need for metadata beyond the size of the array. If you're trying to get the 5th element in an array, you do so by multiplying the size of an element in the array by five, and use the value as an offset from the beginning of the array, and the value you found at that location is the fifth element in the array.
Elements can have different sizes with a Ziplist, small integers, with a value between 0 and 12 will only use one byte, whereas larger integers, with values greater than 2,147,483,647 - the maximum value of an int32_t, which uses 4 bytes - will require 9 total bytes, 8 bytes for the value, an int64_t, and one byte of metadata. These variations in element sizes, which allows for greater optimizations, small elements take a small amount of space, means that the list needs to store some metadata to allow traversal of the list and access to the different elements. Appendix A explains in more details about Ziplist works, but does not provide a Ruby implementation.
The Ziplist approach also shares some similarities with how SQL database systems organize data into pages. For reference this page of the Postgres documentation explains in details the page layout. A key difference is that pages have a fixed size, whereas a ziplist will grow until it reaches its max size. The similarity is in the fact that they both operate on a set amount of memory which starts with some metadata followed by the actual data, organized in a compact sequence.
The reason why Redis does not exclusively uses Ziplists is because as Ziplists grow, update operations such as adding or removing elements, become more and more costly. This mixed approach allows Redis to benefit from the best of both worlds, the quicklist maintains a list of ziplists, which actually hold the elements in the list. Each ziplist has a maximum size, and when it reaches it, a new ziplist node is created in the parent quicklist.
This implementation is fairly complex so for the sake of simplicity we will use the basic doubly linked list approach shown above in this chapter.
The quicklist/ziplist approach might be implemented in a future chapter
It is now time to add support for all the list commands to our server.
Handling Incorrect Types
Now that our server is about to handle more than one type, Strings and Lists, we need to consider the type of each key/value pair when processing commands. For instance, if a pair is first created as a String, with the SET
command, calling LLEN
on it, which attempts to return the length of a List, is invalid:
127.0.0.1:6379> SET a-string a-value
OK
127.0.0.1:6379> LLEN a-string
(error) WRONGTYPE Operation against a key holding the wrong kind of value
Some commands work regardless of the type of the pair, such as TTL
and PTTL
:
127.0.0.1:6379> TTL a-string
(integer) -1
127.0.0.1:6379> RPUSH a-list 1
(integer) 1
127.0.0.1:6379> TTL a-list
(integer) -1
Redis has a TYPE
command that can be used to return the type of a pair, the possible return values are: string
, list
, set
, zset
, hash
and stream
. If the key does not exist, it returns none
.
Let's add support for the TYPE
command.
So far we've repeated some code between all the commands, let's improve this by defining a BaseCommand
class:
module BYORedis
class BaseCommand
def initialize(data_store, expires, args)
@logger = Logger.new(STDOUT)
@logger.level = LOG_LEVEL
@data_store = data_store
@expires = expires
@args = args
end
def call
raise NotImplementedError
end
end
end
listing 7.8 The BaseCommand class, parent of all command classes
We can now define the TypeCommand
class, which inherits from BaseCommand
:
module BYORedis
class TypeCommand < BaseCommand
def call
Utils.assert_args_length(1, @args)
key = @args[0]
ExpireHelper.check_if_expired(@db, key)
value = @db.data_store[key]
case value
when nil
RESPSimpleString.new('none')
when String
RESPSimpleString.new('string')
else
raise "Unknown type for #{ value }"
end
end
end
end
listing 7.9 The TypeCommand class
The command is not that useful for now, it either returns none
or string
. We will add a new branch to the case/when
statement when we add the List
type.
The next step is to add it to the COMMANDS
constant in the Server
class:
# ...
require_relative './pttl_command'
require_relative './type_command'
require_relative './command_command'
module BYORedis
class Server
COMMANDS = Dict.new
COMMANDS.set('command', CommandCommand)
# ...
COMMANDS.set('type', TypeCommand)
MAX_EXPIRE_LOOKUPS_PER_CYCLE = 20
# ...
end
end
listing 7.10 Updates to the Server class to support the TYPE command
All the command classes we've created need describe
class method, which is in turn used by the CommandCommand
class to return the description of the command. The approach worked well so far but it is a bit verbose, and as we add more commands, becomes more and more annoying.
Let's simplify this process with a struct, Describe
, defined in BaseCommand
:
module BYORedis
class BaseCommand
Describe = Struct.new(:name, :arity, :flags, :first_key_position, :last_key_position,
:step_count, :acl_categories) do
def serialize
[
name,
arity,
flags.map { |flag| RESPSimpleString.new(flag) },
first_key_position,
last_key_position,
step_count,
acl_categories.map { |category| RESPSimpleString.new(category) },
]
end
end
end
# ...
end
listing 7.11 The Describe Struct defined in the BaseCommand class
We need to update the CommandCommand
class as well, to use this new class:
module BYORedis
class CommandCommand < BaseCommand
# ...
def call
RESPArray.new(SORTED_COMMANDS.map { |command_class| command_class.describe.serialize })
end
def self.describe
Describe.new('command', -1, [ 'random', 'loading', 'stale' ], 0, 0, 0,
[ '@slow', '@connection' ])
end
end
end
listing 7.12 The CommandCommand class using the Describe Struct
We can now define the self.describe
method in the TypeCommand
with the following:
module BYORedis
class TypeCommand < BaseCommand
...
def self.describe
Describe.new('type', 2, [ 'readonly', 'fast' ], 1, 1, 1,
[ '@keyspace', '@read', '@fast' ])
end
end
end
listing 7.13 The TypeCommand class using the Describe struct
We also need to update all the existing command classes: DelCommand
, GetCommand
, SetCommand
, TtlCommand
, & PTtlCommand
. Doing so is fairly mechanical and is therefore not included here. You can find all the code from this chapter on GitHub
Adding List Commands
Now that we've explored the landscape of lists in Redis, it's time to add these functionalities to our server. We're going to start with the two commands that allow us to create a list, LPUSH
& RPUSH
.
Creating a list with LPUSH & RPUSH
There are two commands that allow clients to create a list, LPUSH
& RPUSH
. If the given key does not already exist and only one new element is passed to the command, they behave identically, they create a list and add the given element to it. The difference with this two commands occurs when the list already exists and has some elements in it or if multiple elements are passed. LPUSH
adds them to the Left and RPUSH
adds them to the Right.
Both commands accepts one or more values,
127.0.0.1:6379> LPUSH a-list a b c d
(integer) 4
127.0.0.1:6379> LRANGE a-list 0 -1
1) "d"
2) "c"
3) "b"
4) "a"
In the previous example we can see that LPUSH
added four elements to the newly created list a-list
. It first added a
, then b
to the left, as the new head, and then c
and d
to the left as well, the final list is [ 'd', 'c', 'b', 'a' ]
.
We can try the same example but with RPUSH
:
127.0.0.1:6379> DEL a-list
(integer) 1
127.0.0.1:6379> RPUSH a-list a b c d
(integer) 4
127.0.0.1:6379> LRANGE a-list 0 -1
1) "a"
2) "b"
3) "c"
4) "d"
The subsequent elements, b
, c
& d
were added as new tails, to the right, and the final list is: [ 'a', 'b', 'c', 'd' ]
.
Finally, if the key already exists and is not a List, Redis returns an error:
127.0.0.1:6379> SET a b
OK
127.0.0.1:6379> LPUSH a a
(error) WRONGTYPE Operation against a key holding the wrong kind of value
127.0.0.1:6379> RPUSH a a
(error) WRONGTYPE Operation against a key holding the wrong kind of value
Because most of the list commands will be small and related to each other, instead of creating one file per command as we've done so far, this time we'll create a single file list_commands.rb
and define all the command classes in there. Let's start with LPUSH
.
Before creating the LPushCommand
, we will start by reorganizing how the Server
class and the different command classes interact with each other. So far we've passed the @data_store
and @expires
Dict
instances, as well as the command arguments, to each command class.
We will need to create more data structures to handle various list related commands, so to simplify this, we will wrap all these data structures inside a DB
class. It's worth noting that this approach is conceptually similar to how the code is organized in the Redis codebase. Redis defines a C struct for a DB type.
The process of a looking up a list is slightly different than it is from a String. For commands such as RPUSH
& LPUSH
, we want to see if a list exists for the given key, but if it doesn't exist, we want to create a new list. Let's create a method in the DB
class for this purpose:
module BYORedis
class DB
attr_reader :data_store, :expires
attr_writer :ready_keys
def initialize
@logger = Logger.new(STDOUT)
@logger.level = LOG_LEVEL
@data_store = Dict.new
@expires = Dict.new
end
def lookup_list(key)
list = @data_store[key]
raise WrongTypeError if list && !list.is_a?(List)
list
end
def lookup_list_for_write(key)
list = lookup_list(key)
if list.nil?
list = List.new
@data_store[key] = list
end
list
end
end
end
listing 7.14 The new DB class
We'll define the WrongTypeError
shortly, when we add a few other utility methods.
Let's now refactor the BaseCommand
class to use the DB class:
module BYORedis
class BaseCommand
# ...
def initialize(db, args)
@logger = Logger.new(STDOUT)
@logger.level = LOG_LEVEL
@db = db
@args = args
end
end
end
listing 7.15 Updates to the BaseCommand class to support the DB class
And finally, let's update the Server
class:
# ...
module BYORedis
class Server
# ...
def initialize
@logger = Logger.new(STDOUT)
@logger.level = LOG_LEVEL
@clients = Dict.new
@db = DB.new
@server = TCPServer.new 2000
# ...
end
# ...
def handle_client_command(command_parts)
@logger.debug "Received command: #{ command_parts }"
command_str = command_parts[0]
args = command_parts[1..-1]
command_class = COMMANDS[command_str.downcase]
if command_class
command = command_class.new(@db, args)
command.call
else
# ...
end
end
# ...
end
end
listing 7.16 Updates to the Server class to support the DB class
Both LPUSH
& RPUSH
accept at least two arguments, the key and one or more elements to add. The validation of the number of arguments is very similar across commands, so let's create a helper method for that. Let's first create a Utils
module and add it there:
module BYORedis
InvalidArgsLength = Class.new(StandardError) do
def resp_error(command_name)
RESPError.new("ERR wrong number of arguments for '#{ command_name }' command")
end
end
WrongTypeError = Class.new(StandardError) do
def resp_error
RESPError.new('WRONGTYPE Operation against a key holding the wrong kind of value')
end
end
module Utils
def self.assert_args_length(args_length, args)
if args.length != args_length
raise InvalidArgsLength, "Expected #{ args_length }, got #{ args.length }: #{ args }"
end
end
def self.assert_args_length_greater_than(args_length, args)
if args.length <= args_length
raise InvalidArgsLength,
"Expected more than #{ args_length } args, got #{ args.length }: #{ args }"
end
end
end
end
listing 7.17 The new Utils module
We can now call Utils.assert_args_length_greater_than(1, @args)
to validate the number of arguments for the LPUSH
& RPUSH
commands. Doing so will raise a InvalidArgsLength
exception. Let's add a rescue
statement for this exception, as well as for the WrongTypeError
we used earlier in the DB
class. Let's do this in the BaseCommand
so that all classes benefit from it:
module BYORedis
class BaseCommand
# ...
def execute_command
call
rescue InvalidArgsLength => e
@logger.debug e.message
command_name = self.class.describe.name.upcase
e.resp_error(command_name)
rescue WrongTypeError => e
e.resp_error
end
def call
raise NotImplementedError
end
end
end
listing 7.18 Updates to the BaseCommand class to catch shared exceptions
This is an application of the template method pattern. The base class defines logic around the call
method, and it is up to the subclasses to define the actual logic of the call
method. We need to update the Server
class to now call the execute_command
method instead:
module BYORedis
class Server
# ...
def handle_client_command(command_parts)
@logger.debug "Received command: #{ command_parts }"
command_str = command_parts[0]
args = command_parts[1..-1]
command_class = COMMANDS[command_str.downcase]
if command_class
command = command_class.new(@db, args)
command.execute_command
else
# ...
end
end
end
end
listing 7.19 Updates to the Server class to use the execute_command method
We already know we are going need a very similar feature soon, for the RPUSH
command, so we're first defining a set of shared methods:
module BYORedis
module ListUtils
def self.common_lpush(list, elements)
elements.each { |element| list.left_push(element) }
RESPInteger.new(list.size)
end
def self.common_rpush(list, elements)
elements.each { |element| list.right_push(element) }
RESPInteger.new(list.size)
end
def self.common_find(args)
Utils.assert_args_length_greater_than(1, args)
yield args[0]
end
def self.find_or_create_list(db, args)
common_find(args) do |key|
db.lookup_list_for_write(key)
end
end
end
end
With all this refactoring out of the way, we can now define the LPushCommand
:
require_relative './list'
module BYORedis
class LPushCommand < BaseCommand
def call
list = ListUtils.find_or_create_list(@db, @args)
values = @args[1..-1]
ListUtils.common_lpush(list, values)
end
def self.describe
Describe.new('lpush', -3, [ 'write', 'denyoom', 'fast' ], 1, 1, 1,
[ '@write','@list', '@fast' ])
end
end
end
listing 7.20 Adding list_commands.rb and the LPushCommand
class
We first use Utils.assert_args_length_greater_than
to validate that we have at least two arguments. We then lookup a list, creating it if doesn't exist and failing if the key already exists and is of a different type. Finally, we iterate over all the remaining arguments and push them to the left of the list with list.left_push
and return the size of list.
There's a major problem, the List
class does not exist yet! Let's address this now:
module BYORedis
class List
ListNode = Struct.new(:value, :prev_node, :next_node)
attr_accessor :head, :tail, :size
def initialize
@head = nil
@tail = nil
@size = 0
end
def left_push(value)
new_node = ListNode.new(value, nil, @head)
if @head.nil?
@tail = new_node
else
@head.prev_node = new_node
end
@head = new_node
@size += 1
end
end
end
listing 7.21 The List
class
The class defines a doubly linked list, similar to what we shown earlier in the chapter. The list is initialized with a size of 0, and nil values for @head
and @tail
. The left_push
method behaves differently depending on whether the list was empty or not. We start by creating a new node, with the given value, and setting the prev_node
attribute to nil
, given that the new element will be the new head, it does not have a previous element in the list. The next element is set to @head
, which may be nil
if the list was empty.
If the list was empty, which we check with if @head.nil?
, then the new element is also the tail of the list, and we update the @tail
attribute as such. Otherwise, we update the old head's previous node, which is still the value held by @head
, to the new node. In other words, the old head now has a previous element in the list.
Finally, we update @head
to the new node and increment the size of the list.
We need to require the list_commands
file in the server.rb
file as well as adding LPushCommand
to the COMMANDS
constant:
# ...
require_relative './list_commands'
# ...
module BYORedis
class Server
COMMANDS = Dict.new
COMMANDS.set('command', CommandCommand)
COMMANDS.set('del', DelCommand)
COMMANDS.set('get', GetCommand)
COMMANDS.set('set', SetCommand)
COMMANDS.set('ttl', TtlCommand)
COMMANDS.set('pttl', PttlCommand)
COMMANDS.set('lpush', LPushCommand)
# ...
end
end
listing 7.22 Updates to the Server class to support the LPUSH command
Now that lists can be added to the keyspace, let's handle List
values in the ListCommand
class:
module BYORedis
class TypeCommand < BaseCommand
def call
# ...
case value
when nil
RESPSimpleString.new('none')
when String
RESPSimpleString.new('string')
when List
RESPSimpleString.new('list')
else
raise "Unknown type for #{ value }"
end
end
# ...
end
end
listing 7.23 Update to the TypeCommand to support the List type
The RPushCommand
is very similar to the LPushCommand
:
module BYORedis
# ...
class RPushCommand < BaseCommand
def call
list = ListUtils.find_or_create_list(@db, @args)
values = @args[1..-1]
ListUtils.common_rpush(list, values)
end
def self.describe
Describe.new('rpush', -3, [ 'write', 'denyoom', 'fast' ], 1, 1, 1,
[ '@write', '@list', '@fast' ])
end
end
end
listing 7.24 The RPushCommand class
The class is almost identical to LPushCommand
, except that we call right_push
on the list instead, so let's add this method to the List
class:
module BYORedis
class List
# ...
def right_push(value)
new_node = ListNode.new(value, @tail, nil)
if @head.nil?
@head = new_node
else
@tail.next_node = new_node
end
@tail = new_node
@size += 1
end
end
end
listing 7.25 The List#right_push method
The right_push
method is also similar to left_push
and also behaves differently whether or not the list was empty. We start by creating the new node, where the previous node value is set to what the tail was. If the list was empty, this value will be nil
and the new element will be the only element in the list, otherwise its previous node value will point at the old tail. If the list was empty, we also need to update the head value. Otherwise, we need to update the next node value of the old tail to point to the new node.
Finally, we update the value of @tail
and increment the size of list.
The *X variants, LPUSHX & RPUSHX
There are two more commands, almost identical to LPUSH
& RPUSH
, LPUSHX
& RPUSHX
. The only difference is that these two commands only push new elements to the list if it already exists. If the list does not already exist, it returns 0, otherwise it returns the new size of the list, like LPUSH
& RPUSH
do.
Given that these commands share a good amount of logic, let's add a few more helpers to the ListUtils
module:
module BYORedis
# ...
module ListUtils
# ...
def self.find_list(db, args)
common_find(args) do |key|
db.lookup_list(key)
end
end
def self.common_xpush(list)
if list.nil?
RESPInteger.new(0)
else
yield
end
end
end
# ...
end
listing 7.26 The ListUtils module
We can now create the LPushXCommand
and RPushXCommand
classes:
module BYORedis
# ...
class LPushXCommand < BaseCommand
def call
list = ListUtils.find_list(@db, @args)
values = @args[1..-1]
ListUtils.common_xpush(list) do
ListUtils.common_lpush(list, values)
end
end
def self.describe
Describe.new('lpushx', -3, [ 'write', 'denyoom', 'fast' ], 1, 1, 1,
[ '@write', '@list', '@fast' ])
end
end
end
listing 7.27 The LPushXCommand
The only difference is that we call @db.lookup_list(key)
instead of @db.lookup_list_for_write(key)
.
module BYORedis
# ...
class RPushXCommand < BaseCommand
def call
list = ListUtils.find_list(@db, @args)
values = @args[1..-1]
ListUtils.common_xpush(list) do
ListUtils.common_rpush(list, values)
end
end
def self.describe
Describe.new('rpushx', -3, [ 'write', 'denyoom', 'fast' ], 1, 1, 1,
[ '@write', '@list', '@fast' ])
end
end
end
listing 7.28 The RPushXCommand
We now have four List related commands, which allow us to add elements to a list, but we don't have any way to read the content of the list. This is what the LRANGE
command was created for.
Reading a list with LRANGE
The LRANGE
command accepts three arguments, a key, a start index and a stop index. It returns all the elements in the list stored at the given key, between the start and stop indices. As discussed earlier in the chapter, it supports a special syntax, with negative indices, to index element from the right instead of from the left.
Another important piece of the LRANGE
command is that it does not return errors for out of bound indices or for out of order start and stop values. Let's look at few examples first.
127.0.0.1:6379> RPUSH a-list a b c d e f
(integer) 6
The a-list
list contains the following elements, [ 'a', 'b', 'c', 'd', 'e', 'f' ]
. The index of the element, 'f'
is 5, but LRANGE
silently ignores this if we ask for all the elements up to index 10 for instance:
127.0.0.1:6379> LRANGE a-list 0 10
1) "a"
2) "b"
3) "c"
4) "d"
5) "e"
6) "f"
It returned all the elements in the given range, and ignored the fact that there are no elements between index 6 and 10. The same logic works the other way, with negative indices, 'f'
is at index -1
, and 'a'
is at index -6
. There is no element at index -7
and beyond:
127.0.0.1:6379> LRANGE a-list -10 -6
1) "a"
And Redis returns empty arrays if the whole request is outside the list:
127.0.0.1:6379> LRANGE a-list -10 -7
(empty array)
127.0.0.1:6379> LRANGE a-list 6 10
(empty array)
Finally, the following commands don't make any sense, since the start value is after the stop value, and Redis returns an empty array in this case:
127.0.0.1:6379> LRANGE a-list 2 1
(empty array)
127.0.0.1:6379> LRANGE a-list -1 -2
(empty array)
Let's implement the logic for the LRANGE
command:
module BYORedis
# ...
class LRangeCommand < BaseCommand
def call
Utils.assert_args_length(3, @args)
key = @args[0]
start = OptionUtils.validate_integer(@args[1])
stop = OptionUtils.validate_integer(@args[2])
list = @db.lookup_list(key)
if list.nil?
EmptyArrayInstance
else
ListSerializer.new(list, start, stop)
end
end
def self.describe
Describe.new('lrange', 4, [ 'readonly' ], 1, 1, 1, [ '@read', '@list', '@slow' ])
end
end
end
listing 7.29 The LRange Command
We use another argument length validator, in this case we always require three arguments. We then need to validate that the second and third arguments are integers, we do so with a new validator method, validate_integer
:
module BYORedis
ValidationError = Class.new(StandardError) do
def resp_error
RESPError.new(message)
end
end
module OptionUtils
def self.validate_integer(str)
Integer(str)
rescue ArgumentError, TypeError
raise ValidationError, 'ERR value is not an integer or out of range'
end
def self.validate_float(str, field_name)
Float(str)
rescue ArgumentError, TypeError
raise ValidationError, "ERR #{ field_name } is not a float or out of range"
end
end
end
listing 7.30 The OptionsUtils module
The module also includes a validate_float
method, which is very similar to the validate_integer
method. We will need the validate_float
method later on in this chapter.
Similarly to InvalidArgsLength
, we need to handle ValidationError
in the execute_command
method in BaseCommand
:
module BYORedis
class BaseCommand
# ...
def execute_command
call
rescue InvalidArgsLength => e
@logger.debug e.message
command_name = self.class.describe.name.upcase
e.resp_error(command_name)
rescue WrongTypeError, ValidationError => e
e.resp_error
end
end
end
listing 7.31 Updates to the BaseCommand class to support ValidationError exceptions
We delegate the actual serialization logic to a dedicated class, ListSerializer
, given that the logic requires the handling of many edge cases. Let's look at the class:
module BYORedis
# ...
class ListSerializer
attr_reader :start, :stop, :list
attr_writer :start, :stop
def initialize(list, start, stop)
@list = list
@start = start
@stop = stop
end
def serialize
@stop = @list.size + @stop if @stop < 0
@start = @list.size + @start if @start < 0
@stop = @list.size - 1 if @stop >= @list.size
@start = 0 if @start < 0
return EmptyArrayInstance.serialize if @start > @stop
response = ''
size = 0
distance_to_head = @start
distance_to_tail = @list.size - @stop
if distance_to_head <= distance_to_tail
iterator = List.left_to_right_iterator(@list)
within_bounds = ->(index) { index >= @start }
stop_condition = ->(index) { index > @stop }
accumulator = ->(value) { response << RESPBulkString.new(value).serialize }
else
iterator = List.right_to_left_iterator(@list)
within_bounds = ->(index) { index <= @stop }
stop_condition = ->(index) { index < @start }
accumulator = ->(value) { response.prepend(RESPBulkString.new(value).serialize) }
end
until stop_condition.call(iterator.index)
if within_bounds.call(iterator.index)
accumulator.call(iterator.cursor.value)
size += 1
end
iterator.next
end
response.prepend("*#{ size }\r\n")
end
end
end
listing 7.32 ListSerializer
in list.rb
The class implements an interface similar to the classes in resp_types.rb
, which allows us to return it from the call
method, and let the Server
class call .serialize
on the value returned by execute_command
.
The serialize
method is fairly long, so let's look at it one line at a time:
The first two lines take care of negative indices. If either start
or stop
are negative, we convert them to a 0-based index by adding the negative value to the size of list. Let's illustrate this with an example.
list = [ 'a', 'b', 'c' ] # list.size == 3
stop = -1
stop = 3 + stop # stop == 2
list[stop] == 'c' # stop is the last index of the array
start = -2
start = 3 + start # start == 1
list[start] == 'b' # start is the second to last index of the array
The next two lines take care of out of bound indices. If stop
is greater than the last index of the list, size - 1
, then we set it to that value. There's no need to keep iterating once we reached the last element. Similarly, if stop is lower than 0
, we set it to 0
, there is no element before the one at index 0
, so there's no need to look there.
Once the indices have been sanitized, we can return early if start > stop
, as we've shown above, this is nonsensical and we return an empty array right away.
The next step is an optimization to speed up the iteration process. Depending on the values of start
& stop
, it might be more interesting to start iterating from the head of the list, or from the tail. Imagine a list with one million elements it would be great if LRANGE -1 -1
, which returns the tail of the list, would run in O(1) time, and not in O(n) time. Our List
class holds a reference to the tail, so it is feasible to return it without iterating through the list.
We achieve this with the distance_to_head
& distance_to_tail
variables. If start
is 0
, there is no need for "empty iterations" to reach the first element we need to return, but if start
were, say, 100, we would need to iterate 100 times to reach the first element we need to return.
The same goes for distance_to_tail
, if list.size - stop
is equal to 0, the last element of the list needs to be returned, so there's no need for empty iterations, but if stop were, say, 2, in a list of 100 elements, we'd need 98 empty iterations from the right to reach the last element that needs to be returned.
If distance_to_head
is smaller than distance_to_tail
, it'll be faster to reach the sublist that needs to be returned if we start from the head, and if distance_to_tail
is smaller, then it'll be faster to start from the tail.
We could have written two while/until
loops in each branch of the if/else
condition, but instead we chose to define a few proc
s that will determine the direction of the iteration, so that we can write a single loop below.
Because this is a common pattern across list commands, we define an Iterator
struct as well as two helpers to return the two common iterators:
module BYORedis
class List
# ...
Iterator = Struct.new(:cursor, :index, :cursor_iterator, :index_iterator) do
def next
self.cursor = cursor_iterator.call(cursor)
self.index = index_iterator.call(index)
end
end
# ...
def self.left_to_right_iterator(list)
# cursor, start_index, iterator, index_iterator
Iterator.new(list.head, 0, ->(node) { node.next_node }, ->(i) { i + 1 })
end
def self.right_to_left_iterator(list)
# cursor, start_index, iterator, index_iterator
Iterator.new(list.tail, list.size - 1, ->(node) { node.prev_node }, ->(i) { i - 1 })
end
# ...
end
end
listing 7.33 The new Iterator class in the List class
We define the starting index for the iteration, 0
in the left to right iteration, size - 1
for a left to right iteration.
The cursor
attribute of the Iterator
instance will be set to each node of the list, its initial value is list.head
if we start from the left and list.tail
if we start from the right.
The cursor_iterator
attribute is a proc that determines how to get the following node in the list. If we're iterating from left to right, we need to get the node at next_node
, and if we're iterating from right to left, we need to get the node at prev_node
.
Similarly, index_iterator
is a proc that updates the current index value, it increments it for a left to right iteration and decrements it for a right to left iteration.
within_bounds
is a proc that returns a boolean for an index, which indicates if the current index is within the bounds of the requested range. If we're iterating from left to right, as soon as the current index reaches the value of start
, we need to start accumulating the elements we find. The condition is different for a right to left iteration, we need to start accumulating elements as soon as the current index is lower or equal to stop. Let's look at few examples to illustrate this:
list = [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ] # size is 7, last index is 6
If start
is 1
and stop
is 2
, the returned list should be [ 'b', 'c' ]
and we'll start iterating from the head. index
will be initialized at 0
, on the next iteration it'll be 1
, which will cause the within_bounds
proc to return true
since 1 >= 1
. On the next iteration index
will be two and within_bounds
will return true
again. On the next iteration index
will be 3, which it outside the bounds of the given range, so we could return at this point.
We achieve this with the stop_condition
proc. In this example it'll be index > stop
, and since 3 > 2
, the until
loop will stop.
The accumulation process works with the response
string. In a left to right iteration we append new values to this string with <<
and in a right to left operation we prepend to it with the prepend
method.
The last step of the method is to add the size of the RESP array at the beginning of the string, to follow the RESP protocol. We do this with response.prepend("*#{ size }\r\n")
.
Now that we can add elements to a list, and read elements from a list, it's time to add commands to remove elements.
Removing items with LPOP & RPOP
The main two commands to remove elements from a list are LPOP
& RPOP
. LPOP
pops an element from the Left and RPOP
pops an element from the Right. Both commands accept a single arguments, the key for the list. If a pair exists for this key, but is not a list, an error is returned. If no pairs exist, a nil
string is returned. Redis does not keep empty lists in memory, as can be shown in the next example, so if the last element is popped form the list, we need to remove the pair from @db.data_store
:
127.0.0.1:6379> RPUSH a-list a b
(integer) 2
127.0.0.1:6379> LPOP a-list
"a"
127.0.0.1:6379> LPOP a-list
"b"
127.0.0.1:6379> TYPE a-list
none
Let's create the LPopCommand
class, but first, similarly to how we approached the LPUSH
& RPUSH
commands, we know we're going to need very similar functionality with LPOP
& RPOP
, so let's define shared methods in ListUtils
first:
module BYORedis
# ...
module ListUtils
# ...
def self.common_pop(db, args)
Utils.assert_args_length(1, args)
key = args[0]
list = db.lookup_list(key)
if list.nil?
NullBulkStringInstance
else
value = yield key, list
RESPBulkString.new(value)
end
end
end
# ...
class LPopCommand < BaseCommand
def call
ListUtils.common_pop(@db, @args) do |key, list|
@db.left_pop_from(key, list)
end
end
def self.describe
Describe.new('lpop', 2, [ 'write', 'fast' ], 1, 1, 1, [ '@write', '@list', '@fast' ])
end
end
end
listing 7.34 The LPopCommand using a shared method from ListUtils
We could call the not implemented yet left_pop
method from the call
method directly, but because the process of checking if the list is now empty and removing it from the keyspace is so common, we wrap this logic in the DB#left_pop_from
method. Let's create this method:
module BYORedis
class DB
# ...
def left_pop_from(key, list)
generic_pop_wrapper(key, list) do
list.left_pop
end
end
private
def generic_pop_wrapper(key, list)
popped = yield
@data_store.delete(key) if list.empty?
if popped
popped.value
else
@logger.warn("Unexpectedly popped from an empty list or a nil value: #{ key }")
nil
end
end
end
end
listing 7.35 The proxy method in the DB class to handle deletions of empty lists
left_pop_from
starts by calling generic_pop_wrapper
with the key
& list
values as well as with a block calling left_pop
on the list.
generic_pop_wrapper
allows us to define logic that is agnostic of which pop operations we're performing. It starts by calling yield
, which is assuming to pop a value from the list, either from the left or from the right. It then proceeds to delete the list from @data_store
if the list is now empty. It is expected that popped
will never be nil
, since we only keep non-empty lists in @data_store
, but as a way to be extra cautious, we log a warning and return nil
if popped
happened to be nil
.
We can now define RPopCommand
:
module BYORedis
# ...
class RPopCommand < BaseCommand
def call
ListUtils.common_pop(@db, @args) do |key, list|
@db.right_pop_from(key, list)
end
end
def self.describe
Describe.new('rpop', 2, [ 'write', 'fast' ], 1, 1, 1, [ '@write', '@list', '@fast' ])
end
end
end
listing 7.36 The RPopCommand class
Let's also define right_pop_from
in DB
:
module BYORedis
class DB
# ...
def right_pop_from(key, list)
generic_pop_wrapper(key, list) do
list.right_pop
end
end
end
end
listing 7.37 The proxy method in the DB class to handle empty list deletions after a right pop
Removing from a list and adding to another with RPOPLPUSH
A common use case for Redis is to use lists as queues, where elements are added from the left, with LPUSH
, and removes from the right, with RPOP
, this makes the list a FIFO, First In First Out, queue, where the order of insertion defines the order of processing.
For an extra layer of reliability, it can be convenient to keep the popped element in another list to make sure that even if the process in charge of processing the message fails to complete it, the message won't be lost and will still be in the processing queue.
A use case for RPOPLPUSH
An example of such use case is an e-commerce site with a subscription component. Let's imagine that there is a system in charge of processing subscriptions, each day at 1am it processes the subscriptions for the day, creating orders for customers and charging them for the amount of the order. We could implement this process by calling LPUSH
for each subscription that needs to be processed that day and having one or worker processes continuously calling RPOP
for that list. Each message would contain enough information to process the order.
This approach of queuing messages for processing by separate workers is fairly common as it allows for easy scaling of the number of workers. It only needs a single process to run on a schedule, find all the subscriptions that need to be processed and adds them to the queue. On the other end of the queue, there could be one or more workers, picking up messages and processing them. There could even be an auto-scaling systems that adds workers based on the size of the queues. If tens of thousand of subscriptions need to be processed, many workers could be started to speed up the process.
Now, imagine that the system in charge of handling the payment portion is having some issues and is causing some errors, we would still want to process these orders later on, when the payment system is back online. This is where the idea of a queue dedicated to keeping the items being processed is useful.
The worker processes would call RPOP
, and before processing the message, would call LPUSH
to add the message to the processing queue. With this approach, regardless of what happens to the worker, the message is safe in the processing list. There might be another process in charge of scanning the processing list and retrying the message at a later point. If no errors happen, the worker must delete the message from the processing queue, to signify that it is fully processed.
The problem with this approach is that things can still go wrong, networks encounter issues, and there is no guarantee that the LPUSH
operation will work, even if the RPOP
operation was successful. In other words, with RPOP
, the messages leaves the Redis server completely, and even if we try to put it back with RPUSH
, this is not guaranteed to succeed.
Another worst-case scenario, which is not that uncommon in my experience, is that something goes wrong on the client side, after receiving the message from RPOP
but before sending it back with RPUSH
. There are different ways in which something could go wrong, like an unhandled exception for instance. That being said, many workers are setup with some try/catch
/begin/rescue
wrappers to handle such cases, but the likelihood of a bug is still there.
Additionally, with some platforms such as the JVM, it is also possible to observe exceptions such as OutOfMemoryError
. These errors are usually rare, but still likely to happen, at some point, for any large enough project.
This is the problem that RPOPLPUSH
solves, in one operation, it pops an element from the right of a list, and pushes it to the left of another. The message never leaves Redis.
The RPopLPushCommand
class
If the source key is nil, a nil
string is returned, and if it exists but is not a list, an error is returned. Otherwise, it pops an element, again deleting the list from the keyspace if the list ends up empty, and pushes the element to the destination. Destination is created if it does not already exist, and an error is returned if it already exists and is not a list:
module BYORedis
class RPopLPushCommand < BaseCommand
def call
Utils.assert_args_length(2, @args)
source_key = @args[0]
source = @db.lookup_list(source_key)
if source.nil?
NullBulkStringInstance
else
destination_key = @args[1]
if source_key == destination_key && source.size == 1
source_tail = source.head.value
else
destination = @db.lookup_list_for_write(destination_key)
source_tail = @db.right_pop_from(source_key, source)
destination.left_push(source_tail)
end
RESPBulkString.new(source_tail)
end
end
def self.describe
Describe.new('rpoplpush', 3, [ 'write', 'denyoom' ], 1, 2, 1,
[ '@write', '@list', '@slow' ])
end
end
end
listing 7.38 The RPopLPushCommand class
We do not use any new methods here. We initially call lookup_list
for the source list, and return early if it doesn't exist. We then call lookup_list_for_write
for the destination, creating the list if it does not already exist.
We then pop from the source, with the right_pop_from
method, which deletes the list if it is now empty, and then call left_push
to the destination
list. The returned value is the element that was popped and pushed.
It's worth nothing that RPOPLPUSH
can be used with the same list as source and destination, which ends up moving the tail of the list to the head. This creates an edge case if the list has a single element, if so, we don't have to do anything, and we can simply return the head of the list, or the tail, since they're the same value. We have to do this because otherwise calling @db.right_pop_from(source_key, source)
would cause the deletion of the list, which we want to avoid.
A bunch of useful utility commands
Redis defines a few more list commands:
LINDEX
- The element at a given index
LINDEX
accepts two arguments, a key and the index of the element we want to return. Similarly to LRANGE
, LINDEX
accept negative indices. If there is an existing pair and it is not a list, an error is returned. If the index is out of bounds, a nil
string is returned.
module BYORedis
# ...
class LIndexCommand < BaseCommand
def call
Utils.assert_args_length(2, @args)
key = @args[0]
index = OptionUtils.validate_integer(@args[1])
list = @db.lookup_list(key)
if list.nil?
NullBulkStringInstance
else
value_at_index = list.at_index(index)
if value_at_index
RESPBulkString.new(value_at_index)
else
NullBulkStringInstance
end
end
end
def self.describe
Describe.new('lindex', 3, [ 'readonly' ], 1, 1, 1, [ '@read', '@list', '@slow' ])
end
end
end
listing 7.39 The LIndex class
Let's add the List#at_index
method:
module BYORedis
class List
# ...
def at_index(index)
index += @size if index < 0
return if index >= @size || index < 0
distance_to_head = index
distance_to_tail = @size - index
if distance_to_head <= distance_to_tail
iterator = List.left_to_right_iterator(self)
else
iterator = List.right_to_left_iterator(self)
end
while iterator.cursor
return iterator.cursor.value if iterator.index == index
iterator.next
end
end
end
end
listing 7.40 The List#index method
The at_index
method makes use of the Iterator
helpers, since we also want to iterate from the right side if the given index is closer to the tail. We also perform the same sanitation step to transform a negative index into a positive 0-based index.
LPOS
- The index (or indices) of element(s) matching the argument
The LPOS
command supports three options: COUNT
, MAXLEN
& RANK
. COUNT
determines the maximum number of elements that can be returned if multiple elements match the argument. By default LPOS
returns one or zero index, for the first element being equal to the argument, starting from the left, but if COUNT
is given, it returns an array, empty if no elements were found, or containing all the indices of element being equal to the argument.
RANK
can be used to skip some matches and return the n-th one, from the left with a positive rank and from the right with a negative rank. A rank value cannot be zero or negative, and a rank value of 1 is the default, return the first match starting from the head. A rank value of -1 will return the last match. A rank value of 2 will return the second match, from the left, and -2, the second to last element, or second element from the right, and so on.
The last option, MAXLEN
, can be used to limit the number of elements that will be scanned. By default LPOS
will scan up to the whole list, but with MAXLEN n
, it will stop after n elements.
The options can be combined, for instance LPOS a-list element RANK -1 MAXLEN 10
will only look at the last ten elements of the list when trying to find the index of element.
module BYORedis
ZeroRankError = Class.new(StandardError) do
def message
'ERR RANK can\'t be zero: use 1 to start from the first match, 2 from the second, ...'
end
end
NegativeOptionError = Class.new(StandardError) do
def initialize(field_name)
@field_name = field_name
end
def message
"ERR #{ @field_name } can\'t be negative"
end
end
# ...
class LPosCommand < BaseCommand
def initialize(db, args)
super
@count = nil
@maxlen = nil
@rank = nil
end
def call
Utils.assert_args_length_greater_than(1, @args)
key = @args.shift
element = @args.shift
list = @db.lookup_list(key)
parse_arguments unless @args.empty?
if list.nil?
NullBulkStringInstance
else
position = list.position(element, @count, @maxlen, @rank)
if position.nil?
NullBulkStringInstance
elsif position.is_a?(Array)
RESPArray.new(position)
else
RESPInteger.new(position)
end
end
rescue ZeroRankError, NegativeOptionError => e
RESPError.new(e.message)
end
def self.describe
Describe.new('lpos', -3, [ 'readonly' ], 1, 1, 1, [ '@read', '@list', '@slow' ])
end
private
def parse_arguments
until @args.empty?
option_name = @args.shift
option_value = @args.shift
raise RESPSyntaxError if option_value.nil?
case option_name.downcase
when 'rank'
rank = OptionUtils.validate_integer(option_value)
raise ZeroRankError if rank == 0
@rank = rank
when 'count'
count = OptionUtils.validate_integer(option_value)
raise NegativeOptionError, 'COUNT' if count < 0
@count = count
when 'maxlen'
maxlen = OptionUtils.validate_integer(option_value)
raise NegativeOptionError, 'MAXLEN' if maxlen < 0
@maxlen = maxlen
else
raise RESPSyntaxError
end
end
end
end
end
listing 7.41 The LPosCommand class
The LPosCommand
class takes care of parsing the list of arguments and storing the values in the @count
, @maxlen
& @rank
instance variables. We perform a few validations to make sure that if a rank is given it is an integer, but not zero and that MAXLEN
& COUNT
are positive integers.
We add the RESPSyntaxError
class to the utils.rb
file now that we need to use it from more than one command, for SET
& LPOS
.
module BYORedis
# ...
RESPSyntaxError = Class.new(StandardError) do
def message
'ERR syntax error'
end
end
# ...
end
listing 7.42: Adding RESPSyntaxError
to utils.rb
We also need to add it to the list of exceptions rescued in BaseCommand
.
module BYORedis
class BaseCommand
# ...
def execute_command
call
rescue InvalidArgsLength => e
@logger.debug e.message
command_name = self.class.describe.name.upcase
e.resp_error(command_name)
rescue WrongTypeError, RESPSyntaxError, ValidationError => e
e.resp_error
end
end
end
listing 7.43 Updates to the BaseCommand class to support RESPSyntaxError exceptions
The actual logic is in the List#position
method:
module BYORedis
class List
# ...
def position(element, count, maxlen, rank)
return if count && count < 0
return if @size == 0
return if rank && rank == 0
match_count = 0
maxlen = @size if maxlen == 0 || maxlen.nil?
indexes = [] if count
if rank.nil? || rank >= 0
iterator = List.left_to_right_iterator(self)
else
iterator = List.right_to_left_iterator(self)
end
while iterator.cursor
if (rank.nil? || rank >= 0) && iterator.index >= maxlen
break
elsif (rank && rank < 0) && (@size - iterator.index - 1) >= maxlen
break
end
if element == iterator.cursor.value
match_count += 1
reached_rank_from_head = rank && rank > 0 && match_count >= rank
reached_rank_from_tail = rank && rank < 0 && match_count >= (rank * -1)
if rank.nil? || reached_rank_from_head || reached_rank_from_tail
return iterator.index if indexes.nil?
indexes << iterator.index
end
return indexes if indexes && indexes.size == count
end
iterator.next
end
indexes
end
end
end
listing 7.44 The List#position method
There's a lot going on in List#position
, let's break it down one step at a time:
The first three lines are sanity checks for the three optional arguments, if any of these values are invalid, there's no need to continue.
We initialize a variable to count the number of matches, match_count
. We also give maxlen
a default value of the size of the list if it wasn't set. If a count value is given then we instantiate an array to store all the indices that need to be returned.
If no rank is given or if rank is positive, we create a left to right iterator, otherwise we create a right to left iterator, we then iterate through the list with the newly created iterator.
If we are in a left to right iteration, checked with the if (rank.nil? || rank >= 0)
condition, we can stop iterating once the index of the iterator reached maxlen
, we've seen enough elements, there's no need to continue. We perform a similar check in the case of a right to left iteration, checked with if (rank && rank < 0)
, but this time we know that we've seen enough elements if @size - iterator.index - 1
is greater than or equal to maxlen
. Using a list of size 10 as an example, if maxlen
is set to 3, and rank is negative, we need to inspect the last three values, at index 9, 8 & 7. So once we reach index 6, 10 - 6 - 1
is equal to 3
, which is the value of maxlen
and the loop will exit. If neither of these conditions match, we need to keep going through the list.
If the current element does not equal element
, we can jump to the next element in the list, and ignore it, otherwise, we found a match, and the steps to take depend on the rank
and maxlen
arguments.
Regardless of the arguments value, we found a new match, so we increment match_count
.
If a rank value was given, we need to check if the current match should be accounted for based on the rank value. In other words, we might have to ignore the match. For instance, if rank is 3, and this is the first match, we need to ignore the match, neither reached_rank_from_head
or reached_rank_from_tail
will be initialized. On the other hand, if we're dealing with the third match, then reached_rank_from_head
will be true. The reached_rank_from_tail
variable works the same way but for negative ranks.
If no rank was given, or if either of the two previous variables was set to true, we have found a valid match. If indexes
is nil, there's no array to accumulate the values into, no count was given and we can return right away. Otherwise, we add the match to the indexes
variable.
Once these checks have been performed for the rank value, we need to check if we can return right away, which is the case if no count was given, or if there is a count and we have found enough matches.
LINSERT
- Add a new element before or an after an element in the list
LINSERT
inserts a new element before or after a given pivot. If no values in the list match the pivot, it does nothing and return -1.
module BYORedis
class LInsertCommand < BaseCommand
def call
Utils.assert_args_length(4, @args)
if ![ 'before', 'after' ].include?(@args[1].downcase)
raise RESPSyntaxError
else
position = @args[1].downcase == 'before' ? :before : :after
end
pivot = @args[2]
element = @args[3]
list = @db.lookup_list(@args[0])
return RESPInteger.new(0) if list.nil?
new_size =
if position == :before
list.insert_before(pivot, element)
else
list.insert_after(pivot, element)
end
RESPInteger.new(new_size)
end
def self.describe
Describe.new('linsert', 5, [ 'write', 'denyoom' ], 1, 1, 1,
[ '@write', '@list', '@slow' ])
end
end
end
listing 7.45 The LInsertCommand
The LInsertCommand
class parses the list of arguments to see if the new element needs to be added before or after the pivot and delegates the operation to the List
class.
We now need to add the List#insert_before
and List#insert_after
methods:
module BYORedis
class List
# ...
def insert_before(pivot, element)
generic_insert(pivot) do |node|
new_node = ListNode.new(element, node.prev_node, node)
if @head == node
@head = new_node
else
node.prev_node.next_node = new_node
end
node.prev_node = new_node
end
end
def insert_after(pivot, element)
generic_insert(pivot) do |node|
new_node = ListNode.new(element, node, node.next_node)
if @tail == node
@tail = new_node
else
node.next_node.prev_node = new_node
end
node.next_node = new_node
end
end
private
def generic_insert(pivot)
cursor = @head
while cursor
break if cursor.value == pivot
cursor = cursor.next_node
end
if cursor.nil?
-1
else
@size += 1
yield cursor
@size
end
end
end
end
listing 7.46 The List#insert_before & List#insert_after methods
The first step is the same for each method, and we share the logic in the generic_insert
method. We iterate from the left, until we find the pivot, the difference occurs when we do find the pivot.
In the insert_before
case, we create a new node with the new value, with its prev_node
value to the node that was before the pivot, node.prev_node
and its next_node
value to the pivot, node
. If the pivot was not the head, it will have a prev_node
value, and we need to update the next_node
value on that node to now point to the new node. Otherwise, if it was the head, then we need to update the @head
reference to be the new node.
The element preceding the pivot is now the new node with node.prev_node = new_node
.
The logic in insert_after
is quite similar. We start by creating a new node, where prev_node
is set to the pivot, node
, and its next_node
is set to the element that follows the pivot, node.next_node
. If the pivot was the tail, then we need to update the @tail
reference, otherwise, we need to update the prev_node
value of the element following pivot to now point to the new node, node.next_node.prev_node = new_node
.
The element following the pivot is now the new node with node.next_node = new_node
.
LLEN
- Return the length of a list
The LLenCommand
class is more straightforward than the previous ones. It takes a single argument, the key for the list, and returns its length, or 0 if the list does not exist:
module BYORedis
class LLenCommand < BaseCommand
def call
Utils.assert_args_length(1, @args)
key = @args[0]
list = @db.lookup_list(key)
if list.nil?
RESPInteger.new(0)
else
RESPInteger.new(list.size)
end
end
def self.describe
Describe.new('llen', 2, [ 'readonly', 'fast' ], 1, 1, 1, [ '@read', '@list', '@fast' ])
end
end
end
listing 7.47 The LLenCommand class
LREM
- Remove one more element from a list
The LRemCommand
commands accepts three arguments, the key for the list, a count value and the element that we intend to remove. The meaning of the count argument is the following:
- count > 0: Remove elements equal to element moving from head to tail.
- count < 0: Remove elements equal to element moving from tail to head.
- count = 0: Remove all elements equal to element.
module BYORedis
class LRemCommand < BaseCommand
def call
Utils.assert_args_length(3, @args)
key = @args[0]
count = OptionUtils.validate_integer(@args[1])
element = @args[2]
list = @db.lookup_list(key)
if list.nil?
RESPInteger.new(0)
else
RESPInteger.new(list.remove(count, element))
end
end
def self.describe
Describe.new('lrem', 4, [ 'write' ], 1, 1, 1, [ '@write', '@list', '@slow' ])
end
end
end
listing 7.48 The LRemCommand class
The LRemCommand
classes performs the necessary validations, such as validating that the value for count is an integer, and then delegates the actual removal operation to the List#remove
method:
module BYORedis
class List
def remove(count, element)
delete_count = 0
if count >= 0
iterator = List.left_to_right_iterator(self)
else
iterator = List.right_to_left_iterator(self)
end
while iterator.cursor
cursor = iterator.cursor
if cursor.value == element
if @head == cursor
@head = cursor.next_node
else
cursor.prev_node.next_node = cursor.next_node
end
if @tail == cursor
@tail = cursor.prev_node
else
cursor.next_node.prev_node = cursor.prev_node
end
delete_count += 1
@size -= 1
if count != 0 && (delete_count == count || delete_count == (count * -1))
break
end
end
iterator.next
end
delete_count
end
end
end
listing 7.49 The List#remove method
The first steps should look familiar at this point. Because of the difference in logic between a negative or positive/zero count, we will need to iterate from the left or the right, and create the necessary iterator.
The main loop does nothing if the current element does not match the given element we want to remove. If there is a match, we always start by deleting the node. The deletion step in a doubly linked list requires a few steps:
- If the cursor is the head, then we update the head to be the element following the cursor
- Otherwise we update the
next_node
value of the element preceding the cursor, to point at the element following the cursor - If the cursor is the tail, then we update the list to be the element preceding the cursor
- Otherwise we update the
prev_node
value of the element following the cursor to the element preceding the cursor
Finally, we increment the delete_count
variable, and decrement the size of the list. The last step of the loop is to check if we should exit based on the count value. If count is zero, we have to iterate through the whole list to delete all matches, so we do not break early. Otherwise, we stop if we have deleted enough elements.
LSET
- Update the value of the element at the given index
The LSET
command takes three arguments, the key for the list, the index of the element to be updated, and the new value. ERR index out of range
is returned if the index is out of range. If the key does not exist, it returns the ERR no such key
error.
Like many of the previous commands, LSET
supports negative indices.
module BYORedis
class LSetCommand < BaseCommand
def call
Utils.assert_args_length(3, @args)
key = @args[0]
index = OptionUtils.validate_integer(@args[1])
new_value = @args[2]
list = @db.lookup_list(key)
if list.nil?
RESPError.new('ERR no such key')
elsif list.set(index, new_value)
OKSimpleStringInstance
else
RESPError.new('ERR index out of range')
end
end
def self.describe
Describe.new('lset', 4, [ 'write', 'denyoom' ], 1, 1, 1, [ '@write', '@list', '@slow' ])
end
end
end
listing 7.50 The LSetCommand class
The LSetCommand
class performs the necessary validations, and delegates the update process to the List#set
method. If the method returns nil
, then LSetCommand#call
returns the out of range error back to the Server
class.
module BYORedis
class List
# ...
def set(index, new_value)
# Convert a negative index
index += @size if index < 0
return if index < 0 || index >= @size
distance_from_head = index
distance_from_tail = @size - index - 1
if distance_from_head <= distance_from_tail
iterator = List.left_to_right_iterator(self)
else
iterator = List.right_to_left_iterator(self)
end
while iterator.index != index
iterator.next
end
iterator.cursor.value = new_value
end
end
end
listing 7.51 The List#set method
Like the previous commands supporting negative indices, we first convert the index to a 0-based index and return nil
if the index is out of bounds. We then perform the same optimizations we performed earlier to decide if we should initiate the iteration from the head or from the tail.
Finally, we iterate until we reach the desired index and update the value in place.
LTRIM
- Keep a subset of the list and discard the rest
The last utility command is LTRIM
, it accepts three arguments, the key for the list, a start index and a stop index. It only keeps the elements in the range delimited by the start and stop indices, all other elements are discarded. If the range is empty, then the list is deleted. Like other commands, it supports negative indices.
module BYORedis
class LTrimCommand < BaseCommand
def call
Utils.assert_args_length(3, @args)
key = @args[0]
start = OptionUtils.validate_integer(@args[1])
stop = OptionUtils.validate_integer(@args[2])
list = @db.lookup_list(key)
if list
@db.trim(key, list, start, stop)
end
OKSimpleStringInstance
end
def self.describe
Describe.new('ltrim', 4, [ 'write' ], 1, 1, 1, [ '@write', '@list', '@slow' ])
end
end
end
listing 7.52 The LTrimCommand class
The LTrimCommand
class validates that the start and stop indices are integer, and delegates the trim operation to the DB#trim
method. It always returns +OK
, even if the list does not exist.
We now need to add the DB#trim
& List#trim
methods:
module BYORedis
class DB
# ...
def trim(key, list, start, stop)
list.trim(start, stop)
@data_store.delete(key) if list.empty?
end
end
end
listing 7.53 The DB#trim proxy method to handle deletions of empty lists
The DB#trim
method deletes the list from the database if the resulting list is empty.
module BYORedis
class List
# ...
def trim(start, stop)
current_head = @head
# Convert negative values
stop = @size + stop if stop < 0
stop = @size - 1 if stop >= @size
start = @size + start if start < 0
start = 0 if start < 0
if start >= @size || start > stop
@size = 0
@head = nil
@tail = nil
return
end
return if start == 0 && stop == @size - 1
distance_to_start = start
distance_to_stop = @size - stop - 1
if distance_to_start <= distance_to_stop
iterator = List.left_to_right_iterator(self)
target_index = start
else
iterator = List.right_to_left_iterator(self)
target_index = stop
end
new_head = nil
new_tail = nil
while iterator.index != target_index
iterator.next
end
# We reached the closest element, either start or stop
# We first update either the head and the nail and then find the fastest way to get to the
# other boundary
if target_index == start
new_head = iterator.cursor
target_index = stop
# We reached start, decide if we should keep going right from where we are or start from
# the tail to reach stop
if distance_to_stop < stop - iterator.index
iterator = List.right_to_left_iterator(self)
end
else
new_tail = iterator.cursor
target_index = start
# We reached stop, decide if we should keep going left from where we are or start from
# the head to reach start
if distance_to_start < iterator.index - start
iterator = List.left_to_right_iterator(self)
end
end
while iterator.index != target_index
iterator.next
end
# We now reached the other boundary
if target_index == start
new_head = iterator.cursor
else
new_tail = iterator.cursor
end
@head = new_head
@head.prev_node = nil
# If start == stop, then there's only element left, and new_tail will not have been set
# above, so we set here
if start == stop
new_tail = new_head
@size = 1
else
# Account for the elements dropped to the right
@size -= (@size - stop - 1)
# Account for the elements dropped to the left
@size -= start
end
@tail = new_tail
@tail.next_node = nil
end
end
end
listing 7.54 The List#trim method
We perform the usual steps with negative indices, converting to 0-based indices. We then check if they're out of order and if so, clear the list right away, there's no need to iterate in this case. We also check that if start == 0 && stop == -1
, in this case, there is nothing to do, we can keep the list as is.
This method is pretty long and pretty verbose in order to minimize the number of empty iterations required to reach the nodes at indices start
& stop
. The algorithm can be described as:
- First, find the fastest node to get to. It is
start
if the distance between the head andstart
is lower than the distance betweenstop
and the tail. It isstop
otherwise. - Once the closest node is reached, try to reach the second boundary in the most efficient manner, there are four options:
- if the node we reached is
start
, we can reachstop
in two ways: - continue with the same iterator if
stop
is closer to the iterator than it is from the tail - iterate from the tail with a new iterator otherwise
- if the node we reached is
stop
, we can reachstart
in two ways: - continue with the same iterator if
start
is closer to the iterator than it is from the head - iterate from the head with a new iterator otherwise
- if the node we reached is
- Once we found each nodes, we update
@head
&@tail
accordingly and adjust the size to accommodate for the dropped nodes.
This algorithm means that calling LTRIM 997 998
on a 1,000 element list will start from the right, reach the stop
node in one iteration, and continue with the same iterator to reach the start
node.
Likewise, calling LTRIM 1 2
on a 1,000 element list will start from the left, reach the start
node in one iteration and continue with the same iterator to reach the stop
node.
Finally, calling LTRIM 1 998
on a 1,000 element list will start from the left, reach the start
node in one iteration and create a new iterator, to start from the tail to reach the stop
node in one iteration.
We've now implemented most of the commands, there are only three more commands, that implement existing logic, but in a blocking manner.
The blocking variants, BLPOP, BRPOP & BRPOPLPUSH
The last three commands we need to implement are almost identical to LPOP
, RPOP
& RPOPLPUSH
, with the difference that they are Blocking.
Let's start with the BLPOP
command. It accepts two arguments or more. The first one is the key for a list, and the last one must be a number, integer or float, which is the timeout the command can block for. It accepts more than one list keys. The following are valid BLPOP
commands: BLPOP a 1
, BLPOP a b c 1.2
. But the following is invalid BLPOP a b
, b
is not a valid number and cannot be used to describe a timeout. It's worth noting that since keys can be numbers, which Redis represents at Strings internally, the following is valid BLPOP 1 1
. It means: "Block for up to 1s for the key '1'".
A timeout value of 0
means an infinite timeout. The server will only unblock the client if a new element is added to one of the lists it is blocked on.
Redis looks at all the keys from left to right, and as soon as it finds a list, it will pop an element from the left and return it, alongside the key, so that the client knows which list the element was popped from:
127.0.0.1:6379> LPUSH b b1
(integer) 1
127.0.0.1:6379> BLPOP a b 1
1) "b"
2) "b1"
In the previous example, if the list at key a
had contained at least one element, it would have been returned instead.
The blocking behavior happens when all the lists are empty, in this case Redis will not send a response to the client, effectively blocking it. Any other commands received while blocked will be accumulated and replied to once the blocking operation is unblocked.
There are three conditions that can unblock a client:
- One of the keys the client is blocked on receives a push before the timeout expires
- The timeout expires before any of the list receives a push
- The client disconnects before any of the two previous conditions are met
In the first case, the return value is the same as if the command returned right away without blocking, it returns the key of the list and the element that was popped.
In the second case, it returns a nil array.
In the last example, the client disconnects, so whether one of the list receives a push, or the timeout expires, nothing will happen, the server needs to clear its internal state to make sure it doesn't actually pop any elements since there's no clients to send the data to. Let's look at an example:
Let's block for 100 seconds in one session and close it with Ctrl-C
right after starting it:
127.0.0.1:6379> BLPOP a 100
^C
Reopen another redis-cli
shell and push a value to the list a
:
127.0.0.1:6379> RPUSH a a1
(integer) 1
127.0.0.1:6379> LRANGE a 0 -1
1) "a1"
If all went well, we did perform all these operations under 100s, but the element we pushed to the list a
was not popped, because when the client disconnected, the server knew that there was no client blocked anymore.
If we'd perform the same operations with two sessions, without closing the first one, this is what would happen:
127.0.0.1:6379> BLPOP a 100
In a second session:
127.0.0.1:6379> RPUSH a a1
(integer) 1
And back in the first session, we see the result of BLPOP
alongside the amount of time it was blocked for thanks to redis-cli
:
127.0.0.1:6379> BLPOP a 100
1) "a"
2) "a1"
(19.50s)
If we let the timeout expire, we can see that the server returns a nil array:
127.0.0.1:6379> BLPOP a 1
(nil)
(1.05s)
Note that redis-cli
displays nil strings and nil arrays the same way, with (nil)
, we can see which one it is with nc
:
> nc -c localhost 6379
BLPOP a 1
*-1
Blocking operations use cases
Before implementing the command, it's worth stopping to discuss the benefits of the blocking variants.
There are two main benefits to the blocking variants of the LPOP
, RPOP
& RPOPLPUSH
commands:
- It reduces the latency between the moment when an element is pushed to a list and when it is read by a client
- It reduces the number of potential "empty"/"useless" commands between clients and the server
Latency improvement
The latency improvement aspect is interesting if there is a benefit to elements added to the list being popped and sent to a client as fast as possible. A common example of such system is a web application processing jobs in the background.
Let's use a password reset flow as an example. It is not unusual for web applications to offer a password reset flow, so that users can reset their passwords if they forgot it. The idea is that a user provides their email address and they'll receive an email providing information about the steps to take to reset their password.
While it could be considered over-engineered, it's not unusual for web applications to use a background worker process system to implement such flow. The motivation behind this approach is that web applications often rely on third party providers to actually send emails, such as SendGrid, MailChimp or Braze. While often reliable, it is always possible that the APIs provided by these services occasionally fail, but once the email address has been read, the requests can be retried until it finally succeeds, this is where a background worker can be useful. The web server receives the email from the client, queues the request in Redis with an LPUSH
command, and returns a successful response to the client. The user will see a successful response even though the email might not have yet been actually sent. It is then up to a background worker to fetch the message from the list, with RPOP
and attempt to actually send the email, retrying if it fails.
In this scenario, we'd like the worker to pick up the message as soon as the message is pushed, but without a blocking variant, the only option we have is to use a polling mechanism. The worker issues sends RPOP
commands, if something is returned, it processes it, otherwise, it optionally waits and issues another RPOP
command. Rinse & repeat.
The problem with this approach is that if we program our workers to wait after receiving an empty response, to prevent spamming the server with unnecessary work, we could end up with a delay between when a message is pushed and when it is popped. Let's imagine that workers wait thirty seconds after each empty pop, if we're unlucky, a worker might send a pop command right before the message is pushed and only process it thirty seconds later.
While the impact in this example might be minimal, a thirty second delay for an email could be considered negligible, the fastest we process the email, the more likely we are to prevent confusion for the user. Any delays might cause confusion on their end, wondering if the process worked as expected.
Empty responses reduction
One approach that could improve the problem mentioned in the previous section would be to reduce the wait time between each poll. We started with thirty seconds in the previous example, and could decide to reduce is to one hundred milliseconds.
With such a short interval, we would we be guarantee to process the email within 100 milliseconds of the user requesting it. The problem is that the worker is now sending up to 10 requests per second, for a system that might see less than one message per minute.
This approach would create a lot of empty responses and effectively send a lot of useless commands. While Redis is optimized to process responses fast, no commands to process will always be faster that some.
The blocking variants avoid these issues by blocking the clients until an element is available, or until the timeout expires. With this approach, and with a long enough timeout, the number of empty responses can be reduced to almost zero. A worker would send the RPOP email-reset-queue 60
command, to either get directly a message from the email-reset-queue
or wait up to 60s until a message is added to the queue. An empty response would only happen if no customers request a password reset within 60s.
The blocking commands also solve the latency issue since there's now no delay between the moment when the message is pushed to the queue and when it is received by a worker.
Blocking commands caveats
There are some caveats that should be considered when using blocking commands. Redis will accumulate any further commands sent while the client that initiated the command is blocked. This means that if the application that communicates with Redis needs to perform other related operations, it would need to create a new connection.
This can be a problem for applications using connection pooling. The idea behind connection pooling is that an application will create a bunch of connections, the pool, and keep them available for future use. This removes the need for opening and closing connections, which is unnecessary since once a response has been sent, the same connection can be reused to send a different command.
This approach is particularly useful for web application where one or more processes or threads might be running to serve all the incoming HTTP requests. Let's use a multi-threaded web server as an example, running 100 threads. Let's imagine that about a tenth of the incoming requests require a connection to Redis.
A naive approach would be give each thread a connection, but this would be wasteful, given that most connections would be idle most of the time based on the 10% example we defined above. Instead, we could create a smaller number of connections, in a pool, and make each thread take a connection from the pool, use it and return it to the pool. Concurrency access concerns are really important here as only one thread should use a connection to prevent unexpected things to happen, such as two threads writing their own command to the connection, it would be impossible to read the responses and assign them to each thread.
Now, while connection pools are great to avoid creating too many connections and reuse existing connections to prevent unnecessary work with closing and creating connections, there might be issues if the pool has a fixed size and cannot grow. Different connection pool libraries might provide different features, but it is very common to have a maximum size of connections. The connection_pool gem is a common library in Ruby, and HikariCP is a very popular one in Java, both use fixed size pools.
In our example, if the pool was created with ten connections, and nine threads send a BLPOP
command with a long timeout, only one connection is left available for the other ninety threads, which would cause delay as each of these threads would need to wait for the previous command to complete.
The BLPopCommand
class
Implementing the blocking behavior will require some changes to the Server
class, but let's start by adding the BLPopCommand
class in list_commands.rb
:
module BYORedis
module ListUtils
# ...
def self.timeout_timestamp_or_nil(timeout)
if timeout == 0
nil
else
Time.now + timeout
end
end
def self.common_bpop(db, args, operation)
Utils.assert_args_length_greater_than(1, args)
timeout = OptionUtils.validate_float(args.pop, 'timeout')
list_names = args
list_names.each do |list_name|
list = db.lookup_list(list_name)
next if list.nil?
popped = yield list_name, list
return RESPArray.new([ list_name, popped ])
end
BYORedis::Server::BlockedState.new(ListUtils.timeout_timestamp_or_nil(timeout),
list_names, operation)
end
end
# ...
class BLPopCommand < BaseCommand
def call
ListUtils.common_bpop(@db, @args, :lpop) do |list_name, list|
@db.left_pop_from(list_name, list)
end
end
def self.describe
Describe.new('blpop', -3, [ 'write', 'noscript' ], 1, -2, 1,
[ '@write', '@list', '@slow', '@blocking' ])
end
end
end
listing 7.55 The BLPopCommand using a shared method from ListUtils
Most of the code in the common_bpop
method uses existing methods, the only difference is if none of the lists in list_names
exist, we return an instance of BlockedState
. This is a new struct defined in the Server
class:
module BYORedis
class Server
# ...
BlockedState = Struct.new(:timeout, :keys, :operation, :target, :client)
# ...
end
end
listing 7.56 The BlockedState Struct
The BlockedState
struct allows us to store information about blocked clients in the Server
class. This is necessary to handle both timeouts and unblocking clients when lists they are blocked on are pushed to and a response should be sent to the blocked clients. We also need to remember which type of operation was initially sent, so that we use left_pop
or right_pop
when it is time to unblock the client.
The process_poll_events
method is starting to get big, so we're extracting logic from it when processing client buffers.
module BYORedis
class Server
# ...
def process_poll_events(sockets)
sockets.each do |socket|
if socket.is_a?(TCPServer)
socket = safe_accept_client
next unless socket
@clients[socket.fileno.to_s] = Client.new(socket)
elsif socket.is_a?(TCPSocket)
client = @clients[socket.fileno.to_s]
# ...
if client_command_with_args.nil?
# ...
else
client.buffer += client_command_with_args
process_client_buffer(client)
end
else
raise "Unknown socket type: #{ socket }"
end
end
end
def process_client_buffer(client)
split_commands(client.buffer) do |command_parts|
return if client.blocked_state
response = handle_client_command(command_parts)
if response.is_a?(BlockedState)
block_client(client, response)
else
@logger.debug "Response: #{ response.class } / #{ response.inspect }"
serialized_response = response.serialize
@logger.debug "Writing: '#{ serialized_response.inspect }'"
unless Utils.safe_write(client.socket, serialized_response)
disconnect_client(client)
end
handle_clients_blocked_on_keys
end
end
rescue IncompleteCommand
# Not clearing the buffer or anything
rescue ProtocolError => e
client.socket.write e.serialize
disconnect_client(client)
end
def block_client(client, blocked_state)
if client.blocked_state
@logger.warn "Client was already blocked: #{ blocked_state }"
return
end
blocked_state.client = client
# Add the state to the client
client.blocked_state = blocked_state
if blocked_state.timeout
@db.client_timeouts << blocked_state
end
# Add this client to the list of clients waiting on this key
blocked_state.keys.each do |key|
client_list = @db.blocking_keys[key]
if client_list.nil?
client_list = List.new
@db.blocking_keys[key] = client_list
end
client_list.right_push(client)
end
end
# ...
end
end
listing 7.57 The blocking related method in the Server class
The blocked_state
instance is only added to client_timeouts
if there is a timeout. If the client specified a value of 0
, then the server will never unblock the client after a timeout and there is therefore no need to keep track of it here.
We added a new method to the Utils
module, to wrap the logic around writing to a socket, while handling potential exceptions if the socket was closed and the write operation fails:
module BYORedis
module Utils
# ...
def self.safe_write(socket, message)
socket.write(message)
true
rescue Errno::ECONNRESET, Errno::EPIPE
false
end
end
end
listing 7.58 The Utils.safe_write helper method
The block_client
method is making use of a few new elements. First it stores the BlockedState
instance in the client, so we need to update the Client
struct to add a new attribute:
module BYORedis
class Server
# ...
Client = Struct.new(:socket, :buffer, :blocked_state) do
attr_reader :id
def initialize(socket)
@id = socket.fileno.to_s
self.socket = socket
self.buffer = ''
end
end
# ...
end
end
listing 7.59 Updates to the Client Struct to support
We also need to add a few more data structures to the DB
class, namely: ready_keys
, :blocking_keys
, client_timeouts
& unblocked_clients
.
The first two, ready_keys
and blocking_keys
, are Dict
instances. With ready_keys
we use a Dict
even though we only care about keys, and will use nil
for all values. The purpose of this dictionary is essentially to be a set, a collection that does not store duplicates. Whenever a list receives a push, we will add the list's key to ready_keys
if it was blocked on. There is no need to store this information more than once, so using a Dict
as a set works perfectly. We will be able to know if it was blocked by inspecting blocking_keys
. blocking_keys
is also a dictionary where keys will be keys of lists that are being blocked on, and the values will be lists of clients blocked. By using a list as the value and appending to the list, we can maintain an order so that the first client to block for a key will be the first client to receive a response when elements will be pushed to the list.
client_timeouts
is necessary to handle clients that should receive an empty response if the timeout expired before elements were pushed to the list. Without an extra data structure to store timeouts, we would have to iterate over all the connected clients, and manually inspect their blocked_state
attribute to check if it expired or not. This would be extremely inefficient, especially given that we need to perform this check on each iteration of the event loop. This operation would have an O(n) complexity, where n is the number of connected clients. If no clients are blocked, even if thousands of clients are connected, we want to be able to know that there's no need to check for expired clients.
This is the problem that the client_timeouts
attribute on DB
solves. It is a sorted array, which we provide an implementation for in Appendix B
Whenever a blocked command is received and there is no element to return right away, a BlockedState
instance will be pushed to client_timeouts
unless the timeout was 0
. Using a regular array to store these BlockedState
instances would already be an improvement compared to the scenario described above. We would only check if clients are expired within the set of blocked clients, but we would still have to check all the clients, one by one, to see if they are expired or not. This is also a O(n) operation, where n is the number of blocked clients.
Using a sorted array turns the operation into an O(1) operation, we can inspect the first element in the sorted array, if it is not expired, there's no need to inspect any other clients, we know their expiration is later than the first one. If the first client in the sorted array is expired, we need to handle it as such and look at the next element, and stop as soon as we find a non expired client.
Using a sorted array also helps with the cleanup tasks required once a client was unblocked after the list it was blocked on received a push. When this happens, we need to remove the client from the client_timeouts
list since it is not blocked anymore. The sorted array class guarantees that this operation happens in O(logn) time, where n is the number of blocked clients, instead of O(n) if the array was not sorted.
This implementation is a deviation from the Redis one. Redis uses a Radix Tree to store client timeouts. A Radix tree provides similar time complexity for the iteration of clients from the ones with the earliest expirations, but it provides a better time complexity for the deletion use case.
We decided to not implement a Radix tree here given the complexity of such implementation. A sorted array provides a good middle ground approach, by being fairly short to implement and still being a great improvement compared to a regular array.
Finally, unblocked_clients
is a List
instance, which will be appended to when clients are unblocked, either because their timeouts expired or if the list they were blocked on received a push. Regardless of the reason, we need to check if the client sent commands while it was blocked, and process them.
module BYORedis
class DB
attr_reader :data_store, :expires, :ready_keys, :blocking_keys, :client_timeouts,
:unblocked_clients
attr_writer :ready_keys
def initialize
@logger = Logger.new(STDOUT)
@logger.level = LOG_LEVEL
@data_store = Dict.new
@expires = Dict.new
@ready_keys = Dict.new
@blocking_keys = Dict.new
@client_timeouts = SortedArray.new(:timeout)
@unblocked_clients = List.new
end
# ...
end
end
listing 7.60 New data structures in the DB class
At this point, once we're done processing a BLPOP
command, we stored the BlockedState
information in client_timeouts
and in the Client
instance. We also added a new entry if necessary in blocking_keys
, where the value is a list containing the client that issued the BLPOP
command.
We're now ready to handle the two possible outcomes for a blocked client. Either its timeout expires, in which case we return a nil
array, and process any commands sent while blocked. If one of the lists the client was blocked on receives a push before the timeout expires, we return the head of the list to the client, alongside the list key and also process any commands sent while blocked.
Handling timeouts
Let's first add two new steps to the event loop, handle_blocked_clients_timeout
and process_unblocked_clients
:
module BYORedis
class Server
# ...
def start_event_loop
loop do
handle_blocked_clients_timeout
process_unblocked_clients
timeout = select_timeout
@logger.debug "select with a timeout of #{ timeout }"
result = IO.select(client_sockets + [ @server ], [], [], timeout)
sockets = result ? result[0] : []
process_poll_events(sockets)
process_time_events
end
end
# ...
end
end
listing 7.61 Updates to the event loop in the Server class to handle blocked client timeouts
We now start each iteration of the event loop by first checking if any of the blocked clients are expired, and then processing any commands sent while blocked. As mentioned in the previous section, the time complexity of these two methods is important since they would otherwise add a delay at the beginning of each event loop iteration.
module BYORedis
class Server
# ...
def handle_blocked_clients_timeout
@db.client_timeouts.delete_if do |blocked_state|
client = blocked_state.client
if client.blocked_state.nil?
@logger.warn "Unexpectedly found a non blocked client in timeouts: #{ client }"
true
elsif client.blocked_state.timeout < Time.now
@logger.debug "Expired timeout: #{ client }"
unblock_client(client)
unless Utils.safe_write(client.socket, NullArrayInstance.serialize)
@logger.warn "Error writing back to #{ client }: #{ e.message }"
disconnect_client(client)
end
true
else
# Impossible to find more later on since client_timeouts is sorted
break
end
end
end
def unblock_client(client)
if client.socket.closed?
@logger.warn 'RETURNING EARLY'
return
end
@db.unblocked_clients.right_push client
return if client.blocked_state.nil?
# Remove this client from the blocking_keys lists
client.blocked_state.keys.each do |key2|
list = @db.blocking_keys[key2]
if list
list.remove(1, client)
@db.blocking_keys.delete(key2) if list.empty?
end
end
client.blocked_state = nil
end
def process_unblocked_clients
return if @db.unblocked_clients.empty?
cursor = @db.unblocked_clients.left_pop
while cursor
client = cursor.value
if @clients.include?(client.id)
process_client_buffer(client)
else
@logger.warn "Unblocked client #{ client } must have disconnected"
end
cursor = @db.unblocked_clients.left_pop
end
end
end
end
listing 7.62 Updates to the Server class to process timeouts for blocked clients
We need to add one more piece of logic to the server with regards to timeouts. If a client disconnects before its timeout expired, we want to clean things of up so that we don't keep track of this client anymore. We need to more that deleting the client from the @clients
array in process_poll_events
.
module BYORedis
class Server
# ...
def safe_accept_client
@server.accept
rescue Errno::ECONNRESET, Errno::EPIPE => e
@logger.warn "Error when accepting client: #{ e }"
nil
end
def safe_read(client)
client.socket.read_nonblock(1024, exception: false)
rescue Errno::ECONNRESET, Errno::EPIPE
disconnect_client(client)
end
def process_poll_events(sockets)
sockets.each do |socket|
if socket.is_a?(TCPServer)
socket = safe_accept_client
next unless socket
@clients[socket.fileno.to_s] = Client.new(socket)
elsif socket.is_a?(TCPSocket)
client = @clients.find { |client| client.socket == socket }
client_command_with_args = socket.read_nonblock(1024, exception: false)
client = @clients[socket.fileno.to_s]
client_command_with_args = safe_read(client)
if client_command_with_args.nil?
disconnect_client(client)
elsif client_command_with_args == :wait_readable
# There's nothing to read from the client, we don't have to do anything
next
elsif client_command_with_args.empty?
@logger.debug "Empty request received from #{ socket }"
else
# ...
end
end
end
end
def disconnect_client(client)
@clients.delete(client.id)
@db.unblocked_clients.remove(1, client)
if client.blocked_state
@db.client_timeouts.delete(client.blocked_state)
client.blocked_state.keys.each do |key|
list = @db.blocking_keys[key]
if list
list.remove(1, client)
@db.blocking_keys.delete(key) if list.empty?
end
end
end
client.socket.close
end
# ...
end
end
listing 7.63 Updates to the Server class to handle clients disconnection
Handling blocked clients before timeout
We need another handler, for when a list that is being blocked on, for instance either a
or b
after receiving BLPOP a b 1
, receives a push and is created. New lists are always created from DB#lookup_list_for_write
, so let's add a condition there, that will notify that one key that is being blocked on can now be used to unblock one or more clients:
module BYORedis
class DB
def lookup_list_for_write(key)
list = lookup_list(key)
if list.nil?
list = List.new
@data_store[key] = list
if @blocking_keys[key]
@ready_keys[key] = nil
end
end
list
end
end
end
listing 7.64 Updates to the DB#lookup_list_for_write to notify for list creation
blocking_keys
is populated when we process the result of a blocking command. It is a dictionary that contains a list of clients blocked for that key. When a new list is created, we inspect blocking_keys
, if there's no entry, then no clients are blocked on this key, on the other hand, if there is one or more clients, then we know that the key is being blocked on and we add it to @ready_keys
. The value of the pair does not matter here, we only care about having an entry in the dictionary.
module BYORedis
class Server
# ...
def initialize
@logger = Logger.new(STDOUT)
@logger.level = LOG_LEVEL
@clients = Dict.new
@db = DB.new
@blocked_client_handler = BlockedClientHandler.new(self, @db)
@server = TCPServer.new 2000
@time_events = []
@logger.debug "Server started at: #{ Time.now }"
add_time_event(Time.now.to_f.truncate + 1) do
server_cron
end
start_event_loop
end
# ...
def handle_clients_blocked_on_keys
return if @db.ready_keys.used == 0
@db.ready_keys.each do |key, _|
unblocked_clients = @blocked_client_handler.handle(key)
unblocked_clients.each do |client|
unblock_client(client)
end
end
@db.ready_keys = Dict.new
end
end
end
listing 7.65 Blocked client handling in the Server class
handle_clients_blocked_on_keys
is called from process_poll_events
, after processing the commands from the clients. If the command did not result in the creation of a list, then nothing will happen, no clients can be unblocked. If the command did create a list, then @db.ready_keys
will have received elements and @blocked_client_handler.handle
will be called for each of these keys.
The code in charge of handling blocked client lives in its own class, BlockedClientHandler
.
module BYORedis
class BlockedClientHandler
def initialize(server, db)
@server = server
@db = db
@logger = Logger.new(STDOUT)
@logger.level = LOG_LEVEL
end
def handle(key)
clients = @db.blocking_keys[key]
unblocked_clients = []
list = @db.data_store[key]
if !list || !list.is_a?(List)
@logger.warn "Something weird happened, not a list: #{ key } / #{ list }"
raise "Unexpectedly found nothing or not a list: #{ key } / #{ list }"
end
raise "Unexpected empty list for #{ key }" if list.empty?
cursor = clients.left_pop
while cursor
client = cursor.value
if handle_client(client, key, list)
unblocked_clients << client
end
if list.empty?
break
else
cursor = clients.left_pop
end
end
@db.blocking_keys.delete(key) if clients.empty?
unblocked_clients
end
def handle_client(client, key, list)
blocked_state = client.blocked_state
# The client is expected to be blocked on a set of keys, we unblock it based on the key
# arg, which itself comes from @db.ready_keys, which is populated when a key that is
# blocked on receives a push
# So we pop (left or right) from the list at key, and send the response to the client
if client.blocked_state
response = pop_operation(key, list, blocked_state.operation, blocked_state.target)
serialized_response = response.serialize
@logger.debug "Writing '#{ serialized_response.inspect } to #{ client }"
unless Utils.safe_write(client.socket, serialized_response)
# If we failed to write the value back, we put the element back in the list
rollback_operation(key, response, blocked_state.operation, blocked_state.target)
@server.disconnect_client(client)
return
end
else
@logger.warn "Client was not blocked, weird!: #{ client }"
return
end
true
end
private
def pop_operation(key, list, operation, target)
case operation
when :lpop
RESPArray.new([ key, @db.left_pop_from(key, list) ])
when :rpop
RESPArray.new([ key, @db.right_pop_from(key, list) ])
when :rpoplpush
raise "Expected a target value for a brpoplpush handling: #{ key }" if target.nil?
ListUtils.common_rpoplpush(@db, key, target, list)
else
raise "Unknown pop operation #{ operation }"
end
end
def rollback_operation(key, response, operation, target_key)
list = @db.lookup_list_for_write(key)
case operation
when :lpop
element = response.underlying_array[1]
list.left_push(element)
when :rpop
element = response.underlying_array[1]
list.right_push(element)
when :rpoplpush
target_list = @db.lookup_list(target_key)
element = target_list.left_pop
@db.data_store.delete(target_key) if target_list.empty?
list.right_push(element.value)
else
raise "Unknown pop operation #{ operation }"
end
end
end
end
listing 7.66 The BlockedClientHandler class
This is the final step required to handle blocked clients. We are processing all the keys that can be used to unblock clients. A command can push more than one element, so we might be able to unblock more than one client. We start this process by getting the list of blocked clients for this key, in @db.blocked_clients
. Clients are added with right push operations, so we use left_pop
here to get them in order of insertions. The first client to block on this list will be processed first. We also get the recently created list that will pop elements from to unblock clients.
For each client, we call handle_client
. In handle_client
we pop an element from the list, according to the operation that the client blocked with, either left_pop
or right_pop
, and we send the response back to the client. In case of a failure, we return the element back to the list and disconnect the client.
Back in BlockedClientHandler#handler
, we accumulate a list of clients that were unblocked, which will in turn be returned to the Server
class. This list is used to call unblock_client
, which will flag the client to be processed in case there were accumulated commands sent while blocked.
We then check if the list is empty, if it is, we cannot unblock anymore clients and exit the iteration. Otherwise we continue and unblock the next client. unblock_client
also takes care of cleaning up the server state. Now that a client is unblocked, it should not be in any of the lists in @db.blocking_keys
. We also need remove the entry in the sorted array of timeouts.
If we were able to unblock all the clients, we remove the key from blocking_keys
.
With all these changes, we can implement the similar BRPopCommand
class:
module BYORedis
# ...
class BRPopCommand < BaseCommand
def call
ListUtils.common_bpop(@db, @args, :rpop) do |list_name, list|
@db.right_pop_from(list_name, list)
end
end
def self.describe
Describe.new('brpop', -3, [ 'write', 'noscript' ], 1, -2, 1,
[ '@write', '@list', '@slow', '@blocking' ])
end
end
end
The implementation is almost identical to BLPopCommand
. The last class to add is BRPopLPushCommand
:
module BYORedis
module ListUtils
# ...
def self.common_rpoplpush(db, source_key, destination_key, source)
if source_key == destination_key && source.size == 1
source_tail = source.head.value
else
destination = db.lookup_list_for_write(destination_key)
source_tail = db.right_pop_from(source_key, source)
destination.left_push(source_tail)
end
RESPBulkString.new(source_tail)
end
end
# ...
class RPopLPushCommand < BaseCommand
def call
Utils.assert_args_length(2, @args)
source_key = @args[0]
source = @db.lookup_list(source_key)
if source.nil?
NullBulkStringInstance
else
destination_key = @args[1]
ListUtils.common_rpoplpush(@db, source_key, destination_key, source)
end
end
# ...
end
end
# ...
class BRPopLPushCommand < BaseCommand
def call
Utils.assert_args_length(3, @args)
source_key = @args[0]
source = @db.lookup_list(source_key)
timeout = OptionUtils.validate_float(@args[2], 'timeout')
destination_key = @args[1]
if source.nil?
BYORedis::Server::BlockedState.new(ListUtils.timeout_timestamp_or_nil(timeout),
[ source_key ], :rpoplpush, destination_key)
else
ListUtils.common_rpoplpush(@db, source_key, destination_key, source)
end
end
def self.describe
Describe.new('brpoplpush', 4, [ 'write', 'denyoom', 'noscript' ], 1, 2, 1,
[ '@write', '@list', '@slow', '@blocking' ])
end
end
end
The code here is almost identical to the one in RPopLPushCommand
, with the difference that it returns BlockedState
instance if the source list does not exist. One difference with the previous two commands is that we need to store the target of the pop, the list in which we'll push the element to.
We also extracted some shared logic between RPopLPushCommand
& BRPopLPushCommand
to ListUtils.common_rpoplpush
to prevent code repetition. This shared code is important as it handles the edge case where both source
& destination
are the same list and only contains one element.
Tests
We added a few more test files in this chapter, and placed all of them in the test/
sub folder, we also added and a task in Rakefile
to run all the tests:
require 'rake/testtask'
Rake::TestTask.new do |t|
t.pattern = 'test/*test.rb'
end
listing 7.67 Rakefile content
The new test files allow us to easily run a sub section of the test suite, the following is a list of the new files:
list_test.rb
dict_unit_test.rb
command_test.rb
sorted_array_unit_test.rb
list_unit_test.rb
We can run all the tests with rake test
, or single files with ruby list_test.rb
for instance.
Conclusion
In this chapter we added full list support to our Server. We also added the TYPE
command which allows us to inspect the type of values added to the database. The current types are string
, list
and none
if the given key does not exist.
We also made a lot of changes to the server to support blocking commands.
In the next chapter we'll add support for another Redis data type, Hashes.
Code
You can find the code on GitHub
Appendix A: Ziplist Deep Dive
Ziplists are implemented in the ziplist.h
& ziplist.c
files. The ziplist.c
file contains a great explanation of how ziplists work. In this appendix we provide a few more details as well as some examples.
Ziplists were added to Redis in 2014 by Twitter engineers who apparently needed a more compact data structure to store Twitter timelines, essentially lists of ids. You can read more about it on the Pull Request page on GitHub and on the blog of one of the authors here and there.
A ziplist is a contiguous chunk of memory, that can store integers and strings, with the following layout:
<zlbytes> <zltail> <zllen> <entry> <entry> ... <entry> <zlend>
-
zlbytes
is an unsigned 32-bit integer (uint32_t), which describes the number of bytes used by the ziplist, including itself. -
ztail
is also an unsigned 32-bit integer, which is an offset to the last element in the list. It serves a purpose similar to@tail
in the implementation used in this chapter. -
zllen
is an unsigned 16-bit integer (uint16_t), it keeps count of the number of entries. Because it is an unsigned 16 bit integer, its maximum value is 65,535 (2^16 - 1). In order to allow ziplist to hold more elements, this ziplist implementation knows that the maximum value that can be described by this integer is 65,534 (2^16 - 2), and if the value of the integer is 65,535, it will scan the whole list to count the number of items. In practice, Redis caps the size of the ziplists it creates, the default is 2Kb, and this scenario is extremely unlikely to happen. -
zlend
is a single byte (uint8_t), and is set to 255, the maximum value of a byte, 2^8 - 1, or1111 1111
/FF
in binary and hexadecimal representations respectively.
The following elements are the actual entries in the list. Each entry has the following layout:
<prevlen> <encoding> <entry-data>
-
prevlen
represents the length of the previous entry, allowing for right to left. It serves a purpose similar to theprev_node
attribute in the list used in this chapter, but uses a dynamic size and will often be smaller. If the length of the previous entry in bytes is smaller than 254 bytes, prevlen will be stored in a single byte (uint8_t). The maximum value of a uint8_t is 255 (2^8 - 1). Otherwise, it will consumes 5 bytes, the first byte will be set to 254, the maximum value of a byte, and the following. As a reminder, 255 cannot be used since this value is used to flag the end of the list. This means that an entry cannot have a length greater than what can fit in a 32 bit integer, since the length is stored over 4 bytes, 32 bits. This sets the maximum length of a string stored by a ziplist to 4,294,967,295 (2^32 - 1). Which is, admittedly, a very large string. -
encoding
describes how the entry data is actually stored. It will take either one, two or five bytes. The first two bits of the encoding determines the type,11
means an integer, and anything else, that is either00
,01
or10
, means a string.
String encoding
- If the encoding starts with
00
, then the encoding itself will only occupy a single byte. This means that we can only use the remaining 6 bits of the encoding byte to encode the length of the string. The maximum value that can be encoded with 6 bits is 63 (2^6 - 1). This means that if the encoding byte is0000 0001
, the string that is stored has a length of 1. With0000 0100
, the string has a length of 4, and with0011 1111
, it has a length of 63. - If the encoding starts with
01
, then the encoding occupies two bytes, which leaves us with 14 bits, the 6 bits of the first byte, and the 8 bits of the second bytes. This encoding can describe a string with a length up to 16,383 (2^14 - 1). As an examples,0100 0001 0000 0000
describes a string of length 256. - If the encoding starts with
10
, then the encoding will use five bytes. The 6 extra bits of the first byte are not used since we can use the other four bytes to hold a 32-bit value, which is the maximum length of a string in a ziplist. This encoding will only be used if strings have a length greater than 16,383.
Integer encoding
There are six variants of the integer encoding:
- If the encoding is
1100 0000
, it describes a signed integer that occupies 2 bytes (int16_t), which value can go up to 32,767 (2^15 - 1) - If the encoding is
1101 0000
, it describes a signed integer that occupies 4 bytes (int32_t), which value can go up to 2,147,483,648 (2^31 - 1) - If the encoding is
1110 0000
, it describes a signed integer that occupies 8 bytes (int64_t), which value can go up to 18,446,744,073,709,551,616 (2^63 - 1) - If the encoding is
1111 0000
, it describes a signed integer that occupies 3 bytes (24 bits), which value can go up to 8,388,607 (2^23 - 1) - If the encoding is
1111 1110
, it describes a signed integer that occupies 1 byte (8 bits), which value can go up to 128 (2^7 - 1) - Finally, if the first four bits are
1111
, then the last four bits are used to hold an unsigned integer value itself. This means that technically we'd be bound to storing integers from 0 up to 15 (2^4 - 1), but the reality is actually more restrictive. We cannot store 0 because1111 0000
means the 3 byte encoding described above. We also cannot store 15, because1111
would make the whole byte1111 1111
and this value, 255, is reserved for the end of list marker. We also cannot store 14,1110
, because1111 1110
means the 1 byte encoding described above. This narrows the range of possible values from 1 (0001
) to 13 (1101
). Redis actually applies an offset here, to allow numbers from 0 to 12 to be stored with this encoding, so if you store 0 in a Ziplist, it actually stores 1, if you store 12, it actually stores 13. Storing -1 or 13 would require the next encoding type, the one using a full extra byte.
The DEBUG ZIPLIST
command
Redis provides a few commands for debug purposes. One of them prints details about the ziplist if the key is itself stored as a ziplist. As it turns out, a list is never stored as a ziplist, it is always a quicklist of ziplists. But other Redis data types can use ziplists, for instance, a hash with a few items, less than 512 entries and the values are shorter than 64 bytes. Note that depending on how you run Redis on your machine, things will be different. The DEBUG ZIPLIST
command does not print the results in the REPL but instead prints them on standard out. I often run a manually built from source Redis, which makes it easy to get the logs since they are by default printed to the terminal where you started the server, but if you run Redis through homebrew, the logs will be located somewhere else.
Let's try it:
127.0.0.1:6379> hset foo name pierre
(integer) 1
127.0.0.1:6379> DEBUG ZIPLIST foo
Ziplist structure printed on stdout
And on stdout:
{total bytes 25} {num entries 2}
{tail offset 16}
{
addr 0x7f8f1350d8ca,
index 0,
offset 10,
hdr+entry len: 6,
hdr len 2,
prevrawlen: 0,
prevrawlensize: 1,
payload 4
bytes: 00|04|6e|61|6d|65|
[str]name
}
{
addr 0x7f8f1350d8d0,
index 1,
offset 16,
hdr+entry len: 8,
hdr len 2,
prevrawlen: 6,
prevrawlensize: 1,
payload 6
bytes: 06|06|70|69|65|72|72|65|
[str]pierre
}
{end}
The previous shows an example of a ziplist containing two items, each is a string. The first entry occupies 6 bytes, and the second one eight. Let's look closer at all the bytes:
The first byte, 00
, represents the length of the previous item, which is zero since this is the head of the list. The second byte is 04
, which is the first encoding byte. The encoding can be between one and 5 bytes, so we need to look at the values, bit by bit, to determine which encoding it is, which will tell us how the data is stored. 04
has the following binary representation: 0000 0100
. Looking at the "String encoding" section above, this fits in the first bullet point, a string described with a single byte, because it starts with 00
. We then need to look at the other six bytes to determine the length of the string, 000100
is the number four, so the string has a length of four.
The last four bytes in the first entry are 00
, 04
, 6e
, 61
, 6d
& 65
. The string stored is name
, which holds four bytes: 6e
, 61
, 6d
& 65
. The values are the ascii representation of the letters n
, a
, m
& e
.
We need to continue as long as we don't encounter the FF
byte, or 255
. The next six bytes are 06
, 06
, 70
, 69
, 65
, 72
, 72
& 65
.
The first byte is the length of the previous item, 06
, the hex representation of the number 6, which is what we found when looking at the previous entry, so far so good. The second byte is the first byte of the encoding, 06
, or 0000 0110
in binary. Similarly as for the previous string, 00
tells us it's a string with a length lower than 63, and we can see it has a length of 6. The next six bytes represent the letters p
, i
, e
, r
, r
, & e
in ASCII and the whole entry has a length of 8.
We can now see why the total bytes
value is 25, 4 for the zlbytes
field, 4 for the zltail
field, 2 for the zllen
field, 6 for the first entry and 8 for the second entry and 1 for the end of list marker:
4 + 4 + 2 + 6 + 8 + 1 = 25
. A basic doubly linked list would have required 20 bytes for the first node, 2 8-byte pointers and 4 bytes of data, 22 bytes for the second node, 2 8-byte pointers and 6 bytes of data, as well as 16 bytes for the head and tail pointers, for a grand total of 62. In this small example a ziplist is less than half the size!
The problem with ziplists is that adding and removing element require a reallocation of memory, and insertions/deletions in the middle of the list become expensive as the list grows since the whole list needs to be rearranged.
The tail offset
field is 16 since the list header occupies 10 bytes, 4, 4 & 2, and the first item takes 6, that's 16.
Let's look at two more examples, one with an integer, and one with a string using a different encoding:
127.0.0.1:6379> hset foo age 30
(integer) 1
127.0.0.1:6379> DEBUG ZIPLIST foo
The following is the debug information, the first two entries are the same as previously and were omitted.
{total bytes 33} {num entries 4}
{tail offset 29}
...
{
addr 0x7f8681004338,
index 2,
offset 24,
hdr+entry len: 5,
hdr len 2,
prevrawlen: 8,
prevrawlensize: 1,
payload 3
bytes: 08|03|61|67|65|
[str]age
}
{
addr 0x7f868100433d,
index 3,
offset 29,
hdr+entry len: 3,
hdr len 2,
prevrawlen: 5,
prevrawlensize: 1,
payload 1
bytes: 05|fe|1e|
[int]30
}
{end}
Looking at the new bytes, 08
is the length of the previous entry, 8, 03
is the encoding for age
, a string of length 3, and the three following bytes are the three letters. For the last entry, the length of the previous entry is 5, and the encoding is fe
, or 1111 1110
in binary. This is the one byte integer encoding, which makes sense given that 30 is too big to use the "encoded within encoding header" approach, where the max value is 12 and is small enough to fit within a single byte, where the max value is 127. If we had set the age to 128, the bytes would have been 05
, c0
,80
& 00
. c0
is the hex representation of 192/1100 0000
, which represents the 2 byte signed integer encoding, which is why it is followed by two bytes, 80
& 00
. 80
is the hex representation of 1000 0000
, aka 128. Note that 128 is encoded as 80
00
and not 00
80
. This is because Redis represents integers as little endian in Ziplists, the least significant bytes come first. There are two exceptions where integers are stored in big endian, for two of the different string encoding, the one using two bytes and the one using five bytes.
Let's now look at a sixty-four byte long string:
127.0.0.1:6379> hset foo 64-char-string aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
(integer) 0
127.0.0.1:6379> DEBUG ZIPLIST foo
The following is the debug information:
{total bytes 116} {num entries 6}
{tail offset 48}
...
{
addr 0x7f867f705880,
index 4,
offset 32,
hdr+entry len: 16,
hdr len 2,
prevrawlen: 3,
prevrawlensize: 1,
payload 14
bytes: 03|0e|36|34|2d|63|68|61|72|2d|73|74|72|69|6e|67|
[str]64-char-string
}
{
addr 0x7f867f705890,
index 5,
offset 48,
hdr+entry len: 67,
hdr len 3,
prevrawlen: 16,
prevrawlensize: 1,
payload 64
bytes: 10|40|40|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|61|
[str]aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa...
}
{end}
Looking at the last entry, we see that the length of the previous entry is 10
, which the hexadecimal representation of the number 16. As a quick reminder, this is because the two characters represent 4 bits each, 1
represents 0001
and 0 is 0000
, and the byte 0001 0000
is the value 16, 2^4. The string "64-char-string"
has 14 characters, and there are two more bytes, one of the prevlen
field, and one for the encoding. That's 16. The encoding takes two bytes for the last entry, 40
& 40
. 40
is the representation of the number 64, or 0100 0000
in binary.
The first two bits of the encoding are 01
, so we know that the overall encoding will use two bytes, and the length of the string will use 14 of the 16 bytes. These 14 bits are: 00 0000 0100 000
, which is the number 64, the length of the string coming up next.
Finally, we can see that if we set a string with a length longer than 64, Redis will convert the hash from a Ziplist to a Hash table
127.0.0.1:6379> hset foo 64-char-string aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa11
(integer) 0
127.0.0.1:6379> DEBUG ZIPLIST foo
(error) ERR Not an sds encoded string.
127.0.0.1:6379> DEBUG OBJECT foo
Value at:0x7f867f62f8e0 refcount:1 encoding:hashtable serializedlength:47 lru:7660156 lru_seconds_idle:36
Note that the error message is incorrect, it should say: "Not a ziplist encoded object". This was fixed in a more recent version of Redis.
Appendix B: Sorted Array
The following is a sorted array implementation that provides O(logn) push/<<
& delete
. The implementation uses a plain Ruby array as the underlying storage, which it delegates most of its operations to.
Elements are kept sorted in the push/<<
method by using the built-in bsearch_index
method. Using this method, we compute the index at which the element should be added in the array and maintain the ordering.
The delete method also relies on bsearch_index
to find the index of the elements it needs to remove. Given that multiple elements could meet the conditions, it needs to find all the indices for the elements that should be removed. bsearch_index
will return the leftmost index of an element that should be deleted, and we keep looking to the right as look as we find matching elements and mark them for deletion.
The SortedArray
is meant to be used to hold classes holding multiple attributes, such as Struct
s, and order them based on one their fields. Its main use case in the BYORedis
codebase is to store BlockedState
instances, sorted by their timeouts.
module BYORedis
class SortedArray
def initialize(field)
@underlying = []
@field = field.to_sym
end
def push(new_element)
if @underlying.empty?
index = 0
else
index = @underlying.bsearch_index do |element|
element.send(@field) >= new_element.send(@field)
end
end
index = @underlying.size if index.nil?
@underlying.insert(index, new_element)
end
alias << push
def [](index)
@underlying[index]
end
def size
@underlying.size
end
def shift
@underlying.shift
end
def delete_if(&block)
@underlying.delete_if(&block)
end
def delete(element)
index = @underlying.bsearch_index { |x| x.send(@field) >= element.send(@field) }
return if index.nil?
element_at_index = @underlying[index]
indices_for_deletion = []
while element_at_index
if element_at_index == element
indices_for_deletion << index
end
index += 1
next_element = @underlying[index]
if next_element && next_element.send(@field) == element.send(@field)
element_at_index = next_element
else
break
end
end
indices_for_deletion.each { |i| @underlying.delete_at(i) }
end
end
end
listing 7.68 The SortedArray class
The following is a test suite for the two main methods of the SortedArray
class, push/<<
& delete
:
# coding: utf-8
require_relative './test_helper'
require_relative './sorted_array'
describe BYORedis::SortedArray do
TestStruct = Struct.new(:a, :timeout)
describe 'push/<<' do
it 'appends elements while keeping the array sorted' do
sorted_array = new_array(:timeout)
sorted_array << TestStruct.new('a', 1)
sorted_array << TestStruct.new('b', 2)
sorted_array << TestStruct.new('c', 10)
sorted_array << TestStruct.new('d', 20)
sorted_array << TestStruct.new('e', 15)
sorted_array << TestStruct.new('f', 8)
assert_equal(6, sorted_array.size)
assert_equal(1, sorted_array[0].timeout)
assert_equal(2, sorted_array[1].timeout)
assert_equal(8, sorted_array[2].timeout)
assert_equal(10, sorted_array[3].timeout)
assert_equal(15, sorted_array[4].timeout)
assert_equal(20, sorted_array[5].timeout)
end
end
describe 'delete' do
it 'deletes the element from the array' do
sorted_array = new_array(:timeout)
sorted_array << TestStruct.new('a', 10)
sorted_array << TestStruct.new('b1', 20)
sorted_array << TestStruct.new('b2', 20)
sorted_array << TestStruct.new('b3', 20)
sorted_array << TestStruct.new('c', 30) # array is now a, b3, b2, b1, c
sorted_array.delete(TestStruct.new('d', 40)) # no-op
sorted_array.delete(TestStruct.new('b1', 20))
assert_equal(4, sorted_array.size)
assert_equal(10, sorted_array[0].timeout)
assert_equal(TestStruct.new('b3', 20), sorted_array[1])
assert_equal(TestStruct.new('b2', 20), sorted_array[2])
assert_equal(30, sorted_array[3].timeout)
end
end
def new_array(field)
BYORedis::SortedArray.new(field)
end
end
listing 7.69 Basic unit tests for the SortedArray class
Top comments (0)