aliases: ConditionVariable
要約
スレッドの同期機構の一つである状態変数を実現するクラスです。
以下も ConditionVariable を理解するのに参考になります。
https://ruby-doc.com/docs/ProgrammingRuby/html/tut_threads.html#UF
Condition Variable とは
あるスレッド A が排他領域で動いていたとします。スレッド A は現在空いていないリソースが必要になったので空くまで待つことにしたとします。これはうまくいきません。なぜなら、スレッド A は排他領域で動いているわけですから、他のスレッドは動くことができません。リソースを空けることもできません。スレッド A がリソースの空きを待っていても、いつまでも空くことはありません。
以上のような状況を解決するのが Condition Variable です。
スレッド a で条件(リソースが空いているかなど)が満たされるまで wait メソッドでスレッドを止めます。他のスレッド b において条件が満たされたなら signal メソッドでスレッド a に対して条件が成立したことを通知します。これが典型的な使用例です。
mutex = Mutex.new
cv = ConditionVariable.new
a = Thread.start {
mutex.synchronize {
...
while (条件が満たされない)
cv.wait(mutex)
end
...
}
}
b = Thread.start {
mutex.synchronize {
# 上の条件を満たすための操作
cv.signal
}
}
以下は [ruby-list:14445] で紹介されている例です。@q が空になった場合、あるいは満タンになった場合に Condition Variable を使って wait しています。
require 'thread'
class TinyQueue
def initialize(max=2)
@max = max
@full = ConditionVariable.new
@empty = ConditionVariable.new
@mutex = Mutex.new
@q = []
end
def count
@q.size
end
def enq(v)
@mutex.synchronize{
@full.wait(@mutex) if count == @max
@q.push v
@empty.signal if count == 1
}
end
def deq
@mutex.synchronize{
@empty.wait(@mutex) if count == 0
v = @q.shift
@full.signal if count == (@max - 1)
v
}
end
alias send enq
alias recv deq
end
if __FILE__ == $0
q = TinyQueue.new(1)
foods = 'Apple Banana Strawberry Udon Rice Milk'.split
l = []
th = Thread.new {
for obj in foods
q.send(obj)
print "sent ", obj, "\n"
end
q.send nil
}
l.push th
th = Thread.new {
while obj = q.recv
print "recv ", obj, "\n"
end
}
l.push th
l.each do |t|
t.join
end
end
実行すると以下のように出力します。
$ ruby condvar.rb sent Apple recv Apple sent Banana recv Banana sent Strawberry recv Strawberry sent Udon recv Udon sent Rice recv Rice sent Milk recv Milk
目次
特異メソッド
new -> Thread::ConditionVariable[permalink][rdoc][edit]-
状態変数を生成して返します。
インスタンスメソッド
broadcast -> self[permalink][rdoc][edit]-
状態変数を待っているスレッドをすべて再開します。再開されたスレッドは Thread::ConditionVariable#wait で指定した mutex のロックを試みます。
- [RETURN]
- 常に self を返します。
例
mutex = Mutex.new cv = ConditionVariable.new flg = true 3.times { Thread.start { mutex.synchronize { puts "a1" while (flg) cv.wait(mutex) end puts "a2" } } } Thread.start { mutex.synchronize { flg = false cv.broadcast } } sleep 1 # => a1 # => a1 # => a1 # => a2 # => a2 # => a2 signal -> self[permalink][rdoc][edit]-
状態変数を待っているスレッドを1つ再開します。再開されたスレッドは Thread::ConditionVariable#wait で指定した mutex のロックを試みます。
- [RETURN]
- 常に self を返します。
例
mutex = Mutex.new cv = ConditionVariable.new flg = true 3.times { Thread.start { mutex.synchronize { puts "a1" while (flg) cv.wait(mutex) end puts "a2" } } } Thread.start { mutex.synchronize { flg = false cv.signal } } sleep 1 # => a1 # => a1 # => a1 # => a2 wait(mutex, timeout = nil) -> self[permalink][rdoc][edit]-
mutex のロックを解放し、カレントスレッドを停止します。 Thread::ConditionVariable#signalまたは、 Thread::ConditionVariable#broadcastで送られたシグナルを受け取ると、mutexのロックを取得し、実行状態となります。
- [PARAM] mutex:
- Mutex オブジェクトを指定します。
- [PARAM] timeout:
- スリープする秒数を指定します。この場合はシグナルを受け取らなかった場合でも指定した秒数が経過するとスリープを終了します。省略するとスリープし続けます。
[SEE_ALSO] Thread::ConditionVariable#signal, Thread::ConditionVariable#broadcast