Ruby 4.0.0dev (2025-12-07 revision 941e70ab38d01d067b7bbbcdf8553893a9ca8b49)
thread_sync.c (941e70ab38d01d067b7bbbcdf8553893a9ca8b49)
1/* included by thread.c */
2#include "ccan/list/list.h"
3#include "builtin.h"
4
5static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
6static VALUE rb_eClosedQueueError;
7
8/* Mutex */
9typedef struct rb_mutex_struct {
10 rb_serial_t fiber_serial;
11 VALUE thread; // even if the fiber is collected, we might need access to the thread in mutex_free
12 struct rb_mutex_struct *next_mutex;
13 struct ccan_list_head waitq; /* protected by GVL */
15
16/* sync_waiter is always on-stack */
18 VALUE self;
19 rb_thread_t *th;
20 rb_fiber_t *fiber;
21 struct ccan_list_node node;
22};
23
24static inline rb_fiber_t*
25nonblocking_fiber(rb_fiber_t *fiber)
26{
27 if (rb_fiberptr_blocking(fiber)) {
28 return NULL;
29 }
30
31 return fiber;
32}
33
35 VALUE self;
36 VALUE timeout;
37 rb_hrtime_t end;
38};
39
40#define MUTEX_ALLOW_TRAP FL_USER1
41
42static void
43sync_wakeup(struct ccan_list_head *head, long max)
44{
45 RUBY_DEBUG_LOG("max:%ld", max);
46
47 struct sync_waiter *cur = 0, *next;
48
49 ccan_list_for_each_safe(head, cur, next, node) {
50 ccan_list_del_init(&cur->node);
51
52 if (cur->th->status != THREAD_KILLED) {
53 if (cur->th->scheduler != Qnil && cur->fiber) {
54 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
55 }
56 else {
57 RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur->th));
58 rb_threadptr_interrupt(cur->th);
59 cur->th->status = THREAD_RUNNABLE;
60 }
61
62 if (--max == 0) return;
63 }
64 }
65}
66
67static void
68wakeup_one(struct ccan_list_head *head)
69{
70 sync_wakeup(head, 1);
71}
72
73static void
74wakeup_all(struct ccan_list_head *head)
75{
76 sync_wakeup(head, LONG_MAX);
77}
78
79#if defined(HAVE_WORKING_FORK)
80static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
81static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
82static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
83#endif
84static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber);
85
86/*
87 * Document-class: Thread::Mutex
88 *
89 * Thread::Mutex implements a simple semaphore that can be used to
90 * coordinate access to shared data from multiple concurrent threads.
91 *
92 * Example:
93 *
94 * semaphore = Thread::Mutex.new
95 *
96 * a = Thread.new {
97 * semaphore.synchronize {
98 * # access shared resource
99 * }
100 * }
101 *
102 * b = Thread.new {
103 * semaphore.synchronize {
104 * # access shared resource
105 * }
106 * }
107 *
108 */
109
110static size_t
111rb_mutex_num_waiting(rb_mutex_t *mutex)
112{
113 struct sync_waiter *w = 0;
114 size_t n = 0;
115
116 ccan_list_for_each(&mutex->waitq, w, node) {
117 n++;
118 }
119
120 return n;
121}
122
123rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
124
125static bool
126locked_p(rb_mutex_t *mutex)
127{
128 return mutex->fiber_serial != 0;
129}
130
131static void
132mutex_free(void *ptr)
133{
134 rb_mutex_t *mutex = ptr;
135 if (locked_p(mutex)) {
136 const char *err = rb_mutex_unlock_th(mutex, rb_thread_ptr(mutex->thread), NULL);
137 if (err) rb_bug("%s", err);
138 }
139 ruby_xfree(ptr);
140}
141
142static size_t
143mutex_memsize(const void *ptr)
144{
145 return sizeof(rb_mutex_t);
146}
147
148static const rb_data_type_t mutex_data_type = {
149 "mutex",
150 {NULL, mutex_free, mutex_memsize,},
151 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
152};
153
154static rb_mutex_t *
155mutex_ptr(VALUE obj)
156{
157 rb_mutex_t *mutex;
158
159 TypedData_Get_Struct(obj, rb_mutex_t, &mutex_data_type, mutex);
160
161 return mutex;
162}
163
164VALUE
165rb_obj_is_mutex(VALUE obj)
166{
167 return RBOOL(rb_typeddata_is_kind_of(obj, &mutex_data_type));
168}
169
170static VALUE
171mutex_alloc(VALUE klass)
172{
173 VALUE obj;
174 rb_mutex_t *mutex;
175
176 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
177
178 ccan_list_head_init(&mutex->waitq);
179 return obj;
180}
181
182/*
183 * call-seq:
184 * Thread::Mutex.new -> mutex
185 *
186 * Creates a new Mutex
187 */
188static VALUE
189mutex_initialize(VALUE self)
190{
191 return self;
192}
193
194VALUE
196{
197 return mutex_alloc(rb_cMutex);
198}
199
200/*
201 * call-seq:
202 * mutex.locked? -> true or false
203 *
204 * Returns +true+ if this lock is currently held by some thread.
205 */
206VALUE
208{
209 rb_mutex_t *mutex = mutex_ptr(self);
210
211 return RBOOL(locked_p(mutex));
212}
213
214static void
215thread_mutex_insert(rb_thread_t *thread, rb_mutex_t *mutex)
216{
217 RUBY_ASSERT(!mutex->next_mutex);
218 if (thread->keeping_mutexes) {
219 mutex->next_mutex = thread->keeping_mutexes;
220 }
221
222 thread->keeping_mutexes = mutex;
223}
224
225static void
226thread_mutex_remove(rb_thread_t *thread, rb_mutex_t *mutex)
227{
228 rb_mutex_t **keeping_mutexes = &thread->keeping_mutexes;
229
230 while (*keeping_mutexes && *keeping_mutexes != mutex) {
231 // Move to the next mutex in the list:
232 keeping_mutexes = &(*keeping_mutexes)->next_mutex;
233 }
234
235 if (*keeping_mutexes) {
236 *keeping_mutexes = mutex->next_mutex;
237 mutex->next_mutex = NULL;
238 }
239}
240
241static void
242mutex_set_owner(VALUE self, rb_thread_t *th, rb_fiber_t *fiber)
243{
244 rb_mutex_t *mutex = mutex_ptr(self);
245
246 mutex->thread = th->self;
247 mutex->fiber_serial = rb_fiber_serial(fiber);
248}
249
250static void
251mutex_locked(rb_thread_t *th, rb_fiber_t *fiber, VALUE self)
252{
253 rb_mutex_t *mutex = mutex_ptr(self);
254
255 mutex_set_owner(self, th, fiber);
256 thread_mutex_insert(th, mutex);
257}
258
259/*
260 * call-seq:
261 * mutex.try_lock -> true or false
262 *
263 * Attempts to obtain the lock and returns immediately. Returns +true+ if the
264 * lock was granted.
265 */
266VALUE
268{
269 rb_mutex_t *mutex = mutex_ptr(self);
270
271 if (mutex->fiber_serial == 0) {
272 RUBY_DEBUG_LOG("%p ok", mutex);
273
274 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
275 rb_thread_t *th = GET_THREAD();
276
277 mutex_locked(th, fiber, self);
278 return Qtrue;
279 }
280 else {
281 RUBY_DEBUG_LOG("%p ng", mutex);
282 return Qfalse;
283 }
284}
285
286static VALUE
287mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
288{
289 return RBOOL(mutex->fiber_serial == rb_fiber_serial(fiber));
290}
291
292static VALUE
293call_rb_fiber_scheduler_block(VALUE mutex)
294{
296}
297
298static VALUE
299delete_from_waitq(VALUE value)
300{
301 struct sync_waiter *sync_waiter = (void *)value;
302 ccan_list_del(&sync_waiter->node);
303
304 return Qnil;
305}
306
307static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th);
308
309static VALUE
310do_mutex_lock(VALUE self, int interruptible_p)
311{
312 rb_execution_context_t *ec = GET_EC();
313 rb_thread_t *th = ec->thread_ptr;
314 rb_fiber_t *fiber = ec->fiber_ptr;
315 rb_mutex_t *mutex = mutex_ptr(self);
316 rb_atomic_t saved_ints = 0;
317
318 /* When running trap handler */
319 if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
320 th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
321 rb_raise(rb_eThreadError, "can't be called from trap context");
322 }
323
324 if (rb_mutex_trylock(self) == Qfalse) {
325 if (mutex->fiber_serial == rb_fiber_serial(fiber)) {
326 rb_raise(rb_eThreadError, "deadlock; recursive locking");
327 }
328
329 while (mutex->fiber_serial != rb_fiber_serial(fiber)) {
330 VM_ASSERT(mutex->fiber_serial != 0);
331
332 VALUE scheduler = rb_fiber_scheduler_current();
333 if (scheduler != Qnil) {
334 struct sync_waiter sync_waiter = {
335 .self = self,
336 .th = th,
337 .fiber = nonblocking_fiber(fiber)
338 };
339
340 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
341
342 rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)&sync_waiter);
343
344 if (!mutex->fiber_serial) {
345 mutex_set_owner(self, th, fiber);
346 }
347 }
348 else {
349 if (!th->vm->thread_ignore_deadlock && rb_thread_ptr(mutex->thread) == th) {
350 rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread");
351 }
352
353 struct sync_waiter sync_waiter = {
354 .self = self,
355 .th = th,
356 .fiber = nonblocking_fiber(fiber),
357 };
358
359 RUBY_DEBUG_LOG("%p wait", mutex);
360
361 // similar code with `sleep_forever`, but
362 // sleep_forever(SLEEP_DEADLOCKABLE) raises an exception.
363 // Ensure clause is needed like but `rb_ensure` a bit slow.
364 //
365 // begin
366 // sleep_forever(th, SLEEP_DEADLOCKABLE);
367 // ensure
368 // ccan_list_del(&sync_waiter.node);
369 // end
370 enum rb_thread_status prev_status = th->status;
371 th->status = THREAD_STOPPED_FOREVER;
372 rb_ractor_sleeper_threads_inc(th->ractor);
373 rb_check_deadlock(th->ractor);
374
375 RUBY_ASSERT(!th->locking_mutex);
376 th->locking_mutex = self;
377
378 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
379 {
380 native_sleep(th, NULL);
381 }
382 ccan_list_del(&sync_waiter.node);
383
384 // unlocked by another thread while sleeping
385 if (!mutex->fiber_serial) {
386 mutex_set_owner(self, th, fiber);
387 }
388
389 rb_ractor_sleeper_threads_dec(th->ractor);
390 th->status = prev_status;
391 th->locking_mutex = Qfalse;
392
393 RUBY_DEBUG_LOG("%p wakeup", mutex);
394 }
395
396 if (interruptible_p) {
397 /* release mutex before checking for interrupts...as interrupt checking
398 * code might call rb_raise() */
399 if (mutex->fiber_serial == rb_fiber_serial(fiber)) {
400 mutex->thread = Qfalse;
401 mutex->fiber_serial = 0;
402 }
403 RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
404 if (!mutex->fiber_serial) {
405 mutex_set_owner(self, th, fiber);
406 }
407 }
408 else {
409 // clear interrupt information
410 if (RUBY_VM_INTERRUPTED(th->ec)) {
411 // reset interrupts
412 if (saved_ints == 0) {
413 saved_ints = threadptr_get_interrupts(th);
414 }
415 else {
416 // ignore additional interrupts
417 threadptr_get_interrupts(th);
418 }
419 }
420 }
421 }
422
423 if (saved_ints) th->ec->interrupt_flag = saved_ints;
424 if (mutex->fiber_serial == rb_fiber_serial(fiber)) mutex_locked(th, fiber, self);
425 }
426
427 RUBY_DEBUG_LOG("%p locked", mutex);
428
429 // assertion
430 if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
431
432 return self;
433}
434
435static VALUE
436mutex_lock_uninterruptible(VALUE self)
437{
438 return do_mutex_lock(self, 0);
439}
440
441/*
442 * call-seq:
443 * mutex.lock -> self
444 *
445 * Attempts to grab the lock and waits if it isn't available.
446 * Raises +ThreadError+ if +mutex+ was locked by the current thread.
447 */
448VALUE
450{
451 return do_mutex_lock(self, 1);
452}
453
454/*
455 * call-seq:
456 * mutex.owned? -> true or false
457 *
458 * Returns +true+ if this lock is currently held by current thread.
459 */
460VALUE
461rb_mutex_owned_p(VALUE self)
462{
463 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
464 rb_mutex_t *mutex = mutex_ptr(self);
465
466 return mutex_owned_p(fiber, mutex);
467}
468
469static const char *
470rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
471{
472 RUBY_DEBUG_LOG("%p", mutex);
473
474 if (mutex->fiber_serial == 0) {
475 return "Attempt to unlock a mutex which is not locked";
476 }
477 else if (fiber && mutex->fiber_serial != rb_fiber_serial(fiber)) {
478 return "Attempt to unlock a mutex which is locked by another thread/fiber";
479 }
480
481 struct sync_waiter *cur = 0, *next;
482
483 mutex->fiber_serial = 0;
484 thread_mutex_remove(th, mutex);
485
486 ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
487 ccan_list_del_init(&cur->node);
488
489 if (cur->th->scheduler != Qnil && cur->fiber) {
490 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
491 return NULL;
492 }
493 else {
494 switch (cur->th->status) {
495 case THREAD_RUNNABLE: /* from someone else calling Thread#run */
496 case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
497 RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th));
498 rb_threadptr_interrupt(cur->th);
499 return NULL;
500 case THREAD_STOPPED: /* probably impossible */
501 rb_bug("unexpected THREAD_STOPPED");
502 case THREAD_KILLED:
503 /* not sure about this, possible in exit GC? */
504 rb_bug("unexpected THREAD_KILLED");
505 continue;
506 }
507 }
508 }
509
510 // We did not find any threads to wake up, so we can just return with no error:
511 return NULL;
512}
513
514/*
515 * call-seq:
516 * mutex.unlock -> self
517 *
518 * Releases the lock.
519 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
520 */
521VALUE
523{
524 const char *err;
525 rb_mutex_t *mutex = mutex_ptr(self);
526 rb_thread_t *th = GET_THREAD();
527
528 err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr);
529 if (err) rb_raise(rb_eThreadError, "%s", err);
530
531 return self;
532}
533
534#if defined(HAVE_WORKING_FORK)
535static void
536rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
537{
538 rb_mutex_abandon_all(th->keeping_mutexes);
539 th->keeping_mutexes = NULL;
540}
541
542static void
543rb_mutex_abandon_locking_mutex(rb_thread_t *th)
544{
545 if (th->locking_mutex) {
546 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
547
548 ccan_list_head_init(&mutex->waitq);
549 th->locking_mutex = Qfalse;
550 }
551}
552
553static void
554rb_mutex_abandon_all(rb_mutex_t *mutexes)
555{
556 rb_mutex_t *mutex;
557
558 while (mutexes) {
559 mutex = mutexes;
560 mutexes = mutex->next_mutex;
561 mutex->fiber_serial = 0;
562 mutex->next_mutex = 0;
563 ccan_list_head_init(&mutex->waitq);
564 }
565}
566#endif
567
569 VALUE self;
570 VALUE timeout;
571};
572
573static VALUE
574mutex_sleep_begin(VALUE _arguments)
575{
576 struct rb_mutex_sleep_arguments *arguments = (struct rb_mutex_sleep_arguments *)_arguments;
577 VALUE timeout = arguments->timeout;
578 VALUE woken = Qtrue;
579
580 VALUE scheduler = rb_fiber_scheduler_current();
581 if (scheduler != Qnil) {
582 rb_fiber_scheduler_kernel_sleep(scheduler, timeout);
583 }
584 else {
585 if (NIL_P(timeout)) {
586 rb_thread_sleep_deadly_allow_spurious_wakeup(arguments->self, Qnil, 0);
587 }
588 else {
589 struct timeval timeout_value = rb_time_interval(timeout);
590 rb_hrtime_t relative_timeout = rb_timeval2hrtime(&timeout_value);
591 /* permit spurious check */
592 woken = RBOOL(sleep_hrtime(GET_THREAD(), relative_timeout, 0));
593 }
594 }
595
596 return woken;
597}
598
599VALUE
601{
602 if (!NIL_P(timeout)) {
603 // Validate the argument:
604 rb_time_interval(timeout);
605 }
606
607 rb_mutex_unlock(self);
608 time_t beg = time(0);
609
610 struct rb_mutex_sleep_arguments arguments = {
611 .self = self,
612 .timeout = timeout,
613 };
614
615 VALUE woken = rb_ensure(mutex_sleep_begin, (VALUE)&arguments, mutex_lock_uninterruptible, self);
616
617 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
618 if (!woken) return Qnil;
619 time_t end = time(0) - beg;
620 return TIMET2NUM(end);
621}
622
623/*
624 * call-seq:
625 * mutex.sleep(timeout = nil) -> number or nil
626 *
627 * Releases the lock and sleeps +timeout+ seconds if it is given and
628 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
629 * the current thread.
630 *
631 * When the thread is next woken up, it will attempt to reacquire
632 * the lock.
633 *
634 * Note that this method can wakeup without explicit Thread#wakeup call.
635 * For example, receiving signal and so on.
636 *
637 * Returns the slept time in seconds if woken up, or +nil+ if timed out.
638 */
639static VALUE
640mutex_sleep(int argc, VALUE *argv, VALUE self)
641{
642 VALUE timeout;
643
644 timeout = rb_check_arity(argc, 0, 1) ? argv[0] : Qnil;
645 return rb_mutex_sleep(self, timeout);
646}
647
648/*
649 * call-seq:
650 * mutex.synchronize { ... } -> result of the block
651 *
652 * Obtains a lock, runs the block, and releases the lock when the block
653 * completes. See the example under Thread::Mutex.
654 */
655
656VALUE
657rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
658{
659 rb_mutex_lock(mutex);
660 return rb_ensure(func, arg, rb_mutex_unlock, mutex);
661}
662
663/*
664 * call-seq:
665 * mutex.synchronize { ... } -> result of the block
666 *
667 * Obtains a lock, runs the block, and releases the lock when the block
668 * completes. See the example under Thread::Mutex.
669 */
670static VALUE
671rb_mutex_synchronize_m(VALUE self)
672{
673 if (!rb_block_given_p()) {
674 rb_raise(rb_eThreadError, "must be called with a block");
675 }
676
677 return rb_mutex_synchronize(self, rb_yield, Qundef);
678}
679
680void
681rb_mutex_allow_trap(VALUE self, int val)
682{
683 Check_TypedStruct(self, &mutex_data_type);
684
685 if (val)
686 FL_SET_RAW(self, MUTEX_ALLOW_TRAP);
687 else
688 FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP);
689}
690
691/* Queue */
692
693#define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
694#define queue_list(q) UNALIGNED_MEMBER_PTR(q, que)
695RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
696struct rb_queue {
697 struct ccan_list_head waitq;
698 rb_serial_t fork_gen;
699 const VALUE que;
700 int num_waiting;
701} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
702
703#define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
704#define szqueue_list(sq) UNALIGNED_MEMBER_PTR(sq, q.que)
705#define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
706RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
708 struct rb_queue q;
709 int num_waiting_push;
710 struct ccan_list_head pushq;
711 long max;
712} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
713
714static void
715queue_mark_and_move(void *ptr)
716{
717 struct rb_queue *q = ptr;
718
719 /* no need to mark threads in waitq, they are on stack */
720 rb_gc_mark_and_move((VALUE *)UNALIGNED_MEMBER_PTR(q, que));
721}
722
723static size_t
724queue_memsize(const void *ptr)
725{
726 return sizeof(struct rb_queue);
727}
728
729static const rb_data_type_t queue_data_type = {
730 "queue",
731 {queue_mark_and_move, RUBY_TYPED_DEFAULT_FREE, queue_memsize, queue_mark_and_move},
732 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
733};
734
735static VALUE
736queue_alloc(VALUE klass)
737{
738 VALUE obj;
739 struct rb_queue *q;
740
741 obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
742 ccan_list_head_init(queue_waitq(q));
743 return obj;
744}
745
746static int
747queue_fork_check(struct rb_queue *q)
748{
749 rb_serial_t fork_gen = GET_VM()->fork_gen;
750
751 if (q->fork_gen == fork_gen) {
752 return 0;
753 }
754 /* forked children can't reach into parent thread stacks */
755 q->fork_gen = fork_gen;
756 ccan_list_head_init(queue_waitq(q));
757 q->num_waiting = 0;
758 return 1;
759}
760
761static struct rb_queue *
762queue_ptr(VALUE obj)
763{
764 struct rb_queue *q;
765
766 TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
767 queue_fork_check(q);
768
769 return q;
770}
771
772#define QUEUE_CLOSED FL_USER5
773
774static rb_hrtime_t
775queue_timeout2hrtime(VALUE timeout)
776{
777 if (NIL_P(timeout)) {
778 return (rb_hrtime_t)0;
779 }
780 rb_hrtime_t rel = 0;
781 if (FIXNUM_P(timeout)) {
782 rel = rb_sec2hrtime(NUM2TIMET(timeout));
783 }
784 else {
785 double2hrtime(&rel, rb_num2dbl(timeout));
786 }
787 return rb_hrtime_add(rel, rb_hrtime_now());
788}
789
790static void
791szqueue_mark_and_move(void *ptr)
792{
793 struct rb_szqueue *sq = ptr;
794
795 queue_mark_and_move(&sq->q);
796}
797
798static size_t
799szqueue_memsize(const void *ptr)
800{
801 return sizeof(struct rb_szqueue);
802}
803
804static const rb_data_type_t szqueue_data_type = {
805 "sized_queue",
806 {szqueue_mark_and_move, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize, szqueue_mark_and_move},
807 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
808};
809
810static VALUE
811szqueue_alloc(VALUE klass)
812{
813 struct rb_szqueue *sq;
814 VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
815 &szqueue_data_type, sq);
816 ccan_list_head_init(szqueue_waitq(sq));
817 ccan_list_head_init(szqueue_pushq(sq));
818 return obj;
819}
820
821static struct rb_szqueue *
822szqueue_ptr(VALUE obj)
823{
824 struct rb_szqueue *sq;
825
826 TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
827 if (queue_fork_check(&sq->q)) {
828 ccan_list_head_init(szqueue_pushq(sq));
829 sq->num_waiting_push = 0;
830 }
831
832 return sq;
833}
834
835static VALUE
836ary_buf_new(void)
837{
838 return rb_ary_hidden_new(1);
839}
840
841static VALUE
842check_array(VALUE obj, VALUE ary)
843{
844 if (!RB_TYPE_P(ary, T_ARRAY)) {
845 rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
846 }
847 return ary;
848}
849
850static long
851queue_length(VALUE self, struct rb_queue *q)
852{
853 return RARRAY_LEN(check_array(self, q->que));
854}
855
856static int
857queue_closed_p(VALUE self)
858{
859 return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
860}
861
862/*
863 * Document-class: ClosedQueueError
864 *
865 * The exception class which will be raised when pushing into a closed
866 * Queue. See Thread::Queue#close and Thread::SizedQueue#close.
867 */
868
869NORETURN(static void raise_closed_queue_error(VALUE self));
870
871static void
872raise_closed_queue_error(VALUE self)
873{
874 rb_raise(rb_eClosedQueueError, "queue closed");
875}
876
877static VALUE
878queue_closed_result(VALUE self, struct rb_queue *q)
879{
880 RUBY_ASSERT(queue_length(self, q) == 0);
881 return Qnil;
882}
883
884/*
885 * Document-class: Thread::Queue
886 *
887 * The Thread::Queue class implements multi-producer, multi-consumer
888 * queues. It is especially useful in threaded programming when
889 * information must be exchanged safely between multiple threads. The
890 * Thread::Queue class implements all the required locking semantics.
891 *
892 * The class implements FIFO (first in, first out) type of queue.
893 * In a FIFO queue, the first tasks added are the first retrieved.
894 *
895 * Example:
896 *
897 * queue = Thread::Queue.new
898 *
899 * producer = Thread.new do
900 * 5.times do |i|
901 * sleep rand(i) # simulate expense
902 * queue << i
903 * puts "#{i} produced"
904 * end
905 * end
906 *
907 * consumer = Thread.new do
908 * 5.times do |i|
909 * value = queue.pop
910 * sleep rand(i/2) # simulate expense
911 * puts "consumed #{value}"
912 * end
913 * end
914 *
915 * consumer.join
916 *
917 */
918
919/*
920 * Document-method: Queue::new
921 *
922 * call-seq:
923 * Thread::Queue.new -> empty_queue
924 * Thread::Queue.new(enumerable) -> queue
925 *
926 * Creates a new queue instance, optionally using the contents of an +enumerable+
927 * for its initial state.
928 *
929 * Example:
930 *
931 * q = Thread::Queue.new
932 * #=> #<Thread::Queue:0x00007ff7501110d0>
933 * q.empty?
934 * #=> true
935 *
936 * q = Thread::Queue.new([1, 2, 3])
937 * #=> #<Thread::Queue:0x00007ff7500ec500>
938 * q.empty?
939 * #=> false
940 * q.pop
941 * #=> 1
942 */
943
944static VALUE
945rb_queue_initialize(int argc, VALUE *argv, VALUE self)
946{
947 VALUE initial;
948 struct rb_queue *q = queue_ptr(self);
949 if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
950 initial = rb_to_array(initial);
951 }
952 RB_OBJ_WRITE(self, queue_list(q), ary_buf_new());
953 ccan_list_head_init(queue_waitq(q));
954 if (argc == 1) {
955 rb_ary_concat(q->que, initial);
956 }
957 return self;
958}
959
960static VALUE
961queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
962{
963 if (queue_closed_p(self)) {
964 raise_closed_queue_error(self);
965 }
966 rb_ary_push(check_array(self, q->que), obj);
967 wakeup_one(queue_waitq(q));
968 return self;
969}
970
971/*
972 * Document-method: Thread::Queue#close
973 * call-seq:
974 * close
975 *
976 * Closes the queue. A closed queue cannot be re-opened.
977 *
978 * After the call to close completes, the following are true:
979 *
980 * - +closed?+ will return true
981 *
982 * - +close+ will be ignored.
983 *
984 * - calling enq/push/<< will raise a +ClosedQueueError+.
985 *
986 * - when +empty?+ is false, calling deq/pop/shift will return an object
987 * from the queue as usual.
988 * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
989 * deq(true) will raise a +ThreadError+.
990 *
991 * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
992 *
993 * Example:
994 *
995 * q = Thread::Queue.new
996 * Thread.new{
997 * while e = q.deq # wait for nil to break loop
998 * # ...
999 * end
1000 * }
1001 * q.close
1002 */
1003
1004static VALUE
1005rb_queue_close(VALUE self)
1006{
1007 struct rb_queue *q = queue_ptr(self);
1008
1009 if (!queue_closed_p(self)) {
1010 FL_SET(self, QUEUE_CLOSED);
1011
1012 wakeup_all(queue_waitq(q));
1013 }
1014
1015 return self;
1016}
1017
1018/*
1019 * Document-method: Thread::Queue#closed?
1020 * call-seq: closed?
1021 *
1022 * Returns +true+ if the queue is closed.
1023 */
1024
1025static VALUE
1026rb_queue_closed_p(VALUE self)
1027{
1028 return RBOOL(queue_closed_p(self));
1029}
1030
1031/*
1032 * Document-method: Thread::Queue#push
1033 * call-seq:
1034 * push(object)
1035 * enq(object)
1036 * <<(object)
1037 *
1038 * Pushes the given +object+ to the queue.
1039 */
1040
1041static VALUE
1042rb_queue_push(VALUE self, VALUE obj)
1043{
1044 return queue_do_push(self, queue_ptr(self), obj);
1045}
1046
1047static VALUE
1048queue_sleep(VALUE _args)
1049{
1050 struct queue_sleep_arg *args = (struct queue_sleep_arg *)_args;
1051 rb_thread_sleep_deadly_allow_spurious_wakeup(args->self, args->timeout, args->end);
1052 return Qnil;
1053}
1054
1056 struct sync_waiter w;
1057 union {
1058 struct rb_queue *q;
1059 struct rb_szqueue *sq;
1060 } as;
1061};
1062
1063static VALUE
1064queue_sleep_done(VALUE p)
1065{
1066 struct queue_waiter *qw = (struct queue_waiter *)p;
1067
1068 ccan_list_del(&qw->w.node);
1069 qw->as.q->num_waiting--;
1070
1071 return Qfalse;
1072}
1073
1074static VALUE
1075szqueue_sleep_done(VALUE p)
1076{
1077 struct queue_waiter *qw = (struct queue_waiter *)p;
1078
1079 ccan_list_del(&qw->w.node);
1080 qw->as.sq->num_waiting_push--;
1081
1082 return Qfalse;
1083}
1084
1085static VALUE
1086queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout)
1087{
1088 check_array(self, q->que);
1089 if (RARRAY_LEN(q->que) == 0) {
1090 if (!should_block) {
1091 rb_raise(rb_eThreadError, "queue empty");
1092 }
1093
1094 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1095 return Qnil;
1096 }
1097 }
1098
1099 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1100 while (RARRAY_LEN(q->que) == 0) {
1101 if (queue_closed_p(self)) {
1102 return queue_closed_result(self, q);
1103 }
1104 else {
1105 rb_execution_context_t *ec = GET_EC();
1106
1107 RUBY_ASSERT(RARRAY_LEN(q->que) == 0);
1108 RUBY_ASSERT(queue_closed_p(self) == 0);
1109
1110 struct queue_waiter queue_waiter = {
1111 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1112 .as = {.q = q}
1113 };
1114
1115 struct ccan_list_head *waitq = queue_waitq(q);
1116
1117 ccan_list_add_tail(waitq, &queue_waiter.w.node);
1118 queue_waiter.as.q->num_waiting++;
1119
1121 .self = self,
1122 .timeout = timeout,
1123 .end = end
1124 };
1125
1126 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, queue_sleep_done, (VALUE)&queue_waiter);
1127 if (!NIL_P(timeout) && (rb_hrtime_now() >= end))
1128 break;
1129 }
1130 }
1131
1132 return rb_ary_shift(q->que);
1133}
1134
1135static VALUE
1136rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1137{
1138 return queue_do_pop(self, queue_ptr(self), !RTEST(non_block), timeout);
1139}
1140
1141/*
1142 * Document-method: Thread::Queue#empty?
1143 * call-seq: empty?
1144 *
1145 * Returns +true+ if the queue is empty.
1146 */
1147
1148static VALUE
1149rb_queue_empty_p(VALUE self)
1150{
1151 return RBOOL(queue_length(self, queue_ptr(self)) == 0);
1152}
1153
1154/*
1155 * Document-method: Thread::Queue#clear
1156 *
1157 * Removes all objects from the queue.
1158 */
1159
1160static VALUE
1161rb_queue_clear(VALUE self)
1162{
1163 struct rb_queue *q = queue_ptr(self);
1164
1165 rb_ary_clear(check_array(self, q->que));
1166 return self;
1167}
1168
1169/*
1170 * Document-method: Thread::Queue#length
1171 * call-seq:
1172 * length
1173 * size
1174 *
1175 * Returns the length of the queue.
1176 */
1177
1178static VALUE
1179rb_queue_length(VALUE self)
1180{
1181 return LONG2NUM(queue_length(self, queue_ptr(self)));
1182}
1183
1184NORETURN(static VALUE rb_queue_freeze(VALUE self));
1185/*
1186 * call-seq:
1187 * freeze
1188 *
1189 * The queue can't be frozen, so this method raises an exception:
1190 * Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
1191 *
1192 */
1193static VALUE
1194rb_queue_freeze(VALUE self)
1195{
1196 rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self);
1197 UNREACHABLE_RETURN(self);
1198}
1199
1200/*
1201 * Document-method: Thread::Queue#num_waiting
1202 *
1203 * Returns the number of threads waiting on the queue.
1204 */
1205
1206static VALUE
1207rb_queue_num_waiting(VALUE self)
1208{
1209 struct rb_queue *q = queue_ptr(self);
1210
1211 return INT2NUM(q->num_waiting);
1212}
1213
1214/*
1215 * Document-class: Thread::SizedQueue
1216 *
1217 * This class represents queues of specified size capacity. The push operation
1218 * may be blocked if the capacity is full.
1219 *
1220 * See Thread::Queue for an example of how a Thread::SizedQueue works.
1221 */
1222
1223/*
1224 * Document-method: SizedQueue::new
1225 * call-seq: new(max)
1226 *
1227 * Creates a fixed-length queue with a maximum size of +max+.
1228 */
1229
1230static VALUE
1231rb_szqueue_initialize(VALUE self, VALUE vmax)
1232{
1233 long max;
1234 struct rb_szqueue *sq = szqueue_ptr(self);
1235
1236 max = NUM2LONG(vmax);
1237 if (max <= 0) {
1238 rb_raise(rb_eArgError, "queue size must be positive");
1239 }
1240
1241 RB_OBJ_WRITE(self, szqueue_list(sq), ary_buf_new());
1242 ccan_list_head_init(szqueue_waitq(sq));
1243 ccan_list_head_init(szqueue_pushq(sq));
1244 sq->max = max;
1245
1246 return self;
1247}
1248
1249/*
1250 * Document-method: Thread::SizedQueue#close
1251 * call-seq:
1252 * close
1253 *
1254 * Similar to Thread::Queue#close.
1255 *
1256 * The difference is behavior with waiting enqueuing threads.
1257 *
1258 * If there are waiting enqueuing threads, they are interrupted by
1259 * raising ClosedQueueError('queue closed').
1260 */
1261static VALUE
1262rb_szqueue_close(VALUE self)
1263{
1264 if (!queue_closed_p(self)) {
1265 struct rb_szqueue *sq = szqueue_ptr(self);
1266
1267 FL_SET(self, QUEUE_CLOSED);
1268 wakeup_all(szqueue_waitq(sq));
1269 wakeup_all(szqueue_pushq(sq));
1270 }
1271 return self;
1272}
1273
1274/*
1275 * Document-method: Thread::SizedQueue#max
1276 *
1277 * Returns the maximum size of the queue.
1278 */
1279
1280static VALUE
1281rb_szqueue_max_get(VALUE self)
1282{
1283 return LONG2NUM(szqueue_ptr(self)->max);
1284}
1285
1286/*
1287 * Document-method: Thread::SizedQueue#max=
1288 * call-seq: max=(number)
1289 *
1290 * Sets the maximum size of the queue to the given +number+.
1291 */
1292
1293static VALUE
1294rb_szqueue_max_set(VALUE self, VALUE vmax)
1295{
1296 long max = NUM2LONG(vmax);
1297 long diff = 0;
1298 struct rb_szqueue *sq = szqueue_ptr(self);
1299
1300 if (max <= 0) {
1301 rb_raise(rb_eArgError, "queue size must be positive");
1302 }
1303 if (max > sq->max) {
1304 diff = max - sq->max;
1305 }
1306 sq->max = max;
1307 sync_wakeup(szqueue_pushq(sq), diff);
1308 return vmax;
1309}
1310
1311static VALUE
1312rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_block, VALUE timeout)
1313{
1314 struct rb_szqueue *sq = szqueue_ptr(self);
1315
1316 if (queue_length(self, &sq->q) >= sq->max) {
1317 if (RTEST(non_block)) {
1318 rb_raise(rb_eThreadError, "queue full");
1319 }
1320
1321 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1322 return Qnil;
1323 }
1324 }
1325
1326 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1327 while (queue_length(self, &sq->q) >= sq->max) {
1328 if (queue_closed_p(self)) {
1329 raise_closed_queue_error(self);
1330 }
1331 else {
1332 rb_execution_context_t *ec = GET_EC();
1333 struct queue_waiter queue_waiter = {
1334 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1335 .as = {.sq = sq}
1336 };
1337
1338 struct ccan_list_head *pushq = szqueue_pushq(sq);
1339
1340 ccan_list_add_tail(pushq, &queue_waiter.w.node);
1341 sq->num_waiting_push++;
1342
1344 .self = self,
1345 .timeout = timeout,
1346 .end = end
1347 };
1348 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, szqueue_sleep_done, (VALUE)&queue_waiter);
1349 if (!NIL_P(timeout) && rb_hrtime_now() >= end) {
1350 return Qnil;
1351 }
1352 }
1353 }
1354
1355 return queue_do_push(self, &sq->q, object);
1356}
1357
1358static VALUE
1359szqueue_do_pop(VALUE self, int should_block, VALUE timeout)
1360{
1361 struct rb_szqueue *sq = szqueue_ptr(self);
1362 VALUE retval = queue_do_pop(self, &sq->q, should_block, timeout);
1363
1364 if (queue_length(self, &sq->q) < sq->max) {
1365 wakeup_one(szqueue_pushq(sq));
1366 }
1367
1368 return retval;
1369}
1370static VALUE
1371rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1372{
1373 return szqueue_do_pop(self, !RTEST(non_block), timeout);
1374}
1375
1376/*
1377 * Document-method: Thread::SizedQueue#clear
1378 *
1379 * Removes all objects from the queue.
1380 */
1381
1382static VALUE
1383rb_szqueue_clear(VALUE self)
1384{
1385 struct rb_szqueue *sq = szqueue_ptr(self);
1386
1387 rb_ary_clear(check_array(self, sq->q.que));
1388 wakeup_all(szqueue_pushq(sq));
1389 return self;
1390}
1391
1392/*
1393 * Document-method: Thread::SizedQueue#length
1394 * call-seq:
1395 * length
1396 * size
1397 *
1398 * Returns the length of the queue.
1399 */
1400
1401static VALUE
1402rb_szqueue_length(VALUE self)
1403{
1404 struct rb_szqueue *sq = szqueue_ptr(self);
1405
1406 return LONG2NUM(queue_length(self, &sq->q));
1407}
1408
1409/*
1410 * Document-method: Thread::SizedQueue#num_waiting
1411 *
1412 * Returns the number of threads waiting on the queue.
1413 */
1414
1415static VALUE
1416rb_szqueue_num_waiting(VALUE self)
1417{
1418 struct rb_szqueue *sq = szqueue_ptr(self);
1419
1420 return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1421}
1422
1423/*
1424 * Document-method: Thread::SizedQueue#empty?
1425 * call-seq: empty?
1426 *
1427 * Returns +true+ if the queue is empty.
1428 */
1429
1430static VALUE
1431rb_szqueue_empty_p(VALUE self)
1432{
1433 struct rb_szqueue *sq = szqueue_ptr(self);
1434
1435 return RBOOL(queue_length(self, &sq->q) == 0);
1436}
1437
1438
1439/* ConditionalVariable */
1441 struct ccan_list_head waitq;
1442 rb_serial_t fork_gen;
1443};
1444
1445/*
1446 * Document-class: Thread::ConditionVariable
1447 *
1448 * ConditionVariable objects augment class Mutex. Using condition variables,
1449 * it is possible to suspend while in the middle of a critical section until a
1450 * condition is met, such as a resource becomes available.
1451 *
1452 * Due to non-deterministic scheduling and spurious wake-ups, users of
1453 * condition variables should always use a separate boolean predicate (such as
1454 * reading from a boolean variable) to check if the condition is actually met
1455 * before starting to wait, and should wait in a loop, re-checking the
1456 * condition every time the ConditionVariable is waken up. The idiomatic way
1457 * of using condition variables is calling the +wait+ method in an +until+
1458 * loop with the predicate as the loop condition.
1459 *
1460 * condvar.wait(mutex) until condition_is_met
1461 *
1462 * In the example below, we use the boolean variable +resource_available+
1463 * (which is protected by +mutex+) to indicate the availability of the
1464 * resource, and use +condvar+ to wait for that variable to become true. Note
1465 * that:
1466 *
1467 * 1. Thread +b+ may be scheduled before thread +a1+ and +a2+, and may run so
1468 * fast that it have already made the resource available before either
1469 * +a1+ or +a2+ starts. Therefore, +a1+ and +a2+ should check if
1470 * +resource_available+ is already true before starting to wait.
1471 * 2. The +wait+ method may spuriously wake up without signalling. Therefore,
1472 * thread +a1+ and +a2+ should recheck +resource_available+ after the
1473 * +wait+ method returns, and go back to wait if the condition is not
1474 * actually met.
1475 * 3. It is possible that thread +a2+ starts right after thread +a1+ is waken
1476 * up by +b+. Thread +a2+ may have acquired the +mutex+ and consumed the
1477 * resource before thread +a1+ acquires the +mutex+. This necessitates
1478 * rechecking after +wait+, too.
1479 *
1480 * Example:
1481 *
1482 * mutex = Thread::Mutex.new
1483 *
1484 * resource_available = false
1485 * condvar = Thread::ConditionVariable.new
1486 *
1487 * a1 = Thread.new {
1488 * # Thread 'a1' waits for the resource to become available and consumes
1489 * # the resource.
1490 * mutex.synchronize {
1491 * condvar.wait(mutex) until resource_available
1492 * # After the loop, 'resource_available' is guaranteed to be true.
1493 *
1494 * resource_available = false
1495 * puts "a1 consumed the resource"
1496 * }
1497 * }
1498 *
1499 * a2 = Thread.new {
1500 * # Thread 'a2' behaves like 'a1'.
1501 * mutex.synchronize {
1502 * condvar.wait(mutex) until resource_available
1503 * resource_available = false
1504 * puts "a2 consumed the resource"
1505 * }
1506 * }
1507 *
1508 * b = Thread.new {
1509 * # Thread 'b' periodically makes the resource available.
1510 * loop {
1511 * mutex.synchronize {
1512 * resource_available = true
1513 *
1514 * # Notify one waiting thread if any. It is possible that neither
1515 * # 'a1' nor 'a2 is waiting on 'condvar' at this moment. That's OK.
1516 * condvar.signal
1517 * }
1518 * sleep 1
1519 * }
1520 * }
1521 *
1522 * # Eventually both 'a1' and 'a2' will have their resources, albeit in an
1523 * # unspecified order.
1524 * [a1, a2].each {|th| th.join}
1525 */
1526
1527static size_t
1528condvar_memsize(const void *ptr)
1529{
1530 return sizeof(struct rb_condvar);
1531}
1532
1533static const rb_data_type_t cv_data_type = {
1534 "condvar",
1535 {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
1536 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
1537};
1538
1539static struct rb_condvar *
1540condvar_ptr(VALUE self)
1541{
1542 struct rb_condvar *cv;
1543 rb_serial_t fork_gen = GET_VM()->fork_gen;
1544
1545 TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
1546
1547 /* forked children can't reach into parent thread stacks */
1548 if (cv->fork_gen != fork_gen) {
1549 cv->fork_gen = fork_gen;
1550 ccan_list_head_init(&cv->waitq);
1551 }
1552
1553 return cv;
1554}
1555
1556static VALUE
1557condvar_alloc(VALUE klass)
1558{
1559 struct rb_condvar *cv;
1560 VALUE obj;
1561
1562 obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
1563 ccan_list_head_init(&cv->waitq);
1564
1565 return obj;
1566}
1567
1568/*
1569 * Document-method: ConditionVariable::new
1570 *
1571 * Creates a new condition variable instance.
1572 */
1573
1574static VALUE
1575rb_condvar_initialize(VALUE self)
1576{
1577 struct rb_condvar *cv = condvar_ptr(self);
1578 ccan_list_head_init(&cv->waitq);
1579 return self;
1580}
1581
1583 VALUE mutex;
1584 VALUE timeout;
1585};
1586
1587static ID id_sleep;
1588
1589static VALUE
1590do_sleep(VALUE args)
1591{
1592 struct sleep_call *p = (struct sleep_call *)args;
1593 return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1594}
1595
1596/*
1597 * Document-method: Thread::ConditionVariable#wait
1598 * call-seq: wait(mutex, timeout=nil)
1599 *
1600 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
1601 *
1602 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
1603 * even if no other thread doesn't signal.
1604 *
1605 * This method may wake up spuriously due to underlying implementation details.
1606 *
1607 * Returns the slept result on +mutex+.
1608 */
1609
1610static VALUE
1611rb_condvar_wait(int argc, VALUE *argv, VALUE self)
1612{
1613 rb_execution_context_t *ec = GET_EC();
1614
1615 struct rb_condvar *cv = condvar_ptr(self);
1616 struct sleep_call args;
1617
1618 rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
1619
1620 struct sync_waiter sync_waiter = {
1621 .self = args.mutex,
1622 .th = ec->thread_ptr,
1623 .fiber = nonblocking_fiber(ec->fiber_ptr)
1624 };
1625
1626 ccan_list_add_tail(&cv->waitq, &sync_waiter.node);
1627 return rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&sync_waiter);
1628}
1629
1630/*
1631 * Document-method: Thread::ConditionVariable#signal
1632 *
1633 * Wakes up the first thread in line waiting for this lock.
1634 */
1635
1636static VALUE
1637rb_condvar_signal(VALUE self)
1638{
1639 struct rb_condvar *cv = condvar_ptr(self);
1640 wakeup_one(&cv->waitq);
1641 return self;
1642}
1643
1644/*
1645 * Document-method: Thread::ConditionVariable#broadcast
1646 *
1647 * Wakes up all threads waiting for this lock.
1648 */
1649
1650static VALUE
1651rb_condvar_broadcast(VALUE self)
1652{
1653 struct rb_condvar *cv = condvar_ptr(self);
1654 wakeup_all(&cv->waitq);
1655 return self;
1656}
1657
1658NORETURN(static VALUE undumpable(VALUE obj));
1659/* :nodoc: */
1660static VALUE
1661undumpable(VALUE obj)
1662{
1663 rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
1665}
1666
1667static VALUE
1668define_thread_class(VALUE outer, const ID name, VALUE super)
1669{
1670 VALUE klass = rb_define_class_id_under(outer, name, super);
1671 rb_const_set(rb_cObject, name, klass);
1672 return klass;
1673}
1674
1675static void
1676Init_thread_sync(void)
1677{
1678#undef rb_intern
1679#if defined(TEACH_RDOC) && TEACH_RDOC == 42
1680 rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject);
1681 rb_cConditionVariable = rb_define_class_under(rb_cThread, "ConditionVariable", rb_cObject);
1682 rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
1683 rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cObject);
1684#endif
1685
1686#define DEFINE_CLASS(name, super) \
1687 rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
1688
1689 /* Mutex */
1690 DEFINE_CLASS(Mutex, Object);
1691 rb_define_alloc_func(rb_cMutex, mutex_alloc);
1692 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
1693 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
1694 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
1695 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
1696 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
1697 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
1698 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
1699 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
1700
1701 /* Queue */
1702 DEFINE_CLASS(Queue, Object);
1703 rb_define_alloc_func(rb_cQueue, queue_alloc);
1704
1705 rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
1706
1707 rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, -1);
1708 rb_undef_method(rb_cQueue, "initialize_copy");
1709 rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
1710 rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
1711 rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
1712 rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
1713 rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
1714 rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
1715 rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
1716 rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
1717 rb_define_method(rb_cQueue, "freeze", rb_queue_freeze, 0);
1718
1719 rb_define_alias(rb_cQueue, "enq", "push");
1720 rb_define_alias(rb_cQueue, "<<", "push");
1721 rb_define_alias(rb_cQueue, "size", "length");
1722
1723 DEFINE_CLASS(SizedQueue, Queue);
1724 rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
1725
1726 rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
1727 rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
1728 rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
1729 rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
1730 rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
1731 rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
1732 rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
1733 rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
1734 rb_define_alias(rb_cSizedQueue, "size", "length");
1735
1736 /* CVar */
1737 DEFINE_CLASS(ConditionVariable, Object);
1738 rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
1739
1740 id_sleep = rb_intern("sleep");
1741
1742 rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
1743 rb_undef_method(rb_cConditionVariable, "initialize_copy");
1744 rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
1745 rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
1746 rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
1747 rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
1748
1749 rb_provide("thread.rb");
1750}
1751
1752#include "thread_sync.rbinc"
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
VALUE rb_define_class(const char *name, VALUE super)
Defines a top-level class.
Definition class.c:1588
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition class.c:1619
VALUE rb_define_class_id_under(VALUE outer, ID id, VALUE super)
Identical to rb_define_class_under(), except it takes the name in ID instead of C's string.
Definition class.c:1658
void rb_define_alias(VALUE klass, const char *name1, const char *name2)
Defines an alias of a method.
Definition class.c:2947
void rb_undef_method(VALUE klass, const char *name)
Defines an undef of a method.
Definition class.c:2767
int rb_scan_args(int argc, const VALUE *argv, const char *fmt,...)
Retrieves argument from argc and argv to given VALUE references according to the format string.
Definition class.c:3237
int rb_block_given_p(void)
Determines if the current method is given a block.
Definition eval.c:1037
#define FL_UNSET_RAW
Old name of RB_FL_UNSET_RAW.
Definition fl_type.h:133
#define Qundef
Old name of RUBY_Qundef.
#define INT2FIX
Old name of RB_INT2FIX.
Definition long.h:48
#define UNREACHABLE_RETURN
Old name of RBIMPL_UNREACHABLE_RETURN.
Definition assume.h:29
#define FL_TEST_RAW
Old name of RB_FL_TEST_RAW.
Definition fl_type.h:131
#define FL_SET
Old name of RB_FL_SET.
Definition fl_type.h:128
#define LONG2NUM
Old name of RB_LONG2NUM.
Definition long.h:50
#define Qtrue
Old name of RUBY_Qtrue.
#define INT2NUM
Old name of RB_INT2NUM.
Definition int.h:43
#define Qnil
Old name of RUBY_Qnil.
#define Qfalse
Old name of RUBY_Qfalse.
#define T_ARRAY
Old name of RUBY_T_ARRAY.
Definition value_type.h:56
#define NIL_P
Old name of RB_NIL_P.
#define Check_TypedStruct(v, t)
Old name of rb_check_typeddata.
Definition rtypeddata.h:106
#define NUM2LONG
Old name of RB_NUM2LONG.
Definition long.h:51
#define FIXNUM_P
Old name of RB_FIXNUM_P.
#define FL_SET_RAW
Old name of RB_FL_SET_RAW.
Definition fl_type.h:129
int rb_typeddata_is_kind_of(VALUE obj, const rb_data_type_t *data_type)
Checks if the given object is of given kind.
Definition error.c:1381
VALUE rb_eTypeError
TypeError exception.
Definition error.c:1431
VALUE rb_eStopIteration
StopIteration exception.
Definition enumerator.c:180
VALUE rb_ensure(VALUE(*b_proc)(VALUE), VALUE data1, VALUE(*e_proc)(VALUE), VALUE data2)
An equivalent to ensure clause.
Definition eval.c:1166
VALUE rb_eThreadError
ThreadError exception.
Definition eval.c:1055
VALUE rb_obj_class(VALUE obj)
Queries the class of an object.
Definition object.c:264
VALUE rb_cThread
Thread class.
Definition vm.c:671
double rb_num2dbl(VALUE num)
Converts an instance of rb_cNumeric into C's double.
Definition object.c:3810
VALUE rb_equal(VALUE lhs, VALUE rhs)
This function is an optimised version of calling #==.
Definition object.c:176
#define RB_OBJ_WRITE(old, slot, young)
Declaration of a "back" pointer.
Definition gc.h:603
Defines RBIMPL_HAS_BUILTIN.
VALUE rb_ary_concat(VALUE lhs, VALUE rhs)
Destructively appends the contents of latter into the end of former.
VALUE rb_ary_shift(VALUE ary)
Destructively deletes an element from the beginning of the passed array and returns what was deleted.
VALUE rb_ary_hidden_new(long capa)
Allocates a hidden (no class) empty array.
VALUE rb_ary_clear(VALUE ary)
Destructively removes everything form an array.
VALUE rb_ary_push(VALUE ary, VALUE elem)
Special case of rb_ary_cat() that it adds only one element.
static int rb_check_arity(int argc, int min, int max)
Ensures that the passed integer is in the passed range.
Definition error.h:284
void rb_provide(const char *feature)
Declares that the given feature is already provided by someone else.
Definition load.c:705
VALUE rb_mutex_new(void)
Creates a mutex.
VALUE rb_mutex_trylock(VALUE mutex)
Attempts to lock the mutex, without waiting for other threads to unlock it.
VALUE rb_mutex_locked_p(VALUE mutex)
Queries if there are any threads that holds the lock.
VALUE rb_mutex_synchronize(VALUE mutex, VALUE(*func)(VALUE arg), VALUE arg)
Obtains the lock, runs the passed function, and releases the lock when it completes.
VALUE rb_mutex_sleep(VALUE self, VALUE timeout)
Releases the lock held in the mutex and waits for the period of time; reacquires the lock on wakeup.
VALUE rb_mutex_unlock(VALUE mutex)
Releases the mutex.
VALUE rb_mutex_lock(VALUE mutex)
Attempts to lock the mutex.
struct timeval rb_time_interval(VALUE num)
Creates a "time interval".
Definition time.c:2947
void rb_const_set(VALUE space, ID name, VALUE val)
Names a constant.
Definition variable.c:3917
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
VALUE rb_yield(VALUE val)
Yields the block.
Definition vm_eval.c:1372
#define RARRAY_LEN
Just another name of rb_array_len.
Definition rarray.h:51
#define RUBY_TYPED_DEFAULT_FREE
This is a value you can set to rb_data_type_struct::dfree.
Definition rtypeddata.h:80
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
Definition rtypeddata.h:639
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:508
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition scheduler.c:466
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition scheduler.c:650
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
Definition scheduler.c:533
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition scheduler.c:669
#define RTEST
This is an old name of RB_TEST.
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:208
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40
static bool RB_TYPE_P(VALUE obj, enum ruby_value_type t)
Queries if the given object is of given type.
Definition value_type.h:376