A thread-safe queue is a queue which can be used in a multi-threaded environment without any loss in data. Before we build one, let us discuss more on why we need a thread-safe queue. Feel free to skip the explanation and head to the bottom of this blog to find the full implementation for the queue if you know the prerequisites π.
Why does one need a thread-safe queue?
Consider this go program with a queue implementation. This is the main
function of that program. It creates a queue with size 1 and inserts an element into it. Then we spawn two go routines which remove the inserted element.
func main() {
q := CreateQueue(1)
q.Insert(0)
// Start two go routines
for i := 0; i < 2; i++ {
// This go routine will try to call the Remove function of the queue.
go func(q *Queue) {
if removed, e := q.Remove(); e == nil {
fmt.Printf("Removed %d\n", removed)
} else {
fmt.Printf("got error %v\n", e)
}
}(q)
}
time.Sleep(100 * time.Second)
}
Also look at the Remove
function. I've added the time.Sleep
function to show what happens when things take too long (1ns=1x10-9s might not be too long for us but can't say the same about computers π) in a multi-threaded environment.
func (q *Queue) Remove() (int, error) {
if len(q.q) > 0 {
time.Sleep(1 * time.Nanosecond)
item := q.q[0]
q.q = q.q[1:]
return item, nil
}
return 0, errors.New("Queue is empty")
}
In a single-threaded environment, no problem would occur and everything would work as expected i.e two Remove
calls which would remove the element for the first call and return an error for the second call.
But for multi-threaded environments, something very interesting happens. If you were to run that program in the playground you would get a panic from Go. Something like this:
panic: runtime error: index out of range [0] with length 0
goroutine 6 [running]:
main.(*Queue).Remove(0xc00000c0a0, 0x0, 0x0, 0x0)
/tmp/sandbox828372841/prog.go:33 +0xf9
main.main.func1(0xc00000c0a0)
/tmp/sandbox828372841/prog.go:56 +0x32
created by main.main
/tmp/sandbox828372841/prog.go:55 +0xcd
Program exited: status 2.
So what happened there?
The two go routines start executing one after the other in parallel. The first go routine calls Remove
and then it checks the length in Remove
(which is 1) and then enters the if
block and sleeps for 1ns. Now the second go routine also calls Remove
and checks the length and gets 1 as length and enters the if
block and sleeps for 1ns. Why does it enter? It's because the first go routine is still sleeping, so it hasn't removed the element from the queue yet so the length was going to be 1 for sure. Now while second go routine is sleeping, the first one wakes up and removes the element. When the second one wakes up it will try to fetch the first element in the queue but, it doesn't exist! And that explains the panic we got i.e runtime error: index out of range [0] with length 0
.
What did we learn from the above experiment?
A small delay in the form of time.Sleep(1 * time.Nanosecond)
i.e sleeping for 1 ns is enough to cause problems. This tells how sensitive threads are to the code we write π€―, meaning, if I were to add some lines to that function which take >= 1ns to run then we would be seeing this issue. Now I think it's clear why we need to handle operations differently in a multi-threaded environment.
So what's the solution?
TLDR; The solution is to use a mutex.
Intuitively, we need a mechanism that makes sure only one thread can Remove
from the queue at any given instance. Particularly the if
block since, that caused the problem in the first place. Here the if
block is said to be the critical section of the Remove
function. So, we would somehow "lock" the critical section of the Remove
function when a thread approaches it and then "unlock" it when it's done with the operations it has to do. To do this "lock" and "unlock" we use a mutex
in Go.
The mutex
will make sure only one thread accesses the critical section at any given instance. So, when you Remove
, before the if
block, we need to Lock
the following operations and when we are done we can Unlock
. This is the updated snippet and below is the updated Remove
which is now thread-safe.
func (q *Queue) Remove() (int, error) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.q) > 0 {
time.Sleep(1 * time.Nanosecond)
item := q.q[0]
q.q = q.q[1:]
return item, nil
}
return 0, errors.New("Queue is empty")
}
I've included the mutex in the Queue struct
so that every instance of the queue has it's own mutex.
If you were to run the program now, you would see everything works fine as expected. No more panics β¨!
How did it work?
When we acquired the lock using q.mu.Lock()
we made sure that only one go routine can access the if
block. The defer q.mu.Unlock()
just makes sure we unlock it after the function returns. Try not calling Unlock
and ponder what happens then π.
Give me the whole code π‘
Alright I've heard you π
. Here it is:
import (
"errors"
"time"
"sync"
)
type Queue struct {
mu sync.Mutex
capacity int
q []int
}
// FifoQueue
type FifoQueue interface {
Insert()
Remove()
}
// Insert inserts the item into the queue
func (q *Queue) Insert(item int) error {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.q) < int(q.capacity) {
q.q = append(q.q, item)
return nil
}
return errors.New("Queue is full")
}
// Remove removes the oldest element from the queue
func (q *Queue) Remove() (int, error) {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.q) > 0 {
item := q.q[0]
q.q = q.q[1:]
return item, nil
}
return 0, errors.New("Queue is empty")
}
// CreateQueue creates an empty queue with desired capacity
func CreateQueue(capacity int) *Queue {
return &Queue{
capacity: capacity,
q: make([]int, 0, capacity),
}
}
Happy coding! Hope you learnt something new today π.
Top comments (3)
Great writeup! Another, and maybe slightly simpler, way of implementing thread-safe queues in Go is to use Goroutines that are thread-safe by default:
In my simple benchmark on my device, this implementation has a slightly better performance too:
This is cool. I come from a Java background so channels weren't that intuitive for me. Thank you for taking the time to put the code. Super helpful.
Glad to help.