We're back at it with another edition of Multithreaded Ruby where we'll continue to dive into concurrency using our beloved language!
Today, I'm going to introduce you to a famous multi-process synchronization problem called the Producer-Consumer problem and we're going to look at Ruby's ConditionVariable
class.
Back to Deadlock
A paragraph into the new article and we're at deadlocks again? Well yes, they're pretty prevalent and we did not actually touch on a solution to the problem last time.
Let's bring back the deadlock example we used in Part I, modified just a tiny bit.
require 'thwait'
item_accessories = {}
item = {}
item_acc_lock = Mutex.new
item_lock = Mutex.new
a = Thread.new {
item_acc_lock.synchronize {
sleep 1 # pretend to work on item_accessories
item_lock.synchronize {
# pretend to work on item
sleep 1
puts 'Worked on accessories, then on item'
}
}
}
b = Thread.new {
item_lock.synchronize {
sleep 1 # pretend to work on item
item_acc_lock.synchronize {
# pretend to work on item_accessories
sleep 1
puts 'Worked on item, then on accessories'
}
}
}
ThWait.all_waits(a, b)
> enether$ ruby item_worker.rb
/Users/enether/.rvm/rubies/ruby-2.4.1/lib/ruby/2.4.0/thwait.rb:112:in `pop': No live threads left. Deadlock? (fatal)
No surprise here, thread a
obviously takes a hold of item_acc_lock
, thread b
takes a hold of item_lock
and each of them waits for the opposite lock in an endless loop. So how could we avoid this?
What if we had a way to temporarily release one of the locks at a specific point in our program where we could afford doing so? That way, the other thread could take the lock, do its thing and return it back for the original one to finish its work.
Enter ConditionVariable
ConditionVariable is a Ruby class which lets you block a thread until another thread signals it OK to continue. It is a way to say - "I'm waiting for a lock and I can give up mine at this exact time". It is an ideal way to synchronize our a
and b
threads here:
require 'thwait'
item_accessories = {}
item = {}
item_acc_lock = Mutex.new
item_lock = Mutex.new
cv = ConditionVariable.new
a = Thread.new {
item_acc_lock.synchronize {
sleep 1 # pretend to work on item_accessories
# At this point, we've just finished work on item_accessories and we're at a window where we
# might not care if item_accessories changes. So: let somebody else take it and give it back
cv.wait(item_acc_lock) # Temporarily sleeps the thread and releases the lock
puts 'Gained back access to item_acc_lock' # on this line, item_acc_lock is re-acquired
item_lock.synchronize {
# pretend to work on item
sleep 1
puts 'Worked on accessories, then on item'
}
}
}
b = Thread.new {
item_lock.synchronize {
sleep 1 # pretend to work on item
item_acc_lock.synchronize {
# pretend to work on item_accessories
sleep 1
puts 'Worked on item, then on accessories'
}
cv.signal
puts "I'm still working, but I'm finished with item_acc_lock"
}
}
ThWait.all_waits(a, b)
> enether$ ruby synchronized_item_worker.rb
Worked on item, then on accessories
I'm still working, but I'm finished with item_acc_lock
Gained back access to item_acc_lock
Worked on accessories, then on item
What we achieved here is a sort of synchronization: we can now be sure that one b
thread will always reach its cv.signal
line before an a
thread starts working on item_accessories
.
Here is a picture visualizing the process:
You might want to open this in another tab - High-Resolution
It is worth noting that the ConditionVariable#signal
method will only wake up one thread which is waiting for the variable. This means that if we have two threads waiting on a ConditionVariable
and its signal
method is called only once, the thread that does not get called will end up waiting forever for the ConditionVariable
, resulting in a deadlock
require 'thwait'
lock = Mutex.new
cv = ConditionVariable.new
threads = []
2.times do
threads << Thread.new {
lock.synchronize { cv.wait(lock) }
}
end
threads << Thread.new {
lock.synchronize { sleep 1 }
cv.signal
}
ThWait.all_waits(*threads)
> enether$ ruby cv_pitfall.rb
/Users/enether/.rvm/rubies/ruby-2.4.1/lib/ruby/2.4.0/thwait.rb:112:in `pop': No live threads left. Deadlock? (fatal)
In such a scenario, you need to either call signal
as many times as there are waits
or use another method - ConditionVariable#broadcast
, which will wake up every thread that is waiting on the condition variable.
Producer-Consumer
The producer-consumer problem consists of at minimum two threads, one representing a producer and one representing a consumer.
- Producer - Sole job is to create an item and put it into the buffer
- Consumer - Sole job is to take the item from the buffer and process it
Here is a sample implementation in Ruby, where the tasks
array acts as the buffer with a fictional limitation of having at most 2 items in it at once:
require 'thwait'
threads = []
tasks = []
mutex = Mutex.new
# producer
2.times do
threads << Thread.new do
loop do
mutex.synchronize do
if tasks.length < 2
tasks << "Task :)"
end
end
end
end
end
# consumer
5.times do
threads << Thread.new do
loop do
task = nil
mutex.synchronize do
if tasks.length != 0
task = tasks.shift
end
end
unless task.nil?
100000.times do
# Simulating task execution's CPU work
# also doing it outside the mutex so we don't block the tasks array (other producer might want to take a task as well)
end
end
end
end
end
ThWait.all_waits(threads)
Producer-Consumer problem
As the name implies, something is not quite right with the code above. Finding problems in concurrent code is hard, so I'm going to give you a couple of minutes to figure out what is wrong.
...
...
...
Okay, if you managed to figure it out - great, if not - don't fret, multithreaded programming is unintuitive.
The problem with the code above is that it wastes time. You see, as we can't control when and to which thread the OS switches to, there is the possibility that we leave our consumer thread and enter the producer's when there is no reason to.
Imagine all our consumer threads are currently executing a task and our tasks
array is full (has two elements). If the OS decides to do a context switch and gives control to a producer thread it would only waste time. Since the tasks
array would be full, the producer would loop only to check that the array is full and not do anything else, only managing to take precious CPU time from our consumer threads.
Let's track exactly how much useless iterations this thing does:
require 'thwait'
threads = []
tasks = []
mutex = Mutex.new
to_exit = false
times_tasks_added = 0
times_time_wasted = 0
executed_tasks = 0
# consumer
2.times do
threads << Thread.new do
loop do
Thread.kill(Thread.current) if to_exit # a way to stop execution
mutex.synchronize do
if tasks.length < 2
tasks << "Task :)"
times_tasks_added += 1
else
# time here is absolutely wasted
times_time_wasted += 1
end
end
end
end
end
# producer
5.times do
threads << Thread.new do
loop do
Thread.kill(Thread.current) if to_exit # a way to stop execution
task = nil
mutex.synchronize do
if tasks.length != 0
task = tasks.shift
end
end
unless task.nil?
100000.times do
# Simulating CPU work
# also doing it outside the mutex so we don't block the tasks array (other producer might want to take a task as well)
end
executed_tasks += 1
if executed_tasks >= 100 # don't loop forever
to_exit = true
end
end
end
end
end
ThWait.all_waits(threads)
puts "Total tasks added: #{times_tasks_added}"
puts "Total times we branched out into the useless else statement: #{times_time_wasted}"
And here are the results:
> enether$ ruby squander_of_time.rb
Total tasks added: 102
Total times we branched out into the useless else statement: 1633
> enether$ ruby squander_of_time.rb
Total tasks added: 102
Total times we branched out into the useless else statement: 848282
> enether$ ruby squander_of_time.rb
Total tasks added: 102
Total times we branched out into the useless else statement: 356418
We know that thread context-switching is non-deterministic and these results further prove it. Sometimes we do as little as 1633 useless executions of the else
branch and some - as much as 848k, 8316 times more than we need!
This is bad because it will cause your program to run slower at some times and behave seemingly normal in others.
It's good to note that I personally found it hard to figure out the problem in this code even though it's specifically made to illustrate it. Imagine how hard it would be to spot such a thing in an established codebase!
This slow-down is not acceptable, so let's fix it. Thinking the problem through, our problem seems to boil down to having a producer thread wake up when it doesn't make sense.
What would be perfect is if we had the ability to somehow control when we resume the producer thread - specifically when a task is removed from the tasks
buffer so we're sure there's room to add another one.
ConditionVariable to the rescue!
The fix is simple: We're just going to put a condition variable which is going to give up the mutex
lock whenever we detect that we do not need to continue looping in the producer. We're also going to need to tell the producer he can resume when we have an empty spot in the tasks
buffer.
require 'thwait'
threads = []
tasks = []
mutex = Mutex.new
to_exit = false
times_tasks_added = 0
times_time_wasted = 0
executed_tasks = 0
cv = ConditionVariable.new
# consumer
2.times do
threads << Thread.new do
loop do
Thread.kill(Thread.current) if to_exit # a way to stop execution
mutex.synchronize do
if tasks.length < 2
tasks << "Task :)"
times_tasks_added += 1
else
times_time_wasted += 1
cv.wait(mutex) # no need to continue looping in such a case, only continue after it makes sense
end
end
end
end
end
# producer
5.times do
threads << Thread.new do
loop do
Thread.kill(Thread.current) if to_exit # a way to stop execution
task = nil
mutex.synchronize do
if tasks.length != 0
task = tasks.shift
cv.signal # one new task can now be added
end
end
unless task.nil?
100000.times do
# Simulating CPU work
# also doing it outside the mutex so we don't block te tasks array (other producer might want to take a task as well)
end
executed_tasks += 1
if executed_tasks >= 100 # don't loop forever
to_exit = true
end
end
end
end
end
ThWait.all_waits(threads)
puts "Total tasks added: #{times_tasks_added}"
puts "Total times we branched out into the useless else statement: #{times_time_wasted}"
> enether$ ruby saver_of_time.rb
Total tasks added: 101
Total times we branched out into the useless else statement: 50
> enether$ ruby saver_of_time.rb
Total tasks added: 100
Total times we branched out into the useless else statement: 45
> enether$ ruby saver_of_time.rb
Total tasks added: 100
Total times we branched out into the useless else statement: 42
Woo, performance!
In reality, it's worth noting that the previous example and this one actually run in about the same time. 400k useless iterations sound like a lot but our computers are fast enough to not let us notice this inefficiency. Regardless, I hope this example managed to clearly illustrate the problem.
Further optimization
Do we even need to enter the else
branch at all? Could we not put cv.wait
inside the block which adds the tasks and have it call wait
when the buffer is full? We can and that way it should never enter the else
block, as it would only be resumed to add a task and sleep if the buffer is full.
# ...
loop do
Thread.kill(Thread.current) if to_exit # a way to stop execution
mutex.synchronize do
if tasks.length < 2
tasks << "Task :)"
times_tasks_added += 1
if tasks.length >= 2
cv.wait(mutex) # no need to continue looping in such a case, only continue after it makes sense
end
else
times_time_wasted += 1
end
end
end
# ...
> enether$ ruby no_time_wasted.rb
Total tasks added: 100.
Total times we branched out into the useless else statement: 16
> enether$ ruby no_time_wasted.rb
Total tasks added: 102
Total times we branched out into the useless else statement: 13
> enether$ ruby no_time_wasted.rb
Total tasks added: 101
Total times we branched out into the useless else statement: 372021
What the hell? We still got entered the else
block and we even got back to the previous levels of needless execution!
Concurrent programming is hard. We're using two producer threads here and when one fills up the tasks
array it frees the mutex. The other producer thread seems to get resumed (remember we free the mutex only when tasks
is full) and enters the else
branch as the tasks
buffer is full.
You might want to open this in another tab - High-Resolution
Okay, well the simplest thing to do is put a wait
back where we had one. This should limit the useless calls as much as possible:
# ...
loop do
Thread.kill(Thread.current) if to_exit # a way to stop execution
mutex.synchronize do
if tasks.length < 2
tasks << "Task :)"
times_tasks_added += 1
if tasks.length >= 2
cv.wait(mutex)
end
else
times_time_wasted += 1
cv.wait(mutex)
end
end
end
# ...
> enether$ ruby no_time_wasted_fixed.rb
Total tasks added: 101
Total times we branched out into the useless else statement: 5
> enether$ ruby no_time_wasted_fixed.rb
Total tasks added: 101
Total times we branched out into the useless else statement: 8
> enether$ ruby no_time_wasted_fixed.rb
Total tasks added: 101
Total times we branched out into the useless else statement: 3
Well I'm afraid this is as much as we can do with the ConditionVariable
. The reason we still enter the else
block a couple of times is most likely the one depicted in the image above.
Although there is one other possibility:
Spurious Wakeups
A spurious wakeup is when a ConditionVariable
gets woken up without getting signaled to. This might sound stupid but it makes sense, since it seems to boost performance in some cases. According to David R. Butenhof's Programming with POSIX Threads (ISBN 0-201-63392-2): "Spurious wakeups may sound strange, but on some multiprocessor systems, making condition wakeup completely predictable might substantially slow all condition variable operations.".
They also enforce robust multithreaded code, essentially enforcing you to take care of such cases. This is why it is strongly recommended that you always put your ConditionVariable
s inside a loop which always checks the appropriate condition (as we do with if tasks.length < 2
).
Here is an interesting discussion on the topic: comp.programming.threads
I personally could not identify when a producer thread was woken up spuriously or simply got scheduled when a previous producer went to sleep. I did dig through the MRI code to verify that cv.wait
is vulnerable to spurious wakeups.
Here is the way it gets called - rb_condvar_wait -> do_sleep -> mutex_sleep -> rb_mutex_sleep -> rb_mutex_sleep_forever -> rb_thread_sleep_deadly_allow_spurious_wakeup -> sleep_forever
It seems to boil down to calling the sleep_forever
function which calls native_sleep
in a loop. After the code exits from the sleep, Ruby checks if the thread was woken up on purpose (in RUBY_VM_CHECK_INTS_BLOCKING(th)
) and schedules it to be interrupted if so. Since ours isn't, it likely enters the if (!spurious_check)
block and break
s the loop, effectively stopping the sleep.
Summary
We touched on a couple of important topics in multithreaded programming.
We learned about the precious ConditionVariable
class and more specifically how it allows you to pause threads at will and schedule a resume when you decide to.
-
ConditionVariable#wait(mutex)
- puts the current thread to sleep, releases the given mutex for the time being and gets resumed strictly after a signal -
ConditionVariable#signal
- allows one thread that holds the given condition variable that to resume -
ConditionVariable#broadcast
- allows all threads that hold the given condition variable to resume
We dabbled into the producer-consumer problem, trying to optimize it on our own and further explored concurrency problems and in the end learned about spurious wakeups.
Top comments (2)
Hi, I really like your post which does me a favor a lot on comprehend multi-thread.
And I only get an question here to ask,
executed_tasks += 1
, don't we need to put that intomutex.synchronize
to be an atomic execution ?Great post, I loved it!
I found few typos: (?)
1.
In the code example under the "Let's track exactly how much useless iterations this thing does:" and in the next examples as well
2.