Ruby 3.5.0dev (2025-02-22 revision 412997300569c1853c09813e4924b6df3d7e8669)
thread_sync.c (412997300569c1853c09813e4924b6df3d7e8669)
1/* included by thread.c */
2#include "ccan/list/list.h"
3#include "builtin.h"
4
5static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
6static VALUE rb_eClosedQueueError;
7
8/* Mutex */
9typedef struct rb_mutex_struct {
10 rb_fiber_t *fiber;
11 struct rb_mutex_struct *next_mutex;
12 struct ccan_list_head waitq; /* protected by GVL */
14
15/* sync_waiter is always on-stack */
17 VALUE self;
18 rb_thread_t *th;
19 rb_fiber_t *fiber;
20 struct ccan_list_node node;
21};
22
23static inline rb_fiber_t*
24nonblocking_fiber(rb_fiber_t *fiber)
25{
26 if (rb_fiberptr_blocking(fiber)) {
27 return NULL;
28 }
29
30 return fiber;
31}
32
34 VALUE self;
35 VALUE timeout;
36 rb_hrtime_t end;
37};
38
39#define MUTEX_ALLOW_TRAP FL_USER1
40
41static void
42sync_wakeup(struct ccan_list_head *head, long max)
43{
44 RUBY_DEBUG_LOG("max:%ld", max);
45
46 struct sync_waiter *cur = 0, *next;
47
48 ccan_list_for_each_safe(head, cur, next, node) {
49 ccan_list_del_init(&cur->node);
50
51 if (cur->th->status != THREAD_KILLED) {
52 if (cur->th->scheduler != Qnil && cur->fiber) {
53 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
54 }
55 else {
56 RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur->th));
57 rb_threadptr_interrupt(cur->th);
58 cur->th->status = THREAD_RUNNABLE;
59 }
60
61 if (--max == 0) return;
62 }
63 }
64}
65
66static void
67wakeup_one(struct ccan_list_head *head)
68{
69 sync_wakeup(head, 1);
70}
71
72static void
73wakeup_all(struct ccan_list_head *head)
74{
75 sync_wakeup(head, LONG_MAX);
76}
77
78#if defined(HAVE_WORKING_FORK)
79static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
80static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
81static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
82#endif
83static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber);
84
85/*
86 * Document-class: Thread::Mutex
87 *
88 * Thread::Mutex implements a simple semaphore that can be used to
89 * coordinate access to shared data from multiple concurrent threads.
90 *
91 * Example:
92 *
93 * semaphore = Thread::Mutex.new
94 *
95 * a = Thread.new {
96 * semaphore.synchronize {
97 * # access shared resource
98 * }
99 * }
100 *
101 * b = Thread.new {
102 * semaphore.synchronize {
103 * # access shared resource
104 * }
105 * }
106 *
107 */
108
109#define mutex_mark ((void(*)(void*))0)
110
111static size_t
112rb_mutex_num_waiting(rb_mutex_t *mutex)
113{
114 struct sync_waiter *w = 0;
115 size_t n = 0;
116
117 ccan_list_for_each(&mutex->waitq, w, node) {
118 n++;
119 }
120
121 return n;
122}
123
124rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber);
125
126static void
127mutex_free(void *ptr)
128{
129 rb_mutex_t *mutex = ptr;
130 if (mutex->fiber) {
131 /* rb_warn("free locked mutex"); */
132 const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber);
133 if (err) rb_bug("%s", err);
134 }
135 ruby_xfree(ptr);
136}
137
138static size_t
139mutex_memsize(const void *ptr)
140{
141 return sizeof(rb_mutex_t);
142}
143
144static const rb_data_type_t mutex_data_type = {
145 "mutex",
146 {mutex_mark, mutex_free, mutex_memsize,},
147 0, 0, RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_FREE_IMMEDIATELY
148};
149
150static rb_mutex_t *
151mutex_ptr(VALUE obj)
152{
153 rb_mutex_t *mutex;
154
155 TypedData_Get_Struct(obj, rb_mutex_t, &mutex_data_type, mutex);
156
157 return mutex;
158}
159
160VALUE
161rb_obj_is_mutex(VALUE obj)
162{
163 return RBOOL(rb_typeddata_is_kind_of(obj, &mutex_data_type));
164}
165
166static VALUE
167mutex_alloc(VALUE klass)
168{
169 VALUE obj;
170 rb_mutex_t *mutex;
171
172 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
173
174 ccan_list_head_init(&mutex->waitq);
175 return obj;
176}
177
178/*
179 * call-seq:
180 * Thread::Mutex.new -> mutex
181 *
182 * Creates a new Mutex
183 */
184static VALUE
185mutex_initialize(VALUE self)
186{
187 return self;
188}
189
190VALUE
192{
193 return mutex_alloc(rb_cMutex);
194}
195
196/*
197 * call-seq:
198 * mutex.locked? -> true or false
199 *
200 * Returns +true+ if this lock is currently held by some thread.
201 */
202VALUE
204{
205 rb_mutex_t *mutex = mutex_ptr(self);
206
207 return RBOOL(mutex->fiber);
208}
209
210static void
211thread_mutex_insert(rb_thread_t *thread, rb_mutex_t *mutex)
212{
213 if (thread->keeping_mutexes) {
214 mutex->next_mutex = thread->keeping_mutexes;
215 }
216
217 thread->keeping_mutexes = mutex;
218}
219
220static void
221thread_mutex_remove(rb_thread_t *thread, rb_mutex_t *mutex)
222{
223 rb_mutex_t **keeping_mutexes = &thread->keeping_mutexes;
224
225 while (*keeping_mutexes && *keeping_mutexes != mutex) {
226 // Move to the next mutex in the list:
227 keeping_mutexes = &(*keeping_mutexes)->next_mutex;
228 }
229
230 if (*keeping_mutexes) {
231 *keeping_mutexes = mutex->next_mutex;
232 mutex->next_mutex = NULL;
233 }
234}
235
236static void
237mutex_locked(rb_thread_t *th, VALUE self)
238{
239 rb_mutex_t *mutex = mutex_ptr(self);
240
241 thread_mutex_insert(th, mutex);
242}
243
244/*
245 * call-seq:
246 * mutex.try_lock -> true or false
247 *
248 * Attempts to obtain the lock and returns immediately. Returns +true+ if the
249 * lock was granted.
250 */
251VALUE
253{
254 rb_mutex_t *mutex = mutex_ptr(self);
255
256 if (mutex->fiber == 0) {
257 RUBY_DEBUG_LOG("%p ok", mutex);
258
259 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
260 rb_thread_t *th = GET_THREAD();
261 mutex->fiber = fiber;
262
263 mutex_locked(th, self);
264 return Qtrue;
265 }
266 else {
267 RUBY_DEBUG_LOG("%p ng", mutex);
268 return Qfalse;
269 }
270}
271
272static VALUE
273mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
274{
275 return RBOOL(mutex->fiber == fiber);
276}
277
278static VALUE
279call_rb_fiber_scheduler_block(VALUE mutex)
280{
282}
283
284static VALUE
285delete_from_waitq(VALUE value)
286{
287 struct sync_waiter *sync_waiter = (void *)value;
288 ccan_list_del(&sync_waiter->node);
289
290 return Qnil;
291}
292
293static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th);
294
295static VALUE
296do_mutex_lock(VALUE self, int interruptible_p)
297{
298 rb_execution_context_t *ec = GET_EC();
299 rb_thread_t *th = ec->thread_ptr;
300 rb_fiber_t *fiber = ec->fiber_ptr;
301 rb_mutex_t *mutex = mutex_ptr(self);
302 rb_atomic_t saved_ints = 0;
303
304 /* When running trap handler */
305 if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
306 th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
307 rb_raise(rb_eThreadError, "can't be called from trap context");
308 }
309
310 if (rb_mutex_trylock(self) == Qfalse) {
311 if (mutex->fiber == fiber) {
312 rb_raise(rb_eThreadError, "deadlock; recursive locking");
313 }
314
315 while (mutex->fiber != fiber) {
316 VM_ASSERT(mutex->fiber != NULL);
317
318 VALUE scheduler = rb_fiber_scheduler_current();
319 if (scheduler != Qnil) {
320 struct sync_waiter sync_waiter = {
321 .self = self,
322 .th = th,
323 .fiber = nonblocking_fiber(fiber)
324 };
325
326 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
327
328 rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)&sync_waiter);
329
330 if (!mutex->fiber) {
331 mutex->fiber = fiber;
332 }
333 }
334 else {
335 if (!th->vm->thread_ignore_deadlock && rb_fiber_threadptr(mutex->fiber) == th) {
336 rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread");
337 }
338
339 struct sync_waiter sync_waiter = {
340 .self = self,
341 .th = th,
342 .fiber = nonblocking_fiber(fiber),
343 };
344
345 RUBY_DEBUG_LOG("%p wait", mutex);
346
347 // similar code with `sleep_forever`, but
348 // sleep_forever(SLEEP_DEADLOCKABLE) raises an exception.
349 // Ensure clause is needed like but `rb_ensure` a bit slow.
350 //
351 // begin
352 // sleep_forever(th, SLEEP_DEADLOCKABLE);
353 // ensure
354 // ccan_list_del(&sync_waiter.node);
355 // end
356 enum rb_thread_status prev_status = th->status;
357 th->status = THREAD_STOPPED_FOREVER;
358 rb_ractor_sleeper_threads_inc(th->ractor);
359 rb_check_deadlock(th->ractor);
360
361 th->locking_mutex = self;
362
363 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
364 {
365 native_sleep(th, NULL);
366 }
367 ccan_list_del(&sync_waiter.node);
368
369 // unlocked by another thread while sleeping
370 if (!mutex->fiber) {
371 mutex->fiber = fiber;
372 }
373
374 rb_ractor_sleeper_threads_dec(th->ractor);
375 th->status = prev_status;
376 th->locking_mutex = Qfalse;
377 th->locking_mutex = Qfalse;
378
379 RUBY_DEBUG_LOG("%p wakeup", mutex);
380 }
381
382 if (interruptible_p) {
383 /* release mutex before checking for interrupts...as interrupt checking
384 * code might call rb_raise() */
385 if (mutex->fiber == fiber) mutex->fiber = 0;
386 RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */
387 if (!mutex->fiber) {
388 mutex->fiber = fiber;
389 }
390 }
391 else {
392 // clear interrupt information
393 if (RUBY_VM_INTERRUPTED(th->ec)) {
394 // reset interrupts
395 if (saved_ints == 0) {
396 saved_ints = threadptr_get_interrupts(th);
397 }
398 else {
399 // ignore additional interrupts
400 threadptr_get_interrupts(th);
401 }
402 }
403 }
404 }
405
406 if (saved_ints) th->ec->interrupt_flag = saved_ints;
407 if (mutex->fiber == fiber) mutex_locked(th, self);
408 }
409
410 RUBY_DEBUG_LOG("%p locked", mutex);
411
412 // assertion
413 if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
414
415 return self;
416}
417
418static VALUE
419mutex_lock_uninterruptible(VALUE self)
420{
421 return do_mutex_lock(self, 0);
422}
423
424/*
425 * call-seq:
426 * mutex.lock -> self
427 *
428 * Attempts to grab the lock and waits if it isn't available.
429 * Raises +ThreadError+ if +mutex+ was locked by the current thread.
430 */
431VALUE
433{
434 return do_mutex_lock(self, 1);
435}
436
437/*
438 * call-seq:
439 * mutex.owned? -> true or false
440 *
441 * Returns +true+ if this lock is currently held by current thread.
442 */
443VALUE
444rb_mutex_owned_p(VALUE self)
445{
446 rb_fiber_t *fiber = GET_EC()->fiber_ptr;
447 rb_mutex_t *mutex = mutex_ptr(self);
448
449 return mutex_owned_p(fiber, mutex);
450}
451
452static const char *
453rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
454{
455 RUBY_DEBUG_LOG("%p", mutex);
456
457 if (mutex->fiber == 0) {
458 return "Attempt to unlock a mutex which is not locked";
459 }
460 else if (mutex->fiber != fiber) {
461 return "Attempt to unlock a mutex which is locked by another thread/fiber";
462 }
463
464 struct sync_waiter *cur = 0, *next;
465
466 mutex->fiber = 0;
467 thread_mutex_remove(th, mutex);
468
469 ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
470 ccan_list_del_init(&cur->node);
471
472 if (cur->th->scheduler != Qnil && cur->fiber) {
473 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
474 return NULL;
475 }
476 else {
477 switch (cur->th->status) {
478 case THREAD_RUNNABLE: /* from someone else calling Thread#run */
479 case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
480 RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th));
481 rb_threadptr_interrupt(cur->th);
482 return NULL;
483 case THREAD_STOPPED: /* probably impossible */
484 rb_bug("unexpected THREAD_STOPPED");
485 case THREAD_KILLED:
486 /* not sure about this, possible in exit GC? */
487 rb_bug("unexpected THREAD_KILLED");
488 continue;
489 }
490 }
491 }
492
493 // We did not find any threads to wake up, so we can just return with no error:
494 return NULL;
495}
496
497/*
498 * call-seq:
499 * mutex.unlock -> self
500 *
501 * Releases the lock.
502 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread.
503 */
504VALUE
506{
507 const char *err;
508 rb_mutex_t *mutex = mutex_ptr(self);
509 rb_thread_t *th = GET_THREAD();
510
511 err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr);
512 if (err) rb_raise(rb_eThreadError, "%s", err);
513
514 return self;
515}
516
517#if defined(HAVE_WORKING_FORK)
518static void
519rb_mutex_abandon_keeping_mutexes(rb_thread_t *th)
520{
521 rb_mutex_abandon_all(th->keeping_mutexes);
522 th->keeping_mutexes = NULL;
523}
524
525static void
526rb_mutex_abandon_locking_mutex(rb_thread_t *th)
527{
528 if (th->locking_mutex) {
529 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
530
531 ccan_list_head_init(&mutex->waitq);
532 th->locking_mutex = Qfalse;
533 }
534}
535
536static void
537rb_mutex_abandon_all(rb_mutex_t *mutexes)
538{
539 rb_mutex_t *mutex;
540
541 while (mutexes) {
542 mutex = mutexes;
543 mutexes = mutex->next_mutex;
544 mutex->fiber = 0;
545 mutex->next_mutex = 0;
546 ccan_list_head_init(&mutex->waitq);
547 }
548}
549#endif
550
552 VALUE self;
553 VALUE timeout;
554};
555
556static VALUE
557mutex_sleep_begin(VALUE _arguments)
558{
559 struct rb_mutex_sleep_arguments *arguments = (struct rb_mutex_sleep_arguments *)_arguments;
560 VALUE timeout = arguments->timeout;
561 VALUE woken = Qtrue;
562
563 VALUE scheduler = rb_fiber_scheduler_current();
564 if (scheduler != Qnil) {
565 rb_fiber_scheduler_kernel_sleep(scheduler, timeout);
566 }
567 else {
568 if (NIL_P(timeout)) {
569 rb_thread_sleep_deadly_allow_spurious_wakeup(arguments->self, Qnil, 0);
570 }
571 else {
572 struct timeval timeout_value = rb_time_interval(timeout);
573 rb_hrtime_t relative_timeout = rb_timeval2hrtime(&timeout_value);
574 /* permit spurious check */
575 woken = RBOOL(sleep_hrtime(GET_THREAD(), relative_timeout, 0));
576 }
577 }
578
579 return woken;
580}
581
582VALUE
584{
585 if (!NIL_P(timeout)) {
586 // Validate the argument:
587 rb_time_interval(timeout);
588 }
589
590 rb_mutex_unlock(self);
591 time_t beg = time(0);
592
593 struct rb_mutex_sleep_arguments arguments = {
594 .self = self,
595 .timeout = timeout,
596 };
597
598 VALUE woken = rb_ensure(mutex_sleep_begin, (VALUE)&arguments, mutex_lock_uninterruptible, self);
599
600 RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
601 if (!woken) return Qnil;
602 time_t end = time(0) - beg;
603 return TIMET2NUM(end);
604}
605
606/*
607 * call-seq:
608 * mutex.sleep(timeout = nil) -> number or nil
609 *
610 * Releases the lock and sleeps +timeout+ seconds if it is given and
611 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by
612 * the current thread.
613 *
614 * When the thread is next woken up, it will attempt to reacquire
615 * the lock.
616 *
617 * Note that this method can wakeup without explicit Thread#wakeup call.
618 * For example, receiving signal and so on.
619 *
620 * Returns the slept time in seconds if woken up, or +nil+ if timed out.
621 */
622static VALUE
623mutex_sleep(int argc, VALUE *argv, VALUE self)
624{
625 VALUE timeout;
626
627 timeout = rb_check_arity(argc, 0, 1) ? argv[0] : Qnil;
628 return rb_mutex_sleep(self, timeout);
629}
630
631/*
632 * call-seq:
633 * mutex.synchronize { ... } -> result of the block
634 *
635 * Obtains a lock, runs the block, and releases the lock when the block
636 * completes. See the example under Thread::Mutex.
637 */
638
639VALUE
640rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg)
641{
642 rb_mutex_lock(mutex);
643 return rb_ensure(func, arg, rb_mutex_unlock, mutex);
644}
645
646/*
647 * call-seq:
648 * mutex.synchronize { ... } -> result of the block
649 *
650 * Obtains a lock, runs the block, and releases the lock when the block
651 * completes. See the example under Thread::Mutex.
652 */
653static VALUE
654rb_mutex_synchronize_m(VALUE self)
655{
656 if (!rb_block_given_p()) {
657 rb_raise(rb_eThreadError, "must be called with a block");
658 }
659
660 return rb_mutex_synchronize(self, rb_yield, Qundef);
661}
662
663void
664rb_mutex_allow_trap(VALUE self, int val)
665{
666 Check_TypedStruct(self, &mutex_data_type);
667
668 if (val)
669 FL_SET_RAW(self, MUTEX_ALLOW_TRAP);
670 else
671 FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP);
672}
673
674/* Queue */
675
676#define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
677#define queue_list(q) UNALIGNED_MEMBER_PTR(q, que)
678RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
679struct rb_queue {
680 struct ccan_list_head waitq;
681 rb_serial_t fork_gen;
682 const VALUE que;
683 int num_waiting;
684} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
685
686#define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
687#define szqueue_list(sq) UNALIGNED_MEMBER_PTR(sq, q.que)
688#define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
689RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
691 struct rb_queue q;
692 int num_waiting_push;
693 struct ccan_list_head pushq;
694 long max;
695} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
696
697static void
698queue_mark(void *ptr)
699{
700 struct rb_queue *q = ptr;
701
702 /* no need to mark threads in waitq, they are on stack */
703 rb_gc_mark(q->que);
704}
705
706static size_t
707queue_memsize(const void *ptr)
708{
709 return sizeof(struct rb_queue);
710}
711
712static const rb_data_type_t queue_data_type = {
713 "queue",
714 {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
715 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
716};
717
718static VALUE
719queue_alloc(VALUE klass)
720{
721 VALUE obj;
722 struct rb_queue *q;
723
724 obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
725 ccan_list_head_init(queue_waitq(q));
726 return obj;
727}
728
729static int
730queue_fork_check(struct rb_queue *q)
731{
732 rb_serial_t fork_gen = GET_VM()->fork_gen;
733
734 if (q->fork_gen == fork_gen) {
735 return 0;
736 }
737 /* forked children can't reach into parent thread stacks */
738 q->fork_gen = fork_gen;
739 ccan_list_head_init(queue_waitq(q));
740 q->num_waiting = 0;
741 return 1;
742}
743
744static struct rb_queue *
745queue_ptr(VALUE obj)
746{
747 struct rb_queue *q;
748
749 TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
750 queue_fork_check(q);
751
752 return q;
753}
754
755#define QUEUE_CLOSED FL_USER5
756
757static rb_hrtime_t
758queue_timeout2hrtime(VALUE timeout)
759{
760 if (NIL_P(timeout)) {
761 return (rb_hrtime_t)0;
762 }
763 rb_hrtime_t rel = 0;
764 if (FIXNUM_P(timeout)) {
765 rel = rb_sec2hrtime(NUM2TIMET(timeout));
766 }
767 else {
768 double2hrtime(&rel, rb_num2dbl(timeout));
769 }
770 return rb_hrtime_add(rel, rb_hrtime_now());
771}
772
773static void
774szqueue_mark(void *ptr)
775{
776 struct rb_szqueue *sq = ptr;
777
778 queue_mark(&sq->q);
779}
780
781static size_t
782szqueue_memsize(const void *ptr)
783{
784 return sizeof(struct rb_szqueue);
785}
786
787static const rb_data_type_t szqueue_data_type = {
788 "sized_queue",
789 {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
790 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
791};
792
793static VALUE
794szqueue_alloc(VALUE klass)
795{
796 struct rb_szqueue *sq;
797 VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
798 &szqueue_data_type, sq);
799 ccan_list_head_init(szqueue_waitq(sq));
800 ccan_list_head_init(szqueue_pushq(sq));
801 return obj;
802}
803
804static struct rb_szqueue *
805szqueue_ptr(VALUE obj)
806{
807 struct rb_szqueue *sq;
808
809 TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
810 if (queue_fork_check(&sq->q)) {
811 ccan_list_head_init(szqueue_pushq(sq));
812 sq->num_waiting_push = 0;
813 }
814
815 return sq;
816}
817
818static VALUE
819ary_buf_new(void)
820{
821 return rb_ary_hidden_new(1);
822}
823
824static VALUE
825check_array(VALUE obj, VALUE ary)
826{
827 if (!RB_TYPE_P(ary, T_ARRAY)) {
828 rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
829 }
830 return ary;
831}
832
833static long
834queue_length(VALUE self, struct rb_queue *q)
835{
836 return RARRAY_LEN(check_array(self, q->que));
837}
838
839static int
840queue_closed_p(VALUE self)
841{
842 return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
843}
844
845/*
846 * Document-class: ClosedQueueError
847 *
848 * The exception class which will be raised when pushing into a closed
849 * Queue. See Thread::Queue#close and Thread::SizedQueue#close.
850 */
851
852NORETURN(static void raise_closed_queue_error(VALUE self));
853
854static void
855raise_closed_queue_error(VALUE self)
856{
857 rb_raise(rb_eClosedQueueError, "queue closed");
858}
859
860static VALUE
861queue_closed_result(VALUE self, struct rb_queue *q)
862{
863 RUBY_ASSERT(queue_length(self, q) == 0);
864 return Qnil;
865}
866
867/*
868 * Document-class: Thread::Queue
869 *
870 * The Thread::Queue class implements multi-producer, multi-consumer
871 * queues. It is especially useful in threaded programming when
872 * information must be exchanged safely between multiple threads. The
873 * Thread::Queue class implements all the required locking semantics.
874 *
875 * The class implements FIFO (first in, first out) type of queue.
876 * In a FIFO queue, the first tasks added are the first retrieved.
877 *
878 * Example:
879 *
880 * queue = Thread::Queue.new
881 *
882 * producer = Thread.new do
883 * 5.times do |i|
884 * sleep rand(i) # simulate expense
885 * queue << i
886 * puts "#{i} produced"
887 * end
888 * end
889 *
890 * consumer = Thread.new do
891 * 5.times do |i|
892 * value = queue.pop
893 * sleep rand(i/2) # simulate expense
894 * puts "consumed #{value}"
895 * end
896 * end
897 *
898 * consumer.join
899 *
900 */
901
902/*
903 * Document-method: Queue::new
904 *
905 * call-seq:
906 * Thread::Queue.new -> empty_queue
907 * Thread::Queue.new(enumerable) -> queue
908 *
909 * Creates a new queue instance, optionally using the contents of an +enumerable+
910 * for its initial state.
911 *
912 * Example:
913 *
914 * q = Thread::Queue.new
915 * #=> #<Thread::Queue:0x00007ff7501110d0>
916 * q.empty?
917 * #=> true
918 *
919 * q = Thread::Queue.new([1, 2, 3])
920 * #=> #<Thread::Queue:0x00007ff7500ec500>
921 * q.empty?
922 * #=> false
923 * q.pop
924 * #=> 1
925 */
926
927static VALUE
928rb_queue_initialize(int argc, VALUE *argv, VALUE self)
929{
930 VALUE initial;
931 struct rb_queue *q = queue_ptr(self);
932 if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) {
933 initial = rb_to_array(initial);
934 }
935 RB_OBJ_WRITE(self, queue_list(q), ary_buf_new());
936 ccan_list_head_init(queue_waitq(q));
937 if (argc == 1) {
938 rb_ary_concat(q->que, initial);
939 }
940 return self;
941}
942
943static VALUE
944queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
945{
946 if (queue_closed_p(self)) {
947 raise_closed_queue_error(self);
948 }
949 rb_ary_push(check_array(self, q->que), obj);
950 wakeup_one(queue_waitq(q));
951 return self;
952}
953
954/*
955 * Document-method: Thread::Queue#close
956 * call-seq:
957 * close
958 *
959 * Closes the queue. A closed queue cannot be re-opened.
960 *
961 * After the call to close completes, the following are true:
962 *
963 * - +closed?+ will return true
964 *
965 * - +close+ will be ignored.
966 *
967 * - calling enq/push/<< will raise a +ClosedQueueError+.
968 *
969 * - when +empty?+ is false, calling deq/pop/shift will return an object
970 * from the queue as usual.
971 * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil.
972 * deq(true) will raise a +ThreadError+.
973 *
974 * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
975 *
976 * Example:
977 *
978 * q = Thread::Queue.new
979 * Thread.new{
980 * while e = q.deq # wait for nil to break loop
981 * # ...
982 * end
983 * }
984 * q.close
985 */
986
987static VALUE
988rb_queue_close(VALUE self)
989{
990 struct rb_queue *q = queue_ptr(self);
991
992 if (!queue_closed_p(self)) {
993 FL_SET(self, QUEUE_CLOSED);
994
995 wakeup_all(queue_waitq(q));
996 }
997
998 return self;
999}
1000
1001/*
1002 * Document-method: Thread::Queue#closed?
1003 * call-seq: closed?
1004 *
1005 * Returns +true+ if the queue is closed.
1006 */
1007
1008static VALUE
1009rb_queue_closed_p(VALUE self)
1010{
1011 return RBOOL(queue_closed_p(self));
1012}
1013
1014/*
1015 * Document-method: Thread::Queue#push
1016 * call-seq:
1017 * push(object)
1018 * enq(object)
1019 * <<(object)
1020 *
1021 * Pushes the given +object+ to the queue.
1022 */
1023
1024static VALUE
1025rb_queue_push(VALUE self, VALUE obj)
1026{
1027 return queue_do_push(self, queue_ptr(self), obj);
1028}
1029
1030static VALUE
1031queue_sleep(VALUE _args)
1032{
1033 struct queue_sleep_arg *args = (struct queue_sleep_arg *)_args;
1034 rb_thread_sleep_deadly_allow_spurious_wakeup(args->self, args->timeout, args->end);
1035 return Qnil;
1036}
1037
1039 struct sync_waiter w;
1040 union {
1041 struct rb_queue *q;
1042 struct rb_szqueue *sq;
1043 } as;
1044};
1045
1046static VALUE
1047queue_sleep_done(VALUE p)
1048{
1049 struct queue_waiter *qw = (struct queue_waiter *)p;
1050
1051 ccan_list_del(&qw->w.node);
1052 qw->as.q->num_waiting--;
1053
1054 return Qfalse;
1055}
1056
1057static VALUE
1058szqueue_sleep_done(VALUE p)
1059{
1060 struct queue_waiter *qw = (struct queue_waiter *)p;
1061
1062 ccan_list_del(&qw->w.node);
1063 qw->as.sq->num_waiting_push--;
1064
1065 return Qfalse;
1066}
1067
1068static VALUE
1069queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout)
1070{
1071 check_array(self, q->que);
1072 if (RARRAY_LEN(q->que) == 0) {
1073 if (!should_block) {
1074 rb_raise(rb_eThreadError, "queue empty");
1075 }
1076
1077 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1078 return Qnil;
1079 }
1080 }
1081
1082 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1083 while (RARRAY_LEN(q->que) == 0) {
1084 if (queue_closed_p(self)) {
1085 return queue_closed_result(self, q);
1086 }
1087 else {
1088 rb_execution_context_t *ec = GET_EC();
1089
1090 RUBY_ASSERT(RARRAY_LEN(q->que) == 0);
1091 RUBY_ASSERT(queue_closed_p(self) == 0);
1092
1093 struct queue_waiter queue_waiter = {
1094 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1095 .as = {.q = q}
1096 };
1097
1098 struct ccan_list_head *waitq = queue_waitq(q);
1099
1100 ccan_list_add_tail(waitq, &queue_waiter.w.node);
1101 queue_waiter.as.q->num_waiting++;
1102
1104 .self = self,
1105 .timeout = timeout,
1106 .end = end
1107 };
1108
1109 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, queue_sleep_done, (VALUE)&queue_waiter);
1110 if (!NIL_P(timeout) && (rb_hrtime_now() >= end))
1111 break;
1112 }
1113 }
1114
1115 return rb_ary_shift(q->que);
1116}
1117
1118static VALUE
1119rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1120{
1121 return queue_do_pop(self, queue_ptr(self), !RTEST(non_block), timeout);
1122}
1123
1124/*
1125 * Document-method: Thread::Queue#empty?
1126 * call-seq: empty?
1127 *
1128 * Returns +true+ if the queue is empty.
1129 */
1130
1131static VALUE
1132rb_queue_empty_p(VALUE self)
1133{
1134 return RBOOL(queue_length(self, queue_ptr(self)) == 0);
1135}
1136
1137/*
1138 * Document-method: Thread::Queue#clear
1139 *
1140 * Removes all objects from the queue.
1141 */
1142
1143static VALUE
1144rb_queue_clear(VALUE self)
1145{
1146 struct rb_queue *q = queue_ptr(self);
1147
1148 rb_ary_clear(check_array(self, q->que));
1149 return self;
1150}
1151
1152/*
1153 * Document-method: Thread::Queue#length
1154 * call-seq:
1155 * length
1156 * size
1157 *
1158 * Returns the length of the queue.
1159 */
1160
1161static VALUE
1162rb_queue_length(VALUE self)
1163{
1164 return LONG2NUM(queue_length(self, queue_ptr(self)));
1165}
1166
1167NORETURN(static VALUE rb_queue_freeze(VALUE self));
1168/*
1169 * call-seq:
1170 * freeze
1171 *
1172 * The queue can't be frozen, so this method raises an exception:
1173 * Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
1174 *
1175 */
1176static VALUE
1177rb_queue_freeze(VALUE self)
1178{
1179 rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self);
1180 UNREACHABLE_RETURN(self);
1181}
1182
1183/*
1184 * Document-method: Thread::Queue#num_waiting
1185 *
1186 * Returns the number of threads waiting on the queue.
1187 */
1188
1189static VALUE
1190rb_queue_num_waiting(VALUE self)
1191{
1192 struct rb_queue *q = queue_ptr(self);
1193
1194 return INT2NUM(q->num_waiting);
1195}
1196
1197/*
1198 * Document-class: Thread::SizedQueue
1199 *
1200 * This class represents queues of specified size capacity. The push operation
1201 * may be blocked if the capacity is full.
1202 *
1203 * See Thread::Queue for an example of how a Thread::SizedQueue works.
1204 */
1205
1206/*
1207 * Document-method: SizedQueue::new
1208 * call-seq: new(max)
1209 *
1210 * Creates a fixed-length queue with a maximum size of +max+.
1211 */
1212
1213static VALUE
1214rb_szqueue_initialize(VALUE self, VALUE vmax)
1215{
1216 long max;
1217 struct rb_szqueue *sq = szqueue_ptr(self);
1218
1219 max = NUM2LONG(vmax);
1220 if (max <= 0) {
1221 rb_raise(rb_eArgError, "queue size must be positive");
1222 }
1223
1224 RB_OBJ_WRITE(self, szqueue_list(sq), ary_buf_new());
1225 ccan_list_head_init(szqueue_waitq(sq));
1226 ccan_list_head_init(szqueue_pushq(sq));
1227 sq->max = max;
1228
1229 return self;
1230}
1231
1232/*
1233 * Document-method: Thread::SizedQueue#close
1234 * call-seq:
1235 * close
1236 *
1237 * Similar to Thread::Queue#close.
1238 *
1239 * The difference is behavior with waiting enqueuing threads.
1240 *
1241 * If there are waiting enqueuing threads, they are interrupted by
1242 * raising ClosedQueueError('queue closed').
1243 */
1244static VALUE
1245rb_szqueue_close(VALUE self)
1246{
1247 if (!queue_closed_p(self)) {
1248 struct rb_szqueue *sq = szqueue_ptr(self);
1249
1250 FL_SET(self, QUEUE_CLOSED);
1251 wakeup_all(szqueue_waitq(sq));
1252 wakeup_all(szqueue_pushq(sq));
1253 }
1254 return self;
1255}
1256
1257/*
1258 * Document-method: Thread::SizedQueue#max
1259 *
1260 * Returns the maximum size of the queue.
1261 */
1262
1263static VALUE
1264rb_szqueue_max_get(VALUE self)
1265{
1266 return LONG2NUM(szqueue_ptr(self)->max);
1267}
1268
1269/*
1270 * Document-method: Thread::SizedQueue#max=
1271 * call-seq: max=(number)
1272 *
1273 * Sets the maximum size of the queue to the given +number+.
1274 */
1275
1276static VALUE
1277rb_szqueue_max_set(VALUE self, VALUE vmax)
1278{
1279 long max = NUM2LONG(vmax);
1280 long diff = 0;
1281 struct rb_szqueue *sq = szqueue_ptr(self);
1282
1283 if (max <= 0) {
1284 rb_raise(rb_eArgError, "queue size must be positive");
1285 }
1286 if (max > sq->max) {
1287 diff = max - sq->max;
1288 }
1289 sq->max = max;
1290 sync_wakeup(szqueue_pushq(sq), diff);
1291 return vmax;
1292}
1293
1294static VALUE
1295rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_block, VALUE timeout)
1296{
1297 struct rb_szqueue *sq = szqueue_ptr(self);
1298
1299 if (queue_length(self, &sq->q) >= sq->max) {
1300 if (RTEST(non_block)) {
1301 rb_raise(rb_eThreadError, "queue full");
1302 }
1303
1304 if (RTEST(rb_equal(INT2FIX(0), timeout))) {
1305 return Qnil;
1306 }
1307 }
1308
1309 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1310 while (queue_length(self, &sq->q) >= sq->max) {
1311 if (queue_closed_p(self)) {
1312 raise_closed_queue_error(self);
1313 }
1314 else {
1315 rb_execution_context_t *ec = GET_EC();
1316 struct queue_waiter queue_waiter = {
1317 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1318 .as = {.sq = sq}
1319 };
1320
1321 struct ccan_list_head *pushq = szqueue_pushq(sq);
1322
1323 ccan_list_add_tail(pushq, &queue_waiter.w.node);
1324 sq->num_waiting_push++;
1325
1327 .self = self,
1328 .timeout = timeout,
1329 .end = end
1330 };
1331 rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, szqueue_sleep_done, (VALUE)&queue_waiter);
1332 if (!NIL_P(timeout) && rb_hrtime_now() >= end) {
1333 return Qnil;
1334 }
1335 }
1336 }
1337
1338 return queue_do_push(self, &sq->q, object);
1339}
1340
1341static VALUE
1342szqueue_do_pop(VALUE self, int should_block, VALUE timeout)
1343{
1344 struct rb_szqueue *sq = szqueue_ptr(self);
1345 VALUE retval = queue_do_pop(self, &sq->q, should_block, timeout);
1346
1347 if (queue_length(self, &sq->q) < sq->max) {
1348 wakeup_one(szqueue_pushq(sq));
1349 }
1350
1351 return retval;
1352}
1353static VALUE
1354rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
1355{
1356 return szqueue_do_pop(self, !RTEST(non_block), timeout);
1357}
1358
1359/*
1360 * Document-method: Thread::SizedQueue#clear
1361 *
1362 * Removes all objects from the queue.
1363 */
1364
1365static VALUE
1366rb_szqueue_clear(VALUE self)
1367{
1368 struct rb_szqueue *sq = szqueue_ptr(self);
1369
1370 rb_ary_clear(check_array(self, sq->q.que));
1371 wakeup_all(szqueue_pushq(sq));
1372 return self;
1373}
1374
1375/*
1376 * Document-method: Thread::SizedQueue#length
1377 * call-seq:
1378 * length
1379 * size
1380 *
1381 * Returns the length of the queue.
1382 */
1383
1384static VALUE
1385rb_szqueue_length(VALUE self)
1386{
1387 struct rb_szqueue *sq = szqueue_ptr(self);
1388
1389 return LONG2NUM(queue_length(self, &sq->q));
1390}
1391
1392/*
1393 * Document-method: Thread::SizedQueue#num_waiting
1394 *
1395 * Returns the number of threads waiting on the queue.
1396 */
1397
1398static VALUE
1399rb_szqueue_num_waiting(VALUE self)
1400{
1401 struct rb_szqueue *sq = szqueue_ptr(self);
1402
1403 return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
1404}
1405
1406/*
1407 * Document-method: Thread::SizedQueue#empty?
1408 * call-seq: empty?
1409 *
1410 * Returns +true+ if the queue is empty.
1411 */
1412
1413static VALUE
1414rb_szqueue_empty_p(VALUE self)
1415{
1416 struct rb_szqueue *sq = szqueue_ptr(self);
1417
1418 return RBOOL(queue_length(self, &sq->q) == 0);
1419}
1420
1421
1422/* ConditionalVariable */
1424 struct ccan_list_head waitq;
1425 rb_serial_t fork_gen;
1426};
1427
1428/*
1429 * Document-class: Thread::ConditionVariable
1430 *
1431 * ConditionVariable objects augment class Mutex. Using condition variables,
1432 * it is possible to suspend while in the middle of a critical section until a
1433 * resource becomes available.
1434 *
1435 * Example:
1436 *
1437 * mutex = Thread::Mutex.new
1438 * resource = Thread::ConditionVariable.new
1439 *
1440 * a = Thread.new {
1441 * mutex.synchronize {
1442 * # Thread 'a' now needs the resource
1443 * resource.wait(mutex)
1444 * # 'a' can now have the resource
1445 * }
1446 * }
1447 *
1448 * b = Thread.new {
1449 * mutex.synchronize {
1450 * # Thread 'b' has finished using the resource
1451 * resource.signal
1452 * }
1453 * }
1454 */
1455
1456static size_t
1457condvar_memsize(const void *ptr)
1458{
1459 return sizeof(struct rb_condvar);
1460}
1461
1462static const rb_data_type_t cv_data_type = {
1463 "condvar",
1464 {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
1465 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
1466};
1467
1468static struct rb_condvar *
1469condvar_ptr(VALUE self)
1470{
1471 struct rb_condvar *cv;
1472 rb_serial_t fork_gen = GET_VM()->fork_gen;
1473
1474 TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
1475
1476 /* forked children can't reach into parent thread stacks */
1477 if (cv->fork_gen != fork_gen) {
1478 cv->fork_gen = fork_gen;
1479 ccan_list_head_init(&cv->waitq);
1480 }
1481
1482 return cv;
1483}
1484
1485static VALUE
1486condvar_alloc(VALUE klass)
1487{
1488 struct rb_condvar *cv;
1489 VALUE obj;
1490
1491 obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
1492 ccan_list_head_init(&cv->waitq);
1493
1494 return obj;
1495}
1496
1497/*
1498 * Document-method: ConditionVariable::new
1499 *
1500 * Creates a new condition variable instance.
1501 */
1502
1503static VALUE
1504rb_condvar_initialize(VALUE self)
1505{
1506 struct rb_condvar *cv = condvar_ptr(self);
1507 ccan_list_head_init(&cv->waitq);
1508 return self;
1509}
1510
1512 VALUE mutex;
1513 VALUE timeout;
1514};
1515
1516static ID id_sleep;
1517
1518static VALUE
1519do_sleep(VALUE args)
1520{
1521 struct sleep_call *p = (struct sleep_call *)args;
1522 return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1523}
1524
1525/*
1526 * Document-method: Thread::ConditionVariable#wait
1527 * call-seq: wait(mutex, timeout=nil)
1528 *
1529 * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
1530 *
1531 * If +timeout+ is given, this method returns after +timeout+ seconds passed,
1532 * even if no other thread doesn't signal.
1533 *
1534 * Returns the slept result on +mutex+.
1535 */
1536
1537static VALUE
1538rb_condvar_wait(int argc, VALUE *argv, VALUE self)
1539{
1540 rb_execution_context_t *ec = GET_EC();
1541
1542 struct rb_condvar *cv = condvar_ptr(self);
1543 struct sleep_call args;
1544
1545 rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
1546
1547 struct sync_waiter sync_waiter = {
1548 .self = args.mutex,
1549 .th = ec->thread_ptr,
1550 .fiber = nonblocking_fiber(ec->fiber_ptr)
1551 };
1552
1553 ccan_list_add_tail(&cv->waitq, &sync_waiter.node);
1554 return rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&sync_waiter);
1555}
1556
1557/*
1558 * Document-method: Thread::ConditionVariable#signal
1559 *
1560 * Wakes up the first thread in line waiting for this lock.
1561 */
1562
1563static VALUE
1564rb_condvar_signal(VALUE self)
1565{
1566 struct rb_condvar *cv = condvar_ptr(self);
1567 wakeup_one(&cv->waitq);
1568 return self;
1569}
1570
1571/*
1572 * Document-method: Thread::ConditionVariable#broadcast
1573 *
1574 * Wakes up all threads waiting for this lock.
1575 */
1576
1577static VALUE
1578rb_condvar_broadcast(VALUE self)
1579{
1580 struct rb_condvar *cv = condvar_ptr(self);
1581 wakeup_all(&cv->waitq);
1582 return self;
1583}
1584
1585NORETURN(static VALUE undumpable(VALUE obj));
1586/* :nodoc: */
1587static VALUE
1588undumpable(VALUE obj)
1589{
1590 rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj));
1592}
1593
1594static VALUE
1595define_thread_class(VALUE outer, const ID name, VALUE super)
1596{
1597 VALUE klass = rb_define_class_id_under(outer, name, super);
1598 rb_const_set(rb_cObject, name, klass);
1599 return klass;
1600}
1601
1602static void
1603Init_thread_sync(void)
1604{
1605#undef rb_intern
1606#if defined(TEACH_RDOC) && TEACH_RDOC == 42
1607 rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject);
1608 rb_cConditionVariable = rb_define_class_under(rb_cThread, "ConditionVariable", rb_cObject);
1609 rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
1610 rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cObject);
1611#endif
1612
1613#define DEFINE_CLASS(name, super) \
1614 rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super)
1615
1616 /* Mutex */
1617 DEFINE_CLASS(Mutex, Object);
1618 rb_define_alloc_func(rb_cMutex, mutex_alloc);
1619 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0);
1620 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0);
1621 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0);
1622 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0);
1623 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
1624 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
1625 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0);
1626 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
1627
1628 /* Queue */
1629 DEFINE_CLASS(Queue, Object);
1630 rb_define_alloc_func(rb_cQueue, queue_alloc);
1631
1632 rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
1633
1634 rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, -1);
1635 rb_undef_method(rb_cQueue, "initialize_copy");
1636 rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
1637 rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
1638 rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
1639 rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
1640 rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
1641 rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
1642 rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
1643 rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
1644 rb_define_method(rb_cQueue, "freeze", rb_queue_freeze, 0);
1645
1646 rb_define_alias(rb_cQueue, "enq", "push");
1647 rb_define_alias(rb_cQueue, "<<", "push");
1648 rb_define_alias(rb_cQueue, "size", "length");
1649
1650 DEFINE_CLASS(SizedQueue, Queue);
1651 rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
1652
1653 rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
1654 rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
1655 rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
1656 rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
1657 rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
1658 rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
1659 rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
1660 rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
1661 rb_define_alias(rb_cSizedQueue, "size", "length");
1662
1663 /* CVar */
1664 DEFINE_CLASS(ConditionVariable, Object);
1665 rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
1666
1667 id_sleep = rb_intern("sleep");
1668
1669 rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0);
1670 rb_undef_method(rb_cConditionVariable, "initialize_copy");
1671 rb_define_method(rb_cConditionVariable, "marshal_dump", undumpable, 0);
1672 rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1);
1673 rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0);
1674 rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0);
1675
1676 rb_provide("thread.rb");
1677}
1678
1679#include "thread_sync.rbinc"
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
VALUE rb_define_class(const char *name, VALUE super)
Defines a top-level class.
Definition class.c:980
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition class.c:1012
VALUE rb_define_class_id_under(VALUE outer, ID id, VALUE super)
Identical to rb_define_class_under(), except it takes the name in ID instead of C's string.
Definition class.c:1051
void rb_define_alias(VALUE klass, const char *name1, const char *name2)
Defines an alias of a method.
Definition class.c:2345
void rb_undef_method(VALUE klass, const char *name)
Defines an undef of a method.
Definition class.c:2166
int rb_scan_args(int argc, const VALUE *argv, const char *fmt,...)
Retrieves argument from argc and argv to given VALUE references according to the format string.
Definition class.c:2635
int rb_block_given_p(void)
Determines if the current method is given a block.
Definition eval.c:936
#define FL_UNSET_RAW
Old name of RB_FL_UNSET_RAW.
Definition fl_type.h:134
#define Qundef
Old name of RUBY_Qundef.
#define INT2FIX
Old name of RB_INT2FIX.
Definition long.h:48
#define UNREACHABLE_RETURN
Old name of RBIMPL_UNREACHABLE_RETURN.
Definition assume.h:29
#define FL_TEST_RAW
Old name of RB_FL_TEST_RAW.
Definition fl_type.h:132
#define FL_SET
Old name of RB_FL_SET.
Definition fl_type.h:129
#define LONG2NUM
Old name of RB_LONG2NUM.
Definition long.h:50
#define Qtrue
Old name of RUBY_Qtrue.
#define INT2NUM
Old name of RB_INT2NUM.
Definition int.h:43
#define Qnil
Old name of RUBY_Qnil.
#define Qfalse
Old name of RUBY_Qfalse.
#define T_ARRAY
Old name of RUBY_T_ARRAY.
Definition value_type.h:56
#define NIL_P
Old name of RB_NIL_P.
#define Check_TypedStruct(v, t)
Old name of rb_check_typeddata.
Definition rtypeddata.h:105
#define NUM2LONG
Old name of RB_NUM2LONG.
Definition long.h:51
#define FIXNUM_P
Old name of RB_FIXNUM_P.
#define FL_SET_RAW
Old name of RB_FL_SET_RAW.
Definition fl_type.h:130
int rb_typeddata_is_kind_of(VALUE obj, const rb_data_type_t *data_type)
Checks if the given object is of given kind.
Definition error.c:1380
VALUE rb_eTypeError
TypeError exception.
Definition error.c:1430
VALUE rb_eStopIteration
StopIteration exception.
Definition enumerator.c:181
VALUE rb_ensure(VALUE(*b_proc)(VALUE), VALUE data1, VALUE(*e_proc)(VALUE), VALUE data2)
An equivalent to ensure clause.
Definition eval.c:1065
VALUE rb_eThreadError
ThreadError exception.
Definition eval.c:954
VALUE rb_obj_class(VALUE obj)
Queries the class of an object.
Definition object.c:247
VALUE rb_cThread
Thread class.
Definition vm.c:530
double rb_num2dbl(VALUE num)
Converts an instance of rb_cNumeric into C's double.
Definition object.c:3690
VALUE rb_equal(VALUE lhs, VALUE rhs)
This function is an optimised version of calling #==.
Definition object.c:179
#define RB_OBJ_WRITE(old, slot, young)
Declaration of a "back" pointer.
Definition gc.h:603
Defines RBIMPL_HAS_BUILTIN.
static int rb_check_arity(int argc, int min, int max)
Ensures that the passed integer is in the passed range.
Definition error.h:284
void rb_provide(const char *feature)
Declares that the given feature is already provided by someone else.
Definition load.c:715
VALUE rb_mutex_new(void)
Creates a mutex.
VALUE rb_mutex_trylock(VALUE mutex)
Attempts to lock the mutex, without waiting for other threads to unlock it.
VALUE rb_mutex_locked_p(VALUE mutex)
Queries if there are any threads that holds the lock.
VALUE rb_mutex_synchronize(VALUE mutex, VALUE(*func)(VALUE arg), VALUE arg)
Obtains the lock, runs the passed function, and releases the lock when it completes.
VALUE rb_mutex_sleep(VALUE self, VALUE timeout)
Releases the lock held in the mutex and waits for the period of time; reacquires the lock on wakeup.
VALUE rb_mutex_unlock(VALUE mutex)
Releases the mutex.
VALUE rb_mutex_lock(VALUE mutex)
Attempts to lock the mutex.
struct timeval rb_time_interval(VALUE num)
Creates a "time interval".
Definition time.c:2943
void rb_const_set(VALUE space, ID name, VALUE val)
Names a constant.
Definition variable.c:3604
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
VALUE rb_yield(VALUE val)
Yields the block.
Definition vm_eval.c:1354
#define RARRAY_LEN
Just another name of rb_array_len.
Definition rarray.h:51
#define RUBY_TYPED_DEFAULT_FREE
This is a value you can set to rb_data_type_struct::dfree.
Definition rtypeddata.h:79
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
Definition rtypeddata.h:515
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:497
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition scheduler.c:228
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition scheduler.c:392
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
Definition scheduler.c:292
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition scheduler.c:411
#define RTEST
This is an old name of RB_TEST.
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:200
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40
static bool RB_TYPE_P(VALUE obj, enum ruby_value_type t)
Queries if the given object is of given type.
Definition value_type.h:376