Managing your threads with Queue

One of my favourite utilities in the Ruby standard library is the Queue class. Queue provides you with a delightfully simple way to synchronise communication between threads. Typically, Queue is used to manage the work assigned to a thread pool. We'll look at some of the ways it assists us in this endeavour.

Queue has two main methods in its API, push and pop. These two methods do exactly what you'd expect, push adds an item to the queue, and pop retrieves an item. By default, if you call pop in your thread and the queue is empty, it will block until an item becomes available. Multiple threads will be able to pop items off of the queue in a safe manner. Let's have a look in a simple example:

queue = Queue.new  
threads = []

# Start 2 threads to pop work off of the queue and print it, then sleep 3 seconds for effect
2.times do  
  threads << Thread.new do
    loop do
      queue_item = queue.pop
      puts queue_item.inspect
      sleep 3
    end
  end
end

# Add 5 items to the queue to be processed by the threads above
5.times do |n|  
  queue.push "Item #{n}"
end  

Running this example you'll see that each thread picks up an item off of the queue, prints it out and then sleeps for 3 seconds before picking up the next item from the queue. Once the queue is empty, they'll block on queue.pop until a new item becomes available for processing.

Queue also comes with a couple of extra handy methods to help you manage the workload for your threads. #length tells you how many items are currently queued, #num_waiting lets you know how many threads are currently blocking on pop, waiting for input and #clear allows you to remove any items from the queue currently waiting to be processed. In particular, you can use the last 2 methods to build a clean shutdown into your script, so that all of your threads exit gracefully once they've finished their current work.

trap "INT" do  
  # Remove any queued jobs from the list
  queue.clear

  # Wait until any currently running threads have finished their current work and returned to queue.pop
  while queue.num_waiting < threads.count
    pending_threads = threads.count - queue.num_waiting
    puts "Waiting for #{pending_threads} threads to finish"
    sleep 1
  end

  # Kill off each thread now that they're idle and exit
  threads.each(&:exit)
  Process.exit(0)
end  

We utilise the Queue class in a couple of places throughout our apps, we use it in Deploy to send files to multiple servers simultaneously and it's used in some internal scripts where parallel processing is advantageous.

Codebase is a Git, Mercurial & SVN hosting service

Codebase provides code hosting plus an enormous array of project management tools including ticketing, milestone management, time tracking, file sharing, discussions and more.