static VALUE
ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move)
{
rb_execution_context_t *ec = GET_EC();
struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
struct rb_ractor_basket *tb = &s->take_basket;
struct rb_ractor_basket taken_basket;
rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
bool do_receive = !!RTEST(do_receivev);
bool do_yield = !!RTEST(do_yieldv);
VALUE ret_v, ret_r;
enum rb_ractor_wait_status wait_status;
struct rb_ractor_queue *rq = &cr->sync.recv_queue;
struct rb_ractor_queue *ts = &cr->sync.takers_queue;
RUBY_DEBUG_LOG("start");
retry:
RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries);
// setup wait_status
wait_status = wait_none;
if (s->take_ractors->num_entries > 0) wait_status |= wait_taking;
if (do_receive) wait_status |= wait_receiving;
if (do_yield) wait_status |= wait_yielding;
RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status));
if (wait_status == wait_none) {
rb_raise(rb_eRactorError, "no taking ractors");
}
// check recv_queue
if (do_receive && !UNDEF_P(ret_v = ractor_try_receive(ec, cr, rq))) {
ret_r = ID2SYM(rb_intern("receive"));
goto success;
}
// check takers
if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) {
ret_v = Qnil;
ret_r = ID2SYM(rb_intern("yield"));
goto success;
}
// check take_basket
VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved));
s->take_basket.type.e = basket_type_none;
// kick all take target ractors
st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb);
RACTOR_LOCK_SELF(cr);
{
retry_waiting:
while (1) {
if (!basket_none_p(tb)) {
RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e),
tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0);
break;
}
if (do_receive && !ractor_queue_empty_p(cr, rq)) {
RUBY_DEBUG_LOG("can receive (%d)", rq->cnt);
break;
}
if (do_yield && ractor_check_take_basket(cr, ts)) {
RUBY_DEBUG_LOG("can yield");
break;
}
ractor_sleep_with_cleanup(ec, cr, wait_status, ractor_selector_wait_cleaup, tb);
}
taken_basket = *tb;
// ensure
// tb->type.e = basket_type_reserved # do it atomic in the following code
if (taken_basket.type.e == basket_type_yielding ||
RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) {
if (basket_type_p(tb, basket_type_yielding)) {
RACTOR_UNLOCK_SELF(cr);
{
rb_thread_sleep(0);
}
RACTOR_LOCK_SELF(cr);
}
goto retry_waiting;
}
}
RACTOR_UNLOCK_SELF(cr);
// check the taken resutl
switch (taken_basket.type.e) {
case basket_type_none:
VM_ASSERT(do_receive || do_yield);
goto retry;
case basket_type_yielding:
rb_bug("unreachable");
case basket_type_deleted: {
ractor_selector_remove(selv, taken_basket.sender);
rb_ractor_t *r = RACTOR_PTR(taken_basket.sender);
if (ractor_take_will_lock(r, &taken_basket)) {
RUBY_DEBUG_LOG("has_will");
}
else {
RUBY_DEBUG_LOG("no will");
// rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
// remove and retry wait
goto retry;
}
break;
}
case basket_type_will:
// no more messages
ractor_selector_remove(selv, taken_basket.sender);
break;
default:
break;
}
RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e));
ret_v = ractor_basket_accept(&taken_basket);
ret_r = taken_basket.sender;
success:
return rb_ary_new_from_args(2, ret_r, ret_v);
}