Ruby 4.1.0dev (2025-12-29 revision cb01b9023ec2007c03bddc992416c33f2c59a0e1)
thread_sync.c (cb01b9023ec2007c03bddc992416c33f2c59a0e1)
1/* included by thread.c */
2#include "ccan/list/list.h"
3#include "builtin.h"
4
5static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
6static VALUE rb_eClosedQueueError;
7
8/* Mutex */
9typedef struct rb_mutex_struct {
10 rb_serial_t ec_serial;
11 rb_thread_t *th; // even if the fiber is collected, we might need access to the thread in mutex_free
12 struct rb_mutex_struct *next_mutex;
13 struct ccan_list_head waitq; /* protected by GVL */
15
16/* sync_waiter is always on-stack */
18 VALUE self;
19 rb_thread_t *th;
20 rb_fiber_t *fiber;
21 struct ccan_list_node node;
22};
23
24static inline rb_fiber_t*
25nonblocking_fiber(rb_fiber_t *fiber)
26{
27 if (rb_fiberptr_blocking(fiber)) {
28 return NULL;
29 }
30
31 return fiber;
32}
33
35 VALUE self;
36 VALUE timeout;
37 rb_hrtime_t end;
38};
39
40#define MUTEX_ALLOW_TRAP FL_USER1
41
42static void
43sync_wakeup(struct ccan_list_head *head, long max)
44{
45 RUBY_DEBUG_LOG("max:%ld", max);
46
47 struct sync_waiter *cur = 0, *next;
48
49 ccan_list_for_each_safe(head, cur, next, node) {
50 ccan_list_del_init(&cur->node);
51
52 if (cur->th->status != THREAD_KILLED) {
53 if (cur->th->scheduler != Qnil && cur->fiber) {
54 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
55 }
56 else {
57 RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur->th));
58 rb_threadptr_interrupt(cur->th);
59 cur->th->status = THREAD_RUNNABLE;
60 }
61
62 if (--max == 0) return;
63 }
64 }
65}
66
67static void
68wakeup_one(struct ccan_list_head *head)
69{
70 sync_wakeup(head, 1);
71}
72
73static void
74wakeup_all(struct ccan_list_head *head)
75{
76 sync_wakeup(head, LONG_MAX);
77}
78
79#if defined(HAVE_WORKING_FORK)
80static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
81static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
82static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
83#endif
84static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial);
85
86/*
87 * Document-class: Thread::Mutex
88 *
89 * Thread::Mutex implements a simple semaphore that can be used to
90 * coordinate access to shared data from multiple concurrent threads.
91 *
92 * Example:
93 *
94 * semaphore = Thread::Mutex.new
95 *
96 * a = Thread.new {
97 * semaphore.synchronize {
98 * # access shared resource
99 * }
100 * }
101 *
102 * b = Thread.new {
103 * semaphore.synchronize {
104 * # access shared resource
105 * }
106 * }
107 *
108 */
109
110static size_t
111rb_mutex_num_waiting(rb_mutex_t *mutex)
112{
113 struct sync_waiter *w = 0;
114 size_t n = 0;
115
116 ccan_list_for_each(&mutex->waitq, w, node) {
117 n++;
118 }
119
120 return n;
121}
122
123rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
124
125static bool
126mutex_locked_p(rb_mutex_t *mutex)
127{
128 return mutex->ec_serial != 0;
129}
130
131static void
132mutex_free(void *ptr)
133{
134 rb_mutex_t *mutex = ptr;
135 if (mutex_locked_p(mutex)) {
136 const char *err = rb_mutex_unlock_th(mutex, mutex->th, 0);
137 if (err) rb_bug("%s", err);
138 }
139 ruby_xfree(ptr);
140}
141
142static size_t
143mutex_memsize(const void *ptr)
144{
145 return sizeof(rb_mutex_t);
146}
147
148static const rb_data_type_t mutex_data_type = {
149 "mutex",
150 {NULL, mutex_free, mutex_memsize,},
151 0, 0, RUBY_TYPED_FREE_IMMEDIATELY
152};
153
154static rb_mutex_t *
155mutex_ptr(VALUE obj)
156{
157 rb_mutex_t *mutex;
158
159 TypedData_Get_Struct(obj, rb_mutex_t, &mutex_data_type, mutex);
160
161 return mutex;
162}
163
164VALUE
165rb_obj_is_mutex(VALUE obj)
166{
167 return RBOOL(rb_typeddata_is_kind_of(obj, &mutex_data_type));
168}
169
170static VALUE
171mutex_alloc(VALUE klass)
172{
173 VALUE obj;
174 rb_mutex_t *mutex;
175
176 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
177
178 ccan_list_head_init(&mutex->waitq);
179 return obj;
180}
181
182VALUE
184{
185 return mutex_alloc(rb_cMutex);
186}
187
188VALUE
190{
191 rb_mutex_t *mutex = mutex_ptr(self);
192
193 return RBOOL(mutex_locked_p(mutex));
194}
195
196static void
197thread_mutex_insert(rb_thread_t *thread, rb_mutex_t *mutex)
198{
199 RUBY_ASSERT(!mutex->next_mutex);
200 if (thread->keeping_mutexes) {
201 mutex->next_mutex = thread->keeping_mutexes;
202 }
203
204 thread->keeping_mutexes = mutex;
205}
206
207static void
208thread_mutex_remove(rb_thread_t *thread, rb_mutex_t *mutex)
209{
210 rb_mutex_t **keeping_mutexes = &thread->keeping_mutexes;
211
212 while (*keeping_mutexes && *keeping_mutexes != mutex) {
213 // Move to the next mutex in the list:
214 keeping_mutexes = &(*keeping_mutexes)->next_mutex;
215 }
216
217 if (*keeping_mutexes) {
218 *keeping_mutexes = mutex->next_mutex;
219 mutex->next_mutex = NULL;
220 }
221}
222
223static void
224mutex_set_owner(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
225{
226 mutex->th = th;
227 mutex->ec_serial = ec_serial;
228}
229
230static void
231mutex_locked(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
232{
233 mutex_set_owner(mutex, th, ec_serial);
234 thread_mutex_insert(th, mutex);
235}
236
237static inline bool
238do_mutex_trylock(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
239{
240 if (mutex->ec_serial == 0) {
241 RUBY_DEBUG_LOG("%p ok", mutex);
242
243 mutex_locked(mutex, th, ec_serial);
244 return true;
245 }
246 else {
247 RUBY_DEBUG_LOG("%p ng", mutex);
248 return false;
249 }
250}
251
252static VALUE
253rb_mut_trylock(rb_execution_context_t *ec, VALUE self)
254{
255 return RBOOL(do_mutex_trylock(mutex_ptr(self), ec->thread_ptr, rb_ec_serial(ec)));
256}
257
258VALUE
260{
261 return rb_mut_trylock(GET_EC(), self);
262}
263
264static VALUE
265mutex_owned_p(rb_serial_t ec_serial, rb_mutex_t *mutex)
266{
267 return RBOOL(mutex->ec_serial == ec_serial);
268}
269
270static VALUE
271call_rb_fiber_scheduler_block(VALUE mutex)
272{
274}
275
276static VALUE
277delete_from_waitq(VALUE value)
278{
279 struct sync_waiter *sync_waiter = (void *)value;
280 ccan_list_del(&sync_waiter->node);
281
282 return Qnil;
283}
284
285static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th);
286
288 VALUE self;
289 rb_mutex_t *mutex;
291};
292
293static inline void
294mutex_args_init(struct mutex_args *args, VALUE mutex)
295{
296 args->self = mutex;
297 args->mutex = mutex_ptr(mutex);
298 args->ec = GET_EC();
299}
300
301static VALUE
302do_mutex_lock(struct mutex_args *args, int interruptible_p)
303{
304 VALUE self = args->self;
305 rb_execution_context_t *ec = args->ec;
306 rb_thread_t *th = ec->thread_ptr;
307 rb_fiber_t *fiber = ec->fiber_ptr;
308 rb_serial_t ec_serial = rb_ec_serial(ec);
309 rb_mutex_t *mutex = args->mutex;
310 rb_atomic_t saved_ints = 0;
311
312 /* When running trap handler */
313 if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
314 th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
315 rb_raise(rb_eThreadError, "can't be called from trap context");
316 }
317
318 if (!do_mutex_trylock(mutex, th, ec_serial)) {
319 if (mutex->ec_serial == ec_serial) {
320 rb_raise(rb_eThreadError, "deadlock; recursive locking");
321 }
322
323 while (mutex->ec_serial != ec_serial) {
324 VM_ASSERT(mutex->ec_serial != 0);
325
326 VALUE scheduler = rb_fiber_scheduler_current();
327 if (scheduler != Qnil) {
328 struct sync_waiter sync_waiter = {
329 .self = self,
330 .th = th,
331 .fiber = nonblocking_fiber(fiber)
332 };
333
334 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
335
336 rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)&sync_waiter);
337
338 if (!mutex->ec_serial) {
339 mutex_set_owner(mutex, th, ec_serial);
340 }
341 }
342 else {
343 if (!th->vm->thread_ignore_deadlock && mutex->th == th) {
344 rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread");
345 }
346
347 struct sync_waiter sync_waiter = {
348 .self = self,
349 .th = th,
350 .fiber = nonblocking_fiber(fiber),
351 };
352
353 RUBY_DEBUG_LOG("%p wait", mutex);
354
355 // similar code with `sleep_forever`, but
356 // sleep_forever(SLEEP_DEADLOCKABLE) raises an exception.
357 // Ensure clause is needed like but `rb_ensure` a bit slow.
358 //
359 // begin
360 // sleep_forever(th, SLEEP_DEADLOCKABLE);
361 // ensure
362 // ccan_list_del(&sync_waiter.node);
363 // end
364 enum rb_thread_status prev_status = th->status;
365 th->status = THREAD_STOPPED_FOREVER;
366 rb_ractor_sleeper_threads_inc(th->ractor);
367 rb_check_deadlock(th->ractor);
368
369 RUBY_ASSERT(!th->locking_mutex);
370 th->locking_mutex = self;
371
372 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
373 {
374 native_sleep(th, NULL);
375 }
376 ccan_list_del(&sync_waiter.node);
377
378 // unlocked by another thread while sleeping
379 if (!mutex->ec_serial) {
380 mutex_set_owner(mutex, th, ec_serial);
381 }
382
383 rb_ractor_sleeper_threads_dec(th->ractor);
384 th->status = prev_status;
385 th->locking_mutex = Qfalse;
386
387 RUBY_DEBUG_LOG("%p wakeup", mutex);
388 }
389
390 if (interruptible_p) {
391 /* release mutex before checking for interrupts...as interrupt checking
392 * code might call rb_raise() */
393 if (mutex->ec_serial == ec_serial) {
394 mutex->th = NULL;
395 mutex->ec_serial = 0;
396 }
397 RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
398 if (!mutex->ec_serial) {
399 mutex_set_owner(mutex, th, ec_serial);
400 }
401 }
402 else {
403 // clear interrupt information
404 if (RUBY_VM_INTERRUPTED(th->ec)) {
405 // reset interrupts
406 if (saved_ints == 0) {
407 saved_ints = threadptr_get_interrupts(th);
408 }
409 else {
410 // ignore additional interrupts
411 threadptr_get_interrupts(th);
412 }
413 }
414 }
415 }
416
417 if (saved_ints) th->ec->interrupt_flag = saved_ints;
418 if (mutex->ec_serial == ec_serial) mutex_locked(mutex, th, ec_serial);
419 }
420
421 RUBY_DEBUG_LOG("%p locked", mutex);
422
423 // assertion
424 if (mutex_owned_p(ec_serial, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
425
426 return self;
427}
428
429static VALUE
430mutex_lock_uninterruptible(VALUE self)
431{
432 struct mutex_args args;
433 mutex_args_init(&args, self);
434 return do_mutex_lock(&args, 0);
435}
436
437static VALUE
438rb_mut_lock(rb_execution_context_t *ec, VALUE self)
439{
440 struct mutex_args args = {
441 .self = self,
442 .mutex = mutex_ptr(self),
443 .ec = ec,
444 };
445 return do_mutex_lock(&args, 1);
446}
447
448VALUE
450{
451 struct mutex_args args;
452 mutex_args_init(&args, self);
453 return do_mutex_lock(&args, 1);
454}
455
456static VALUE
457rb_mut_owned_p(rb_execution_context_t *ec, VALUE self)
458{
459 return mutex_owned_p(rb_ec_serial(ec), mutex_ptr(self));
460}
461
462VALUE
463rb_mutex_owned_p(VALUE self)
464{
465 return rb_mut_owned_p(GET_EC(), self);
466}
467
468static const char *
469rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_serial_t ec_serial)
470{
471 RUBY_DEBUG_LOG("%p", mutex);
472
473 if (mutex->ec_serial == 0) {
474 return "Attempt to unlock a mutex which is not locked";
475 }
476 else if (ec_serial && mutex->ec_serial != ec_serial) {
477 return "Attempt to unlock a mutex which is locked by another thread/fiber";
478 }
479
480 struct sync_waiter *cur = 0, *next;
481
482 mutex->ec_serial = 0;
483 thread_mutex_remove(th, mutex);
484
485 ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
486 ccan_list_del_init(&cur->node);
487
488 if (cur->th->scheduler != Qnil && cur->fiber) {
489 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
490 return NULL;
491 }
492 else {
493 switch (cur->th->status) {
494 case THREAD_RUNNABLE: /* from someone else calling Thread#run */
495 case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
496 RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th));
497 rb_threadptr_interrupt(cur->th);
498 return NULL;
499 case THREAD_STOPPED: /* probably impossible */
500 rb_bug("unexpected THREAD_STOPPED");
501 case THREAD_KILLED:
502 /* not sure about this, possible in exit GC? */
503 rb_bug("unexpected THREAD_KILLED");
504 continue;
505 }
506 }
507 }
508
509 // We did not find any threads to wake up, so we can just return with no error:
510 return NULL;
511}
512
513static void
514do_mutex_unlock(struct mutex_args *args)
515{
516 const char *err;
517 rb_mutex_t *mutex = args->mutex;
518 rb_thread_t *th = rb_ec_thread_ptr(args->ec);
519
520 err = rb_mutex_unlock_th(mutex, th, rb_ec_serial(args->ec));
521 if (err) rb_raise(rb_eThreadError, "%s", err);
522}
523
524static VALUE
525do_mutex_unlock_safe(VALUE args)
526{
527 do_mutex_unlock((struct mutex_args *)args);
528 return Qnil;
529}
530
531/*
532 * call-seq:
533 * mutex.unlock -> self
534 *
535 * Releases the lock.
536 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
537 */
538VALUE
540{
541 struct mutex_args args;
542 mutex_args_init(&args, self);
543 do_mutex_unlock(&args);
544 return self;
545}
546
547static VALUE
548rb_mut_unlock(rb_execution_context_t *ec, VALUE self)
549{
550 struct mutex_args args = {
551 .self = self,
552 .mutex = mutex_ptr(self),
553 .ec = ec,
554 };
555 do_mutex_unlock(&args);
556 return self;
557}
558
559#if defined(HAVE_WORKING_FORK)
560static void
561rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
562{
563 rb_mutex_abandon_all(th->keeping_mutexes);
564 th->keeping_mutexes = NULL;
565}
566
567static void
568rb_mutex_abandon_locking_mutex(rb_thread_t *th)
569{
570 if (th->locking_mutex) {
571 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
572
573 ccan_list_head_init(&mutex->waitq);
574 th->locking_mutex = Qfalse;
575 }
576}
577
578static void
579rb_mutex_abandon_all(rb_mutex_t *mutexes)
580{
581 rb_mutex_t *mutex;
582
583 while (mutexes) {
584 mutex = mutexes;
585 mutexes = mutex->next_mutex;
586 mutex->ec_serial = 0;
587 mutex->next_mutex = 0;
588 ccan_list_head_init(&mutex->waitq);
589 }
590}
591#endif
592
594 VALUE self;
595 VALUE timeout;
596};
597
598static VALUE
599mutex_sleep_begin(VALUE _arguments)
600{
601 struct rb_mutex_sleep_arguments *arguments = (struct rb_mutex_sleep_arguments *)_arguments;
602 VALUE timeout = arguments->timeout;
603 VALUE woken = Qtrue;
604
605 VALUE scheduler = rb_fiber_scheduler_current();
606 if (scheduler != Qnil) {
607 rb_fiber_scheduler_kernel_sleep(scheduler, timeout);
608 }
609 else {
610 if (NIL_P(timeout)) {
611 rb_thread_sleep_deadly_allow_spurious_wakeup(arguments->self, Qnil, 0);
612 }
613 else {
614 struct timeval timeout_value = rb_time_interval(timeout);
615 rb_hrtime_t relative_timeout = rb_timeval2hrtime(&timeout_value);
616 /* permit spurious check */
617 woken = RBOOL(sleep_hrtime(GET_THREAD(), relative_timeout, 0));
618 }
619 }
620
621 return woken;
622}
623
624static VALUE
625rb_mut_sleep(rb_execution_context_t *ec, VALUE self, VALUE timeout)
626{
627 if (!NIL_P(timeout)) {
628 // Validate the argument:
629 rb_time_interval(timeout);
630 }
631
632 rb_mut_unlock(ec, self);
633 time_t beg = time(0);
634
635 struct rb_mutex_sleep_arguments arguments = {
636 .self = self,
637 .timeout = timeout,
638 };
639
640 VALUE woken = rb_ec_ensure(ec, mutex_sleep_begin, (VALUE)&arguments, mutex_lock_uninterruptible, self);
641
642 RUBY_VM_CHECK_INTS_BLOCKING(ec);
643 if (!woken) return Qnil;
644 time_t end = time(0) - beg;
645 return TIMET2NUM(end);
646}
647
648VALUE
650{
651 return rb_mut_sleep(GET_EC(), self, timeout);
652}
653
654VALUE
656{
657 struct mutex_args args;
658 mutex_args_init(&args, self);
659 do_mutex_lock(&args, 1);
660 return rb_ec_ensure(args.ec, func, arg, do_mutex_unlock_safe, (VALUE)&args);
661}
662
663static VALUE
664do_ec_yield(VALUE _ec)
665{
666 return rb_ec_yield((rb_execution_context_t *)_ec, Qundef);
667}
668
669VALUE
670rb_mut_synchronize(rb_execution_context_t *ec, VALUE self)
671{
672 struct mutex_args args = {
673 .self = self,
674 .mutex = mutex_ptr(self),
675 .ec = ec,
676 };
677 do_mutex_lock(&args, 1);
678 return rb_ec_ensure(args.ec, do_ec_yield, (VALUE)ec, do_mutex_unlock_safe, (VALUE)&args);
679}
680
681void
682rb_mutex_allow_trap(VALUE self, int val)
683{
684 Check_TypedStruct(self, &mutex_data_type);
685
686 if (val)
687 FL_SET_RAW(self, MUTEX_ALLOW_TRAP);
688 else
689 FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP);
690}
691
692/* Queue */
693
694#define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
695#define queue_list(q) UNALIGNED_MEMBER_PTR(q, que)
696RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
697struct rb_queue {
698 struct ccan_list_head waitq;
699 rb_serial_t fork_gen;
700 const VALUE que;
701 int num_waiting;
702} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
703
704#define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
705#define szqueue_list(sq) UNALIGNED_MEMBER_PTR(sq, q.que)
706#define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
707RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
709 struct rb_queue q;
710 int num_waiting_push;
711 struct ccan_list_head pushq;
712 long max;
713} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
714
715static void
716queue_mark_and_move(void *ptr)
717{
718 struct rb_queue *q = ptr;
719
720 /* no need to mark threads in waitq, they are on stack */
721 rb_gc_mark_and_move((VALUE *)UNALIGNED_MEMBER_PTR(q, que));
722}
723
724static size_t
725queue_memsize(const void *ptr)
726{
727 return sizeof(struct rb_queue);
728}
729
730static const rb_data_type_t queue_data_type = {
731 .wrap_struct_name = "Thread::Queue",
732 .function = {
733 .dmark = queue_mark_and_move,
735 .dsize = queue_memsize,
736 .dcompact = queue_mark_and_move,
737 },
738 .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED,
739};
740
741static VALUE
742queue_alloc(VALUE klass)
743{
744 VALUE obj;
745 struct rb_queue *q;
746
747 obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
748 ccan_list_head_init(queue_waitq(q));
749 return obj;
750}
751
752static int
753queue_fork_check(struct rb_queue *q)
754{
755 rb_serial_t fork_gen = GET_VM()->fork_gen;
756
757 if (q->fork_gen == fork_gen) {
758 return 0;
759 }
760 /* forked children can't reach into parent thread stacks */
761 q->fork_gen = fork_gen;
762 ccan_list_head_init(queue_waitq(q));
763 q->num_waiting = 0;
764 return 1;
765}
766
767static struct rb_queue *
768queue_ptr(VALUE obj)
769{
770 struct rb_queue *q;
771
772 TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
773 queue_fork_check(q);
774
775 return q;
776}
777
778#define QUEUE_CLOSED FL_USER5
779
780static rb_hrtime_t
781queue_timeout2hrtime(VALUE timeout)
782{
783 if (NIL_P(timeout)) {
784 return (rb_hrtime_t)0;
785 }
786 rb_hrtime_t rel = 0;
787 if (FIXNUM_P(timeout)) {
788 rel = rb_sec2hrtime(NUM2TIMET(timeout));
789 }
790 else {
791 double2hrtime(&rel, rb_num2dbl(timeout));
792 }
793 return rb_hrtime_add(rel, rb_hrtime_now());
794}
795
796static void
797szqueue_mark_and_move(void *ptr)
798{
799 struct rb_szqueue *sq = ptr;
800
801 queue_mark_and_move(&sq->q);
802}
803
804static size_t
805szqueue_memsize(const void *ptr)
806{
807 return sizeof(struct rb_szqueue);
808}
809
810static const rb_data_type_t szqueue_data_type = {
811 .wrap_struct_name = "Thread::SizedQueue",
812 .function = {
813 .dmark = szqueue_mark_and_move,
815 .dsize = szqueue_memsize,
816 .dcompact = szqueue_mark_and_move,
817 },
818 .parent = &queue_data_type,
819 .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED,
820};
821
822static VALUE
823szqueue_alloc(VALUE klass)
824{
825 struct rb_szqueue *sq;
826 VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
827 &szqueue_data_type, sq);
828 ccan_list_head_init(szqueue_waitq(sq));
829 ccan_list_head_init(szqueue_pushq(sq));
830 return obj;
831}
832
833static struct rb_szqueue *
834szqueue_ptr(VALUE obj)
835{
836 struct rb_szqueue *sq;
837
838 TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
839 if (queue_fork_check(&sq->q)) {
840 ccan_list_head_init(szqueue_pushq(sq));
841 sq->num_waiting_push = 0;
842 }
843
844 return sq;
845}
846
847static VALUE
848ary_buf_new(void)
849{
850 return rb_ary_hidden_new(1);
851}
852
853static inline VALUE
854check_array(VALUE obj, VALUE ary)
855{
856 if (RB_LIKELY(ary)) {
857 return ary;
858 }
859 rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
860}
861
862static long
863queue_length(VALUE self, struct rb_queue *q)
864{
865 return RARRAY_LEN(check_array(self, q->que));
866}
867
868static int
869queue_closed_p(VALUE self)
870{
871 return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
872}
873
874/*
875 * Document-class: ClosedQueueError
876 *
877 * The exception class which will be raised when pushing into a closed
878 * Queue. See Thread::Queue#close and Thread::SizedQueue#close.
879 */
880
881NORETURN(static void raise_closed_queue_error(VALUE self));
882
883static void
884raise_closed_queue_error(VALUE self)
885{
886 rb_raise(rb_eClosedQueueError, "queue closed");
887}
888
889static VALUE
890queue_closed_result(VALUE self, struct rb_queue *q)
891{
892 RUBY_ASSERT(queue_length(self, q) == 0);
893 return Qnil;
894}
895
896/*
897 * Document-class: Thread::Queue
898 *
899 * The Thread::Queue class implements multi-producer, multi-consumer
900 * queues. It is especially useful in threaded programming when
901 * information must be exchanged safely between multiple threads. The
902 * Thread::Queue class implements all the required locking semantics.
903 *
904 * The class implements FIFO (first in, first out) type of queue.
905 * In a FIFO queue, the first tasks added are the first retrieved.
906 *
907 * Example:
908 *
909 * queue = Thread::Queue.new
910 *
911 * producer = Thread.new do
912 * 5.times do |i|
913 * sleep rand(i) # simulate expense
914 * queue << i
915 * puts "#{i} produced"
916 * end
917 * end
918 *
919 * consumer = Thread.new do
920 * 5.times do |i|
921 * value = queue.pop
922 * sleep rand(i/2) # simulate expense
923 * puts "consumed #{value}"
924 * end
925 * end
926 *
927 * consumer.join
928 *
929 */
930
931/*
932 * Document-method: Queue::new
933 *
934 * call-seq:
935 * Thread::Queue.new -> empty_queue
936 * Thread::Queue.new(enumerable) -> queue
937 *
938 * Creates a new queue instance, optionally using the contents of an +enumerable+
939 * for its initial state.
940 *
941 * Example:
942 *
943 * q = Thread::Queue.new
944 * #=> #<Thread::Queue:0x00007ff7501110d0>
945 * q.empty?
946 * #=> true
947 *
948 * q = Thread::Queue.new([1, 2, 3])
949 * #=> #<Thread::Queue:0x00007ff7500ec500>
950 * q.empty?
951 * #=> false
952 * q.pop
953 * #=> 1
954 */
955
956static VALUE
957rb_queue_initialize(int argc, VALUE *argv, VALUE self)
958{
959 VALUE initial;
960 struct rb_queue *q = queue_ptr(self);
961 if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
962 initial = rb_to_array(initial);
963 }
964 RB_OBJ_WRITE(self, queue_list(q), ary_buf_new());
965 ccan_list_head_init(queue_waitq(q));
966 if (argc == 1) {
967 rb_ary_concat(q->que, initial);
968 }
969 return self;
970}
971
972static VALUE
973queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
974{
975 if (queue_closed_p(self)) {
976 raise_closed_queue_error(self);
977 }
978 rb_ary_push(check_array(self, q->que), obj);
979 wakeup_one(queue_waitq(q));
980 return self;
981}
982
983/*
984 * Document-method: Thread::Queue#close
985 * call-seq:
986 * close
987 *
988 * Closes the queue. A closed queue cannot be re-opened.
989 *
990 * After the call to close completes, the following are true:
991 *
992 * - +closed?+ will return true
993 *
994 * - +close+ will be ignored.
995 *
996 * - calling enq/push/<< will raise a +ClosedQueueError+.
997 *
998 * - when +empty?+ is false, calling deq/pop/shift will return an object
999 * from the queue as usual.
1000 * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
1001 * deq(true) will raise a +ThreadError+.
1002 *
1003 * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
1004 *
1005 * Example:
1006 *
1007 * q = Thread::Queue.new
1008 * Thread.new{
1009 * while e = q.deq # wait for nil to break loop
1010 * # ...
1011 * end
1012 * }
1013 * q.close
1014 */
1015
1016static VALUE
1017rb_queue_close(VALUE self)
1018{
1019 struct rb_queue *q = queue_ptr(self);
1020
1021 if (!queue_closed_p(self)) {
1022 FL_SET(self, QUEUE_CLOSED);
1023
1024 wakeup_all(queue_waitq(q));
1025 }
1026
1027 return self;
1028}
1029
1030/*
1031 * Document-method: Thread::Queue#closed?
1032 * call-seq: closed?
1033 *
1034 * Returns +true+ if the queue is closed.
1035 */
1036
1037static VALUE
1038rb_queue_closed_p(VALUE self)
1039{
1040 return RBOOL(queue_closed_p(self));
1041}
1042
1043/*
1044 * Document-method: Thread::Queue#push
1045 * call-seq:
1046 * push(object)
1047 * enq(object)
1048 * <<(object)
1049 *
1050 * Pushes the given +object+ to the queue.
1051 */
1052
1053static VALUE
1054rb_queue_push(VALUE self, VALUE obj)
1055{
1056 return queue_do_push(self, queue_ptr(self), obj);
1057}
1058
1059static VALUE
1060queue_sleep(VALUE _args)
1061{
1062 struct queue_sleep_arg *args = (struct queue_sleep_arg *)_args;
1063 rb_thread_sleep_deadly_allow_spurious_wakeup(args->self, args->timeout, args->end);
1064 return Qnil;
1065}
1066
1068 struct sync_waiter w;
1069 union {
1070 struct rb_queue *q;
1071 struct rb_szqueue *sq;
1072 } as;
1073};
1074
1075static VALUE
1076queue_sleep_done(VALUE p)
1077{
1078 struct queue_waiter *qw = (struct queue_waiter *)p;
1079
1080 ccan_list_del(&qw->w.node);
1081 qw->as.q->num_waiting--;
1082
1083 return Qfalse;
1084}
1085
1086static VALUE
1087szqueue_sleep_done(VALUE p)
1088{
1089 struct queue_waiter *qw = (struct queue_waiter *)p;
1090
1091 ccan_list_del(&qw->w.node);
1092 qw->as.sq->num_waiting_push--;
1093
1094 return Qfalse;
1095}
1096
1097static inline VALUE
1098queue_do_pop(rb_execution_context_t *ec, VALUE self, struct rb_queue *q, VALUE non_block, VALUE timeout)
1099{
1100 check_array(self, q->que);
1101 if (RARRAY_LEN(q->que) == 0) {
1102 if (RTEST(non_block)) {
1103 rb_raise(rb_eThreadError, "queue empty");
1104 }
1105
1106 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1107 return Qnil;
1108 }
1109 }
1110
1111 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1112 while (RARRAY_LEN(q->que) == 0) {
1113 if (queue_closed_p(self)) {
1114 return queue_closed_result(self, q);
1115 }
1116 else {
1117 RUBY_ASSERT(RARRAY_LEN(q->que) == 0);
1118 RUBY_ASSERT(queue_closed_p(self) == 0);
1119
1120 struct queue_waiter queue_waiter = {
1121 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1122 .as = {.q = q}
1123 };
1124
1125 struct ccan_list_head *waitq = queue_waitq(q);
1126
1127 ccan_list_add_tail(waitq, &queue_waiter.w.node);
1128 queue_waiter.as.q->num_waiting++;
1129
1131 .self = self,
1132 .timeout = timeout,
1133 .end = end
1134 };
1135
1136 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, queue_sleep_done, (VALUE)&queue_waiter);
1137 if (!NIL_P(timeout) && (rb_hrtime_now() >= end))
1138 break;
1139 }
1140 }
1141
1142 return rb_ary_shift(q->que);
1143}
1144
1145static VALUE
1146rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1147{
1148 return queue_do_pop(ec, self, queue_ptr(self), non_block, timeout);
1149}
1150
1151/*
1152 * Document-method: Thread::Queue#empty?
1153 * call-seq: empty?
1154 *
1155 * Returns +true+ if the queue is empty.
1156 */
1157
1158static VALUE
1159rb_queue_empty_p(VALUE self)
1160{
1161 return RBOOL(queue_length(self, queue_ptr(self)) == 0);
1162}
1163
1164/*
1165 * Document-method: Thread::Queue#clear
1166 *
1167 * Removes all objects from the queue.
1168 */
1169
1170static VALUE
1171rb_queue_clear(VALUE self)
1172{
1173 struct rb_queue *q = queue_ptr(self);
1174
1175 rb_ary_clear(check_array(self, q->que));
1176 return self;
1177}
1178
1179/*
1180 * Document-method: Thread::Queue#length
1181 * call-seq:
1182 * length
1183 * size
1184 *
1185 * Returns the length of the queue.
1186 */
1187
1188static VALUE
1189rb_queue_length(VALUE self)
1190{
1191 return LONG2NUM(queue_length(self, queue_ptr(self)));
1192}
1193
1194NORETURN(static VALUE rb_queue_freeze(VALUE self));
1195/*
1196 * call-seq:
1197 * freeze
1198 *
1199 * The queue can't be frozen, so this method raises an exception:
1200 * Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
1201 *
1202 */
1203static VALUE
1204rb_queue_freeze(VALUE self)
1205{
1206 rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self);
1207 UNREACHABLE_RETURN(self);
1208}
1209
1210/*
1211 * Document-method: Thread::Queue#num_waiting
1212 *
1213 * Returns the number of threads waiting on the queue.
1214 */
1215
1216static VALUE
1217rb_queue_num_waiting(VALUE self)
1218{
1219 struct rb_queue *q = queue_ptr(self);
1220
1221 return INT2NUM(q->num_waiting);
1222}
1223
1224/*
1225 * Document-class: Thread::SizedQueue
1226 *
1227 * This class represents queues of specified size capacity. The push operation
1228 * may be blocked if the capacity is full.
1229 *
1230 * See Thread::Queue for an example of how a Thread::SizedQueue works.
1231 */
1232
1233/*
1234 * Document-method: SizedQueue::new
1235 * call-seq: new(max)
1236 *
1237 * Creates a fixed-length queue with a maximum size of +max+.
1238 */
1239
1240static VALUE
1241rb_szqueue_initialize(VALUE self, VALUE vmax)
1242{
1243 long max;
1244 struct rb_szqueue *sq = szqueue_ptr(self);
1245
1246 max = NUM2LONG(vmax);
1247 if (max <= 0) {
1248 rb_raise(rb_eArgError, "queue size must be positive");
1249 }
1250
1251 RB_OBJ_WRITE(self, szqueue_list(sq), ary_buf_new());
1252 ccan_list_head_init(szqueue_waitq(sq));
1253 ccan_list_head_init(szqueue_pushq(sq));
1254 sq->max = max;
1255
1256 return self;
1257}
1258
1259/*
1260 * Document-method: Thread::SizedQueue#close
1261 * call-seq:
1262 * close
1263 *
1264 * Similar to Thread::Queue#close.
1265 *
1266 * The difference is behavior with waiting enqueuing threads.
1267 *
1268 * If there are waiting enqueuing threads, they are interrupted by
1269 * raising ClosedQueueError('queue closed').
1270 */
1271static VALUE
1272rb_szqueue_close(VALUE self)
1273{
1274 if (!queue_closed_p(self)) {
1275 struct rb_szqueue *sq = szqueue_ptr(self);
1276
1277 FL_SET(self, QUEUE_CLOSED);
1278 wakeup_all(szqueue_waitq(sq));
1279 wakeup_all(szqueue_pushq(sq));
1280 }
1281 return self;
1282}
1283
1284/*
1285 * Document-method: Thread::SizedQueue#max
1286 *
1287 * Returns the maximum size of the queue.
1288 */
1289
1290static VALUE
1291rb_szqueue_max_get(VALUE self)
1292{
1293 return LONG2NUM(szqueue_ptr(self)->max);
1294}
1295
1296/*
1297 * Document-method: Thread::SizedQueue#max=
1298 * call-seq: max=(number)
1299 *
1300 * Sets the maximum size of the queue to the given +number+.
1301 */
1302
1303static VALUE
1304rb_szqueue_max_set(VALUE self, VALUE vmax)
1305{
1306 long max = NUM2LONG(vmax);
1307 long diff = 0;
1308 struct rb_szqueue *sq = szqueue_ptr(self);
1309
1310 if (max <= 0) {
1311 rb_raise(rb_eArgError, "queue size must be positive");
1312 }
1313 if (max > sq->max) {
1314 diff = max - sq->max;
1315 }
1316 sq->max = max;
1317 sync_wakeup(szqueue_pushq(sq), diff);
1318 return vmax;
1319}
1320
1321static VALUE
1322rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_block, VALUE timeout)
1323{
1324 struct rb_szqueue *sq = szqueue_ptr(self);
1325
1326 if (queue_length(self, &sq->q) >= sq->max) {
1327 if (RTEST(non_block)) {
1328 rb_raise(rb_eThreadError, "queue full");
1329 }
1330
1331 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1332 return Qnil;
1333 }
1334 }
1335
1336 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1337 while (queue_length(self, &sq->q) >= sq->max) {
1338 if (queue_closed_p(self)) {
1339 raise_closed_queue_error(self);
1340 }
1341 else {
1342 struct queue_waiter queue_waiter = {
1343 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1344 .as = {.sq = sq}
1345 };
1346
1347 struct ccan_list_head *pushq = szqueue_pushq(sq);
1348
1349 ccan_list_add_tail(pushq, &queue_waiter.w.node);
1350 sq->num_waiting_push++;
1351
1353 .self = self,
1354 .timeout = timeout,
1355 .end = end
1356 };
1357 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, szqueue_sleep_done, (VALUE)&queue_waiter);
1358 if (!NIL_P(timeout) && rb_hrtime_now() >= end) {
1359 return Qnil;
1360 }
1361 }
1362 }
1363
1364 return queue_do_push(self, &sq->q, object);
1365}
1366
1367static VALUE
1368rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1369{
1370 struct rb_szqueue *sq = szqueue_ptr(self);
1371 VALUE retval = queue_do_pop(ec, self, &sq->q, non_block, timeout);
1372
1373 if (queue_length(self, &sq->q) < sq->max) {
1374 wakeup_one(szqueue_pushq(sq));
1375 }
1376
1377 return retval;
1378}
1379
1380/*
1381 * Document-method: Thread::SizedQueue#clear
1382 *
1383 * Removes all objects from the queue.
1384 */
1385
1386static VALUE
1387rb_szqueue_clear(VALUE self)
1388{
1389 struct rb_szqueue *sq = szqueue_ptr(self);
1390
1391 rb_ary_clear(check_array(self, sq->q.que));
1392 wakeup_all(szqueue_pushq(sq));
1393 return self;
1394}
1395
1396/*
1397 * Document-method: Thread::SizedQueue#num_waiting
1398 *
1399 * Returns the number of threads waiting on the queue.
1400 */
1401
1402static VALUE
1403rb_szqueue_num_waiting(VALUE self)
1404{
1405 struct rb_szqueue *sq = szqueue_ptr(self);
1406
1407 return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1408}
1409
1410
1411/* ConditionalVariable */
1413 struct ccan_list_head waitq;
1414 rb_serial_t fork_gen;
1415};
1416
1417/*
1418 * Document-class: Thread::ConditionVariable
1419 *
1420 * ConditionVariable objects augment class Mutex. Using condition variables,
1421 * it is possible to suspend while in the middle of a critical section until a
1422 * condition is met, such as a resource becomes available.
1423 *
1424 * Due to non-deterministic scheduling and spurious wake-ups, users of
1425 * condition variables should always use a separate boolean predicate (such as
1426 * reading from a boolean variable) to check if the condition is actually met
1427 * before starting to wait, and should wait in a loop, re-checking the
1428 * condition every time the ConditionVariable is waken up. The idiomatic way
1429 * of using condition variables is calling the +wait+ method in an +until+
1430 * loop with the predicate as the loop condition.
1431 *
1432 * condvar.wait(mutex) until condition_is_met
1433 *
1434 * In the example below, we use the boolean variable +resource_available+
1435 * (which is protected by +mutex+) to indicate the availability of the
1436 * resource, and use +condvar+ to wait for that variable to become true. Note
1437 * that:
1438 *
1439 * 1. Thread +b+ may be scheduled before thread +a1+ and +a2+, and may run so
1440 * fast that it have already made the resource available before either
1441 * +a1+ or +a2+ starts. Therefore, +a1+ and +a2+ should check if
1442 * +resource_available+ is already true before starting to wait.
1443 * 2. The +wait+ method may spuriously wake up without signalling. Therefore,
1444 * thread +a1+ and +a2+ should recheck +resource_available+ after the
1445 * +wait+ method returns, and go back to wait if the condition is not
1446 * actually met.
1447 * 3. It is possible that thread +a2+ starts right after thread +a1+ is waken
1448 * up by +b+. Thread +a2+ may have acquired the +mutex+ and consumed the
1449 * resource before thread +a1+ acquires the +mutex+. This necessitates
1450 * rechecking after +wait+, too.
1451 *
1452 * Example:
1453 *
1454 * mutex = Thread::Mutex.new
1455 *
1456 * resource_available = false
1457 * condvar = Thread::ConditionVariable.new
1458 *
1459 * a1 = Thread.new {
1460 * # Thread 'a1' waits for the resource to become available and consumes
1461 * # the resource.
1462 * mutex.synchronize {
1463 * condvar.wait(mutex) until resource_available
1464 * # After the loop, 'resource_available' is guaranteed to be true.
1465 *
1466 * resource_available = false
1467 * puts "a1 consumed the resource"
1468 * }
1469 * }
1470 *
1471 * a2 = Thread.new {
1472 * # Thread 'a2' behaves like 'a1'.
1473 * mutex.synchronize {
1474 * condvar.wait(mutex) until resource_available
1475 * resource_available = false
1476 * puts "a2 consumed the resource"
1477 * }
1478 * }
1479 *
1480 * b = Thread.new {
1481 * # Thread 'b' periodically makes the resource available.
1482 * loop {
1483 * mutex.synchronize {
1484 * resource_available = true
1485 *
1486 * # Notify one waiting thread if any. It is possible that neither
1487 * # 'a1' nor 'a2 is waiting on 'condvar' at this moment. That's OK.
1488 * condvar.signal
1489 * }
1490 * sleep 1
1491 * }
1492 * }
1493 *
1494 * # Eventually both 'a1' and 'a2' will have their resources, albeit in an
1495 * # unspecified order.
1496 * [a1, a2].each {|th| th.join}
1497 */
1498
1499static size_t
1500condvar_memsize(const void *ptr)
1501{
1502 return sizeof(struct rb_condvar);
1503}
1504
1505static const rb_data_type_t cv_data_type = {
1506 "condvar",
1507 {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
1508 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
1509};
1510
1511static struct rb_condvar *
1512condvar_ptr(VALUE self)
1513{
1514 struct rb_condvar *cv;
1515 rb_serial_t fork_gen = GET_VM()->fork_gen;
1516
1517 TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
1518
1519 /* forked children can't reach into parent thread stacks */
1520 if (cv->fork_gen != fork_gen) {
1521 cv->fork_gen = fork_gen;
1522 ccan_list_head_init(&cv->waitq);
1523 }
1524
1525 return cv;
1526}
1527
1528static VALUE
1529condvar_alloc(VALUE klass)
1530{
1531 struct rb_condvar *cv;
1532 VALUE obj;
1533
1534 obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
1535 ccan_list_head_init(&cv->waitq);
1536
1537 return obj;
1538}
1539
1542 VALUE mutex;
1543 VALUE timeout;
1544};
1545
1546static ID id_sleep;
1547
1548static VALUE
1549do_sleep(VALUE args)
1550{
1551 struct sleep_call *p = (struct sleep_call *)args;
1552 if (CLASS_OF(p->mutex) == rb_cMutex) {
1553 return rb_mut_sleep(p->ec, p->mutex, p->timeout);
1554 }
1555 else {
1556 return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1557 }
1558}
1559
1560static VALUE
1561rb_condvar_wait(rb_execution_context_t *ec, VALUE self, VALUE mutex, VALUE timeout)
1562{
1563 struct rb_condvar *cv = condvar_ptr(self);
1564 struct sleep_call args = {
1565 .ec = ec,
1566 .mutex = mutex,
1567 .timeout = timeout,
1568 };
1569
1570 struct sync_waiter sync_waiter = {
1571 .self = mutex,
1572 .th = ec->thread_ptr,
1573 .fiber = nonblocking_fiber(ec->fiber_ptr)
1574 };
1575
1576 ccan_list_add_tail(&cv->waitq, &sync_waiter.node);
1577 return rb_ec_ensure(ec, do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&sync_waiter);
1578}
1579
1580static VALUE
1581rb_condvar_signal(rb_execution_context_t *ec, VALUE self)
1582{
1583 struct rb_condvar *cv = condvar_ptr(self);
1584 wakeup_one(&cv->waitq);
1585 return self;
1586}
1587
1588static VALUE
1589rb_condvar_broadcast(rb_execution_context_t *ec, VALUE self)
1590{
1591 struct rb_condvar *cv = condvar_ptr(self);
1592 wakeup_all(&cv->waitq);
1593 return self;
1594}
1595
1596NORETURN(static VALUE undumpable(VALUE obj));
1597/* :nodoc: */
1598static VALUE
1599undumpable(VALUE obj)
1600{
1601 rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
1603}
1604
1605static VALUE
1606define_thread_class(VALUE outer, const ID name, VALUE super)
1607{
1608 VALUE klass = rb_define_class_id_under(outer, name, super);
1609 rb_const_set(rb_cObject, name, klass);
1610 return klass;
1611}
1612
1613static void
1614Init_thread_sync(void)
1615{
1616#undef rb_intern
1617#if defined(TEACH_RDOC) && TEACH_RDOC == 42
1618 rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject);
1619 rb_cConditionVariable = rb_define_class_under(rb_cThread, "ConditionVariable", rb_cObject);
1620 rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
1621 rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cObject);
1622#endif
1623
1624#define DEFINE_CLASS(name, super) \
1625 rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
1626
1627 /* Mutex */
1628 DEFINE_CLASS(Mutex, Object);
1629 rb_define_alloc_func(rb_cMutex, mutex_alloc);
1630
1631 /* Queue */
1632 DEFINE_CLASS(Queue, Object);
1633 rb_define_alloc_func(rb_cQueue, queue_alloc);
1634
1635 rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
1636
1637 rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, -1);
1638 rb_undef_method(rb_cQueue, "initialize_copy");
1639 rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
1640 rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
1641 rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
1642 rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
1643 rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
1644 rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
1645 rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
1646 rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
1647 rb_define_method(rb_cQueue, "freeze", rb_queue_freeze, 0);
1648
1649 rb_define_alias(rb_cQueue, "enq", "push");
1650 rb_define_alias(rb_cQueue, "<<", "push");
1651 rb_define_alias(rb_cQueue, "size", "length");
1652
1653 DEFINE_CLASS(SizedQueue, Queue);
1654 rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
1655
1656 rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
1657 rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
1658 rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
1659 rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
1660 rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
1661 rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
1662
1663 /* CVar */
1664 DEFINE_CLASS(ConditionVariable, Object);
1665 rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
1666
1667 id_sleep = rb_intern("sleep");
1668
1669 rb_provide("thread.rb");
1670}
1671
1672#include "thread_sync.rbinc"
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
VALUE rb_define_class(const char *name, VALUE super)
Defines a top-level class.
Definition class.c:1589
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition class.c:1620
VALUE rb_define_class_id_under(VALUE outer, ID id, VALUE super)
Identical to rb_define_class_under(), except it takes the name in ID instead of C's string.
Definition class.c:1659
void rb_define_alias(VALUE klass, const char *name1, const char *name2)
Defines an alias of a method.
Definition class.c:2956
void rb_undef_method(VALUE klass, const char *name)
Defines an undef of a method.
Definition class.c:2768
int rb_scan_args(int argc, const VALUE *argv, const char *fmt,...)
Retrieves argument from argc and argv to given VALUE references according to the format string.
Definition class.c:3246
#define FL_UNSET_RAW
Old name of RB_FL_UNSET_RAW.
Definition fl_type.h:132
#define Qundef
Old name of RUBY_Qundef.
#define INT2FIX
Old name of RB_INT2FIX.
Definition long.h:48
#define UNREACHABLE_RETURN
Old name of RBIMPL_UNREACHABLE_RETURN.
Definition assume.h:29
#define CLASS_OF
Old name of rb_class_of.
Definition globals.h:205
#define FL_TEST_RAW
Old name of RB_FL_TEST_RAW.
Definition fl_type.h:130
#define FL_SET
Old name of RB_FL_SET.
Definition fl_type.h:127
#define LONG2NUM
Old name of RB_LONG2NUM.
Definition long.h:50
#define Qtrue
Old name of RUBY_Qtrue.
#define INT2NUM
Old name of RB_INT2NUM.
Definition int.h:43
#define Qnil
Old name of RUBY_Qnil.
#define Qfalse
Old name of RUBY_Qfalse.
#define NIL_P
Old name of RB_NIL_P.
#define Check_TypedStruct(v, t)
Old name of rb_check_typeddata.
Definition rtypeddata.h:106
#define NUM2LONG
Old name of RB_NUM2LONG.
Definition long.h:51
#define FIXNUM_P
Old name of RB_FIXNUM_P.
#define FL_SET_RAW
Old name of RB_FL_SET_RAW.
Definition fl_type.h:128
int rb_typeddata_is_kind_of(VALUE obj, const rb_data_type_t *data_type)
Checks if the given object is of given kind.
Definition error.c:1381
VALUE rb_eTypeError
TypeError exception.
Definition error.c:1431
VALUE rb_eStopIteration
StopIteration exception.
Definition enumerator.c:180
VALUE rb_ensure(VALUE(*b_proc)(VALUE), VALUE data1, VALUE(*e_proc)(VALUE), VALUE data2)
An equivalent to ensure clause.
Definition eval.c:1161
VALUE rb_eThreadError
ThreadError exception.
Definition eval.c:1028
VALUE rb_obj_class(VALUE obj)
Queries the class of an object.
Definition object.c:264
VALUE rb_cThread
Thread class.
Definition vm.c:671
double rb_num2dbl(VALUE num)
Converts an instance of rb_cNumeric into C's double.
Definition object.c:3829
VALUE rb_equal(VALUE lhs, VALUE rhs)
This function is an optimised version of calling #==.
Definition object.c:176
#define RB_OBJ_WRITE(old, slot, young)
Declaration of a "back" pointer.
Definition gc.h:603
Defines RBIMPL_HAS_BUILTIN.
VALUE rb_ary_concat(VALUE lhs, VALUE rhs)
Destructively appends the contents of latter into the end of former.
VALUE rb_ary_shift(VALUE ary)
Destructively deletes an element from the beginning of the passed array and returns what was deleted.
VALUE rb_ary_hidden_new(long capa)
Allocates a hidden (no class) empty array.
VALUE rb_ary_clear(VALUE ary)
Destructively removes everything form an array.
VALUE rb_ary_push(VALUE ary, VALUE elem)
Special case of rb_ary_cat() that it adds only one element.
void rb_provide(const char *feature)
Declares that the given feature is already provided by someone else.
Definition load.c:695
VALUE rb_mutex_new(void)
Creates a mutex.
VALUE rb_mutex_trylock(VALUE mutex)
Attempts to lock the mutex, without waiting for other threads to unlock it.
VALUE rb_mutex_locked_p(VALUE mutex)
Queries if there are any threads that holds the lock.
VALUE rb_mutex_synchronize(VALUE mutex, VALUE(*func)(VALUE arg), VALUE arg)
Obtains the lock, runs the passed function, and releases the lock when it completes.
VALUE rb_mutex_sleep(VALUE self, VALUE timeout)
Releases the lock held in the mutex and waits for the period of time; reacquires the lock on wakeup.
VALUE rb_mutex_unlock(VALUE mutex)
Releases the mutex.
VALUE rb_mutex_lock(VALUE mutex)
Attempts to lock the mutex.
struct timeval rb_time_interval(VALUE num)
Creates a "time interval".
Definition time.c:2949
void rb_const_set(VALUE space, ID name, VALUE val)
Names a constant.
Definition variable.c:3926
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
#define RARRAY_LEN
Just another name of rb_array_len.
Definition rarray.h:51
#define RUBY_TYPED_DEFAULT_FREE
This is a value you can set to rb_data_type_struct::dfree.
Definition rtypeddata.h:80
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
Definition rtypeddata.h:650
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:508
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition scheduler.c:471
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition scheduler.c:660
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
Definition scheduler.c:543
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition scheduler.c:679
#define RTEST
This is an old name of RB_TEST.
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:208
const char * wrap_struct_name
Name of structs of this kind.
Definition rtypeddata.h:215
VALUE flags
Type-specific behavioural characteristics.
Definition rtypeddata.h:317
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40