Ruby 3.5.0dev (2025-08-26 revision 8b7b58735aeed748764397875210765a4cd63661)
thread_sync.c (8b7b58735aeed748764397875210765a4cd63661)
1/* included by thread.c */
2#include "ccan/list/list.h"
3#include "builtin.h"
4
5static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
6static VALUE rb_eClosedQueueError;
7
8/* Mutex */
9typedef struct rb_mutex_struct {
10 rb_fiber_t *fiber;
11 struct rb_mutex_struct *next_mutex;
12 struct ccan_list_head waitq; /* protected by GVL */
14
15/* sync_waiter is always on-stack */
17 VALUE self;
18 rb_thread_t *th;
19 rb_fiber_t *fiber;
20 struct ccan_list_node node;
21};
22
23static inline rb_fiber_t*
24nonblocking_fiber(rb_fiber_t *fiber)
25{
26 if (rb_fiberptr_blocking(fiber)) {
27 return NULL;
28 }
29
30 return fiber;
31}
32
34 VALUE self;
35 VALUE timeout;
36 rb_hrtime_t end;
37};
38
39#define MUTEX_ALLOW_TRAP FL_USER1
40
41static void
42sync_wakeup(struct ccan_list_head *head, long max)
43{
44 RUBY_DEBUG_LOG("max:%ld", max);
45
46 struct sync_waiter *cur = 0, *next;
47
48 ccan_list_for_each_safe(head, cur, next, node) {
49 ccan_list_del_init(&cur->node);
50
51 if (cur->th->status != THREAD_KILLED) {
52 if (cur->th->scheduler != Qnil && cur->fiber) {
53 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
54 }
55 else {
56 RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur->th));
57 rb_threadptr_interrupt(cur->th);
58 cur->th->status = THREAD_RUNNABLE;
59 }
60
61 if (--max == 0) return;
62 }
63 }
64}
65
66static void
67wakeup_one(struct ccan_list_head *head)
68{
69 sync_wakeup(head, 1);
70}
71
72static void
73wakeup_all(struct ccan_list_head *head)
74{
75 sync_wakeup(head, LONG_MAX);
76}
77
78#if defined(HAVE_WORKING_FORK)
79static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
80static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
81static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
82#endif
83static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber);
84
85/*
86 * Document-class: Thread::Mutex
87 *
88 * Thread::Mutex implements a simple semaphore that can be used to
89 * coordinate access to shared data from multiple concurrent threads.
90 *
91 * Example:
92 *
93 * semaphore = Thread::Mutex.new
94 *
95 * a = Thread.new {
96 * semaphore.synchronize {
97 * # access shared resource
98 * }
99 * }
100 *
101 * b = Thread.new {
102 * semaphore.synchronize {
103 * # access shared resource
104 * }
105 * }
106 *
107 */
108
109#define mutex_mark ((void(*)(void*))0)
110
111static size_t
112rb_mutex_num_waiting(rb_mutex_t *mutex)
113{
114 struct sync_waiter *w = 0;
115 size_t n = 0;
116
117 ccan_list_for_each(&mutex->waitq, w, node) {
118 n++;
119 }
120
121 return n;
122}
123
124rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
125
126static void
127mutex_free(void *ptr)
128{
129 rb_mutex_t *mutex = ptr;
130 if (mutex->fiber) {
131 /* rb_warn("free locked mutex"); */
132 const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber);
133 if (err) rb_bug("%s", err);
134 }
135 ruby_xfree(ptr);
136}
137
138static size_t
139mutex_memsize(const void *ptr)
140{
141 return sizeof(rb_mutex_t);
142}
143
144static const rb_data_type_t mutex_data_type = {
145 "mutex",
146 {mutex_mark, mutex_free, mutex_memsize,},
147 0, 0, RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_FREE_IMMEDIATELY
148};
149
150static rb_mutex_t *
151mutex_ptr(VALUE obj)
152{
153 rb_mutex_t *mutex;
154
155 TypedData_Get_Struct(obj, rb_mutex_t, &mutex_data_type, mutex);
156
157 return mutex;
158}
159
160VALUE
161rb_obj_is_mutex(VALUE obj)
162{
163 return RBOOL(rb_typeddata_is_kind_of(obj, &mutex_data_type));
164}
165
166static VALUE
167mutex_alloc(VALUE klass)
168{
169 VALUE obj;
170 rb_mutex_t *mutex;
171
172 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
173
174 ccan_list_head_init(&mutex->waitq);
175 return obj;
176}
177
178/*
179 * call-seq:
180 * Thread::Mutex.new -> mutex
181 *
182 * Creates a new Mutex
183 */
184static VALUE
185mutex_initialize(VALUE self)
186{
187 return self;
188}
189
190VALUE
192{
193 return mutex_alloc(rb_cMutex);
194}
195
196/*
197 * call-seq:
198 * mutex.locked? -> true or false
199 *
200 * Returns +true+ if this lock is currently held by some thread.
201 */
202VALUE
204{
205 rb_mutex_t *mutex = mutex_ptr(self);
206
207 return RBOOL(mutex->fiber);
208}
209
210static void
211thread_mutex_insert(rb_thread_t *thread, rb_mutex_t *mutex)
212{
213 if (thread->keeping_mutexes) {
214 mutex->next_mutex = thread->keeping_mutexes;
215 }
216
217 thread->keeping_mutexes = mutex;
218}
219
220static void
221thread_mutex_remove(rb_thread_t *thread, rb_mutex_t *mutex)
222{
223 rb_mutex_t **keeping_mutexes = &thread->keeping_mutexes;
224
225 while (*keeping_mutexes && *keeping_mutexes != mutex) {
226 // Move to the next mutex in the list:
227 keeping_mutexes = &(*keeping_mutexes)->next_mutex;
228 }
229
230 if (*keeping_mutexes) {
231 *keeping_mutexes = mutex->next_mutex;
232 mutex->next_mutex = NULL;
233 }
234}
235
236static void
237mutex_locked(rb_thread_t *th, VALUE self)
238{
239 rb_mutex_t *mutex = mutex_ptr(self);
240
241 thread_mutex_insert(th, mutex);
242}
243
244/*
245 * call-seq:
246 * mutex.try_lock -> true or false
247 *
248 * Attempts to obtain the lock and returns immediately. Returns +true+ if the
249 * lock was granted.
250 */
251VALUE
253{
254 rb_mutex_t *mutex = mutex_ptr(self);
255
256 if (mutex->fiber == 0) {
257 RUBY_DEBUG_LOG("%p ok", mutex);
258
259 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
260 rb_thread_t *th = GET_THREAD();
261 mutex->fiber = fiber;
262
263 mutex_locked(th, self);
264 return Qtrue;
265 }
266 else {
267 RUBY_DEBUG_LOG("%p ng", mutex);
268 return Qfalse;
269 }
270}
271
272static VALUE
273mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
274{
275 return RBOOL(mutex->fiber == fiber);
276}
277
278static VALUE
279call_rb_fiber_scheduler_block(VALUE mutex)
280{
282}
283
284static VALUE
285delete_from_waitq(VALUE value)
286{
287 struct sync_waiter *sync_waiter = (void *)value;
288 ccan_list_del(&sync_waiter->node);
289
290 return Qnil;
291}
292
293static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th);
294
295static VALUE
296do_mutex_lock(VALUE self, int interruptible_p)
297{
298 rb_execution_context_t *ec = GET_EC();
299 rb_thread_t *th = ec->thread_ptr;
300 rb_fiber_t *fiber = ec->fiber_ptr;
301 rb_mutex_t *mutex = mutex_ptr(self);
302 rb_atomic_t saved_ints = 0;
303
304 /* When running trap handler */
305 if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
306 th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
307 rb_raise(rb_eThreadError, "can't be called from trap context");
308 }
309
310 if (rb_mutex_trylock(self) == Qfalse) {
311 if (mutex->fiber == fiber) {
312 rb_raise(rb_eThreadError, "deadlock; recursive locking");
313 }
314
315 while (mutex->fiber != fiber) {
316 VM_ASSERT(mutex->fiber != NULL);
317
318 VALUE scheduler = rb_fiber_scheduler_current();
319 if (scheduler != Qnil) {
320 struct sync_waiter sync_waiter = {
321 .self = self,
322 .th = th,
323 .fiber = nonblocking_fiber(fiber)
324 };
325
326 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
327
328 rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)&sync_waiter);
329
330 if (!mutex->fiber) {
331 mutex->fiber = fiber;
332 }
333 }
334 else {
335 if (!th->vm->thread_ignore_deadlock && rb_fiber_threadptr(mutex->fiber) == th) {
336 rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread");
337 }
338
339 struct sync_waiter sync_waiter = {
340 .self = self,
341 .th = th,
342 .fiber = nonblocking_fiber(fiber),
343 };
344
345 RUBY_DEBUG_LOG("%p wait", mutex);
346
347 // similar code with `sleep_forever`, but
348 // sleep_forever(SLEEP_DEADLOCKABLE) raises an exception.
349 // Ensure clause is needed like but `rb_ensure` a bit slow.
350 //
351 // begin
352 // sleep_forever(th, SLEEP_DEADLOCKABLE);
353 // ensure
354 // ccan_list_del(&sync_waiter.node);
355 // end
356 enum rb_thread_status prev_status = th->status;
357 th->status = THREAD_STOPPED_FOREVER;
358 rb_ractor_sleeper_threads_inc(th->ractor);
359 rb_check_deadlock(th->ractor);
360
361 th->locking_mutex = self;
362
363 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
364 {
365 native_sleep(th, NULL);
366 }
367 ccan_list_del(&sync_waiter.node);
368
369 // unlocked by another thread while sleeping
370 if (!mutex->fiber) {
371 mutex->fiber = fiber;
372 }
373
374 rb_ractor_sleeper_threads_dec(th->ractor);
375 th->status = prev_status;
376 th->locking_mutex = Qfalse;
377
378 RUBY_DEBUG_LOG("%p wakeup", mutex);
379 }
380
381 if (interruptible_p) {
382 /* release mutex before checking for interrupts...as interrupt checking
383 * code might call rb_raise() */
384 if (mutex->fiber == fiber) mutex->fiber = 0;
385 RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
386 if (!mutex->fiber) {
387 mutex->fiber = fiber;
388 }
389 }
390 else {
391 // clear interrupt information
392 if (RUBY_VM_INTERRUPTED(th->ec)) {
393 // reset interrupts
394 if (saved_ints == 0) {
395 saved_ints = threadptr_get_interrupts(th);
396 }
397 else {
398 // ignore additional interrupts
399 threadptr_get_interrupts(th);
400 }
401 }
402 }
403 }
404
405 if (saved_ints) th->ec->interrupt_flag = saved_ints;
406 if (mutex->fiber == fiber) mutex_locked(th, self);
407 }
408
409 RUBY_DEBUG_LOG("%p locked", mutex);
410
411 // assertion
412 if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
413
414 return self;
415}
416
417static VALUE
418mutex_lock_uninterruptible(VALUE self)
419{
420 return do_mutex_lock(self, 0);
421}
422
423/*
424 * call-seq:
425 * mutex.lock -> self
426 *
427 * Attempts to grab the lock and waits if it isn't available.
428 * Raises +ThreadError+ if +mutex+ was locked by the current thread.
429 */
430VALUE
432{
433 return do_mutex_lock(self, 1);
434}
435
436/*
437 * call-seq:
438 * mutex.owned? -> true or false
439 *
440 * Returns +true+ if this lock is currently held by current thread.
441 */
442VALUE
443rb_mutex_owned_p(VALUE self)
444{
445 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
446 rb_mutex_t *mutex = mutex_ptr(self);
447
448 return mutex_owned_p(fiber, mutex);
449}
450
451static const char *
452rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
453{
454 RUBY_DEBUG_LOG("%p", mutex);
455
456 if (mutex->fiber == 0) {
457 return "Attempt to unlock a mutex which is not locked";
458 }
459 else if (mutex->fiber != fiber) {
460 return "Attempt to unlock a mutex which is locked by another thread/fiber";
461 }
462
463 struct sync_waiter *cur = 0, *next;
464
465 mutex->fiber = 0;
466 thread_mutex_remove(th, mutex);
467
468 ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
469 ccan_list_del_init(&cur->node);
470
471 if (cur->th->scheduler != Qnil && cur->fiber) {
472 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
473 return NULL;
474 }
475 else {
476 switch (cur->th->status) {
477 case THREAD_RUNNABLE: /* from someone else calling Thread#run */
478 case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
479 RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th));
480 rb_threadptr_interrupt(cur->th);
481 return NULL;
482 case THREAD_STOPPED: /* probably impossible */
483 rb_bug("unexpected THREAD_STOPPED");
484 case THREAD_KILLED:
485 /* not sure about this, possible in exit GC? */
486 rb_bug("unexpected THREAD_KILLED");
487 continue;
488 }
489 }
490 }
491
492 // We did not find any threads to wake up, so we can just return with no error:
493 return NULL;
494}
495
496/*
497 * call-seq:
498 * mutex.unlock -> self
499 *
500 * Releases the lock.
501 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
502 */
503VALUE
505{
506 const char *err;
507 rb_mutex_t *mutex = mutex_ptr(self);
508 rb_thread_t *th = GET_THREAD();
509
510 err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr);
511 if (err) rb_raise(rb_eThreadError, "%s", err);
512
513 return self;
514}
515
516#if defined(HAVE_WORKING_FORK)
517static void
518rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
519{
520 rb_mutex_abandon_all(th->keeping_mutexes);
521 th->keeping_mutexes = NULL;
522}
523
524static void
525rb_mutex_abandon_locking_mutex(rb_thread_t *th)
526{
527 if (th->locking_mutex) {
528 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
529
530 ccan_list_head_init(&mutex->waitq);
531 th->locking_mutex = Qfalse;
532 }
533}
534
535static void
536rb_mutex_abandon_all(rb_mutex_t *mutexes)
537{
538 rb_mutex_t *mutex;
539
540 while (mutexes) {
541 mutex = mutexes;
542 mutexes = mutex->next_mutex;
543 mutex->fiber = 0;
544 mutex->next_mutex = 0;
545 ccan_list_head_init(&mutex->waitq);
546 }
547}
548#endif
549
551 VALUE self;
552 VALUE timeout;
553};
554
555static VALUE
556mutex_sleep_begin(VALUE _arguments)
557{
558 struct rb_mutex_sleep_arguments *arguments = (struct rb_mutex_sleep_arguments *)_arguments;
559 VALUE timeout = arguments->timeout;
560 VALUE woken = Qtrue;
561
562 VALUE scheduler = rb_fiber_scheduler_current();
563 if (scheduler != Qnil) {
564 rb_fiber_scheduler_kernel_sleep(scheduler, timeout);
565 }
566 else {
567 if (NIL_P(timeout)) {
568 rb_thread_sleep_deadly_allow_spurious_wakeup(arguments->self, Qnil, 0);
569 }
570 else {
571 struct timeval timeout_value = rb_time_interval(timeout);
572 rb_hrtime_t relative_timeout = rb_timeval2hrtime(&timeout_value);
573 /* permit spurious check */
574 woken = RBOOL(sleep_hrtime(GET_THREAD(), relative_timeout, 0));
575 }
576 }
577
578 return woken;
579}
580
581VALUE
583{
584 if (!NIL_P(timeout)) {
585 // Validate the argument:
586 rb_time_interval(timeout);
587 }
588
589 rb_mutex_unlock(self);
590 time_t beg = time(0);
591
592 struct rb_mutex_sleep_arguments arguments = {
593 .self = self,
594 .timeout = timeout,
595 };
596
597 VALUE woken = rb_ensure(mutex_sleep_begin, (VALUE)&arguments, mutex_lock_uninterruptible, self);
598
599 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
600 if (!woken) return Qnil;
601 time_t end = time(0) - beg;
602 return TIMET2NUM(end);
603}
604
605/*
606 * call-seq:
607 * mutex.sleep(timeout = nil) -> number or nil
608 *
609 * Releases the lock and sleeps +timeout+ seconds if it is given and
610 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
611 * the current thread.
612 *
613 * When the thread is next woken up, it will attempt to reacquire
614 * the lock.
615 *
616 * Note that this method can wakeup without explicit Thread#wakeup call.
617 * For example, receiving signal and so on.
618 *
619 * Returns the slept time in seconds if woken up, or +nil+ if timed out.
620 */
621static VALUE
622mutex_sleep(int argc, VALUE *argv, VALUE self)
623{
624 VALUE timeout;
625
626 timeout = rb_check_arity(argc, 0, 1) ? argv[0] : Qnil;
627 return rb_mutex_sleep(self, timeout);
628}
629
630/*
631 * call-seq:
632 * mutex.synchronize { ... } -> result of the block
633 *
634 * Obtains a lock, runs the block, and releases the lock when the block
635 * completes. See the example under Thread::Mutex.
636 */
637
638VALUE
639rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
640{
641 rb_mutex_lock(mutex);
642 return rb_ensure(func, arg, rb_mutex_unlock, mutex);
643}
644
645/*
646 * call-seq:
647 * mutex.synchronize { ... } -> result of the block
648 *
649 * Obtains a lock, runs the block, and releases the lock when the block
650 * completes. See the example under Thread::Mutex.
651 */
652static VALUE
653rb_mutex_synchronize_m(VALUE self)
654{
655 if (!rb_block_given_p()) {
656 rb_raise(rb_eThreadError, "must be called with a block");
657 }
658
659 return rb_mutex_synchronize(self, rb_yield, Qundef);
660}
661
662void
663rb_mutex_allow_trap(VALUE self, int val)
664{
665 Check_TypedStruct(self, &mutex_data_type);
666
667 if (val)
668 FL_SET_RAW(self, MUTEX_ALLOW_TRAP);
669 else
670 FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP);
671}
672
673/* Queue */
674
675#define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
676#define queue_list(q) UNALIGNED_MEMBER_PTR(q, que)
677RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
678struct rb_queue {
679 struct ccan_list_head waitq;
680 rb_serial_t fork_gen;
681 const VALUE que;
682 int num_waiting;
683} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
684
685#define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
686#define szqueue_list(sq) UNALIGNED_MEMBER_PTR(sq, q.que)
687#define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
688RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
690 struct rb_queue q;
691 int num_waiting_push;
692 struct ccan_list_head pushq;
693 long max;
694} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
695
696static void
697queue_mark(void *ptr)
698{
699 struct rb_queue *q = ptr;
700
701 /* no need to mark threads in waitq, they are on stack */
702 rb_gc_mark(q->que);
703}
704
705static size_t
706queue_memsize(const void *ptr)
707{
708 return sizeof(struct rb_queue);
709}
710
711static const rb_data_type_t queue_data_type = {
712 "queue",
713 {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
714 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
715};
716
717static VALUE
718queue_alloc(VALUE klass)
719{
720 VALUE obj;
721 struct rb_queue *q;
722
723 obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
724 ccan_list_head_init(queue_waitq(q));
725 return obj;
726}
727
728static int
729queue_fork_check(struct rb_queue *q)
730{
731 rb_serial_t fork_gen = GET_VM()->fork_gen;
732
733 if (q->fork_gen == fork_gen) {
734 return 0;
735 }
736 /* forked children can't reach into parent thread stacks */
737 q->fork_gen = fork_gen;
738 ccan_list_head_init(queue_waitq(q));
739 q->num_waiting = 0;
740 return 1;
741}
742
743static struct rb_queue *
744queue_ptr(VALUE obj)
745{
746 struct rb_queue *q;
747
748 TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
749 queue_fork_check(q);
750
751 return q;
752}
753
754#define QUEUE_CLOSED FL_USER5
755
756static rb_hrtime_t
757queue_timeout2hrtime(VALUE timeout)
758{
759 if (NIL_P(timeout)) {
760 return (rb_hrtime_t)0;
761 }
762 rb_hrtime_t rel = 0;
763 if (FIXNUM_P(timeout)) {
764 rel = rb_sec2hrtime(NUM2TIMET(timeout));
765 }
766 else {
767 double2hrtime(&rel, rb_num2dbl(timeout));
768 }
769 return rb_hrtime_add(rel, rb_hrtime_now());
770}
771
772static void
773szqueue_mark(void *ptr)
774{
775 struct rb_szqueue *sq = ptr;
776
777 queue_mark(&sq->q);
778}
779
780static size_t
781szqueue_memsize(const void *ptr)
782{
783 return sizeof(struct rb_szqueue);
784}
785
786static const rb_data_type_t szqueue_data_type = {
787 "sized_queue",
788 {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
789 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
790};
791
792static VALUE
793szqueue_alloc(VALUE klass)
794{
795 struct rb_szqueue *sq;
796 VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
797 &szqueue_data_type, sq);
798 ccan_list_head_init(szqueue_waitq(sq));
799 ccan_list_head_init(szqueue_pushq(sq));
800 return obj;
801}
802
803static struct rb_szqueue *
804szqueue_ptr(VALUE obj)
805{
806 struct rb_szqueue *sq;
807
808 TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
809 if (queue_fork_check(&sq->q)) {
810 ccan_list_head_init(szqueue_pushq(sq));
811 sq->num_waiting_push = 0;
812 }
813
814 return sq;
815}
816
817static VALUE
818ary_buf_new(void)
819{
820 return rb_ary_hidden_new(1);
821}
822
823static VALUE
824check_array(VALUE obj, VALUE ary)
825{
826 if (!RB_TYPE_P(ary, T_ARRAY)) {
827 rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
828 }
829 return ary;
830}
831
832static long
833queue_length(VALUE self, struct rb_queue *q)
834{
835 return RARRAY_LEN(check_array(self, q->que));
836}
837
838static int
839queue_closed_p(VALUE self)
840{
841 return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
842}
843
844/*
845 * Document-class: ClosedQueueError
846 *
847 * The exception class which will be raised when pushing into a closed
848 * Queue. See Thread::Queue#close and Thread::SizedQueue#close.
849 */
850
851NORETURN(static void raise_closed_queue_error(VALUE self));
852
853static void
854raise_closed_queue_error(VALUE self)
855{
856 rb_raise(rb_eClosedQueueError, "queue closed");
857}
858
859static VALUE
860queue_closed_result(VALUE self, struct rb_queue *q)
861{
862 RUBY_ASSERT(queue_length(self, q) == 0);
863 return Qnil;
864}
865
866/*
867 * Document-class: Thread::Queue
868 *
869 * The Thread::Queue class implements multi-producer, multi-consumer
870 * queues. It is especially useful in threaded programming when
871 * information must be exchanged safely between multiple threads. The
872 * Thread::Queue class implements all the required locking semantics.
873 *
874 * The class implements FIFO (first in, first out) type of queue.
875 * In a FIFO queue, the first tasks added are the first retrieved.
876 *
877 * Example:
878 *
879 * queue = Thread::Queue.new
880 *
881 * producer = Thread.new do
882 * 5.times do |i|
883 * sleep rand(i) # simulate expense
884 * queue << i
885 * puts "#{i} produced"
886 * end
887 * end
888 *
889 * consumer = Thread.new do
890 * 5.times do |i|
891 * value = queue.pop
892 * sleep rand(i/2) # simulate expense
893 * puts "consumed #{value}"
894 * end
895 * end
896 *
897 * consumer.join
898 *
899 */
900
901/*
902 * Document-method: Queue::new
903 *
904 * call-seq:
905 * Thread::Queue.new -> empty_queue
906 * Thread::Queue.new(enumerable) -> queue
907 *
908 * Creates a new queue instance, optionally using the contents of an +enumerable+
909 * for its initial state.
910 *
911 * Example:
912 *
913 * q = Thread::Queue.new
914 * #=> #<Thread::Queue:0x00007ff7501110d0>
915 * q.empty?
916 * #=> true
917 *
918 * q = Thread::Queue.new([1, 2, 3])
919 * #=> #<Thread::Queue:0x00007ff7500ec500>
920 * q.empty?
921 * #=> false
922 * q.pop
923 * #=> 1
924 */
925
926static VALUE
927rb_queue_initialize(int argc, VALUE *argv, VALUE self)
928{
929 VALUE initial;
930 struct rb_queue *q = queue_ptr(self);
931 if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
932 initial = rb_to_array(initial);
933 }
934 RB_OBJ_WRITE(self, queue_list(q), ary_buf_new());
935 ccan_list_head_init(queue_waitq(q));
936 if (argc == 1) {
937 rb_ary_concat(q->que, initial);
938 }
939 return self;
940}
941
942static VALUE
943queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
944{
945 if (queue_closed_p(self)) {
946 raise_closed_queue_error(self);
947 }
948 rb_ary_push(check_array(self, q->que), obj);
949 wakeup_one(queue_waitq(q));
950 return self;
951}
952
953/*
954 * Document-method: Thread::Queue#close
955 * call-seq:
956 * close
957 *
958 * Closes the queue. A closed queue cannot be re-opened.
959 *
960 * After the call to close completes, the following are true:
961 *
962 * - +closed?+ will return true
963 *
964 * - +close+ will be ignored.
965 *
966 * - calling enq/push/<< will raise a +ClosedQueueError+.
967 *
968 * - when +empty?+ is false, calling deq/pop/shift will return an object
969 * from the queue as usual.
970 * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
971 * deq(true) will raise a +ThreadError+.
972 *
973 * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
974 *
975 * Example:
976 *
977 * q = Thread::Queue.new
978 * Thread.new{
979 * while e = q.deq # wait for nil to break loop
980 * # ...
981 * end
982 * }
983 * q.close
984 */
985
986static VALUE
987rb_queue_close(VALUE self)
988{
989 struct rb_queue *q = queue_ptr(self);
990
991 if (!queue_closed_p(self)) {
992 FL_SET(self, QUEUE_CLOSED);
993
994 wakeup_all(queue_waitq(q));
995 }
996
997 return self;
998}
999
1000/*
1001 * Document-method: Thread::Queue#closed?
1002 * call-seq: closed?
1003 *
1004 * Returns +true+ if the queue is closed.
1005 */
1006
1007static VALUE
1008rb_queue_closed_p(VALUE self)
1009{
1010 return RBOOL(queue_closed_p(self));
1011}
1012
1013/*
1014 * Document-method: Thread::Queue#push
1015 * call-seq:
1016 * push(object)
1017 * enq(object)
1018 * <<(object)
1019 *
1020 * Pushes the given +object+ to the queue.
1021 */
1022
1023static VALUE
1024rb_queue_push(VALUE self, VALUE obj)
1025{
1026 return queue_do_push(self, queue_ptr(self), obj);
1027}
1028
1029static VALUE
1030queue_sleep(VALUE _args)
1031{
1032 struct queue_sleep_arg *args = (struct queue_sleep_arg *)_args;
1033 rb_thread_sleep_deadly_allow_spurious_wakeup(args->self, args->timeout, args->end);
1034 return Qnil;
1035}
1036
1038 struct sync_waiter w;
1039 union {
1040 struct rb_queue *q;
1041 struct rb_szqueue *sq;
1042 } as;
1043};
1044
1045static VALUE
1046queue_sleep_done(VALUE p)
1047{
1048 struct queue_waiter *qw = (struct queue_waiter *)p;
1049
1050 ccan_list_del(&qw->w.node);
1051 qw->as.q->num_waiting--;
1052
1053 return Qfalse;
1054}
1055
1056static VALUE
1057szqueue_sleep_done(VALUE p)
1058{
1059 struct queue_waiter *qw = (struct queue_waiter *)p;
1060
1061 ccan_list_del(&qw->w.node);
1062 qw->as.sq->num_waiting_push--;
1063
1064 return Qfalse;
1065}
1066
1067static VALUE
1068queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout)
1069{
1070 check_array(self, q->que);
1071 if (RARRAY_LEN(q->que) == 0) {
1072 if (!should_block) {
1073 rb_raise(rb_eThreadError, "queue empty");
1074 }
1075
1076 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1077 return Qnil;
1078 }
1079 }
1080
1081 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1082 while (RARRAY_LEN(q->que) == 0) {
1083 if (queue_closed_p(self)) {
1084 return queue_closed_result(self, q);
1085 }
1086 else {
1087 rb_execution_context_t *ec = GET_EC();
1088
1089 RUBY_ASSERT(RARRAY_LEN(q->que) == 0);
1090 RUBY_ASSERT(queue_closed_p(self) == 0);
1091
1092 struct queue_waiter queue_waiter = {
1093 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1094 .as = {.q = q}
1095 };
1096
1097 struct ccan_list_head *waitq = queue_waitq(q);
1098
1099 ccan_list_add_tail(waitq, &queue_waiter.w.node);
1100 queue_waiter.as.q->num_waiting++;
1101
1103 .self = self,
1104 .timeout = timeout,
1105 .end = end
1106 };
1107
1108 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, queue_sleep_done, (VALUE)&queue_waiter);
1109 if (!NIL_P(timeout) && (rb_hrtime_now() >= end))
1110 break;
1111 }
1112 }
1113
1114 return rb_ary_shift(q->que);
1115}
1116
1117static VALUE
1118rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1119{
1120 return queue_do_pop(self, queue_ptr(self), !RTEST(non_block), timeout);
1121}
1122
1123/*
1124 * Document-method: Thread::Queue#empty?
1125 * call-seq: empty?
1126 *
1127 * Returns +true+ if the queue is empty.
1128 */
1129
1130static VALUE
1131rb_queue_empty_p(VALUE self)
1132{
1133 return RBOOL(queue_length(self, queue_ptr(self)) == 0);
1134}
1135
1136/*
1137 * Document-method: Thread::Queue#clear
1138 *
1139 * Removes all objects from the queue.
1140 */
1141
1142static VALUE
1143rb_queue_clear(VALUE self)
1144{
1145 struct rb_queue *q = queue_ptr(self);
1146
1147 rb_ary_clear(check_array(self, q->que));
1148 return self;
1149}
1150
1151/*
1152 * Document-method: Thread::Queue#length
1153 * call-seq:
1154 * length
1155 * size
1156 *
1157 * Returns the length of the queue.
1158 */
1159
1160static VALUE
1161rb_queue_length(VALUE self)
1162{
1163 return LONG2NUM(queue_length(self, queue_ptr(self)));
1164}
1165
1166NORETURN(static VALUE rb_queue_freeze(VALUE self));
1167/*
1168 * call-seq:
1169 * freeze
1170 *
1171 * The queue can't be frozen, so this method raises an exception:
1172 * Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
1173 *
1174 */
1175static VALUE
1176rb_queue_freeze(VALUE self)
1177{
1178 rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self);
1179 UNREACHABLE_RETURN(self);
1180}
1181
1182/*
1183 * Document-method: Thread::Queue#num_waiting
1184 *
1185 * Returns the number of threads waiting on the queue.
1186 */
1187
1188static VALUE
1189rb_queue_num_waiting(VALUE self)
1190{
1191 struct rb_queue *q = queue_ptr(self);
1192
1193 return INT2NUM(q->num_waiting);
1194}
1195
1196/*
1197 * Document-class: Thread::SizedQueue
1198 *
1199 * This class represents queues of specified size capacity. The push operation
1200 * may be blocked if the capacity is full.
1201 *
1202 * See Thread::Queue for an example of how a Thread::SizedQueue works.
1203 */
1204
1205/*
1206 * Document-method: SizedQueue::new
1207 * call-seq: new(max)
1208 *
1209 * Creates a fixed-length queue with a maximum size of +max+.
1210 */
1211
1212static VALUE
1213rb_szqueue_initialize(VALUE self, VALUE vmax)
1214{
1215 long max;
1216 struct rb_szqueue *sq = szqueue_ptr(self);
1217
1218 max = NUM2LONG(vmax);
1219 if (max <= 0) {
1220 rb_raise(rb_eArgError, "queue size must be positive");
1221 }
1222
1223 RB_OBJ_WRITE(self, szqueue_list(sq), ary_buf_new());
1224 ccan_list_head_init(szqueue_waitq(sq));
1225 ccan_list_head_init(szqueue_pushq(sq));
1226 sq->max = max;
1227
1228 return self;
1229}
1230
1231/*
1232 * Document-method: Thread::SizedQueue#close
1233 * call-seq:
1234 * close
1235 *
1236 * Similar to Thread::Queue#close.
1237 *
1238 * The difference is behavior with waiting enqueuing threads.
1239 *
1240 * If there are waiting enqueuing threads, they are interrupted by
1241 * raising ClosedQueueError('queue closed').
1242 */
1243static VALUE
1244rb_szqueue_close(VALUE self)
1245{
1246 if (!queue_closed_p(self)) {
1247 struct rb_szqueue *sq = szqueue_ptr(self);
1248
1249 FL_SET(self, QUEUE_CLOSED);
1250 wakeup_all(szqueue_waitq(sq));
1251 wakeup_all(szqueue_pushq(sq));
1252 }
1253 return self;
1254}
1255
1256/*
1257 * Document-method: Thread::SizedQueue#max
1258 *
1259 * Returns the maximum size of the queue.
1260 */
1261
1262static VALUE
1263rb_szqueue_max_get(VALUE self)
1264{
1265 return LONG2NUM(szqueue_ptr(self)->max);
1266}
1267
1268/*
1269 * Document-method: Thread::SizedQueue#max=
1270 * call-seq: max=(number)
1271 *
1272 * Sets the maximum size of the queue to the given +number+.
1273 */
1274
1275static VALUE
1276rb_szqueue_max_set(VALUE self, VALUE vmax)
1277{
1278 long max = NUM2LONG(vmax);
1279 long diff = 0;
1280 struct rb_szqueue *sq = szqueue_ptr(self);
1281
1282 if (max <= 0) {
1283 rb_raise(rb_eArgError, "queue size must be positive");
1284 }
1285 if (max > sq->max) {
1286 diff = max - sq->max;
1287 }
1288 sq->max = max;
1289 sync_wakeup(szqueue_pushq(sq), diff);
1290 return vmax;
1291}
1292
1293static VALUE
1294rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_block, VALUE timeout)
1295{
1296 struct rb_szqueue *sq = szqueue_ptr(self);
1297
1298 if (queue_length(self, &sq->q) >= sq->max) {
1299 if (RTEST(non_block)) {
1300 rb_raise(rb_eThreadError, "queue full");
1301 }
1302
1303 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1304 return Qnil;
1305 }
1306 }
1307
1308 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1309 while (queue_length(self, &sq->q) >= sq->max) {
1310 if (queue_closed_p(self)) {
1311 raise_closed_queue_error(self);
1312 }
1313 else {
1314 rb_execution_context_t *ec = GET_EC();
1315 struct queue_waiter queue_waiter = {
1316 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1317 .as = {.sq = sq}
1318 };
1319
1320 struct ccan_list_head *pushq = szqueue_pushq(sq);
1321
1322 ccan_list_add_tail(pushq, &queue_waiter.w.node);
1323 sq->num_waiting_push++;
1324
1326 .self = self,
1327 .timeout = timeout,
1328 .end = end
1329 };
1330 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, szqueue_sleep_done, (VALUE)&queue_waiter);
1331 if (!NIL_P(timeout) && rb_hrtime_now() >= end) {
1332 return Qnil;
1333 }
1334 }
1335 }
1336
1337 return queue_do_push(self, &sq->q, object);
1338}
1339
1340static VALUE
1341szqueue_do_pop(VALUE self, int should_block, VALUE timeout)
1342{
1343 struct rb_szqueue *sq = szqueue_ptr(self);
1344 VALUE retval = queue_do_pop(self, &sq->q, should_block, timeout);
1345
1346 if (queue_length(self, &sq->q) < sq->max) {
1347 wakeup_one(szqueue_pushq(sq));
1348 }
1349
1350 return retval;
1351}
1352static VALUE
1353rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1354{
1355 return szqueue_do_pop(self, !RTEST(non_block), timeout);
1356}
1357
1358/*
1359 * Document-method: Thread::SizedQueue#clear
1360 *
1361 * Removes all objects from the queue.
1362 */
1363
1364static VALUE
1365rb_szqueue_clear(VALUE self)
1366{
1367 struct rb_szqueue *sq = szqueue_ptr(self);
1368
1369 rb_ary_clear(check_array(self, sq->q.que));
1370 wakeup_all(szqueue_pushq(sq));
1371 return self;
1372}
1373
1374/*
1375 * Document-method: Thread::SizedQueue#length
1376 * call-seq:
1377 * length
1378 * size
1379 *
1380 * Returns the length of the queue.
1381 */
1382
1383static VALUE
1384rb_szqueue_length(VALUE self)
1385{
1386 struct rb_szqueue *sq = szqueue_ptr(self);
1387
1388 return LONG2NUM(queue_length(self, &sq->q));
1389}
1390
1391/*
1392 * Document-method: Thread::SizedQueue#num_waiting
1393 *
1394 * Returns the number of threads waiting on the queue.
1395 */
1396
1397static VALUE
1398rb_szqueue_num_waiting(VALUE self)
1399{
1400 struct rb_szqueue *sq = szqueue_ptr(self);
1401
1402 return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1403}
1404
1405/*
1406 * Document-method: Thread::SizedQueue#empty?
1407 * call-seq: empty?
1408 *
1409 * Returns +true+ if the queue is empty.
1410 */
1411
1412static VALUE
1413rb_szqueue_empty_p(VALUE self)
1414{
1415 struct rb_szqueue *sq = szqueue_ptr(self);
1416
1417 return RBOOL(queue_length(self, &sq->q) == 0);
1418}
1419
1420
1421/* ConditionalVariable */
1423 struct ccan_list_head waitq;
1424 rb_serial_t fork_gen;
1425};
1426
1427/*
1428 * Document-class: Thread::ConditionVariable
1429 *
1430 * ConditionVariable objects augment class Mutex. Using condition variables,
1431 * it is possible to suspend while in the middle of a critical section until a
1432 * condition is met, such as a resource becomes available.
1433 *
1434 * Due to non-deterministic scheduling and spurious wake-ups, users of
1435 * condition variables should always use a separate boolean predicate (such as
1436 * reading from a boolean variable) to check if the condition is actually met
1437 * before starting to wait, and should wait in a loop, re-checking the
1438 * condition every time the ConditionVariable is waken up. The idiomatic way
1439 * of using condition variables is calling the +wait+ method in an +until+
1440 * loop with the predicate as the loop condition.
1441 *
1442 * condvar.wait(mutex) until condition_is_met
1443 *
1444 * In the example below, we use the boolean variable +resource_available+
1445 * (which is protected by +mutex+) to indicate the availability of the
1446 * resource, and use +condvar+ to wait for that variable to become true. Note
1447 * that:
1448 *
1449 * 1. Thread +b+ may be scheduled before thread +a1+ and +a2+, and may run so
1450 * fast that it have already made the resource available before either
1451 * +a1+ or +a2+ starts. Therefore, +a1+ and +a2+ should check if
1452 * +resource_available+ is already true before starting to wait.
1453 * 2. The +wait+ method may spuriously wake up without signalling. Therefore,
1454 * thread +a1+ and +a2+ should recheck +resource_available+ after the
1455 * +wait+ method returns, and go back to wait if the condition is not
1456 * actually met.
1457 * 3. It is possible that thread +a2+ starts right after thread +a1+ is waken
1458 * up by +b+. Thread +a2+ may have acquired the +mutex+ and consumed the
1459 * resource before thread +a1+ acquires the +mutex+. This necessitates
1460 * rechecking after +wait+, too.
1461 *
1462 * Example:
1463 *
1464 * mutex = Thread::Mutex.new
1465 *
1466 * resource_available = false
1467 * condvar = Thread::ConditionVariable.new
1468 *
1469 * a1 = Thread.new {
1470 * # Thread 'a1' waits for the resource to become available and consumes
1471 * # the resource.
1472 * mutex.synchronize {
1473 * condvar.wait(mutex) until resource_available
1474 * # After the loop, 'resource_available' is guaranteed to be true.
1475 *
1476 * resource_available = false
1477 * puts "a1 consumed the resource"
1478 * }
1479 * }
1480 *
1481 * a2 = Thread.new {
1482 * # Thread 'a2' behaves like 'a1'.
1483 * mutex.synchronize {
1484 * condvar.wait(mutex) until resource_available
1485 * resource_available = false
1486 * puts "a2 consumed the resource"
1487 * }
1488 * }
1489 *
1490 * b = Thread.new {
1491 * # Thread 'b' periodically makes the resource available.
1492 * loop {
1493 * mutex.synchronize {
1494 * resource_available = true
1495 *
1496 * # Notify one waiting thread if any. It is possible that neither
1497 * # 'a1' nor 'a2 is waiting on 'condvar' at this moment. That's OK.
1498 * condvar.signal
1499 * }
1500 * sleep 1
1501 * }
1502 * }
1503 *
1504 * # Eventually both 'a1' and 'a2' will have their resources, albeit in an
1505 * # unspecified order.
1506 * [a1, a2].each {|th| th.join}
1507 */
1508
1509static size_t
1510condvar_memsize(const void *ptr)
1511{
1512 return sizeof(struct rb_condvar);
1513}
1514
1515static const rb_data_type_t cv_data_type = {
1516 "condvar",
1517 {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
1518 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
1519};
1520
1521static struct rb_condvar *
1522condvar_ptr(VALUE self)
1523{
1524 struct rb_condvar *cv;
1525 rb_serial_t fork_gen = GET_VM()->fork_gen;
1526
1527 TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
1528
1529 /* forked children can't reach into parent thread stacks */
1530 if (cv->fork_gen != fork_gen) {
1531 cv->fork_gen = fork_gen;
1532 ccan_list_head_init(&cv->waitq);
1533 }
1534
1535 return cv;
1536}
1537
1538static VALUE
1539condvar_alloc(VALUE klass)
1540{
1541 struct rb_condvar *cv;
1542 VALUE obj;
1543
1544 obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
1545 ccan_list_head_init(&cv->waitq);
1546
1547 return obj;
1548}
1549
1550/*
1551 * Document-method: ConditionVariable::new
1552 *
1553 * Creates a new condition variable instance.
1554 */
1555
1556static VALUE
1557rb_condvar_initialize(VALUE self)
1558{
1559 struct rb_condvar *cv = condvar_ptr(self);
1560 ccan_list_head_init(&cv->waitq);
1561 return self;
1562}
1563
1565 VALUE mutex;
1566 VALUE timeout;
1567};
1568
1569static ID id_sleep;
1570
1571static VALUE
1572do_sleep(VALUE args)
1573{
1574 struct sleep_call *p = (struct sleep_call *)args;
1575 return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1576}
1577
1578/*
1579 * Document-method: Thread::ConditionVariable#wait
1580 * call-seq: wait(mutex, timeout=nil)
1581 *
1582 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
1583 *
1584 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
1585 * even if no other thread doesn't signal.
1586 *
1587 * This method may wake up spuriously due to underlying implementation details.
1588 *
1589 * Returns the slept result on +mutex+.
1590 */
1591
1592static VALUE
1593rb_condvar_wait(int argc, VALUE *argv, VALUE self)
1594{
1595 rb_execution_context_t *ec = GET_EC();
1596
1597 struct rb_condvar *cv = condvar_ptr(self);
1598 struct sleep_call args;
1599
1600 rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
1601
1602 struct sync_waiter sync_waiter = {
1603 .self = args.mutex,
1604 .th = ec->thread_ptr,
1605 .fiber = nonblocking_fiber(ec->fiber_ptr)
1606 };
1607
1608 ccan_list_add_tail(&cv->waitq, &sync_waiter.node);
1609 return rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&sync_waiter);
1610}
1611
1612/*
1613 * Document-method: Thread::ConditionVariable#signal
1614 *
1615 * Wakes up the first thread in line waiting for this lock.
1616 */
1617
1618static VALUE
1619rb_condvar_signal(VALUE self)
1620{
1621 struct rb_condvar *cv = condvar_ptr(self);
1622 wakeup_one(&cv->waitq);
1623 return self;
1624}
1625
1626/*
1627 * Document-method: Thread::ConditionVariable#broadcast
1628 *
1629 * Wakes up all threads waiting for this lock.
1630 */
1631
1632static VALUE
1633rb_condvar_broadcast(VALUE self)
1634{
1635 struct rb_condvar *cv = condvar_ptr(self);
1636 wakeup_all(&cv->waitq);
1637 return self;
1638}
1639
1640NORETURN(static VALUE undumpable(VALUE obj));
1641/* :nodoc: */
1642static VALUE
1643undumpable(VALUE obj)
1644{
1645 rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
1647}
1648
1649static VALUE
1650define_thread_class(VALUE outer, const ID name, VALUE super)
1651{
1652 VALUE klass = rb_define_class_id_under(outer, name, super);
1653 rb_const_set(rb_cObject, name, klass);
1654 return klass;
1655}
1656
1657static void
1658Init_thread_sync(void)
1659{
1660#undef rb_intern
1661#if defined(TEACH_RDOC) && TEACH_RDOC == 42
1662 rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject);
1663 rb_cConditionVariable = rb_define_class_under(rb_cThread, "ConditionVariable", rb_cObject);
1664 rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
1665 rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cObject);
1666#endif
1667
1668#define DEFINE_CLASS(name, super) \
1669 rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
1670
1671 /* Mutex */
1672 DEFINE_CLASS(Mutex, Object);
1673 rb_define_alloc_func(rb_cMutex, mutex_alloc);
1674 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
1675 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
1676 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
1677 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
1678 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
1679 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
1680 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
1681 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
1682
1683 /* Queue */
1684 DEFINE_CLASS(Queue, Object);
1685 rb_define_alloc_func(rb_cQueue, queue_alloc);
1686
1687 rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
1688
1689 rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, -1);
1690 rb_undef_method(rb_cQueue, "initialize_copy");
1691 rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
1692 rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
1693 rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
1694 rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
1695 rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
1696 rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
1697 rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
1698 rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
1699 rb_define_method(rb_cQueue, "freeze", rb_queue_freeze, 0);
1700
1701 rb_define_alias(rb_cQueue, "enq", "push");
1702 rb_define_alias(rb_cQueue, "<<", "push");
1703 rb_define_alias(rb_cQueue, "size", "length");
1704
1705 DEFINE_CLASS(SizedQueue, Queue);
1706 rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
1707
1708 rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
1709 rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
1710 rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
1711 rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
1712 rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
1713 rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
1714 rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
1715 rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
1716 rb_define_alias(rb_cSizedQueue, "size", "length");
1717
1718 /* CVar */
1719 DEFINE_CLASS(ConditionVariable, Object);
1720 rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
1721
1722 id_sleep = rb_intern("sleep");
1723
1724 rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
1725 rb_undef_method(rb_cConditionVariable, "initialize_copy");
1726 rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
1727 rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
1728 rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
1729 rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
1730
1731 rb_provide("thread.rb");
1732}
1733
1734#include "thread_sync.rbinc"
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
VALUE rb_define_class(const char *name, VALUE super)
Defines a top-level class.
Definition class.c:1474
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition class.c:1510
VALUE rb_define_class_id_under(VALUE outer, ID id, VALUE super)
Identical to rb_define_class_under(), except it takes the name in ID instead of C's string.
Definition class.c:1549
void rb_define_alias(VALUE klass, const char *name1, const char *name2)
Defines an alias of a method.
Definition class.c:2843
void rb_undef_method(VALUE klass, const char *name)
Defines an undef of a method.
Definition class.c:2663
int rb_scan_args(int argc, const VALUE *argv, const char *fmt,...)
Retrieves argument from argc and argv to given VALUE references according to the format string.
Definition class.c:3133
int rb_block_given_p(void)
Determines if the current method is given a block.
Definition eval.c:1036
#define FL_UNSET_RAW
Old name of RB_FL_UNSET_RAW.
Definition fl_type.h:133
#define Qundef
Old name of RUBY_Qundef.
#define INT2FIX
Old name of RB_INT2FIX.
Definition long.h:48
#define UNREACHABLE_RETURN
Old name of RBIMPL_UNREACHABLE_RETURN.
Definition assume.h:29
#define FL_TEST_RAW
Old name of RB_FL_TEST_RAW.
Definition fl_type.h:131
#define FL_SET
Old name of RB_FL_SET.
Definition fl_type.h:128
#define LONG2NUM
Old name of RB_LONG2NUM.
Definition long.h:50
#define Qtrue
Old name of RUBY_Qtrue.
#define INT2NUM
Old name of RB_INT2NUM.
Definition int.h:43
#define Qnil
Old name of RUBY_Qnil.
#define Qfalse
Old name of RUBY_Qfalse.
#define T_ARRAY
Old name of RUBY_T_ARRAY.
Definition value_type.h:56
#define NIL_P
Old name of RB_NIL_P.
#define Check_TypedStruct(v, t)
Old name of rb_check_typeddata.
Definition rtypeddata.h:106
#define NUM2LONG
Old name of RB_NUM2LONG.
Definition long.h:51
#define FIXNUM_P
Old name of RB_FIXNUM_P.
#define FL_SET_RAW
Old name of RB_FL_SET_RAW.
Definition fl_type.h:129
int rb_typeddata_is_kind_of(VALUE obj, const rb_data_type_t *data_type)
Checks if the given object is of given kind.
Definition error.c:1380
VALUE rb_eTypeError
TypeError exception.
Definition error.c:1430
VALUE rb_eStopIteration
StopIteration exception.
Definition enumerator.c:180
VALUE rb_ensure(VALUE(*b_proc)(VALUE), VALUE data1, VALUE(*e_proc)(VALUE), VALUE data2)
An equivalent to ensure clause.
Definition eval.c:1165
VALUE rb_eThreadError
ThreadError exception.
Definition eval.c:1054
VALUE rb_obj_class(VALUE obj)
Queries the class of an object.
Definition object.c:243
VALUE rb_cThread
Thread class.
Definition vm.c:550
double rb_num2dbl(VALUE num)
Converts an instance of rb_cNumeric into C's double.
Definition object.c:3744
VALUE rb_equal(VALUE lhs, VALUE rhs)
This function is an optimised version of calling #==.
Definition object.c:175
#define RB_OBJ_WRITE(old, slot, young)
Declaration of a "back" pointer.
Definition gc.h:603
Defines RBIMPL_HAS_BUILTIN.
VALUE rb_ary_concat(VALUE lhs, VALUE rhs)
Destructively appends the contents of latter into the end of former.
VALUE rb_ary_shift(VALUE ary)
Destructively deletes an element from the beginning of the passed array and returns what was deleted.
VALUE rb_ary_hidden_new(long capa)
Allocates a hidden (no class) empty array.
VALUE rb_ary_clear(VALUE ary)
Destructively removes everything form an array.
VALUE rb_ary_push(VALUE ary, VALUE elem)
Special case of rb_ary_cat() that it adds only one element.
static int rb_check_arity(int argc, int min, int max)
Ensures that the passed integer is in the passed range.
Definition error.h:284
void rb_provide(const char *feature)
Declares that the given feature is already provided by someone else.
Definition load.c:767
VALUE rb_mutex_new(void)
Creates a mutex.
VALUE rb_mutex_trylock(VALUE mutex)
Attempts to lock the mutex, without waiting for other threads to unlock it.
VALUE rb_mutex_locked_p(VALUE mutex)
Queries if there are any threads that holds the lock.
VALUE rb_mutex_synchronize(VALUE mutex, VALUE(*func)(VALUE arg), VALUE arg)
Obtains the lock, runs the passed function, and releases the lock when it completes.
VALUE rb_mutex_sleep(VALUE self, VALUE timeout)
Releases the lock held in the mutex and waits for the period of time; reacquires the lock on wakeup.
VALUE rb_mutex_unlock(VALUE mutex)
Releases the mutex.
VALUE rb_mutex_lock(VALUE mutex)
Attempts to lock the mutex.
struct timeval rb_time_interval(VALUE num)
Creates a "time interval".
Definition time.c:2948
void rb_const_set(VALUE space, ID name, VALUE val)
Names a constant.
Definition variable.c:3885
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
VALUE rb_yield(VALUE val)
Yields the block.
Definition vm_eval.c:1372
#define RARRAY_LEN
Just another name of rb_array_len.
Definition rarray.h:51
#define RUBY_TYPED_DEFAULT_FREE
This is a value you can set to rb_data_type_struct::dfree.
Definition rtypeddata.h:80
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
Definition rtypeddata.h:523
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:505
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition scheduler.c:463
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition scheduler.c:627
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
Definition scheduler.c:527
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition scheduler.c:646
#define RTEST
This is an old name of RB_TEST.
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:204
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40
static bool RB_TYPE_P(VALUE obj, enum ruby_value_type t)
Queries if the given object is of given type.
Definition value_type.h:376