DEV Community

Stanislav Kozlovski
Stanislav Kozlovski

Posted on

Working With Multithreaded Ruby Part II

We're back at it with another edition of Multithreaded Ruby where we'll continue to dive into concurrency using our beloved language!
Today, I'm going to introduce you to a famous multi-process synchronization problem called the Producer-Consumer problem and we're going to look at Ruby's ConditionVariable class.

Back to Deadlock

A paragraph into the new article and we're at deadlocks again? Well yes, they're pretty prevalent and we did not actually touch on a solution to the problem last time.
Let's bring back the deadlock example we used in Part I, modified just a tiny bit.

require 'thwait'

item_accessories = {}
item = {}
item_acc_lock = Mutex.new
item_lock = Mutex.new

a = Thread.new {
  item_acc_lock.synchronize {
    sleep 1  # pretend to work on item_accessories
    item_lock.synchronize {
      # pretend to work on item
      sleep 1
      puts 'Worked on accessories, then on item'
    }
  }
}

b = Thread.new {
  item_lock.synchronize {
    sleep 1  # pretend to work on item
    item_acc_lock.synchronize {
      # pretend to work on item_accessories
      sleep 1
      puts 'Worked on item, then on accessories'
    }
  }
}

ThWait.all_waits(a, b)
Enter fullscreen mode Exit fullscreen mode
> enether$ ruby item_worker.rb
/Users/enether/.rvm/rubies/ruby-2.4.1/lib/ruby/2.4.0/thwait.rb:112:in `pop': No live threads left. Deadlock? (fatal)
Enter fullscreen mode Exit fullscreen mode

No surprise here, thread a obviously takes a hold of item_acc_lock, thread b takes a hold of item_lock and each of them waits for the opposite lock in an endless loop. So how could we avoid this?
What if we had a way to temporarily release one of the locks at a specific point in our program where we could afford doing so? That way, the other thread could take the lock, do its thing and return it back for the original one to finish its work.

Enter ConditionVariable

ConditionVariable is a Ruby class which lets you block a thread until another thread signals it OK to continue. It is a way to say - "I'm waiting for a lock and I can give up mine at this exact time". It is an ideal way to synchronize our a and b threads here:

require 'thwait'

item_accessories = {}
item = {}
item_acc_lock = Mutex.new
item_lock = Mutex.new
cv = ConditionVariable.new

a = Thread.new {
  item_acc_lock.synchronize {
    sleep 1  # pretend to work on item_accessories
    # At this point, we've just finished work on item_accessories and we're at a window where we
    #   might not care if item_accessories changes. So: let somebody else take it and give it back
    cv.wait(item_acc_lock)  # Temporarily sleeps the thread and releases the lock
    puts 'Gained back access to item_acc_lock'  # on this line, item_acc_lock is re-acquired 
    item_lock.synchronize {
      # pretend to work on item
      sleep 1
      puts 'Worked on accessories, then on item'
    }
  }
}

b = Thread.new {
  item_lock.synchronize {
    sleep 1  # pretend to work on item
    item_acc_lock.synchronize {
      # pretend to work on item_accessories
      sleep 1
      puts 'Worked on item, then on accessories'
    }
    cv.signal
    puts "I'm still working, but I'm finished with item_acc_lock"
  }
}

ThWait.all_waits(a, b)
Enter fullscreen mode Exit fullscreen mode
> enether$ ruby synchronized_item_worker.rb
Worked on item, then on accessories
I'm still working, but I'm finished with item_acc_lock
Gained back access to item_acc_lock
Worked on accessories, then on item
Enter fullscreen mode Exit fullscreen mode

What we achieved here is a sort of synchronization: we can now be sure that one b thread will always reach its cv.signal line before an a thread starts working on item_accessories.
Here is a picture visualizing the process:

You might want to open this in another tab - High-Resolution

It is worth noting that the ConditionVariable#signal method will only wake up one thread which is waiting for the variable. This means that if we have two threads waiting on a ConditionVariable and its signal method is called only once, the thread that does not get called will end up waiting forever for the ConditionVariable, resulting in a deadlock

require 'thwait'

lock = Mutex.new
cv = ConditionVariable.new
threads = []
2.times do
  threads << Thread.new {
    lock.synchronize { cv.wait(lock) }
  }
end

threads << Thread.new {
  lock.synchronize { sleep 1 }
  cv.signal
}

ThWait.all_waits(*threads)
Enter fullscreen mode Exit fullscreen mode
> enether$ ruby cv_pitfall.rb
/Users/enether/.rvm/rubies/ruby-2.4.1/lib/ruby/2.4.0/thwait.rb:112:in `pop': No live threads left. Deadlock? (fatal)
Enter fullscreen mode Exit fullscreen mode

In such a scenario, you need to either call signal as many times as there are waits or use another method - ConditionVariable#broadcast, which will wake up every thread that is waiting on the condition variable.

Producer-Consumer

The producer-consumer problem consists of at minimum two threads, one representing a producer and one representing a consumer.

  • Producer - Sole job is to create an item and put it into the buffer
  • Consumer - Sole job is to take the item from the buffer and process it

Here is a sample implementation in Ruby, where the tasks array acts as the buffer with a fictional limitation of having at most 2 items in it at once:

require 'thwait'

threads = []
tasks = []
mutex = Mutex.new

# producer
2.times do
  threads << Thread.new do
    loop do
      mutex.synchronize do
        if tasks.length < 2
          tasks << "Task :)"
        end
      end
    end
  end
end


# consumer
5.times do
  threads << Thread.new do
    loop do
      task = nil
      mutex.synchronize do
        if tasks.length != 0
          task = tasks.shift
        end
      end
      unless task.nil?
        100000.times do
          # Simulating task execution's CPU work
          # also doing it outside the mutex so we don't block the tasks array (other producer might want to take a task as well)
        end
      end
    end
  end
end

ThWait.all_waits(threads)
Enter fullscreen mode Exit fullscreen mode

Producer-Consumer problem

As the name implies, something is not quite right with the code above. Finding problems in concurrent code is hard, so I'm going to give you a couple of minutes to figure out what is wrong.

...
...
...

Okay, if you managed to figure it out - great, if not - don't fret, multithreaded programming is unintuitive.
The problem with the code above is that it wastes time. You see, as we can't control when and to which thread the OS switches to, there is the possibility that we leave our consumer thread and enter the producer's when there is no reason to.
Imagine all our consumer threads are currently executing a task and our tasks array is full (has two elements). If the OS decides to do a context switch and gives control to a producer thread it would only waste time. Since the tasks array would be full, the producer would loop only to check that the array is full and not do anything else, only managing to take precious CPU time from our consumer threads.

Let's track exactly how much useless iterations this thing does:

require 'thwait'

threads = []
tasks = []
mutex = Mutex.new

to_exit = false
times_tasks_added = 0
times_time_wasted = 0
executed_tasks = 0

# consumer
2.times do
  threads << Thread.new do
    loop do
      Thread.kill(Thread.current) if to_exit  # a way to stop execution

      mutex.synchronize do
        if tasks.length < 2
          tasks << "Task :)"
          times_tasks_added += 1
        else
          # time here is absolutely wasted
          times_time_wasted += 1
        end
      end
    end
  end
end

# producer
5.times do
  threads << Thread.new do
    loop do
      Thread.kill(Thread.current) if to_exit  # a way to stop execution
      task = nil
      mutex.synchronize do
        if tasks.length != 0
          task = tasks.shift
        end
      end
      unless task.nil?
        100000.times do
          # Simulating CPU work
          # also doing it outside the mutex so we don't block the tasks array (other producer might want to take a task as well)
        end
        executed_tasks += 1
        if executed_tasks >= 100  # don't loop forever
          to_exit = true
        end
      end
    end
  end
end

ThWait.all_waits(threads)

puts "Total tasks added: #{times_tasks_added}"
puts "Total times we branched out into the useless else statement: #{times_time_wasted}"
Enter fullscreen mode Exit fullscreen mode

And here are the results:

> enether$ ruby squander_of_time.rb
Total tasks added: 102
Total times we branched out into the useless else statement: 1633
> enether$ ruby squander_of_time.rb
Total tasks added: 102
Total times we branched out into the useless else statement: 848282
> enether$ ruby squander_of_time.rb
Total tasks added: 102
Total times we branched out into the useless else statement: 356418
Enter fullscreen mode Exit fullscreen mode

We know that thread context-switching is non-deterministic and these results further prove it. Sometimes we do as little as 1633 useless executions of the else branch and some - as much as 848k, 8316 times more than we need!
This is bad because it will cause your program to run slower at some times and behave seemingly normal in others.
It's good to note that I personally found it hard to figure out the problem in this code even though it's specifically made to illustrate it. Imagine how hard it would be to spot such a thing in an established codebase!

This slow-down is not acceptable, so let's fix it. Thinking the problem through, our problem seems to boil down to having a producer thread wake up when it doesn't make sense.
What would be perfect is if we had the ability to somehow control when we resume the producer thread - specifically when a task is removed from the tasks buffer so we're sure there's room to add another one.

ConditionVariable to the rescue!

The fix is simple: We're just going to put a condition variable which is going to give up the mutex lock whenever we detect that we do not need to continue looping in the producer. We're also going to need to tell the producer he can resume when we have an empty spot in the tasks buffer.

require 'thwait'

threads = []
tasks = []
mutex = Mutex.new

to_exit = false
times_tasks_added = 0
times_time_wasted = 0
executed_tasks = 0
cv = ConditionVariable.new

# consumer
2.times do
  threads << Thread.new do
    loop do
      Thread.kill(Thread.current) if to_exit  # a way to stop execution

      mutex.synchronize do
        if tasks.length < 2
          tasks << "Task :)"
          times_tasks_added += 1
        else
          times_time_wasted += 1
          cv.wait(mutex)  # no need to continue looping in such a case, only continue after it makes sense
        end
      end
    end
  end
end

# producer
5.times do
  threads << Thread.new do
    loop do
      Thread.kill(Thread.current) if to_exit  # a way to stop execution
      task = nil
      mutex.synchronize do
        if tasks.length != 0
          task = tasks.shift
          cv.signal  # one new task can now be added
        end
      end
      unless task.nil?
        100000.times do
          # Simulating CPU work
          # also doing it outside the mutex so we don't block te tasks array (other producer might want to take a task as well)
        end
        executed_tasks += 1
        if executed_tasks >= 100  # don't loop forever
          to_exit = true
        end
      end
    end
  end
end

ThWait.all_waits(threads)

puts "Total tasks added: #{times_tasks_added}"
puts "Total times we branched out into the useless else statement: #{times_time_wasted}"
Enter fullscreen mode Exit fullscreen mode
> enether$ ruby saver_of_time.rb
Total tasks added: 101
Total times we branched out into the useless else statement: 50
> enether$ ruby saver_of_time.rb
Total tasks added: 100
Total times we branched out into the useless else statement: 45
> enether$ ruby saver_of_time.rb
Total tasks added: 100
Total times we branched out into the useless else statement: 42
Enter fullscreen mode Exit fullscreen mode

Woo, performance!

In reality, it's worth noting that the previous example and this one actually run in about the same time. 400k useless iterations sound like a lot but our computers are fast enough to not let us notice this inefficiency. Regardless, I hope this example managed to clearly illustrate the problem.

Further optimization

Do we even need to enter the else branch at all? Could we not put cv.wait inside the block which adds the tasks and have it call wait when the buffer is full? We can and that way it should never enter the else block, as it would only be resumed to add a task and sleep if the buffer is full.

# ...
loop do
  Thread.kill(Thread.current) if to_exit  # a way to stop execution

  mutex.synchronize do
    if tasks.length < 2
      tasks << "Task :)"
      times_tasks_added += 1
      if tasks.length >= 2
        cv.wait(mutex)  # no need to continue looping in such a case, only continue after it makes sense
      end
    else
      times_time_wasted += 1
    end
  end
end
# ...
Enter fullscreen mode Exit fullscreen mode
> enether$ ruby no_time_wasted.rb
Total tasks added: 100.
Total times we branched out into the useless else statement: 16
> enether$ ruby no_time_wasted.rb
Total tasks added: 102
Total times we branched out into the useless else statement: 13
> enether$ ruby no_time_wasted.rb
Total tasks added: 101
Total times we branched out into the useless else statement: 372021
Enter fullscreen mode Exit fullscreen mode

What the hell? We still got entered the else block and we even got back to the previous levels of needless execution!

Concurrent programming is hard. We're using two producer threads here and when one fills up the tasks array it frees the mutex. The other producer thread seems to get resumed (remember we free the mutex only when tasks is full) and enters the else branch as the tasks buffer is full.

You might want to open this in another tab - High-Resolution

Okay, well the simplest thing to do is put a wait back where we had one. This should limit the useless calls as much as possible:

# ...
loop do
  Thread.kill(Thread.current) if to_exit  # a way to stop execution

  mutex.synchronize do
    if tasks.length < 2
      tasks << "Task :)"
      times_tasks_added += 1
      if tasks.length >= 2
        cv.wait(mutex)
      end
    else
      times_time_wasted += 1
      cv.wait(mutex)
    end
  end
end
# ...
Enter fullscreen mode Exit fullscreen mode
> enether$ ruby no_time_wasted_fixed.rb
Total tasks added: 101
Total times we branched out into the useless else statement: 5
> enether$ ruby no_time_wasted_fixed.rb
Total tasks added: 101
Total times we branched out into the useless else statement: 8
> enether$ ruby no_time_wasted_fixed.rb
Total tasks added: 101
Total times we branched out into the useless else statement: 3
Enter fullscreen mode Exit fullscreen mode

Well I'm afraid this is as much as we can do with the ConditionVariable. The reason we still enter the else block a couple of times is most likely the one depicted in the image above.
Although there is one other possibility:

Spurious Wakeups

A spurious wakeup is when a ConditionVariable gets woken up without getting signaled to. This might sound stupid but it makes sense, since it seems to boost performance in some cases. According to David R. Butenhof's Programming with POSIX Threads (ISBN 0-201-63392-2): "Spurious wakeups may sound strange, but on some multiprocessor systems, making condition wakeup completely predictable might substantially slow all condition variable operations.".
They also enforce robust multithreaded code, essentially enforcing you to take care of such cases. This is why it is strongly recommended that you always put your ConditionVariables inside a loop which always checks the appropriate condition (as we do with if tasks.length < 2).
Here is an interesting discussion on the topic: comp.programming.threads

I personally could not identify when a producer thread was woken up spuriously or simply got scheduled when a previous producer went to sleep. I did dig through the MRI code to verify that cv.wait is vulnerable to spurious wakeups.
Here is the way it gets called - rb_condvar_wait -> do_sleep -> mutex_sleep -> rb_mutex_sleep -> rb_mutex_sleep_forever -> rb_thread_sleep_deadly_allow_spurious_wakeup -> sleep_forever
It seems to boil down to calling the sleep_forever function which calls native_sleep in a loop. After the code exits from the sleep, Ruby checks if the thread was woken up on purpose (in RUBY_VM_CHECK_INTS_BLOCKING(th)) and schedules it to be interrupted if so. Since ours isn't, it likely enters the if (!spurious_check) block and breaks the loop, effectively stopping the sleep.

Summary

We touched on a couple of important topics in multithreaded programming.
We learned about the precious ConditionVariable class and more specifically how it allows you to pause threads at will and schedule a resume when you decide to.

  • ConditionVariable#wait(mutex) - puts the current thread to sleep, releases the given mutex for the time being and gets resumed strictly after a signal
  • ConditionVariable#signal - allows one thread that holds the given condition variable that to resume
  • ConditionVariable#broadcast - allows all threads that hold the given condition variable to resume

We dabbled into the producer-consumer problem, trying to optimize it on our own and further explored concurrency problems and in the end learned about spurious wakeups.

Top comments (2)

Collapse
 
noel9999 profile image
Noel

Hi, I really like your post which does me a favor a lot on comprehend multi-thread.
And I only get an question here to ask, executed_tasks += 1, don't we need to put that into mutex.synchronize to be an atomic execution ?

Collapse
 
milushov profile image
roma

Great post, I loved it!

I found few typos: (?)

1.
In the code example under the "Let's track exactly how much useless iterations this thing does:" and in the next examples as well

# consumer -- should be a "producer"
2.times do

2.

      if tasks.length >= 2 -- why we need to check if it's bigger then 2, why not only equal 2? How it might be more than 2?
        cv.wait(mutex)  # no need to continue looping in such a case, only continue after it makes sense
      end