Managing threads with Queue and SizedQueue

Threads in Ruby

To make our applications do multiple things faster at the same time, we can use Ruby threads. For a long-running application, re-using a pool of such threads can provide great performance benefits. Thread pool is a way to have a fixed set of threads doing the work, instead of creating new ones each time. Considering a simple program as below, we realize the importance of threads.
def add_elements(group)
  sleep(4)
  sum = 0
  group.each do |item|
    sum += item
  end
  sum
end
@group1 = [22, 44, 55]
@group2 = [45, 59, 72]
@group3 = [99, 22, 33]
puts "sum of group1 = #{add_elements(@group1)}"
puts "sum of group2 = #{add_elements(@group2)}"
puts "sum of group3 = #{add_elements(@group3)}"
You will get the sum of each array as output but the sleep(4) instruction will pause execution for 4 seconds and then continue. Thus, group1 will get the sum after 4 seconds, group2 after 8 seconds and so on, which is not feasible. In such cases, it is more economical to have threads, since we can have the sum of each array calculated independently. Threads allow us to execute different parts of our program independently. For implementing threads, after initializing each array,
threads = (1..3).map do |c|
  Thread.new(c) do |c|
    groups = instance_variable_get("@groups#{element}")
    puts "groups#{element} = #{add_element(groups)}"
  end
end
threads.each {|t| t.join}
The add_element method definition is same but we wrapped method call in a Thread.new block. Now, instead of getting the sum of each array after 4 seconds, 8 seconds and 12 seconds respectively, you will get the sum of all arrays after 4 seconds. This indicates better performance and efficiency which is the power of threads.

Queue for managing threads

To safely exchange information between threads, we can use Queue in the standard library. The tasks that are added first in the queue are retrieved first. PUSH and POP are the two main methods in Queue that add and retrieves an item respectively. Consider the following example. To create new queue instance, use the new() method.
require 'thread'
queue = Queue.new
Thread.new do
  4.times do |i|
    sleep(2)
    queue << i
    puts "Thread #{i} produced"
  end
end
5 items are inserted into the queue. Output as follows,
Thread 0
Thread 1
Thread 2
Thread 3
Now, to pop off items from queue,
Thread.new do
  4.times do |i|
     sleep(2)
     puts "consumed thread #{queue.pop}"
  end
end
It produces the following output
Thread 0
consumed 0
Thread 1
consumed 1
Thread 2
consumed 2
Thread 3
consumed 3

Sized queue for fixed-length queue

The sized queue is useful in situations where the rate of production is higher than consumption. In the following example,
require 'thread'
queue = Queue.new
Thread.new do
  10.times do |i|
    sleep(2)
    queue << i
    puts "Thread #{i} produced"
  end
end
Thread.new do
  4.times do |i|
    sleep(2)
    puts "consumed thread #{queue.pop}"
  end
end
We see, 10 items are produced and 4 items are consumed and remaining accumulate in the queue. This is an issue of memory wastage. Hence we rely on the sized queue. Instead of Queue.new, we use SizedQueue.new(maxvalue). The argument specifies the maximum number of items we allow to put in a queue. Modifying our example, we can save memory space.
require 'thread'
queue = SizedQueue.new(4)
Thread.new do
  10.times do |i|
    sleep(2)
    queue << i
    puts "Thread #{i}"
  end
end
Thread.new do
  4.times do |i|
    sleep(2)
    puts "consumed #{queue.pop}"
  end
end
4 threads are produced and consumed. After that, the maximum limit is checked and push operation is blocked. The maximum value of our sized queue is 4 here, so after that push, the operation is not allowed, even though the loop is for 10 times. Output:
Thread 0 produced 
consumed thread 0
Thread 1 produced 
consumed thread 1
Thread 2 produced 
consumed thread 2
Thread 3 produced 
consumed thread 3
Thread 4 produced 
Thread 5 produced 
Thread 6 produced 
Thread 7 produced
  To conclude, we can say that while Queue can be used to safely exchange information between threads, SizedQueue helps to overcome the problem with the queue as mentioned above.

References

 ]]>