class ConditionVariable

ConditionVariable objects augment class Mutex. Using condition variables, it is possible to suspend while in the middle of a critical section until a resource becomes available.

Example:

require 'thread'

mutex = Mutex.new
resource = ConditionVariable.new

a = Thread.new {
  mutex.synchronize {
    # Thread 'a' now needs the resource
    resource.wait(mutex)
    # 'a' can now have the resource
  }
}

b = Thread.new {
  mutex.synchronize {
    # Thread 'b' has finished using the resource
    resource.signal
  }
}

Public Class Methods

new() click to toggle source

Creates a new ConditionVariable

# File lib/thread.rb, line 54
def initialize
  @waiters = {}
  @waiters_mutex = Mutex.new
end

Public Instance Methods

broadcast() click to toggle source

Wakes up all threads waiting for this lock.

# File lib/thread.rb, line 101
def broadcast
  Thread.handle_interrupt(StandardError => :on_blocking) do
    threads = nil
    @waiters_mutex.synchronize do
      threads = @waiters.keys
      @waiters.clear
    end
    for t in threads
      begin
        t.run
      rescue ThreadError
      end
    end
  end
  self
end
signal() click to toggle source

Wakes up the first thread in line waiting for this lock.

# File lib/thread.rb, line 86
def signal
  Thread.handle_interrupt(StandardError => :on_blocking) do
    begin
      t, _ = @waiters_mutex.synchronize { @waiters.shift }
      t.run if t
    rescue ThreadError
      retry # t was already dead?
    end
  end
  self
end
wait(mutex, timeout=nil) click to toggle source

Releases the lock held in mutex and waits; reacquires the lock on wakeup.

If timeout is given, this method returns after timeout seconds passed, even if no other thread doesn't signal.

# File lib/thread.rb, line 65
def wait(mutex, timeout=nil)
  Thread.handle_interrupt(StandardError => :never) do
    begin
      Thread.handle_interrupt(StandardError => :on_blocking) do
        @waiters_mutex.synchronize do
          @waiters[Thread.current] = true
        end
        mutex.sleep timeout
      end
    ensure
      @waiters_mutex.synchronize do
        @waiters.delete(Thread.current)
      end
    end
  end
  self
end