2#include "ccan/list/list.h"
5static VALUE rb_cMutex, rb_eClosedQueueError;
12 struct ccan_list_head waitq;
20 struct ccan_list_node node;
26 if (rb_fiberptr_blocking(fiber)) {
39#define MUTEX_ALLOW_TRAP FL_USER1
42sync_wakeup(
struct ccan_list_head *head,
long max)
44 RUBY_DEBUG_LOG(
"max:%ld", max);
48 ccan_list_for_each_safe(head, cur, next, node) {
49 ccan_list_del_init(&cur->node);
51 if (cur->th->status != THREAD_KILLED) {
52 if (cur->th->scheduler !=
Qnil && cur->fiber) {
56 RUBY_DEBUG_LOG(
"target_th:%u", rb_th_serial(cur->th));
57 rb_threadptr_interrupt(cur->th);
58 cur->th->status = THREAD_RUNNABLE;
61 if (--max == 0)
return;
67wakeup_one(
struct ccan_list_head *head)
73wakeup_all(
struct ccan_list_head *head)
75 sync_wakeup(head, LONG_MAX);
78#if defined(HAVE_WORKING_FORK)
79static void rb_mutex_abandon_all(
rb_mutex_t *mutexes);
80static void rb_mutex_abandon_keeping_mutexes(
rb_thread_t *th);
81static void rb_mutex_abandon_locking_mutex(
rb_thread_t *th);
91 ccan_list_for_each(&mutex->waitq, w, node) {
103 return mutex->ec_serial != 0;
110 if (mutex_locked_p(mutex)) {
111 const char *err = rb_mutex_unlock_th(mutex, mutex->th, 0);
112 if (err) rb_bug(
"%s", err);
118mutex_memsize(
const void *ptr)
125 {NULL, mutex_free, mutex_memsize,},
140rb_obj_is_mutex(
VALUE obj)
142 return RBOOL(rb_typeddata_is_kind_of(obj, &mutex_data_type));
146mutex_alloc(
VALUE klass)
153 ccan_list_head_init(&mutex->waitq);
160 return mutex_alloc(rb_cMutex);
168 return RBOOL(mutex_locked_p(mutex));
175 if (thread->keeping_mutexes) {
176 mutex->next_mutex = thread->keeping_mutexes;
179 thread->keeping_mutexes = mutex;
185 rb_mutex_t **keeping_mutexes = &thread->keeping_mutexes;
187 while (*keeping_mutexes && *keeping_mutexes != mutex) {
189 keeping_mutexes = &(*keeping_mutexes)->next_mutex;
192 if (*keeping_mutexes) {
193 *keeping_mutexes = mutex->next_mutex;
194 mutex->next_mutex = NULL;
202 mutex->ec_serial = ec_serial;
208 mutex_set_owner(mutex, th, ec_serial);
209 thread_mutex_insert(th, mutex);
215 if (mutex->ec_serial == 0) {
216 RUBY_DEBUG_LOG(
"%p ok", mutex);
218 mutex_locked(mutex, th, ec_serial);
222 RUBY_DEBUG_LOG(
"%p ng", mutex);
230 return RBOOL(do_mutex_trylock(mutex_ptr(self), ec->thread_ptr, rb_ec_serial(ec)));
236 return rb_mut_trylock(GET_EC(), self);
240mutex_owned_p(rb_serial_t ec_serial,
rb_mutex_t *mutex)
242 return RBOOL(mutex->ec_serial == ec_serial);
246call_rb_fiber_scheduler_block(
VALUE mutex)
252delete_from_waitq(
VALUE value)
272 args->mutex = mutex_ptr(mutex);
277do_mutex_lock(
struct mutex_args *args,
int interruptible_p)
279 VALUE self = args->self;
283 rb_serial_t ec_serial = rb_ec_serial(ec);
289 th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) {
293 if (!do_mutex_trylock(mutex, th, ec_serial)) {
294 if (mutex->ec_serial == ec_serial) {
298 while (mutex->ec_serial != ec_serial) {
299 VM_ASSERT(mutex->ec_serial != 0);
302 if (scheduler !=
Qnil) {
306 .fiber = nonblocking_fiber(fiber)
309 ccan_list_add_tail(&mutex->waitq, &
sync_waiter.node);
313 if (!mutex->ec_serial) {
314 mutex_set_owner(mutex, th, ec_serial);
318 if (!th->vm->thread_ignore_deadlock && mutex->th == th) {
319 rb_raise(
rb_eThreadError,
"deadlock; lock already owned by another fiber belonging to the same thread");
325 .fiber = nonblocking_fiber(fiber),
328 RUBY_DEBUG_LOG(
"%p wait", mutex);
339 enum rb_thread_status prev_status = th->status;
340 th->status = THREAD_STOPPED_FOREVER;
341 rb_ractor_sleeper_threads_inc(th->ractor);
342 rb_check_deadlock(th->ractor);
345 th->locking_mutex = self;
347 ccan_list_add_tail(&mutex->waitq, &
sync_waiter.node);
349 native_sleep(th, NULL);
354 if (!mutex->ec_serial) {
355 mutex_set_owner(mutex, th, ec_serial);
358 rb_ractor_sleeper_threads_dec(th->ractor);
359 th->status = prev_status;
360 th->locking_mutex =
Qfalse;
362 RUBY_DEBUG_LOG(
"%p wakeup", mutex);
365 if (interruptible_p) {
368 if (mutex->ec_serial == ec_serial) {
370 mutex->ec_serial = 0;
372 RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
373 if (!mutex->ec_serial) {
374 mutex_set_owner(mutex, th, ec_serial);
379 if (RUBY_VM_INTERRUPTED(th->ec)) {
381 if (saved_ints == 0) {
382 saved_ints = threadptr_get_interrupts(th);
386 threadptr_get_interrupts(th);
392 if (saved_ints) th->ec->interrupt_flag = saved_ints;
393 if (mutex->ec_serial == ec_serial) mutex_locked(mutex, th, ec_serial);
396 RUBY_DEBUG_LOG(
"%p locked", mutex);
399 if (mutex_owned_p(ec_serial, mutex) ==
Qfalse) rb_bug(
"do_mutex_lock: mutex is not owned.");
405mutex_lock_uninterruptible(
VALUE self)
408 mutex_args_init(&args, self);
409 return do_mutex_lock(&args, 0);
417 .mutex = mutex_ptr(self),
420 return do_mutex_lock(&args, 1);
427 mutex_args_init(&args, self);
428 return do_mutex_lock(&args, 1);
434 return mutex_owned_p(rb_ec_serial(ec), mutex_ptr(self));
438rb_mutex_owned_p(
VALUE self)
440 return rb_mut_owned_p(GET_EC(), self);
446 RUBY_DEBUG_LOG(
"%p", mutex);
448 if (mutex->ec_serial == 0) {
449 return "Attempt to unlock a mutex which is not locked";
451 else if (ec_serial && mutex->ec_serial != ec_serial) {
452 return "Attempt to unlock a mutex which is locked by another thread/fiber";
457 mutex->ec_serial = 0;
458 thread_mutex_remove(th, mutex);
460 ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
461 ccan_list_del_init(&cur->node);
463 if (cur->th->scheduler !=
Qnil && cur->fiber) {
468 switch (cur->th->status) {
469 case THREAD_RUNNABLE:
470 case THREAD_STOPPED_FOREVER:
471 RUBY_DEBUG_LOG(
"wakeup th:%u", rb_th_serial(cur->th));
472 rb_threadptr_interrupt(cur->th);
475 rb_bug(
"unexpected THREAD_STOPPED");
478 rb_bug(
"unexpected THREAD_KILLED");
495 err = rb_mutex_unlock_th(mutex, th, rb_ec_serial(args->ec));
500do_mutex_unlock_safe(
VALUE args)
517 mutex_args_init(&args, self);
518 do_mutex_unlock(&args);
527 .mutex = mutex_ptr(self),
530 do_mutex_unlock(&args);
534#if defined(HAVE_WORKING_FORK)
538 rb_mutex_abandon_all(th->keeping_mutexes);
539 th->keeping_mutexes = NULL;
545 if (th->locking_mutex) {
546 rb_mutex_t *mutex = mutex_ptr(th->locking_mutex);
548 ccan_list_head_init(&mutex->waitq);
549 th->locking_mutex =
Qfalse;
560 mutexes = mutex->next_mutex;
561 mutex->ec_serial = 0;
562 mutex->next_mutex = 0;
563 ccan_list_head_init(&mutex->waitq);
574mutex_sleep_begin(
VALUE _arguments)
577 VALUE timeout = arguments->timeout;
581 if (scheduler !=
Qnil) {
585 if (
NIL_P(timeout)) {
586 rb_thread_sleep_deadly_allow_spurious_wakeup(arguments->self,
Qnil, 0);
590 rb_hrtime_t relative_timeout = rb_timeval2hrtime(&timeout_value);
592 woken = RBOOL(sleep_hrtime(GET_THREAD(), relative_timeout, 0));
602 if (!
NIL_P(timeout)) {
607 rb_mut_unlock(ec, self);
608 time_t beg = time(0);
615 VALUE woken = rb_ec_ensure(ec, mutex_sleep_begin, (
VALUE)&arguments, mutex_lock_uninterruptible, self);
617 RUBY_VM_CHECK_INTS_BLOCKING(ec);
618 if (!woken)
return Qnil;
619 time_t end = time(0) - beg;
620 return TIMET2NUM(end);
626 return rb_mut_sleep(GET_EC(), self, timeout);
633 mutex_args_init(&args, self);
634 do_mutex_lock(&args, 1);
635 return rb_ec_ensure(args.ec, func, arg, do_mutex_unlock_safe, (
VALUE)&args);
639do_ec_yield(
VALUE _ec)
649 .mutex = mutex_ptr(self),
652 do_mutex_lock(&args, 1);
653 return rb_ec_ensure(args.ec, do_ec_yield, (
VALUE)ec, do_mutex_unlock_safe, (
VALUE)&args);
657rb_mutex_allow_trap(
VALUE self,
int val)
670 struct ccan_list_head waitq;
671 rb_serial_t fork_gen;
679#define szqueue_waitq(sq) &sq->q.waitq
680#define szqueue_pushq(sq) &sq->pushq
684 int num_waiting_push;
685 struct ccan_list_head pushq;
690queue_mark_and_move(
void *ptr)
694 for (
long index = 0; index < q->len; index++) {
695 rb_gc_mark_and_move(&q->buffer[((q->offset + index) % q->capa)]);
704 ruby_sized_xfree(q->buffer, q->capa *
sizeof(
VALUE));
709queue_memsize(
const void *ptr)
718 .dmark = queue_mark_and_move,
720 .dsize = queue_memsize,
721 .dcompact = queue_mark_and_move,
727queue_alloc(
VALUE klass)
733 ccan_list_head_init(&q->waitq);
740 rb_serial_t fork_gen = GET_VM()->fork_gen;
742 if (RB_LIKELY(q->fork_gen == fork_gen)) {
746 q->fork_gen = fork_gen;
747 ccan_list_head_init(&q->waitq);
753raw_queue_ptr(
VALUE obj)
766 if (RB_UNLIKELY(q->buffer == NULL)) {
767 rb_raise(
rb_eTypeError,
"%+"PRIsVALUE
" not initialized", obj);
774 struct rb_queue *q = raw_queue_ptr(obj);
779#define QUEUE_CLOSED FL_USER5
782queue_timeout2hrtime(
VALUE timeout)
784 if (
NIL_P(timeout)) {
785 return (rb_hrtime_t)0;
789 rel = rb_sec2hrtime(NUM2TIMET(timeout));
794 return rb_hrtime_add(rel, rb_hrtime_now());
798szqueue_mark_and_move(
void *ptr)
802 queue_mark_and_move(&sq->q);
806szqueue_free(
void *ptr)
813szqueue_memsize(
const void *ptr)
822 .dmark = szqueue_mark_and_move,
823 .dfree = szqueue_free,
824 .dsize = szqueue_memsize,
825 .dcompact = szqueue_mark_and_move,
827 .parent = &queue_data_type,
832szqueue_alloc(
VALUE klass)
836 &szqueue_data_type, sq);
837 ccan_list_head_init(szqueue_waitq(sq));
838 ccan_list_head_init(szqueue_pushq(sq));
843raw_szqueue_ptr(
VALUE obj)
848 if (RB_UNLIKELY(queue_fork_check(&sq->q))) {
849 ccan_list_head_init(szqueue_pushq(sq));
850 sq->num_waiting_push = 0;
857szqueue_ptr(
VALUE obj)
860 check_queue(obj, &sq->q);
865queue_closed_p(
VALUE self)
877NORETURN(
static void raise_closed_queue_error(
VALUE self));
880raise_closed_queue_error(
VALUE self)
882 rb_raise(rb_eClosedQueueError,
"queue closed");
892#define QUEUE_INITIAL_CAPA 8
895ring_buffer_init(
struct rb_queue *q,
long initial_capa)
898 q->capa = initial_capa;
902ring_buffer_expand(
struct rb_queue *q)
906 MEMCPY(new_buffer, q->buffer + q->offset,
VALUE, q->capa - q->offset);
907 MEMCPY(new_buffer + (q->capa - q->offset), q->buffer,
VALUE, q->offset);
908 VALUE *old_buffer = q->buffer;
909 q->buffer = new_buffer;
911 ruby_sized_xfree(old_buffer, q->capa *
sizeof(
VALUE));
918 if (RB_UNLIKELY(q->len >= q->capa)) {
919 ring_buffer_expand(q);
922 long index = (q->offset + q->len) % q->capa;
928ring_buffer_shift(
struct rb_queue *q)
934 VALUE obj = q->buffer[q->offset];
940 q->offset = (q->offset + 1) % q->capa;
948 struct rb_queue *q = raw_queue_ptr(self);
949 ccan_list_head_init(&q->waitq);
950 if (
NIL_P(initial)) {
951 ring_buffer_init(q, QUEUE_INITIAL_CAPA);
954 initial = rb_to_array(initial);
956 long initial_capa = QUEUE_INITIAL_CAPA;
957 while (initial_capa <
len) {
960 ring_buffer_init(q, initial_capa);
970 check_queue(self, q);
971 if (queue_closed_p(self)) {
972 raise_closed_queue_error(self);
974 ring_buffer_push(self, q, obj);
975 wakeup_one(&q->waitq);
980queue_sleep(
VALUE _args)
983 rb_thread_sleep_deadly_allow_spurious_wakeup(args->self, args->timeout, args->end);
996queue_sleep_done(
VALUE p)
1000 ccan_list_del(&qw->w.node);
1001 qw->as.q->num_waiting--;
1007szqueue_sleep_done(
VALUE p)
1011 ccan_list_del(&qw->w.node);
1012 qw->as.sq->num_waiting_push--;
1021 if (
RTEST(non_block)) {
1030 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1031 while (q->len == 0) {
1032 if (queue_closed_p(self)) {
1033 return queue_closed_result(self, q);
1040 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1044 struct ccan_list_head *waitq = &q->waitq;
1056 if (!
NIL_P(timeout) && (rb_hrtime_now() >= end))
1061 return ring_buffer_shift(q);
1067 return queue_do_pop(ec, self, queue_ptr(self), non_block, timeout);
1081 struct rb_szqueue *sq = raw_szqueue_ptr(self);
1084 rb_raise(rb_eArgError,
"queue size must be positive");
1086 ring_buffer_init(&sq->q, QUEUE_INITIAL_CAPA);
1087 ccan_list_head_init(szqueue_waitq(sq));
1088 ccan_list_head_init(szqueue_pushq(sq));
1099 if (sq->q.len >= sq->max) {
1100 if (
RTEST(non_block)) {
1109 rb_hrtime_t end = queue_timeout2hrtime(timeout);
1110 while (sq->q.len >= sq->max) {
1111 if (queue_closed_p(self)) {
1112 raise_closed_queue_error(self);
1116 .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
1120 struct ccan_list_head *pushq = szqueue_pushq(sq);
1123 sq->num_waiting_push++;
1131 if (!
NIL_P(timeout) && rb_hrtime_now() >= end) {
1137 return queue_do_push(self, &sq->q,
object);
1144 VALUE retval = queue_do_pop(ec, self, &sq->q, non_block, timeout);
1146 if (sq->q.len < sq->max) {
1147 wakeup_one(szqueue_pushq(sq));
1155 struct ccan_list_head waitq;
1156 rb_serial_t fork_gen;
1160condvar_memsize(
const void *ptr)
1172condvar_ptr(
VALUE self)
1175 rb_serial_t fork_gen = GET_VM()->fork_gen;
1180 if (cv->fork_gen != fork_gen) {
1181 cv->fork_gen = fork_gen;
1182 ccan_list_head_init(&cv->waitq);
1189condvar_alloc(
VALUE klass)
1195 ccan_list_head_init(&cv->waitq);
1212 if (
CLASS_OF(p->mutex) == rb_cMutex) {
1213 return rb_mut_sleep(p->ec, p->mutex, p->timeout);
1216 return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout);
1232 .th = ec->thread_ptr,
1233 .fiber = nonblocking_fiber(ec->fiber_ptr)
1236 ccan_list_add_tail(&cv->waitq, &
sync_waiter.node);
1244 wakeup_one(&cv->waitq);
1252 wakeup_all(&cv->waitq);
1257Init_thread_sync(
void)
1269 VALUE rb_cSizedQueue = rb_define_class_id_under_no_pin(
rb_cThread, rb_intern(
"SizedQueue"), rb_cQueue);
1276 id_sleep = rb_intern(
"sleep");
1281#include "thread_sync.rbinc"
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
VALUE rb_define_class(const char *name, VALUE super)
Defines a top-level class.
VALUE rb_define_class_id_under(VALUE outer, ID id, VALUE super)
Identical to rb_define_class_under(), except it takes the name in ID instead of C's string.
#define FL_UNSET_RAW
Old name of RB_FL_UNSET_RAW.
#define Qundef
Old name of RUBY_Qundef.
#define INT2FIX
Old name of RB_INT2FIX.
#define CLASS_OF
Old name of rb_class_of.
#define ALLOC_N
Old name of RB_ALLOC_N.
#define FL_TEST_RAW
Old name of RB_FL_TEST_RAW.
#define Qtrue
Old name of RUBY_Qtrue.
#define Qnil
Old name of RUBY_Qnil.
#define Qfalse
Old name of RUBY_Qfalse.
#define NIL_P
Old name of RB_NIL_P.
#define Check_TypedStruct(v, t)
Old name of rb_check_typeddata.
#define NUM2LONG
Old name of RB_NUM2LONG.
#define FIXNUM_P
Old name of RB_FIXNUM_P.
#define FL_SET_RAW
Old name of RB_FL_SET_RAW.
VALUE rb_eTypeError
TypeError exception.
VALUE rb_eStopIteration
StopIteration exception.
VALUE rb_ensure(VALUE(*b_proc)(VALUE), VALUE data1, VALUE(*e_proc)(VALUE), VALUE data2)
An equivalent to ensure clause.
VALUE rb_eThreadError
ThreadError exception.
VALUE rb_cObject
Object class.
VALUE rb_cThread
Thread class.
double rb_num2dbl(VALUE num)
Converts an instance of rb_cNumeric into C's double.
VALUE rb_equal(VALUE lhs, VALUE rhs)
This function is an optimised version of calling #==.
#define RB_OBJ_WRITE(old, slot, young)
Declaration of a "back" pointer.
Defines RBIMPL_HAS_BUILTIN.
void rb_provide(const char *feature)
Declares that the given feature is already provided by someone else.
VALUE rb_mutex_new(void)
Creates a mutex.
VALUE rb_mutex_trylock(VALUE mutex)
Attempts to lock the mutex, without waiting for other threads to unlock it.
VALUE rb_mutex_locked_p(VALUE mutex)
Queries if there are any threads that holds the lock.
VALUE rb_mutex_synchronize(VALUE mutex, VALUE(*func)(VALUE arg), VALUE arg)
Obtains the lock, runs the passed function, and releases the lock when it completes.
VALUE rb_mutex_sleep(VALUE self, VALUE timeout)
Releases the lock held in the mutex and waits for the period of time; reacquires the lock on wakeup.
VALUE rb_mutex_unlock(VALUE mutex)
Releases the mutex.
VALUE rb_mutex_lock(VALUE mutex)
Attempts to lock the mutex.
struct timeval rb_time_interval(VALUE num)
Creates a "time interval".
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
int capa
Designed capacity of the buffer.
int len
Length of the buffer.
#define MEMCPY(p1, p2, type, n)
Handy macro to call memcpy.
#define RARRAY_LEN
Just another name of rb_array_len.
#define RARRAY_CONST_PTR
Just another name of rb_array_const_ptr.
#define RUBY_TYPED_DEFAULT_FREE
This is a value you can set to rb_data_type_struct::dfree.
#define RUBY_TYPED_FREE_IMMEDIATELY
Macros to see if each corresponding flag is defined.
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
#define RTEST
This is an old name of RB_TEST.
This is the struct that holds necessary info for a struct.
const char * wrap_struct_name
Name of structs of this kind.
VALUE flags
Type-specific behavioural characteristics.
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
uintptr_t VALUE
Type that represents a Ruby object.