module MonitorMixin
In concurrent programming, a monitor is an object or module intended to be used safely by more than one thread. The defining characteristic of a monitor is that its methods are executed with mutual exclusion. That is, at each point in time, at most one thread may be executing any of its methods. This mutual exclusion greatly simplifies reasoning about the implementation of monitors compared to reasoning about parallel code that updates a data structure.
You can read more about the general principles on the Wikipedia page for Monitors
Examples¶ ↑
Simple object.extend¶ ↑
require 'monitor.rb' buf = [] buf.extend(MonitorMixin) empty_cond = buf.new_cond # consumer Thread.start do loop do buf.synchronize do empty_cond.wait_while { buf.empty? } print buf.shift end end end # producer while line = ARGF.gets buf.synchronize do buf.push(line) empty_cond.signal end end
The consumer thread waits for the producer thread to push a line to buf while buf.empty?
. The producer thread (main thread) reads a line from ARGF
and pushes it into buf then calls empty_cond.signal
to notify the consumer thread of new data.
Simple Class
include¶ ↑
require 'monitor' class SynchronizedArray < Array include MonitorMixin def initialize(*args) super(*args) end alias :old_shift :shift alias :old_unshift :unshift def shift(n=1) self.synchronize do self.old_shift(n) end end def unshift(item) self.synchronize do self.old_unshift(item) end end # other methods ... end
SynchronizedArray
implements an Array
with synchronized access to items. This Class
is implemented as subclass of Array
which includes the MonitorMixin
module.
Constants
- EXCEPTION_IMMEDIATE
- EXCEPTION_NEVER
Public Class Methods
# File lib/monitor.rb, line 165 def self.extend_object(obj) super(obj) obj.__send__(:mon_initialize) end
Use extend MonitorMixin
or include MonitorMixin
instead of this constructor. Have look at the examples above to understand how to use this module.
# File lib/monitor.rb, line 255 def initialize(*args) super mon_initialize end
Public Instance Methods
Enters exclusive section.
# File lib/monitor.rb, line 190 def mon_enter if @mon_owner != Thread.current @mon_mutex.lock @mon_owner = Thread.current @mon_count = 0 end @mon_count += 1 end
Leaves exclusive section.
# File lib/monitor.rb, line 202 def mon_exit mon_check_owner @mon_count -=1 if @mon_count == 0 @mon_owner = nil @mon_mutex.unlock end end
Returns true if this monitor is locked by any thread
# File lib/monitor.rb, line 214 def mon_locked? @mon_mutex.locked? end
Returns true if this monitor is locked by current thread.
# File lib/monitor.rb, line 221 def mon_owned? @mon_mutex.locked? && @mon_owner == Thread.current end
Enters exclusive section and executes the block. Leaves the exclusive section automatically when the block exits. See example under MonitorMixin
.
# File lib/monitor.rb, line 230 def mon_synchronize # Prevent interrupt on handling interrupts; for example timeout errors # it may break locking state. Thread.handle_interrupt(EXCEPTION_NEVER){ mon_enter } begin yield ensure Thread.handle_interrupt(EXCEPTION_NEVER){ mon_exit } end end
Attempts to enter exclusive section. Returns false
if lock fails.
# File lib/monitor.rb, line 173 def mon_try_enter if @mon_owner != Thread.current unless @mon_mutex.try_lock return false end @mon_owner = Thread.current @mon_count = 0 end @mon_count += 1 return true end
Creates a new MonitorMixin::ConditionVariable
associated with the receiver.
# File lib/monitor.rb, line 246 def new_cond return ConditionVariable.new(self) end
Private Instance Methods
# File lib/monitor.rb, line 272 def mon_check_owner if @mon_owner != Thread.current raise ThreadError, "current thread not owner" end end
# File lib/monitor.rb, line 278 def mon_enter_for_cond(count) @mon_owner = Thread.current @mon_count = count end
# File lib/monitor.rb, line 283 def mon_exit_for_cond count = @mon_count @mon_owner = nil @mon_count = 0 return count end
Initializes the MonitorMixin
after being included in a class or when an object has been extended with the MonitorMixin
# File lib/monitor.rb, line 262 def mon_initialize if defined?(@mon_mutex) && @mon_mutex_owner_object_id == object_id raise ThreadError, "already initialized" end @mon_mutex = Thread::Mutex.new @mon_mutex_owner_object_id = object_id @mon_owner = nil @mon_count = 0 end