Ruby 3.5.0dev (2025-10-06 revision 704677257ecb01c7ee10aa0dfc55ca1d4fc4636d)
thread_sync.c (704677257ecb01c7ee10aa0dfc55ca1d4fc4636d)
1/* included by thread.c */
2#include "ccan/list/list.h"
3#include "builtin.h"
4
5static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
6static VALUE rb_eClosedQueueError;
7
8/* Mutex */
9typedef struct rb_mutex_struct {
10 rb_fiber_t *fiber;
11 VALUE thread; // even if the fiber is collected, we might need access to the thread in mutex_free
12 struct rb_mutex_struct *next_mutex;
13 struct ccan_list_head waitq; /* protected by GVL */
15
16/* sync_waiter is always on-stack */
18 VALUE self;
19 rb_thread_t *th;
20 rb_fiber_t *fiber;
21 struct ccan_list_node node;
22};
23
24static inline rb_fiber_t*
25nonblocking_fiber(rb_fiber_t *fiber)
26{
27 if (rb_fiberptr_blocking(fiber)) {
28 return NULL;
29 }
30
31 return fiber;
32}
33
35 VALUE self;
36 VALUE timeout;
37 rb_hrtime_t end;
38};
39
40#define MUTEX_ALLOW_TRAP FL_USER1
41
42static void
43sync_wakeup(struct ccan_list_head *head, long max)
44{
45 RUBY_DEBUG_LOG("max:%ld", max);
46
47 struct sync_waiter *cur = 0, *next;
48
49 ccan_list_for_each_safe(head, cur, next, node) {
50 ccan_list_del_init(&cur->node);
51
52 if (cur->th->status != THREAD_KILLED) {
53 if (cur->th->scheduler != Qnil && cur->fiber) {
54 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
55 }
56 else {
57 RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur->th));
58 rb_threadptr_interrupt(cur->th);
59 cur->th->status = THREAD_RUNNABLE;
60 }
61
62 if (--max == 0) return;
63 }
64 }
65}
66
67static void
68wakeup_one(struct ccan_list_head *head)
69{
70 sync_wakeup(head, 1);
71}
72
73static void
74wakeup_all(struct ccan_list_head *head)
75{
76 sync_wakeup(head, LONG_MAX);
77}
78
79#if defined(HAVE_WORKING_FORK)
80static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
81static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
82static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
83#endif
84static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber);
85
86/*
87 * Document-class: Thread::Mutex
88 *
89 * Thread::Mutex implements a simple semaphore that can be used to
90 * coordinate access to shared data from multiple concurrent threads.
91 *
92 * Example:
93 *
94 * semaphore = Thread::Mutex.new
95 *
96 * a = Thread.new {
97 * semaphore.synchronize {
98 * # access shared resource
99 * }
100 * }
101 *
102 * b = Thread.new {
103 * semaphore.synchronize {
104 * # access shared resource
105 * }
106 * }
107 *
108 */
109
110static size_t
111rb_mutex_num_waiting(rb_mutex_t *mutex)
112{
113 struct sync_waiter *w = 0;
114 size_t n = 0;
115
116 ccan_list_for_each(&mutex->waitq, w, node) {
117 n++;
118 }
119
120 return n;
121}
122
123rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
124
125static bool
126locked_p(rb_mutex_t *mutex)
127{
128 return mutex->fiber != 0;
129}
130
131static void
132mutex_mark(void *ptr)
133{
134 rb_mutex_t *mutex = ptr;
135 VALUE fiber;
136 if (locked_p(mutex)) {
137 fiber = rb_fiberptr_self(mutex->fiber); // rb_fiber_t* doesn't move along with fiber object
138 if (fiber) rb_gc_mark_movable(fiber);
139 rb_gc_mark_movable(mutex->thread);
140 }
141}
142
143static void
144mutex_compact(void *ptr)
145{
146 rb_mutex_t *mutex = ptr;
147 if (locked_p(mutex)) {
148 mutex->thread = rb_gc_location(mutex->thread);
149 }
150}
151
152static void
153mutex_free(void *ptr)
154{
155 rb_mutex_t *mutex = ptr;
156 if (locked_p(mutex)) {
157 const char *err = rb_mutex_unlock_th(mutex, rb_thread_ptr(mutex->thread), mutex->fiber);
158 if (err) rb_bug("%s", err);
159 }
160 ruby_xfree(ptr);
161}
162
163static size_t
164mutex_memsize(const void *ptr)
165{
166 return sizeof(rb_mutex_t);
167}
168
169static const rb_data_type_t mutex_data_type = {
170 "mutex",
171 {mutex_mark, mutex_free, mutex_memsize, mutex_compact,},
172 0, 0, RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_FREE_IMMEDIATELY
173};
174
175static rb_mutex_t *
176mutex_ptr(VALUE obj)
177{
178 rb_mutex_t *mutex;
179
180 TypedData_Get_Struct(obj, rb_mutex_t, &mutex_data_type, mutex);
181
182 return mutex;
183}
184
185VALUE
186rb_obj_is_mutex(VALUE obj)
187{
188 return RBOOL(rb_typeddata_is_kind_of(obj, &mutex_data_type));
189}
190
191static VALUE
192mutex_alloc(VALUE klass)
193{
194 VALUE obj;
195 rb_mutex_t *mutex;
196
197 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
198
199 ccan_list_head_init(&mutex->waitq);
200 return obj;
201}
202
203/*
204 * call-seq:
205 * Thread::Mutex.new -> mutex
206 *
207 * Creates a new Mutex
208 */
209static VALUE
210mutex_initialize(VALUE self)
211{
212 return self;
213}
214
215VALUE
217{
218 return mutex_alloc(rb_cMutex);
219}
220
221/*
222 * call-seq:
223 * mutex.locked? -> true or false
224 *
225 * Returns +true+ if this lock is currently held by some thread.
226 */
227VALUE
229{
230 rb_mutex_t *mutex = mutex_ptr(self);
231
232 return RBOOL(locked_p(mutex));
233}
234
235static void
236thread_mutex_insert(rb_thread_t *thread, rb_mutex_t *mutex)
237{
238 RUBY_ASSERT(!mutex->next_mutex);
239 if (thread->keeping_mutexes) {
240 mutex->next_mutex = thread->keeping_mutexes;
241 }
242
243 thread->keeping_mutexes = mutex;
244}
245
246static void
247thread_mutex_remove(rb_thread_t *thread, rb_mutex_t *mutex)
248{
249 rb_mutex_t **keeping_mutexes = &thread->keeping_mutexes;
250
251 while (*keeping_mutexes && *keeping_mutexes != mutex) {
252 // Move to the next mutex in the list:
253 keeping_mutexes = &(*keeping_mutexes)->next_mutex;
254 }
255
256 if (*keeping_mutexes) {
257 *keeping_mutexes = mutex->next_mutex;
258 mutex->next_mutex = NULL;
259 }
260}
261
262static void
263mutex_set_owner(VALUE self, rb_thread_t *th, rb_fiber_t *fiber)
264{
265 rb_mutex_t *mutex = mutex_ptr(self);
266
267 mutex->thread = th->self;
268 mutex->fiber = fiber;
269 RB_OBJ_WRITTEN(self, Qundef, th->self);
270 if (fiber) {
271 RB_OBJ_WRITTEN(self, Qundef, rb_fiberptr_self(fiber));
272 }
273}
274
275static void
276mutex_locked(rb_thread_t *th, rb_fiber_t *fiber, VALUE self)
277{
278 rb_mutex_t *mutex = mutex_ptr(self);
279
280 mutex_set_owner(self, th, fiber);
281 thread_mutex_insert(th, mutex);
282}
283
284/*
285 * call-seq:
286 * mutex.try_lock -> true or false
287 *
288 * Attempts to obtain the lock and returns immediately. Returns +true+ if the
289 * lock was granted.
290 */
291VALUE
293{
294 rb_mutex_t *mutex = mutex_ptr(self);
295
296 if (mutex->fiber == 0) {
297 RUBY_DEBUG_LOG("%p ok", mutex);
298
299 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
300 rb_thread_t *th = GET_THREAD();
301
302 mutex_locked(th, fiber, self);
303 return Qtrue;
304 }
305 else {
306 RUBY_DEBUG_LOG("%p ng", mutex);
307 return Qfalse;
308 }
309}
310
311static VALUE
312mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
313{
314 return RBOOL(mutex->fiber == fiber);
315}
316
317static VALUE
318call_rb_fiber_scheduler_block(VALUE mutex)
319{
321}
322
323static VALUE
324delete_from_waitq(VALUE value)
325{
326 struct sync_waiter *sync_waiter = (void *)value;
327 ccan_list_del(&sync_waiter->node);
328
329 return Qnil;
330}
331
332static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th);
333
334static VALUE
335do_mutex_lock(VALUE self, int interruptible_p)
336{
337 rb_execution_context_t *ec = GET_EC();
338 rb_thread_t *th = ec->thread_ptr;
339 rb_fiber_t *fiber = ec->fiber_ptr;
340 rb_mutex_t *mutex = mutex_ptr(self);
341 rb_atomic_t saved_ints = 0;
342
343 /* When running trap handler */
344 if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
345 th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
346 rb_raise(rb_eThreadError, "can't be called from trap context");
347 }
348
349 if (rb_mutex_trylock(self) == Qfalse) {
350 if (mutex->fiber == fiber) {
351 rb_raise(rb_eThreadError, "deadlock; recursive locking");
352 }
353
354 while (mutex->fiber != fiber) {
355 VM_ASSERT(mutex->fiber != NULL);
356
357 VALUE scheduler = rb_fiber_scheduler_current();
358 if (scheduler != Qnil) {
359 struct sync_waiter sync_waiter = {
360 .self = self,
361 .th = th,
362 .fiber = nonblocking_fiber(fiber)
363 };
364
365 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
366
367 rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)&sync_waiter);
368
369 if (!mutex->fiber) {
370 mutex_set_owner(self, th, fiber);
371 }
372 }
373 else {
374 if (!th->vm->thread_ignore_deadlock && rb_fiber_threadptr(mutex->fiber) == th) {
375 rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread");
376 }
377
378 struct sync_waiter sync_waiter = {
379 .self = self,
380 .th = th,
381 .fiber = nonblocking_fiber(fiber),
382 };
383
384 RUBY_DEBUG_LOG("%p wait", mutex);
385
386 // similar code with `sleep_forever`, but
387 // sleep_forever(SLEEP_DEADLOCKABLE) raises an exception.
388 // Ensure clause is needed like but `rb_ensure` a bit slow.
389 //
390 // begin
391 // sleep_forever(th, SLEEP_DEADLOCKABLE);
392 // ensure
393 // ccan_list_del(&sync_waiter.node);
394 // end
395 enum rb_thread_status prev_status = th->status;
396 th->status = THREAD_STOPPED_FOREVER;
397 rb_ractor_sleeper_threads_inc(th->ractor);
398 rb_check_deadlock(th->ractor);
399
400 RUBY_ASSERT(!th->locking_mutex);
401 th->locking_mutex = self;
402
403 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
404 {
405 native_sleep(th, NULL);
406 }
407 ccan_list_del(&sync_waiter.node);
408
409 // unlocked by another thread while sleeping
410 if (!mutex->fiber) {
411 mutex_set_owner(self, th, fiber);
412 }
413
414 rb_ractor_sleeper_threads_dec(th->ractor);
415 th->status = prev_status;
416 th->locking_mutex = Qfalse;
417
418 RUBY_DEBUG_LOG("%p wakeup", mutex);
419 }
420
421 if (interruptible_p) {
422 /* release mutex before checking for interrupts...as interrupt checking
423 * code might call rb_raise() */
424 if (mutex->fiber == fiber) {
425 mutex->thread = Qfalse;
426 mutex->fiber = NULL;
427 }
428 RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
429 if (!mutex->fiber) {
430 mutex_set_owner(self, th, fiber);
431 }
432 }
433 else {
434 // clear interrupt information
435 if (RUBY_VM_INTERRUPTED(th->ec)) {
436 // reset interrupts
437 if (saved_ints == 0) {
438 saved_ints = threadptr_get_interrupts(th);
439 }
440 else {
441 // ignore additional interrupts
442 threadptr_get_interrupts(th);
443 }
444 }
445 }
446 }
447
448 if (saved_ints) th->ec->interrupt_flag = saved_ints;
449 if (mutex->fiber == fiber) mutex_locked(th, fiber, self);
450 }
451
452 RUBY_DEBUG_LOG("%p locked", mutex);
453
454 // assertion
455 if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
456
457 return self;
458}
459
460static VALUE
461mutex_lock_uninterruptible(VALUE self)
462{
463 return do_mutex_lock(self, 0);
464}
465
466/*
467 * call-seq:
468 * mutex.lock -> self
469 *
470 * Attempts to grab the lock and waits if it isn't available.
471 * Raises +ThreadError+ if +mutex+ was locked by the current thread.
472 */
473VALUE
475{
476 return do_mutex_lock(self, 1);
477}
478
479/*
480 * call-seq:
481 * mutex.owned? -> true or false
482 *
483 * Returns +true+ if this lock is currently held by current thread.
484 */
485VALUE
486rb_mutex_owned_p(VALUE self)
487{
488 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
489 rb_mutex_t *mutex = mutex_ptr(self);
490
491 return mutex_owned_p(fiber, mutex);
492}
493
494static const char *
495rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
496{
497 RUBY_DEBUG_LOG("%p", mutex);
498
499 if (mutex->fiber == 0) {
500 return "Attempt to unlock a mutex which is not locked";
501 }
502 else if (mutex->fiber != fiber) {
503 return "Attempt to unlock a mutex which is locked by another thread/fiber";
504 }
505
506 struct sync_waiter *cur = 0, *next;
507
508 mutex->fiber = 0;
509 thread_mutex_remove(th, mutex);
510
511 ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
512 ccan_list_del_init(&cur->node);
513
514 if (cur->th->scheduler != Qnil && cur->fiber) {
515 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
516 return NULL;
517 }
518 else {
519 switch (cur->th->status) {
520 case THREAD_RUNNABLE: /* from someone else calling Thread#run */
521 case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
522 RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th));
523 rb_threadptr_interrupt(cur->th);
524 return NULL;
525 case THREAD_STOPPED: /* probably impossible */
526 rb_bug("unexpected THREAD_STOPPED");
527 case THREAD_KILLED:
528 /* not sure about this, possible in exit GC? */
529 rb_bug("unexpected THREAD_KILLED");
530 continue;
531 }
532 }
533 }
534
535 // We did not find any threads to wake up, so we can just return with no error:
536 return NULL;
537}
538
539/*
540 * call-seq:
541 * mutex.unlock -> self
542 *
543 * Releases the lock.
544 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
545 */
546VALUE
548{
549 const char *err;
550 rb_mutex_t *mutex = mutex_ptr(self);
551 rb_thread_t *th = GET_THREAD();
552
553 err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr);
554 if (err) rb_raise(rb_eThreadError, "%s", err);
555
556 return self;
557}
558
559#if defined(HAVE_WORKING_FORK)
560static void
561rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
562{
563 rb_mutex_abandon_all(th->keeping_mutexes);
564 th->keeping_mutexes = NULL;
565}
566
567static void
568rb_mutex_abandon_locking_mutex(rb_thread_t *th)
569{
570 if (th->locking_mutex) {
571 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
572
573 ccan_list_head_init(&mutex->waitq);
574 th->locking_mutex = Qfalse;
575 }
576}
577
578static void
579rb_mutex_abandon_all(rb_mutex_t *mutexes)
580{
581 rb_mutex_t *mutex;
582
583 while (mutexes) {
584 mutex = mutexes;
585 mutexes = mutex->next_mutex;
586 mutex->fiber = 0;
587 mutex->next_mutex = 0;
588 ccan_list_head_init(&mutex->waitq);
589 }
590}
591#endif
592
594 VALUE self;
595 VALUE timeout;
596};
597
598static VALUE
599mutex_sleep_begin(VALUE _arguments)
600{
601 struct rb_mutex_sleep_arguments *arguments = (struct rb_mutex_sleep_arguments *)_arguments;
602 VALUE timeout = arguments->timeout;
603 VALUE woken = Qtrue;
604
605 VALUE scheduler = rb_fiber_scheduler_current();
606 if (scheduler != Qnil) {
607 rb_fiber_scheduler_kernel_sleep(scheduler, timeout);
608 }
609 else {
610 if (NIL_P(timeout)) {
611 rb_thread_sleep_deadly_allow_spurious_wakeup(arguments->self, Qnil, 0);
612 }
613 else {
614 struct timeval timeout_value = rb_time_interval(timeout);
615 rb_hrtime_t relative_timeout = rb_timeval2hrtime(&timeout_value);
616 /* permit spurious check */
617 woken = RBOOL(sleep_hrtime(GET_THREAD(), relative_timeout, 0));
618 }
619 }
620
621 return woken;
622}
623
624VALUE
626{
627 if (!NIL_P(timeout)) {
628 // Validate the argument:
629 rb_time_interval(timeout);
630 }
631
632 rb_mutex_unlock(self);
633 time_t beg = time(0);
634
635 struct rb_mutex_sleep_arguments arguments = {
636 .self = self,
637 .timeout = timeout,
638 };
639
640 VALUE woken = rb_ensure(mutex_sleep_begin, (VALUE)&arguments, mutex_lock_uninterruptible, self);
641
642 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
643 if (!woken) return Qnil;
644 time_t end = time(0) - beg;
645 return TIMET2NUM(end);
646}
647
648/*
649 * call-seq:
650 * mutex.sleep(timeout = nil) -> number or nil
651 *
652 * Releases the lock and sleeps +timeout+ seconds if it is given and
653 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
654 * the current thread.
655 *
656 * When the thread is next woken up, it will attempt to reacquire
657 * the lock.
658 *
659 * Note that this method can wakeup without explicit Thread#wakeup call.
660 * For example, receiving signal and so on.
661 *
662 * Returns the slept time in seconds if woken up, or +nil+ if timed out.
663 */
664static VALUE
665mutex_sleep(int argc, VALUE *argv, VALUE self)
666{
667 VALUE timeout;
668
669 timeout = rb_check_arity(argc, 0, 1) ? argv[0] : Qnil;
670 return rb_mutex_sleep(self, timeout);
671}
672
673/*
674 * call-seq:
675 * mutex.synchronize { ... } -> result of the block
676 *
677 * Obtains a lock, runs the block, and releases the lock when the block
678 * completes. See the example under Thread::Mutex.
679 */
680
681VALUE
682rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
683{
684 rb_mutex_lock(mutex);
685 return rb_ensure(func, arg, rb_mutex_unlock, mutex);
686}
687
688/*
689 * call-seq:
690 * mutex.synchronize { ... } -> result of the block
691 *
692 * Obtains a lock, runs the block, and releases the lock when the block
693 * completes. See the example under Thread::Mutex.
694 */
695static VALUE
696rb_mutex_synchronize_m(VALUE self)
697{
698 if (!rb_block_given_p()) {
699 rb_raise(rb_eThreadError, "must be called with a block");
700 }
701
702 return rb_mutex_synchronize(self, rb_yield, Qundef);
703}
704
705void
706rb_mutex_allow_trap(VALUE self, int val)
707{
708 Check_TypedStruct(self, &mutex_data_type);
709
710 if (val)
711 FL_SET_RAW(self, MUTEX_ALLOW_TRAP);
712 else
713 FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP);
714}
715
716/* Queue */
717
718#define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
719#define queue_list(q) UNALIGNED_MEMBER_PTR(q, que)
720RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
721struct rb_queue {
722 struct ccan_list_head waitq;
723 rb_serial_t fork_gen;
724 const VALUE que;
725 int num_waiting;
726} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
727
728#define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
729#define szqueue_list(sq) UNALIGNED_MEMBER_PTR(sq, q.que)
730#define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
731RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
733 struct rb_queue q;
734 int num_waiting_push;
735 struct ccan_list_head pushq;
736 long max;
737} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
738
739static void
740queue_mark_and_move(void *ptr)
741{
742 struct rb_queue *q = ptr;
743
744 /* no need to mark threads in waitq, they are on stack */
745 rb_gc_mark_and_move((VALUE *)UNALIGNED_MEMBER_PTR(q, que));
746}
747
748static size_t
749queue_memsize(const void *ptr)
750{
751 return sizeof(struct rb_queue);
752}
753
754static const rb_data_type_t queue_data_type = {
755 "queue",
756 {queue_mark_and_move, RUBY_TYPED_DEFAULT_FREE, queue_memsize, queue_mark_and_move},
757 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
758};
759
760static VALUE
761queue_alloc(VALUE klass)
762{
763 VALUE obj;
764 struct rb_queue *q;
765
766 obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
767 ccan_list_head_init(queue_waitq(q));
768 return obj;
769}
770
771static int
772queue_fork_check(struct rb_queue *q)
773{
774 rb_serial_t fork_gen = GET_VM()->fork_gen;
775
776 if (q->fork_gen == fork_gen) {
777 return 0;
778 }
779 /* forked children can't reach into parent thread stacks */
780 q->fork_gen = fork_gen;
781 ccan_list_head_init(queue_waitq(q));
782 q->num_waiting = 0;
783 return 1;
784}
785
786static struct rb_queue *
787queue_ptr(VALUE obj)
788{
789 struct rb_queue *q;
790
791 TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
792 queue_fork_check(q);
793
794 return q;
795}
796
797#define QUEUE_CLOSED FL_USER5
798
799static rb_hrtime_t
800queue_timeout2hrtime(VALUE timeout)
801{
802 if (NIL_P(timeout)) {
803 return (rb_hrtime_t)0;
804 }
805 rb_hrtime_t rel = 0;
806 if (FIXNUM_P(timeout)) {
807 rel = rb_sec2hrtime(NUM2TIMET(timeout));
808 }
809 else {
810 double2hrtime(&rel, rb_num2dbl(timeout));
811 }
812 return rb_hrtime_add(rel, rb_hrtime_now());
813}
814
815static void
816szqueue_mark_and_move(void *ptr)
817{
818 struct rb_szqueue *sq = ptr;
819
820 queue_mark_and_move(&sq->q);
821}
822
823static size_t
824szqueue_memsize(const void *ptr)
825{
826 return sizeof(struct rb_szqueue);
827}
828
829static const rb_data_type_t szqueue_data_type = {
830 "sized_queue",
831 {szqueue_mark_and_move, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize, szqueue_mark_and_move},
832 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
833};
834
835static VALUE
836szqueue_alloc(VALUE klass)
837{
838 struct rb_szqueue *sq;
839 VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
840 &szqueue_data_type, sq);
841 ccan_list_head_init(szqueue_waitq(sq));
842 ccan_list_head_init(szqueue_pushq(sq));
843 return obj;
844}
845
846static struct rb_szqueue *
847szqueue_ptr(VALUE obj)
848{
849 struct rb_szqueue *sq;
850
851 TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
852 if (queue_fork_check(&sq->q)) {
853 ccan_list_head_init(szqueue_pushq(sq));
854 sq->num_waiting_push = 0;
855 }
856
857 return sq;
858}
859
860static VALUE
861ary_buf_new(void)
862{
863 return rb_ary_hidden_new(1);
864}
865
866static VALUE
867check_array(VALUE obj, VALUE ary)
868{
869 if (!RB_TYPE_P(ary, T_ARRAY)) {
870 rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
871 }
872 return ary;
873}
874
875static long
876queue_length(VALUE self, struct rb_queue *q)
877{
878 return RARRAY_LEN(check_array(self, q->que));
879}
880
881static int
882queue_closed_p(VALUE self)
883{
884 return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
885}
886
887/*
888 * Document-class: ClosedQueueError
889 *
890 * The exception class which will be raised when pushing into a closed
891 * Queue. See Thread::Queue#close and Thread::SizedQueue#close.
892 */
893
894NORETURN(static void raise_closed_queue_error(VALUE self));
895
896static void
897raise_closed_queue_error(VALUE self)
898{
899 rb_raise(rb_eClosedQueueError, "queue closed");
900}
901
902static VALUE
903queue_closed_result(VALUE self, struct rb_queue *q)
904{
905 RUBY_ASSERT(queue_length(self, q) == 0);
906 return Qnil;
907}
908
909/*
910 * Document-class: Thread::Queue
911 *
912 * The Thread::Queue class implements multi-producer, multi-consumer
913 * queues. It is especially useful in threaded programming when
914 * information must be exchanged safely between multiple threads. The
915 * Thread::Queue class implements all the required locking semantics.
916 *
917 * The class implements FIFO (first in, first out) type of queue.
918 * In a FIFO queue, the first tasks added are the first retrieved.
919 *
920 * Example:
921 *
922 * queue = Thread::Queue.new
923 *
924 * producer = Thread.new do
925 * 5.times do |i|
926 * sleep rand(i) # simulate expense
927 * queue << i
928 * puts "#{i} produced"
929 * end
930 * end
931 *
932 * consumer = Thread.new do
933 * 5.times do |i|
934 * value = queue.pop
935 * sleep rand(i/2) # simulate expense
936 * puts "consumed #{value}"
937 * end
938 * end
939 *
940 * consumer.join
941 *
942 */
943
944/*
945 * Document-method: Queue::new
946 *
947 * call-seq:
948 * Thread::Queue.new -> empty_queue
949 * Thread::Queue.new(enumerable) -> queue
950 *
951 * Creates a new queue instance, optionally using the contents of an +enumerable+
952 * for its initial state.
953 *
954 * Example:
955 *
956 * q = Thread::Queue.new
957 * #=> #<Thread::Queue:0x00007ff7501110d0>
958 * q.empty?
959 * #=> true
960 *
961 * q = Thread::Queue.new([1, 2, 3])
962 * #=> #<Thread::Queue:0x00007ff7500ec500>
963 * q.empty?
964 * #=> false
965 * q.pop
966 * #=> 1
967 */
968
969static VALUE
970rb_queue_initialize(int argc, VALUE *argv, VALUE self)
971{
972 VALUE initial;
973 struct rb_queue *q = queue_ptr(self);
974 if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
975 initial = rb_to_array(initial);
976 }
977 RB_OBJ_WRITE(self, queue_list(q), ary_buf_new());
978 ccan_list_head_init(queue_waitq(q));
979 if (argc == 1) {
980 rb_ary_concat(q->que, initial);
981 }
982 return self;
983}
984
985static VALUE
986queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
987{
988 if (queue_closed_p(self)) {
989 raise_closed_queue_error(self);
990 }
991 rb_ary_push(check_array(self, q->que), obj);
992 wakeup_one(queue_waitq(q));
993 return self;
994}
995
996/*
997 * Document-method: Thread::Queue#close
998 * call-seq:
999 * close
1000 *
1001 * Closes the queue. A closed queue cannot be re-opened.
1002 *
1003 * After the call to close completes, the following are true:
1004 *
1005 * - +closed?+ will return true
1006 *
1007 * - +close+ will be ignored.
1008 *
1009 * - calling enq/push/<< will raise a +ClosedQueueError+.
1010 *
1011 * - when +empty?+ is false, calling deq/pop/shift will return an object
1012 * from the queue as usual.
1013 * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
1014 * deq(true) will raise a +ThreadError+.
1015 *
1016 * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
1017 *
1018 * Example:
1019 *
1020 * q = Thread::Queue.new
1021 * Thread.new{
1022 * while e = q.deq # wait for nil to break loop
1023 * # ...
1024 * end
1025 * }
1026 * q.close
1027 */
1028
1029static VALUE
1030rb_queue_close(VALUE self)
1031{
1032 struct rb_queue *q = queue_ptr(self);
1033
1034 if (!queue_closed_p(self)) {
1035 FL_SET(self, QUEUE_CLOSED);
1036
1037 wakeup_all(queue_waitq(q));
1038 }
1039
1040 return self;
1041}
1042
1043/*
1044 * Document-method: Thread::Queue#closed?
1045 * call-seq: closed?
1046 *
1047 * Returns +true+ if the queue is closed.
1048 */
1049
1050static VALUE
1051rb_queue_closed_p(VALUE self)
1052{
1053 return RBOOL(queue_closed_p(self));
1054}
1055
1056/*
1057 * Document-method: Thread::Queue#push
1058 * call-seq:
1059 * push(object)
1060 * enq(object)
1061 * <<(object)
1062 *
1063 * Pushes the given +object+ to the queue.
1064 */
1065
1066static VALUE
1067rb_queue_push(VALUE self, VALUE obj)
1068{
1069 return queue_do_push(self, queue_ptr(self), obj);
1070}
1071
1072static VALUE
1073queue_sleep(VALUE _args)
1074{
1075 struct queue_sleep_arg *args = (struct queue_sleep_arg *)_args;
1076 rb_thread_sleep_deadly_allow_spurious_wakeup(args->self, args->timeout, args->end);
1077 return Qnil;
1078}
1079
1081 struct sync_waiter w;
1082 union {
1083 struct rb_queue *q;
1084 struct rb_szqueue *sq;
1085 } as;
1086};
1087
1088static VALUE
1089queue_sleep_done(VALUE p)
1090{
1091 struct queue_waiter *qw = (struct queue_waiter *)p;
1092
1093 ccan_list_del(&qw->w.node);
1094 qw->as.q->num_waiting--;
1095
1096 return Qfalse;
1097}
1098
1099static VALUE
1100szqueue_sleep_done(VALUE p)
1101{
1102 struct queue_waiter *qw = (struct queue_waiter *)p;
1103
1104 ccan_list_del(&qw->w.node);
1105 qw->as.sq->num_waiting_push--;
1106
1107 return Qfalse;
1108}
1109
1110static VALUE
1111queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout)
1112{
1113 check_array(self, q->que);
1114 if (RARRAY_LEN(q->que) == 0) {
1115 if (!should_block) {
1116 rb_raise(rb_eThreadError, "queue empty");
1117 }
1118
1119 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1120 return Qnil;
1121 }
1122 }
1123
1124 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1125 while (RARRAY_LEN(q->que) == 0) {
1126 if (queue_closed_p(self)) {
1127 return queue_closed_result(self, q);
1128 }
1129 else {
1130 rb_execution_context_t *ec = GET_EC();
1131
1132 RUBY_ASSERT(RARRAY_LEN(q->que) == 0);
1133 RUBY_ASSERT(queue_closed_p(self) == 0);
1134
1135 struct queue_waiter queue_waiter = {
1136 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1137 .as = {.q = q}
1138 };
1139
1140 struct ccan_list_head *waitq = queue_waitq(q);
1141
1142 ccan_list_add_tail(waitq, &queue_waiter.w.node);
1143 queue_waiter.as.q->num_waiting++;
1144
1146 .self = self,
1147 .timeout = timeout,
1148 .end = end
1149 };
1150
1151 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, queue_sleep_done, (VALUE)&queue_waiter);
1152 if (!NIL_P(timeout) && (rb_hrtime_now() >= end))
1153 break;
1154 }
1155 }
1156
1157 return rb_ary_shift(q->que);
1158}
1159
1160static VALUE
1161rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1162{
1163 return queue_do_pop(self, queue_ptr(self), !RTEST(non_block), timeout);
1164}
1165
1166/*
1167 * Document-method: Thread::Queue#empty?
1168 * call-seq: empty?
1169 *
1170 * Returns +true+ if the queue is empty.
1171 */
1172
1173static VALUE
1174rb_queue_empty_p(VALUE self)
1175{
1176 return RBOOL(queue_length(self, queue_ptr(self)) == 0);
1177}
1178
1179/*
1180 * Document-method: Thread::Queue#clear
1181 *
1182 * Removes all objects from the queue.
1183 */
1184
1185static VALUE
1186rb_queue_clear(VALUE self)
1187{
1188 struct rb_queue *q = queue_ptr(self);
1189
1190 rb_ary_clear(check_array(self, q->que));
1191 return self;
1192}
1193
1194/*
1195 * Document-method: Thread::Queue#length
1196 * call-seq:
1197 * length
1198 * size
1199 *
1200 * Returns the length of the queue.
1201 */
1202
1203static VALUE
1204rb_queue_length(VALUE self)
1205{
1206 return LONG2NUM(queue_length(self, queue_ptr(self)));
1207}
1208
1209NORETURN(static VALUE rb_queue_freeze(VALUE self));
1210/*
1211 * call-seq:
1212 * freeze
1213 *
1214 * The queue can't be frozen, so this method raises an exception:
1215 * Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
1216 *
1217 */
1218static VALUE
1219rb_queue_freeze(VALUE self)
1220{
1221 rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self);
1222 UNREACHABLE_RETURN(self);
1223}
1224
1225/*
1226 * Document-method: Thread::Queue#num_waiting
1227 *
1228 * Returns the number of threads waiting on the queue.
1229 */
1230
1231static VALUE
1232rb_queue_num_waiting(VALUE self)
1233{
1234 struct rb_queue *q = queue_ptr(self);
1235
1236 return INT2NUM(q->num_waiting);
1237}
1238
1239/*
1240 * Document-class: Thread::SizedQueue
1241 *
1242 * This class represents queues of specified size capacity. The push operation
1243 * may be blocked if the capacity is full.
1244 *
1245 * See Thread::Queue for an example of how a Thread::SizedQueue works.
1246 */
1247
1248/*
1249 * Document-method: SizedQueue::new
1250 * call-seq: new(max)
1251 *
1252 * Creates a fixed-length queue with a maximum size of +max+.
1253 */
1254
1255static VALUE
1256rb_szqueue_initialize(VALUE self, VALUE vmax)
1257{
1258 long max;
1259 struct rb_szqueue *sq = szqueue_ptr(self);
1260
1261 max = NUM2LONG(vmax);
1262 if (max <= 0) {
1263 rb_raise(rb_eArgError, "queue size must be positive");
1264 }
1265
1266 RB_OBJ_WRITE(self, szqueue_list(sq), ary_buf_new());
1267 ccan_list_head_init(szqueue_waitq(sq));
1268 ccan_list_head_init(szqueue_pushq(sq));
1269 sq->max = max;
1270
1271 return self;
1272}
1273
1274/*
1275 * Document-method: Thread::SizedQueue#close
1276 * call-seq:
1277 * close
1278 *
1279 * Similar to Thread::Queue#close.
1280 *
1281 * The difference is behavior with waiting enqueuing threads.
1282 *
1283 * If there are waiting enqueuing threads, they are interrupted by
1284 * raising ClosedQueueError('queue closed').
1285 */
1286static VALUE
1287rb_szqueue_close(VALUE self)
1288{
1289 if (!queue_closed_p(self)) {
1290 struct rb_szqueue *sq = szqueue_ptr(self);
1291
1292 FL_SET(self, QUEUE_CLOSED);
1293 wakeup_all(szqueue_waitq(sq));
1294 wakeup_all(szqueue_pushq(sq));
1295 }
1296 return self;
1297}
1298
1299/*
1300 * Document-method: Thread::SizedQueue#max
1301 *
1302 * Returns the maximum size of the queue.
1303 */
1304
1305static VALUE
1306rb_szqueue_max_get(VALUE self)
1307{
1308 return LONG2NUM(szqueue_ptr(self)->max);
1309}
1310
1311/*
1312 * Document-method: Thread::SizedQueue#max=
1313 * call-seq: max=(number)
1314 *
1315 * Sets the maximum size of the queue to the given +number+.
1316 */
1317
1318static VALUE
1319rb_szqueue_max_set(VALUE self, VALUE vmax)
1320{
1321 long max = NUM2LONG(vmax);
1322 long diff = 0;
1323 struct rb_szqueue *sq = szqueue_ptr(self);
1324
1325 if (max <= 0) {
1326 rb_raise(rb_eArgError, "queue size must be positive");
1327 }
1328 if (max > sq->max) {
1329 diff = max - sq->max;
1330 }
1331 sq->max = max;
1332 sync_wakeup(szqueue_pushq(sq), diff);
1333 return vmax;
1334}
1335
1336static VALUE
1337rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_block, VALUE timeout)
1338{
1339 struct rb_szqueue *sq = szqueue_ptr(self);
1340
1341 if (queue_length(self, &sq->q) >= sq->max) {
1342 if (RTEST(non_block)) {
1343 rb_raise(rb_eThreadError, "queue full");
1344 }
1345
1346 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1347 return Qnil;
1348 }
1349 }
1350
1351 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1352 while (queue_length(self, &sq->q) >= sq->max) {
1353 if (queue_closed_p(self)) {
1354 raise_closed_queue_error(self);
1355 }
1356 else {
1357 rb_execution_context_t *ec = GET_EC();
1358 struct queue_waiter queue_waiter = {
1359 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1360 .as = {.sq = sq}
1361 };
1362
1363 struct ccan_list_head *pushq = szqueue_pushq(sq);
1364
1365 ccan_list_add_tail(pushq, &queue_waiter.w.node);
1366 sq->num_waiting_push++;
1367
1369 .self = self,
1370 .timeout = timeout,
1371 .end = end
1372 };
1373 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, szqueue_sleep_done, (VALUE)&queue_waiter);
1374 if (!NIL_P(timeout) && rb_hrtime_now() >= end) {
1375 return Qnil;
1376 }
1377 }
1378 }
1379
1380 return queue_do_push(self, &sq->q, object);
1381}
1382
1383static VALUE
1384szqueue_do_pop(VALUE self, int should_block, VALUE timeout)
1385{
1386 struct rb_szqueue *sq = szqueue_ptr(self);
1387 VALUE retval = queue_do_pop(self, &sq->q, should_block, timeout);
1388
1389 if (queue_length(self, &sq->q) < sq->max) {
1390 wakeup_one(szqueue_pushq(sq));
1391 }
1392
1393 return retval;
1394}
1395static VALUE
1396rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1397{
1398 return szqueue_do_pop(self, !RTEST(non_block), timeout);
1399}
1400
1401/*
1402 * Document-method: Thread::SizedQueue#clear
1403 *
1404 * Removes all objects from the queue.
1405 */
1406
1407static VALUE
1408rb_szqueue_clear(VALUE self)
1409{
1410 struct rb_szqueue *sq = szqueue_ptr(self);
1411
1412 rb_ary_clear(check_array(self, sq->q.que));
1413 wakeup_all(szqueue_pushq(sq));
1414 return self;
1415}
1416
1417/*
1418 * Document-method: Thread::SizedQueue#length
1419 * call-seq:
1420 * length
1421 * size
1422 *
1423 * Returns the length of the queue.
1424 */
1425
1426static VALUE
1427rb_szqueue_length(VALUE self)
1428{
1429 struct rb_szqueue *sq = szqueue_ptr(self);
1430
1431 return LONG2NUM(queue_length(self, &sq->q));
1432}
1433
1434/*
1435 * Document-method: Thread::SizedQueue#num_waiting
1436 *
1437 * Returns the number of threads waiting on the queue.
1438 */
1439
1440static VALUE
1441rb_szqueue_num_waiting(VALUE self)
1442{
1443 struct rb_szqueue *sq = szqueue_ptr(self);
1444
1445 return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1446}
1447
1448/*
1449 * Document-method: Thread::SizedQueue#empty?
1450 * call-seq: empty?
1451 *
1452 * Returns +true+ if the queue is empty.
1453 */
1454
1455static VALUE
1456rb_szqueue_empty_p(VALUE self)
1457{
1458 struct rb_szqueue *sq = szqueue_ptr(self);
1459
1460 return RBOOL(queue_length(self, &sq->q) == 0);
1461}
1462
1463
1464/* ConditionalVariable */
1466 struct ccan_list_head waitq;
1467 rb_serial_t fork_gen;
1468};
1469
1470/*
1471 * Document-class: Thread::ConditionVariable
1472 *
1473 * ConditionVariable objects augment class Mutex. Using condition variables,
1474 * it is possible to suspend while in the middle of a critical section until a
1475 * condition is met, such as a resource becomes available.
1476 *
1477 * Due to non-deterministic scheduling and spurious wake-ups, users of
1478 * condition variables should always use a separate boolean predicate (such as
1479 * reading from a boolean variable) to check if the condition is actually met
1480 * before starting to wait, and should wait in a loop, re-checking the
1481 * condition every time the ConditionVariable is waken up. The idiomatic way
1482 * of using condition variables is calling the +wait+ method in an +until+
1483 * loop with the predicate as the loop condition.
1484 *
1485 * condvar.wait(mutex) until condition_is_met
1486 *
1487 * In the example below, we use the boolean variable +resource_available+
1488 * (which is protected by +mutex+) to indicate the availability of the
1489 * resource, and use +condvar+ to wait for that variable to become true. Note
1490 * that:
1491 *
1492 * 1. Thread +b+ may be scheduled before thread +a1+ and +a2+, and may run so
1493 * fast that it have already made the resource available before either
1494 * +a1+ or +a2+ starts. Therefore, +a1+ and +a2+ should check if
1495 * +resource_available+ is already true before starting to wait.
1496 * 2. The +wait+ method may spuriously wake up without signalling. Therefore,
1497 * thread +a1+ and +a2+ should recheck +resource_available+ after the
1498 * +wait+ method returns, and go back to wait if the condition is not
1499 * actually met.
1500 * 3. It is possible that thread +a2+ starts right after thread +a1+ is waken
1501 * up by +b+. Thread +a2+ may have acquired the +mutex+ and consumed the
1502 * resource before thread +a1+ acquires the +mutex+. This necessitates
1503 * rechecking after +wait+, too.
1504 *
1505 * Example:
1506 *
1507 * mutex = Thread::Mutex.new
1508 *
1509 * resource_available = false
1510 * condvar = Thread::ConditionVariable.new
1511 *
1512 * a1 = Thread.new {
1513 * # Thread 'a1' waits for the resource to become available and consumes
1514 * # the resource.
1515 * mutex.synchronize {
1516 * condvar.wait(mutex) until resource_available
1517 * # After the loop, 'resource_available' is guaranteed to be true.
1518 *
1519 * resource_available = false
1520 * puts "a1 consumed the resource"
1521 * }
1522 * }
1523 *
1524 * a2 = Thread.new {
1525 * # Thread 'a2' behaves like 'a1'.
1526 * mutex.synchronize {
1527 * condvar.wait(mutex) until resource_available
1528 * resource_available = false
1529 * puts "a2 consumed the resource"
1530 * }
1531 * }
1532 *
1533 * b = Thread.new {
1534 * # Thread 'b' periodically makes the resource available.
1535 * loop {
1536 * mutex.synchronize {
1537 * resource_available = true
1538 *
1539 * # Notify one waiting thread if any. It is possible that neither
1540 * # 'a1' nor 'a2 is waiting on 'condvar' at this moment. That's OK.
1541 * condvar.signal
1542 * }
1543 * sleep 1
1544 * }
1545 * }
1546 *
1547 * # Eventually both 'a1' and 'a2' will have their resources, albeit in an
1548 * # unspecified order.
1549 * [a1, a2].each {|th| th.join}
1550 */
1551
1552static size_t
1553condvar_memsize(const void *ptr)
1554{
1555 return sizeof(struct rb_condvar);
1556}
1557
1558static const rb_data_type_t cv_data_type = {
1559 "condvar",
1560 {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
1561 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
1562};
1563
1564static struct rb_condvar *
1565condvar_ptr(VALUE self)
1566{
1567 struct rb_condvar *cv;
1568 rb_serial_t fork_gen = GET_VM()->fork_gen;
1569
1570 TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
1571
1572 /* forked children can't reach into parent thread stacks */
1573 if (cv->fork_gen != fork_gen) {
1574 cv->fork_gen = fork_gen;
1575 ccan_list_head_init(&cv->waitq);
1576 }
1577
1578 return cv;
1579}
1580
1581static VALUE
1582condvar_alloc(VALUE klass)
1583{
1584 struct rb_condvar *cv;
1585 VALUE obj;
1586
1587 obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
1588 ccan_list_head_init(&cv->waitq);
1589
1590 return obj;
1591}
1592
1593/*
1594 * Document-method: ConditionVariable::new
1595 *
1596 * Creates a new condition variable instance.
1597 */
1598
1599static VALUE
1600rb_condvar_initialize(VALUE self)
1601{
1602 struct rb_condvar *cv = condvar_ptr(self);
1603 ccan_list_head_init(&cv->waitq);
1604 return self;
1605}
1606
1608 VALUE mutex;
1609 VALUE timeout;
1610};
1611
1612static ID id_sleep;
1613
1614static VALUE
1615do_sleep(VALUE args)
1616{
1617 struct sleep_call *p = (struct sleep_call *)args;
1618 return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1619}
1620
1621/*
1622 * Document-method: Thread::ConditionVariable#wait
1623 * call-seq: wait(mutex, timeout=nil)
1624 *
1625 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
1626 *
1627 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
1628 * even if no other thread doesn't signal.
1629 *
1630 * This method may wake up spuriously due to underlying implementation details.
1631 *
1632 * Returns the slept result on +mutex+.
1633 */
1634
1635static VALUE
1636rb_condvar_wait(int argc, VALUE *argv, VALUE self)
1637{
1638 rb_execution_context_t *ec = GET_EC();
1639
1640 struct rb_condvar *cv = condvar_ptr(self);
1641 struct sleep_call args;
1642
1643 rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
1644
1645 struct sync_waiter sync_waiter = {
1646 .self = args.mutex,
1647 .th = ec->thread_ptr,
1648 .fiber = nonblocking_fiber(ec->fiber_ptr)
1649 };
1650
1651 ccan_list_add_tail(&cv->waitq, &sync_waiter.node);
1652 return rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&sync_waiter);
1653}
1654
1655/*
1656 * Document-method: Thread::ConditionVariable#signal
1657 *
1658 * Wakes up the first thread in line waiting for this lock.
1659 */
1660
1661static VALUE
1662rb_condvar_signal(VALUE self)
1663{
1664 struct rb_condvar *cv = condvar_ptr(self);
1665 wakeup_one(&cv->waitq);
1666 return self;
1667}
1668
1669/*
1670 * Document-method: Thread::ConditionVariable#broadcast
1671 *
1672 * Wakes up all threads waiting for this lock.
1673 */
1674
1675static VALUE
1676rb_condvar_broadcast(VALUE self)
1677{
1678 struct rb_condvar *cv = condvar_ptr(self);
1679 wakeup_all(&cv->waitq);
1680 return self;
1681}
1682
1683NORETURN(static VALUE undumpable(VALUE obj));
1684/* :nodoc: */
1685static VALUE
1686undumpable(VALUE obj)
1687{
1688 rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
1690}
1691
1692static VALUE
1693define_thread_class(VALUE outer, const ID name, VALUE super)
1694{
1695 VALUE klass = rb_define_class_id_under(outer, name, super);
1696 rb_const_set(rb_cObject, name, klass);
1697 return klass;
1698}
1699
1700static void
1701Init_thread_sync(void)
1702{
1703#undef rb_intern
1704#if defined(TEACH_RDOC) && TEACH_RDOC == 42
1705 rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject);
1706 rb_cConditionVariable = rb_define_class_under(rb_cThread, "ConditionVariable", rb_cObject);
1707 rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
1708 rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cObject);
1709#endif
1710
1711#define DEFINE_CLASS(name, super) \
1712 rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
1713
1714 /* Mutex */
1715 DEFINE_CLASS(Mutex, Object);
1716 rb_define_alloc_func(rb_cMutex, mutex_alloc);
1717 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
1718 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
1719 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
1720 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
1721 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
1722 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
1723 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
1724 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
1725
1726 /* Queue */
1727 DEFINE_CLASS(Queue, Object);
1728 rb_define_alloc_func(rb_cQueue, queue_alloc);
1729
1730 rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
1731
1732 rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, -1);
1733 rb_undef_method(rb_cQueue, "initialize_copy");
1734 rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
1735 rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
1736 rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
1737 rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
1738 rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
1739 rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
1740 rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
1741 rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
1742 rb_define_method(rb_cQueue, "freeze", rb_queue_freeze, 0);
1743
1744 rb_define_alias(rb_cQueue, "enq", "push");
1745 rb_define_alias(rb_cQueue, "<<", "push");
1746 rb_define_alias(rb_cQueue, "size", "length");
1747
1748 DEFINE_CLASS(SizedQueue, Queue);
1749 rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
1750
1751 rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
1752 rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
1753 rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
1754 rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
1755 rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
1756 rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
1757 rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
1758 rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
1759 rb_define_alias(rb_cSizedQueue, "size", "length");
1760
1761 /* CVar */
1762 DEFINE_CLASS(ConditionVariable, Object);
1763 rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
1764
1765 id_sleep = rb_intern("sleep");
1766
1767 rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
1768 rb_undef_method(rb_cConditionVariable, "initialize_copy");
1769 rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
1770 rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
1771 rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
1772 rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
1773
1774 rb_provide("thread.rb");
1775}
1776
1777#include "thread_sync.rbinc"
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
VALUE rb_define_class(const char *name, VALUE super)
Defines a top-level class.
Definition class.c:1484
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition class.c:1520
VALUE rb_define_class_id_under(VALUE outer, ID id, VALUE super)
Identical to rb_define_class_under(), except it takes the name in ID instead of C's string.
Definition class.c:1559
void rb_define_alias(VALUE klass, const char *name1, const char *name2)
Defines an alias of a method.
Definition class.c:2853
void rb_undef_method(VALUE klass, const char *name)
Defines an undef of a method.
Definition class.c:2673
int rb_scan_args(int argc, const VALUE *argv, const char *fmt,...)
Retrieves argument from argc and argv to given VALUE references according to the format string.
Definition class.c:3143
int rb_block_given_p(void)
Determines if the current method is given a block.
Definition eval.c:1037
#define FL_UNSET_RAW
Old name of RB_FL_UNSET_RAW.
Definition fl_type.h:133
#define Qundef
Old name of RUBY_Qundef.
#define INT2FIX
Old name of RB_INT2FIX.
Definition long.h:48
#define UNREACHABLE_RETURN
Old name of RBIMPL_UNREACHABLE_RETURN.
Definition assume.h:29
#define FL_TEST_RAW
Old name of RB_FL_TEST_RAW.
Definition fl_type.h:131
#define FL_SET
Old name of RB_FL_SET.
Definition fl_type.h:128
#define LONG2NUM
Old name of RB_LONG2NUM.
Definition long.h:50
#define Qtrue
Old name of RUBY_Qtrue.
#define INT2NUM
Old name of RB_INT2NUM.
Definition int.h:43
#define Qnil
Old name of RUBY_Qnil.
#define Qfalse
Old name of RUBY_Qfalse.
#define T_ARRAY
Old name of RUBY_T_ARRAY.
Definition value_type.h:56
#define NIL_P
Old name of RB_NIL_P.
#define Check_TypedStruct(v, t)
Old name of rb_check_typeddata.
Definition rtypeddata.h:106
#define NUM2LONG
Old name of RB_NUM2LONG.
Definition long.h:51
#define FIXNUM_P
Old name of RB_FIXNUM_P.
#define FL_SET_RAW
Old name of RB_FL_SET_RAW.
Definition fl_type.h:129
int rb_typeddata_is_kind_of(VALUE obj, const rb_data_type_t *data_type)
Checks if the given object is of given kind.
Definition error.c:1380
VALUE rb_eTypeError
TypeError exception.
Definition error.c:1430
VALUE rb_eStopIteration
StopIteration exception.
Definition enumerator.c:180
VALUE rb_ensure(VALUE(*b_proc)(VALUE), VALUE data1, VALUE(*e_proc)(VALUE), VALUE data2)
An equivalent to ensure clause.
Definition eval.c:1166
VALUE rb_eThreadError
ThreadError exception.
Definition eval.c:1055
VALUE rb_obj_class(VALUE obj)
Queries the class of an object.
Definition object.c:265
VALUE rb_cThread
Thread class.
Definition vm.c:617
double rb_num2dbl(VALUE num)
Converts an instance of rb_cNumeric into C's double.
Definition object.c:3785
VALUE rb_equal(VALUE lhs, VALUE rhs)
This function is an optimised version of calling #==.
Definition object.c:177
#define RB_OBJ_WRITTEN(old, oldv, young)
Identical to RB_OBJ_WRITE(), except it doesn't write any values, but only a WB declaration.
Definition gc.h:615
#define RB_OBJ_WRITE(old, slot, young)
Declaration of a "back" pointer.
Definition gc.h:603
Defines RBIMPL_HAS_BUILTIN.
VALUE rb_ary_concat(VALUE lhs, VALUE rhs)
Destructively appends the contents of latter into the end of former.
VALUE rb_ary_shift(VALUE ary)
Destructively deletes an element from the beginning of the passed array and returns what was deleted.
VALUE rb_ary_hidden_new(long capa)
Allocates a hidden (no class) empty array.
VALUE rb_ary_clear(VALUE ary)
Destructively removes everything form an array.
VALUE rb_ary_push(VALUE ary, VALUE elem)
Special case of rb_ary_cat() that it adds only one element.
static int rb_check_arity(int argc, int min, int max)
Ensures that the passed integer is in the passed range.
Definition error.h:284
void rb_provide(const char *feature)
Declares that the given feature is already provided by someone else.
Definition load.c:705
VALUE rb_mutex_new(void)
Creates a mutex.
VALUE rb_mutex_trylock(VALUE mutex)
Attempts to lock the mutex, without waiting for other threads to unlock it.
VALUE rb_mutex_locked_p(VALUE mutex)
Queries if there are any threads that holds the lock.
VALUE rb_mutex_synchronize(VALUE mutex, VALUE(*func)(VALUE arg), VALUE arg)
Obtains the lock, runs the passed function, and releases the lock when it completes.
VALUE rb_mutex_sleep(VALUE self, VALUE timeout)
Releases the lock held in the mutex and waits for the period of time; reacquires the lock on wakeup.
VALUE rb_mutex_unlock(VALUE mutex)
Releases the mutex.
VALUE rb_mutex_lock(VALUE mutex)
Attempts to lock the mutex.
struct timeval rb_time_interval(VALUE num)
Creates a "time interval".
Definition time.c:2948
void rb_const_set(VALUE space, ID name, VALUE val)
Names a constant.
Definition variable.c:3878
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
VALUE rb_yield(VALUE val)
Yields the block.
Definition vm_eval.c:1372
#define RARRAY_LEN
Just another name of rb_array_len.
Definition rarray.h:51
#define RUBY_TYPED_DEFAULT_FREE
This is a value you can set to rb_data_type_struct::dfree.
Definition rtypeddata.h:80
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
Definition rtypeddata.h:521
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:503
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition scheduler.c:464
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition scheduler.c:628
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
Definition scheduler.c:528
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition scheduler.c:647
#define RTEST
This is an old name of RB_TEST.
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:202
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40
static bool RB_TYPE_P(VALUE obj, enum ruby_value_type t)
Queries if the given object is of given type.
Definition value_type.h:376