Ruby 3.5.0dev (2025-05-15 revision 87261c2d95f93f8738557cfb6f93ed14f1b483dd)
ractor.c (87261c2d95f93f8738557cfb6f93ed14f1b483dd)
1// Ractor implementation
2
3#include "ruby/ruby.h"
4#include "ruby/thread.h"
5#include "ruby/ractor.h"
7#include "vm_core.h"
8#include "eval_intern.h"
9#include "vm_sync.h"
10#include "ractor_core.h"
11#include "internal/complex.h"
12#include "internal/error.h"
13#include "internal/gc.h"
14#include "internal/hash.h"
15#include "internal/object.h"
16#include "internal/ractor.h"
17#include "internal/rational.h"
18#include "internal/struct.h"
19#include "internal/thread.h"
20#include "variable.h"
21#include "yjit.h"
22
24static VALUE rb_cRactorSelector;
25
26VALUE rb_eRactorUnsafeError;
27VALUE rb_eRactorIsolationError;
28static VALUE rb_eRactorError;
29static VALUE rb_eRactorRemoteError;
30static VALUE rb_eRactorMovedError;
31static VALUE rb_eRactorClosedError;
32static VALUE rb_cRactorMovedObject;
33
34static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line);
35
36// Ractor locking
37
38static void
39ASSERT_ractor_unlocking(rb_ractor_t *r)
40{
41#if RACTOR_CHECK_MODE > 0
42 const rb_execution_context_t *ec = rb_current_ec_noinline();
43 if (ec != NULL && r->sync.locked_by == rb_ractor_self(rb_ec_ractor_ptr(ec))) {
44 rb_bug("recursive ractor locking");
45 }
46#endif
47}
48
49static void
50ASSERT_ractor_locking(rb_ractor_t *r)
51{
52#if RACTOR_CHECK_MODE > 0
53 const rb_execution_context_t *ec = rb_current_ec_noinline();
54 if (ec != NULL && r->sync.locked_by != rb_ractor_self(rb_ec_ractor_ptr(ec))) {
55 rp(r->sync.locked_by);
56 rb_bug("ractor lock is not acquired.");
57 }
58#endif
59}
60
61static void
62ractor_lock(rb_ractor_t *r, const char *file, int line)
63{
64 RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
65
66 ASSERT_ractor_unlocking(r);
67 rb_native_mutex_lock(&r->sync.lock);
68
69#if RACTOR_CHECK_MODE > 0
70 if (rb_current_execution_context(false) != NULL) {
71 rb_ractor_t *cr = rb_current_ractor_raw(false);
72 r->sync.locked_by = cr ? rb_ractor_self(cr) : Qundef;
73 }
74#endif
75
76 RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
77}
78
79static void
80ractor_lock_self(rb_ractor_t *cr, const char *file, int line)
81{
82 VM_ASSERT(cr == rb_ec_ractor_ptr(rb_current_ec_noinline()));
83#if RACTOR_CHECK_MODE > 0
84 VM_ASSERT(cr->sync.locked_by != cr->pub.self);
85#endif
86 ractor_lock(cr, file, line);
87}
88
89static void
90ractor_unlock(rb_ractor_t *r, const char *file, int line)
91{
92 ASSERT_ractor_locking(r);
93#if RACTOR_CHECK_MODE > 0
94 r->sync.locked_by = Qnil;
95#endif
96 rb_native_mutex_unlock(&r->sync.lock);
97
98 RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, rb_current_ractor_raw(false) == r ? " (self)" : "");
99}
100
101static void
102ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
103{
104 VM_ASSERT(cr == rb_ec_ractor_ptr(rb_current_ec_noinline()));
105#if RACTOR_CHECK_MODE > 0
106 VM_ASSERT(cr->sync.locked_by == cr->pub.self);
107#endif
108 ractor_unlock(cr, file, line);
109}
110
111#define RACTOR_LOCK(r) ractor_lock(r, __FILE__, __LINE__)
112#define RACTOR_UNLOCK(r) ractor_unlock(r, __FILE__, __LINE__)
113#define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__)
114#define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__)
115
116void
117rb_ractor_lock_self(rb_ractor_t *r)
118{
119 RACTOR_LOCK_SELF(r);
120}
121
122void
123rb_ractor_unlock_self(rb_ractor_t *r)
124{
125 RACTOR_UNLOCK_SELF(r);
126}
127
128// Ractor status
129
130static const char *
131ractor_status_str(enum ractor_status status)
132{
133 switch (status) {
134 case ractor_created: return "created";
135 case ractor_running: return "running";
136 case ractor_blocking: return "blocking";
137 case ractor_terminated: return "terminated";
138 }
139 rb_bug("unreachable");
140}
141
142static void
143ractor_status_set(rb_ractor_t *r, enum ractor_status status)
144{
145 RUBY_DEBUG_LOG("r:%u [%s]->[%s]", r->pub.id, ractor_status_str(r->status_), ractor_status_str(status));
146
147 // check 1
148 if (r->status_ != ractor_created) {
149 VM_ASSERT(r == GET_RACTOR()); // only self-modification is allowed.
150 ASSERT_vm_locking();
151 }
152
153 // check2: transition check. assume it will be vanished on non-debug build.
154 switch (r->status_) {
155 case ractor_created:
156 VM_ASSERT(status == ractor_blocking);
157 break;
158 case ractor_running:
159 VM_ASSERT(status == ractor_blocking||
160 status == ractor_terminated);
161 break;
162 case ractor_blocking:
163 VM_ASSERT(status == ractor_running);
164 break;
165 case ractor_terminated:
166 rb_bug("unreachable");
167 break;
168 }
169
170 r->status_ = status;
171}
172
173static bool
174ractor_status_p(rb_ractor_t *r, enum ractor_status status)
175{
176 return rb_ractor_status_p(r, status);
177}
178
179// Ractor data/mark/free
180
181static struct rb_ractor_basket *ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i);
182static void ractor_local_storage_mark(rb_ractor_t *r);
183static void ractor_local_storage_free(rb_ractor_t *r);
184
185static void
186ractor_queue_mark(struct rb_ractor_queue *rq)
187{
188 for (int i=0; i<rq->cnt; i++) {
189 struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
190 rb_gc_mark(b->sender);
191
192 switch (b->type.e) {
193 case basket_type_yielding:
194 case basket_type_take_basket:
195 case basket_type_deleted:
196 case basket_type_reserved:
197 // ignore
198 break;
199 default:
200 rb_gc_mark(b->p.send.v);
201 }
202 }
203}
204
205static void
206ractor_mark(void *ptr)
207{
208 rb_ractor_t *r = (rb_ractor_t *)ptr;
209
210 ractor_queue_mark(&r->sync.recv_queue);
211 ractor_queue_mark(&r->sync.takers_queue);
212
213 rb_gc_mark(r->loc);
214 rb_gc_mark(r->name);
215 rb_gc_mark(r->r_stdin);
216 rb_gc_mark(r->r_stdout);
217 rb_gc_mark(r->r_stderr);
218 rb_hook_list_mark(&r->pub.hooks);
219
220 if (r->threads.cnt > 0) {
221 rb_thread_t *th = 0;
222 ccan_list_for_each(&r->threads.set, th, lt_node) {
223 VM_ASSERT(th != NULL);
224 rb_gc_mark(th->self);
225 }
226 }
227
228 ractor_local_storage_mark(r);
229}
230
231static void
232ractor_queue_free(struct rb_ractor_queue *rq)
233{
234 free(rq->baskets);
235}
236
237static void
238ractor_free(void *ptr)
239{
240 rb_ractor_t *r = (rb_ractor_t *)ptr;
241 RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r));
242 rb_native_mutex_destroy(&r->sync.lock);
243 ractor_queue_free(&r->sync.recv_queue);
244 ractor_queue_free(&r->sync.takers_queue);
245 ractor_local_storage_free(r);
246 rb_hook_list_free(&r->pub.hooks);
247
248 if (r->newobj_cache) {
249 RUBY_ASSERT(r == ruby_single_main_ractor);
250
251 rb_gc_ractor_cache_free(r->newobj_cache);
252 r->newobj_cache = NULL;
253 }
254
255 ruby_xfree(r);
256}
257
258static size_t
259ractor_queue_memsize(const struct rb_ractor_queue *rq)
260{
261 return sizeof(struct rb_ractor_basket) * rq->size;
262}
263
264static size_t
265ractor_memsize(const void *ptr)
266{
267 rb_ractor_t *r = (rb_ractor_t *)ptr;
268
269 // TODO: more correct?
270 return sizeof(rb_ractor_t) +
271 ractor_queue_memsize(&r->sync.recv_queue) +
272 ractor_queue_memsize(&r->sync.takers_queue);
273}
274
275static const rb_data_type_t ractor_data_type = {
276 "ractor",
277 {
278 ractor_mark,
279 ractor_free,
280 ractor_memsize,
281 NULL, // update
282 },
283 0, 0, RUBY_TYPED_FREE_IMMEDIATELY /* | RUBY_TYPED_WB_PROTECTED */
284};
285
286bool
287rb_ractor_p(VALUE gv)
288{
289 if (rb_typeddata_is_kind_of(gv, &ractor_data_type)) {
290 return true;
291 }
292 else {
293 return false;
294 }
295}
296
297static inline rb_ractor_t *
298RACTOR_PTR(VALUE self)
299{
300 VM_ASSERT(rb_ractor_p(self));
301 rb_ractor_t *r = DATA_PTR(self);
302 return r;
303}
304
305static rb_atomic_t ractor_last_id;
306
307#if RACTOR_CHECK_MODE > 0
308uint32_t
309rb_ractor_current_id(void)
310{
311 if (GET_THREAD()->ractor == NULL) {
312 return 1; // main ractor
313 }
314 else {
315 return rb_ractor_id(GET_RACTOR());
316 }
317}
318#endif
319
320// Ractor queue
321
322static void
323ractor_queue_setup(struct rb_ractor_queue *rq)
324{
325 rq->size = 2;
326 rq->cnt = 0;
327 rq->start = 0;
328 rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
329}
330
331static struct rb_ractor_basket *
332ractor_queue_head(rb_ractor_t *r, struct rb_ractor_queue *rq)
333{
334 if (r != NULL) ASSERT_ractor_locking(r);
335 return &rq->baskets[rq->start];
336}
337
338static struct rb_ractor_basket *
339ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
340{
341 if (r != NULL) ASSERT_ractor_locking(r);
342 return &rq->baskets[(rq->start + i) % rq->size];
343}
344
345static void
346ractor_queue_advance(rb_ractor_t *r, struct rb_ractor_queue *rq)
347{
348 ASSERT_ractor_locking(r);
349
350 if (rq->reserved_cnt == 0) {
351 rq->cnt--;
352 rq->start = (rq->start + 1) % rq->size;
353 rq->serial++;
354 }
355 else {
356 ractor_queue_at(r, rq, 0)->type.e = basket_type_deleted;
357 }
358}
359
360static bool
361ractor_queue_skip_p(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
362{
363 struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
364 return basket_type_p(b, basket_type_deleted) ||
365 basket_type_p(b, basket_type_reserved);
366}
367
368static void
369ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
370{
371 ASSERT_ractor_locking(r);
372
373 while (rq->cnt > 0 && basket_type_p(ractor_queue_at(r, rq, 0), basket_type_deleted)) {
374 ractor_queue_advance(r, rq);
375 }
376}
377
378static bool
379ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
380{
381 ASSERT_ractor_locking(r);
382
383 if (rq->cnt == 0) {
384 return true;
385 }
386
387 ractor_queue_compact(r, rq);
388
389 for (int i=0; i<rq->cnt; i++) {
390 if (!ractor_queue_skip_p(r, rq, i)) {
391 return false;
392 }
393 }
394
395 return true;
396}
397
398static bool
399ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
400{
401 ASSERT_ractor_locking(r);
402
403 for (int i=0; i<rq->cnt; i++) {
404 if (!ractor_queue_skip_p(r, rq, i)) {
405 struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
406 *basket = *b;
407
408 // remove from queue
409 b->type.e = basket_type_deleted;
410 ractor_queue_compact(r, rq);
411 return true;
412 }
413 }
414
415 return false;
416}
417
418static void
419ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
420{
421 ASSERT_ractor_locking(r);
422
423 if (rq->size <= rq->cnt) {
424 rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2);
425 for (int i=rq->size - rq->start; i<rq->cnt; i++) {
426 rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size];
427 }
428 rq->size *= 2;
429 }
430 // copy basket into queue
431 rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket;
432 // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt);
433}
434
435static void
436ractor_queue_delete(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
437{
438 basket->type.e = basket_type_deleted;
439}
440
441// Ractor basket
442
443static VALUE ractor_reset_belonging(VALUE obj); // in this file
444
445static VALUE
446ractor_basket_value(struct rb_ractor_basket *b)
447{
448 switch (b->type.e) {
449 case basket_type_ref:
450 break;
451 case basket_type_copy:
452 case basket_type_move:
453 case basket_type_will:
454 b->type.e = basket_type_ref;
455 b->p.send.v = ractor_reset_belonging(b->p.send.v);
456 break;
457 default:
458 rb_bug("unreachable");
459 }
460
461 return b->p.send.v;
462}
463
464static VALUE
465ractor_basket_accept(struct rb_ractor_basket *b)
466{
467 VALUE v = ractor_basket_value(b);
468
469 // a ractor's main thread had an error and yielded us this exception during its dying moments
470 if (b->p.send.exception) {
471 VALUE cause = v;
472 VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
473 rb_ivar_set(err, rb_intern("@ractor"), b->sender);
474 rb_ec_setup_exception(NULL, err, cause);
475 rb_exc_raise(err);
476 }
477
478 return v;
479}
480
481// Ractor synchronizations
482
483#if USE_RUBY_DEBUG_LOG
484static const char *
485wait_status_str(enum rb_ractor_wait_status wait_status)
486{
487 switch ((int)wait_status) {
488 case wait_none: return "none";
489 case wait_receiving: return "receiving";
490 case wait_taking: return "taking";
491 case wait_yielding: return "yielding";
492 case wait_receiving|wait_taking: return "receiving|taking";
493 case wait_receiving|wait_yielding: return "receiving|yielding";
494 case wait_taking|wait_yielding: return "taking|yielding";
495 case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding";
496 }
497 rb_bug("unreachable");
498}
499
500static const char *
501wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status)
502{
503 switch (wakeup_status) {
504 case wakeup_none: return "none";
505 case wakeup_by_send: return "by_send";
506 case wakeup_by_yield: return "by_yield";
507 case wakeup_by_take: return "by_take";
508 case wakeup_by_close: return "by_close";
509 case wakeup_by_interrupt: return "by_interrupt";
510 case wakeup_by_retry: return "by_retry";
511 }
512 rb_bug("unreachable");
513}
514
515static const char *
516basket_type_name(enum rb_ractor_basket_type type)
517{
518 switch (type) {
519 case basket_type_none: return "none";
520 case basket_type_ref: return "ref";
521 case basket_type_copy: return "copy";
522 case basket_type_move: return "move";
523 case basket_type_will: return "will";
524 case basket_type_deleted: return "deleted";
525 case basket_type_reserved: return "reserved";
526 case basket_type_take_basket: return "take_basket";
527 case basket_type_yielding: return "yielding";
528 }
529 VM_ASSERT(0);
530 return NULL;
531}
532#endif // USE_RUBY_DEBUG_LOG
533
534static rb_thread_t *
535ractor_sleeping_by(const rb_ractor_t *r, rb_thread_t *th, enum rb_ractor_wait_status wait_status)
536{
537 if (th) {
538 if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) {
539 return th;
540 }
541 } else {
542 // find any thread that has this ractor wait status that is blocked
543 ccan_list_for_each(&r->sync.wait.waiting_threads, th, ractor_waiting.waiting_node) {
544 if ((th->ractor_waiting.wait_status & wait_status) && th->ractor_waiting.wakeup_status == wakeup_none) {
545 return th;
546 }
547 }
548 }
549 return NULL;
550}
551
552#ifdef RUBY_THREAD_PTHREAD_H
553// thread_*.c
554void rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th);
555#else
556
557// win32
558static void
559rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th)
560{
561 (void)r;
562 ASSERT_ractor_locking(r);
563 rb_native_cond_signal(&th->ractor_waiting.cond);
564
565}
566#endif
567
568
569/*
570 * Wakeup `r` if the given `th` is blocked and has the given ractor `wait_status`.
571 * Wakeup any blocked thread in `r` with the given ractor `wait_status` if `th` is NULL.
572 */
573static bool
574ractor_wakeup(rb_ractor_t *r, rb_thread_t *th /* can be NULL */, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status)
575{
576 ASSERT_ractor_locking(r);
577
578 RUBY_DEBUG_LOG("r:%u wait_by:%s -> wait:%s wakeup:%s",
579 rb_ractor_id(r),
580 wait_status_str(th->ractor_waiting.wait_status),
581 wait_status_str(wait_status),
582 wakeup_status_str(wakeup_status));
583
584 if ((th = ractor_sleeping_by(r, th, wait_status)) != NULL) {
585 th->ractor_waiting.wakeup_status = wakeup_status;
586 rb_ractor_sched_wakeup(r, th);
587 return true;
588 }
589 else {
590 return false;
591 }
592}
593
594// unblock function (UBF). This gets called when another thread on this or another ractor sets our thread's interrupt flag.
595// This is not async-safe.
596static void
597ractor_sleep_interrupt(void *ptr)
598{
599 rb_execution_context_t *ec = ptr;
600 rb_ractor_t *r = rb_ec_ractor_ptr(ec);
601 rb_thread_t *th = rb_ec_thread_ptr(ec);
602
603 RACTOR_LOCK(r);
604 {
605 ractor_wakeup(r, th, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt);
606 }
607 RACTOR_UNLOCK(r);
608}
609
610typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p);
611
612// Checks the current thread for ruby interrupts and runs the cleanup function `cf_func` with `cf_data` if
613// `rb_ec_check_ints` is going to raise. See the `rb_threadptr_execute_interrupts` for info on when it can raise.
614static void
615ractor_check_ints(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, ractor_sleep_cleanup_function cf_func, void *cf_data)
616{
617 if (cur_th->ractor_waiting.wait_status != wait_none) {
618 enum rb_ractor_wait_status prev_wait_status = cur_th->ractor_waiting.wait_status;
619 cur_th->ractor_waiting.wait_status = wait_none;
620 cur_th->ractor_waiting.wakeup_status = wakeup_by_interrupt;
621
622 RACTOR_UNLOCK(cr);
623 {
624 if (cf_func) {
625 enum ruby_tag_type state;
626 EC_PUSH_TAG(ec);
627 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
628 rb_ec_check_ints(ec);
629 }
630 EC_POP_TAG();
631
632 if (state) {
633 (*cf_func)(cr, cf_data); // cleanup function is run after the ubf, if it had ubf
634 EC_JUMP_TAG(ec, state);
635 }
636 }
637 else {
638 rb_ec_check_ints(ec);
639 }
640 }
641
642 RACTOR_LOCK(cr);
643 cur_th->ractor_waiting.wait_status = prev_wait_status;
644 }
645}
646
647#ifdef RUBY_THREAD_PTHREAD_H
648void rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf);
649#else
650
651static void
652ractor_cond_wait(rb_ractor_t *r, rb_thread_t *th)
653{
654#if RACTOR_CHECK_MODE > 0
655 VALUE locked_by = r->sync.locked_by;
656 r->sync.locked_by = Qnil;
657#endif
658 rb_native_cond_wait(&th->ractor_waiting.cond, &r->sync.lock);
659
660#if RACTOR_CHECK_MODE > 0
661 r->sync.locked_by = locked_by;
662#endif
663}
664
665static void *
666ractor_sleep_wo_gvl(void *ptr)
667{
668 rb_ractor_t *cr = ptr;
669 rb_execution_context_t *ec = cr->threads.running_ec;
670 VM_ASSERT(GET_EC() == ec);
671 rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
672 RACTOR_LOCK_SELF(cr);
673 {
674 VM_ASSERT(cur_th->ractor_waiting.wait_status != wait_none);
675 // it's possible that another ractor has woken us up (ractor_wakeup),
676 // so check this condition
677 if (cur_th->ractor_waiting.wakeup_status == wakeup_none) {
678 cur_th->status = THREAD_STOPPED_FOREVER;
679 ractor_cond_wait(cr, cur_th);
680 cur_th->status = THREAD_RUNNABLE;
681 VM_ASSERT(cur_th->ractor_waiting.wakeup_status != wakeup_none);
682 } else {
683 RUBY_DEBUG_LOG("rare timing, no cond wait");
684 }
685 cur_th->ractor_waiting.wait_status = wait_none;
686 }
687 RACTOR_UNLOCK_SELF(cr);
688 return NULL;
689}
690
691static void
692rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf_ractor_sleep_interrupt)
693{
694 ASSERT_ractor_locking(cr);
695 rb_thread_t *th = rb_ec_thread_ptr(ec);
696 struct ccan_list_node *waitn = &th->ractor_waiting.waiting_node;
697 VM_ASSERT(waitn->next == waitn->prev && waitn->next == waitn); // it should be unlinked
698 ccan_list_add(&cr->sync.wait.waiting_threads, waitn);
699 RACTOR_UNLOCK(cr);
700 {
701 rb_nogvl(ractor_sleep_wo_gvl, cr, ubf_ractor_sleep_interrupt, ec, RB_NOGVL_INTR_FAIL);
702 }
703 RACTOR_LOCK(cr);
704 ccan_list_del_init(waitn);
705}
706#endif
707
708/*
709 * Sleep the current ractor's current thread until another ractor wakes us up or another thread calls our unblock function.
710 * The following ractor actions can cause this function to be called:
711 * Ractor#take (wait_taking)
712 * Ractor.yield (wait_yielding)
713 * Ractor.receive (wait_receiving)
714 * Ractor.select (can be a combination of the above wait states, depending on the states of the ractors passed to Ractor.select)
715 */
716static enum rb_ractor_wakeup_status
717ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status,
718 ractor_sleep_cleanup_function cf_func, void *cf_data)
719{
720 ASSERT_ractor_locking(cr);
721 enum rb_ractor_wakeup_status wakeup_status;
722 VM_ASSERT(GET_RACTOR() == cr);
723
724 VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none);
725 VM_ASSERT(wait_status != wait_none);
726 cur_th->ractor_waiting.wait_status = wait_status;
727 cur_th->ractor_waiting.wakeup_status = wakeup_none;
728
729 // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
730 // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
731
732 RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status));
733
734 while (cur_th->ractor_waiting.wakeup_status == wakeup_none) {
735 rb_ractor_sched_sleep(ec, cr, ractor_sleep_interrupt);
736 ractor_check_ints(ec, cr, cur_th, cf_func, cf_data);
737 }
738
739 cur_th->ractor_waiting.wait_status = wait_none;
740
741 wakeup_status = cur_th->ractor_waiting.wakeup_status;
742 cur_th->ractor_waiting.wakeup_status = wakeup_none;
743
744 RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status));
745
746 ASSERT_ractor_locking(cr);
747 return wakeup_status;
748}
749
750static enum rb_ractor_wakeup_status
751ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, enum rb_ractor_wait_status wait_status)
752{
753 return ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, 0, NULL);
754}
755
756// Ractor.receive
757
758static void
759ractor_recursive_receive_if(rb_thread_t *th)
760{
761 if (th->ractor_waiting.receiving_mutex && rb_mutex_owned_p(th->ractor_waiting.receiving_mutex)) {
762 rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
763 }
764}
765
766static VALUE
767ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
768{
769 struct rb_ractor_basket basket;
770 ractor_recursive_receive_if(rb_ec_thread_ptr(ec));
771 bool received = false;
772
773 RACTOR_LOCK_SELF(cr);
774 {
775 RUBY_DEBUG_LOG("rq->cnt:%d", rq->cnt);
776 received = ractor_queue_deq(cr, rq, &basket);
777 }
778 RACTOR_UNLOCK_SELF(cr);
779
780 if (!received) {
781 if (cr->sync.incoming_port_closed) {
782 rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
783 }
784 return Qundef;
785 }
786 else {
787 return ractor_basket_accept(&basket);
788 }
789}
790
791static void
792ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
793{
794 VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
795 rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
796 ractor_recursive_receive_if(cur_th);
797
798 RACTOR_LOCK(cr);
799 {
800 while (ractor_queue_empty_p(cr, rq) && !cr->sync.incoming_port_closed) {
801 ractor_sleep(ec, cr, cur_th, wait_receiving);
802 }
803 }
804 RACTOR_UNLOCK(cr);
805}
806
807static VALUE
808ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
809{
810 VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
811 VALUE v;
812 struct rb_ractor_queue *rq = &cr->sync.recv_queue;
813
814 while (UNDEF_P(v = ractor_try_receive(ec, cr, rq))) {
815 ractor_wait_receive(ec, cr, rq);
816 }
817
818 return v;
819}
820
821#if 0
822static void
823rq_dump(struct rb_ractor_queue *rq)
824{
825 bool bug = false;
826 for (int i=0; i<rq->cnt; i++) {
827 struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
828 fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type),
829 (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
830 if (basket_type_p(b, basket_type_reserved) bug = true;
831 }
832 if (bug) rb_bug("!!");
833}
834#endif
835
837 rb_ractor_t *cr;
838 rb_thread_t *th;
839 struct rb_ractor_queue *rq;
840 VALUE v;
841 int index;
842 bool success;
843};
844
845static void
846ractor_receive_if_lock(rb_thread_t *th)
847{
848 VALUE m = th->ractor_waiting.receiving_mutex;
849 if (m == Qfalse) {
850 m = th->ractor_waiting.receiving_mutex = rb_mutex_new();
851 }
852 rb_mutex_lock(m);
853}
854
855static VALUE
856receive_if_body(VALUE ptr)
857{
858 struct receive_block_data *data = (struct receive_block_data *)ptr;
859
860 ractor_receive_if_lock(data->th);
861 VALUE block_result = rb_yield(data->v);
862 rb_ractor_t *cr = data->cr;
863
864 RACTOR_LOCK_SELF(cr);
865 {
866 struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
867 VM_ASSERT(basket_type_p(b, basket_type_reserved));
868 data->rq->reserved_cnt--;
869
870 if (RTEST(block_result)) {
871 ractor_queue_delete(cr, data->rq, b);
872 ractor_queue_compact(cr, data->rq);
873 }
874 else {
875 b->type.e = basket_type_ref;
876 }
877 }
878 RACTOR_UNLOCK_SELF(cr);
879
880 data->success = true;
881
882 if (RTEST(block_result)) {
883 return data->v;
884 }
885 else {
886 return Qundef;
887 }
888}
889
890static VALUE
891receive_if_ensure(VALUE v)
892{
893 struct receive_block_data *data = (struct receive_block_data *)v;
894 rb_ractor_t *cr = data->cr;
895 rb_thread_t *cur_th = data->th;
896
897 if (!data->success) {
898 RACTOR_LOCK_SELF(cr);
899 {
900 struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
901 VM_ASSERT(basket_type_p(b, basket_type_reserved));
902 b->type.e = basket_type_deleted;
903 data->rq->reserved_cnt--;
904 }
905 RACTOR_UNLOCK_SELF(cr);
906 }
907
908 rb_mutex_unlock(cur_th->ractor_waiting.receiving_mutex);
909 return Qnil;
910}
911
912static VALUE
913ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
914{
915 if (!RTEST(b)) rb_raise(rb_eArgError, "no block given");
916
917 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
918 rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
919 unsigned int serial = (unsigned int)-1;
920 int index = 0;
921 struct rb_ractor_queue *rq = &cr->sync.recv_queue;
922
923 while (1) {
924 VALUE v = Qundef;
925
926 ractor_wait_receive(ec, cr, rq);
927
928 RACTOR_LOCK_SELF(cr);
929 {
930 if (serial != rq->serial) {
931 serial = rq->serial;
932 index = 0;
933 }
934
935 // check newer version
936 for (int i=index; i<rq->cnt; i++) {
937 if (!ractor_queue_skip_p(cr, rq, i)) {
938 struct rb_ractor_basket *b = ractor_queue_at(cr, rq, i);
939 v = ractor_basket_value(b);
940 b->type.e = basket_type_reserved;
941 rq->reserved_cnt++;
942 index = i;
943 break;
944 }
945 }
946 }
947 RACTOR_UNLOCK_SELF(cr);
948
949 if (!UNDEF_P(v)) {
950 struct receive_block_data data = {
951 .cr = cr,
952 .th = cur_th,
953 .rq = rq,
954 .v = v,
955 .index = index,
956 .success = false,
957 };
958
959 VALUE result = rb_ensure(receive_if_body, (VALUE)&data,
960 receive_if_ensure, (VALUE)&data);
961
962 if (!UNDEF_P(result)) return result;
963 index++;
964 }
965
966 RUBY_VM_CHECK_INTS(ec);
967 }
968}
969
970static void
971ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
972{
973 bool closed = false;
974
975 RACTOR_LOCK(r);
976 {
977 if (r->sync.incoming_port_closed) {
978 closed = true;
979 }
980 else {
981 ractor_queue_enq(r, &r->sync.recv_queue, b);
982 // wakeup any receiving thread in `r`
983 ractor_wakeup(r, NULL, wait_receiving, wakeup_by_send);
984 }
985 }
986 RACTOR_UNLOCK(r);
987
988 if (closed) {
989 rb_raise(rb_eRactorClosedError, "The incoming-port is already closed");
990 }
991}
992
993// Ractor#send
994
995static VALUE ractor_move(VALUE obj); // in this file
996static VALUE ractor_copy(VALUE obj); // in this file
997
998static void
999ractor_basket_prepare_contents(VALUE obj, VALUE move, volatile VALUE *pobj, enum rb_ractor_basket_type *ptype)
1000{
1001 VALUE v;
1002 enum rb_ractor_basket_type type;
1003
1004 if (rb_ractor_shareable_p(obj)) {
1005 type = basket_type_ref;
1006 v = obj;
1007 }
1008 else if (!RTEST(move)) {
1009 v = ractor_copy(obj);
1010 type = basket_type_copy;
1011 }
1012 else {
1013 type = basket_type_move;
1014 v = ractor_move(obj);
1015 }
1016
1017 *pobj = v;
1018 *ptype = type;
1019}
1020
1021static void
1022ractor_basket_fill_(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc)
1023{
1024 VM_ASSERT(cr == GET_RACTOR());
1025
1026 basket->sender = cr->pub.self;
1027 basket->sending_th = cur_th;
1028 basket->p.send.exception = exc;
1029 basket->p.send.v = obj;
1030}
1031
1032static void
1033ractor_basket_fill(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc)
1034{
1035 VALUE v;
1036 enum rb_ractor_basket_type type;
1037 ractor_basket_prepare_contents(obj, move, &v, &type);
1038 ractor_basket_fill_(cr, cur_th, basket, v, exc);
1039 basket->type.e = type;
1040}
1041
1042static void
1043ractor_basket_fill_will(rb_ractor_t *cr, rb_thread_t *cur_th, struct rb_ractor_basket *basket, VALUE obj, bool exc)
1044{
1045 ractor_basket_fill_(cr, cur_th, basket, obj, exc);
1046 basket->type.e = basket_type_will;
1047}
1048
1049static VALUE
1050ractor_send(rb_execution_context_t *ec, rb_ractor_t *recv_r, VALUE obj, VALUE move)
1051{
1052 struct rb_ractor_basket basket;
1053 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
1054 rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
1055 // TODO: Ractor local GC
1056 ractor_basket_fill(cr, cur_th, &basket, obj, move, false);
1057 ractor_send_basket(ec, recv_r, &basket);
1058 return recv_r->pub.self;
1059}
1060
1061// Ractor#take
1062
1063static bool
1064ractor_take_has_will(rb_ractor_t *r)
1065{
1066 ASSERT_ractor_locking(r);
1067
1068 return basket_type_p(&r->sync.will_basket, basket_type_will);
1069}
1070
1071static bool
1072ractor_take_will(rb_ractor_t *r, struct rb_ractor_basket *b)
1073{
1074 ASSERT_ractor_locking(r);
1075
1076 if (ractor_take_has_will(r)) {
1077 *b = r->sync.will_basket;
1078 r->sync.will_basket.type.e = basket_type_none;
1079 return true;
1080 }
1081 else {
1082 VM_ASSERT(basket_type_p(&r->sync.will_basket, basket_type_none));
1083 return false;
1084 }
1085}
1086
1087static bool
1088ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b)
1089{
1090 ASSERT_ractor_unlocking(r);
1091 bool taken;
1092
1093 RACTOR_LOCK(r);
1094 {
1095 taken = ractor_take_will(r, b);
1096 }
1097 RACTOR_UNLOCK(r);
1098
1099 return taken;
1100}
1101
1102static bool
1103ractor_register_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket,
1104 bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error)
1105{
1106 struct rb_ractor_basket b = {
1107 .type.e = basket_type_take_basket,
1108 .sender = cr->pub.self,
1109 .sending_th = cur_th,
1110 .p = {
1111 .take = {
1112 .basket = take_basket, // pointer to our stack value saved in ractor `r` queue
1113 .config = config,
1114 },
1115 },
1116 };
1117 bool closed = false;
1118
1119 RACTOR_LOCK(r);
1120 {
1121 if (is_take && ractor_take_will(r, take_basket)) {
1122 RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r));
1123 }
1124 else if (!is_take && ractor_take_has_will(r)) {
1125 RUBY_DEBUG_LOG("has_will");
1126 VM_ASSERT(config != NULL);
1127 config->closed = true;
1128 }
1129 else if (r->sync.outgoing_port_closed) {
1130 closed = true;
1131 }
1132 else {
1133 RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r));
1134 ractor_queue_enq(r, &r->sync.takers_queue, &b);
1135
1136 if (basket_none_p(take_basket)) {
1137 // wakeup any thread in `r` that has yielded, if there is any.
1138 ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take);
1139 }
1140 }
1141 }
1142 RACTOR_UNLOCK(r);
1143
1144 if (closed) {
1145 if (!ignore_error) rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
1146 return false;
1147 }
1148 else {
1149 return true;
1150 }
1151}
1152
1153static bool
1154ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket)
1155{
1156 struct rb_ractor_queue *ts = &r->sync.takers_queue;
1157 bool deleted = false;
1158
1159 RACTOR_LOCK(r);
1160 {
1161 if (r->sync.outgoing_port_closed) {
1162 // ok
1163 }
1164 else {
1165 for (int i=0; i<ts->cnt; i++) {
1166 struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
1167 if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == take_basket) {
1168 ractor_queue_delete(r, ts, b);
1169 deleted = true;
1170 }
1171 }
1172 if (deleted) {
1173 ractor_queue_compact(r, ts);
1174 }
1175 }
1176 }
1177 RACTOR_UNLOCK(r);
1178
1179 return deleted;
1180}
1181
1182static VALUE
1183ractor_try_take(rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *recv_r, struct rb_ractor_basket *take_basket)
1184{
1185 bool taken;
1186
1187 RACTOR_LOCK_SELF(cr);
1188 {
1189 // If it hasn't yielded yet or is currently in the process of yielding, sleep more
1190 if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
1191 taken = false;
1192 }
1193 else {
1194 taken = true; // basket type might be, for ex, basket_type_copy if value was copied during yield
1195 }
1196 }
1197 RACTOR_UNLOCK_SELF(cr);
1198
1199 if (taken) {
1200 RUBY_DEBUG_LOG("taken");
1201 if (basket_type_p(take_basket, basket_type_deleted)) {
1202 VM_ASSERT(recv_r->sync.outgoing_port_closed);
1203 rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
1204 }
1205 return ractor_basket_accept(take_basket);
1206 }
1207 else {
1208 RUBY_DEBUG_LOG("not taken");
1209 return Qundef;
1210 }
1211}
1212
1213
1214#if VM_CHECK_MODE > 0
1215static bool
1216ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *tb)
1217{
1218 bool ret = false;
1219 struct rb_ractor_queue *ts = &r->sync.takers_queue;
1220
1221 RACTOR_LOCK(r);
1222 {
1223 for (int i=0; i<ts->cnt; i++) {
1224 struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
1225 if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == tb) {
1226 ret = true;
1227 break;
1228 }
1229 }
1230 }
1231 RACTOR_UNLOCK(r);
1232
1233 return ret;
1234}
1235#endif
1236
1237// cleanup function, cr is unlocked
1238static void
1239ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb)
1240{
1241 retry:
1242 if (basket_none_p(tb)) { // not yielded yet
1243 if (!ractor_deregister_take(r, tb)) {
1244 // not in r's takers queue
1245 rb_thread_sleep(0);
1246 goto retry;
1247 }
1248 }
1249 else {
1250 VM_ASSERT(!ractor_check_specific_take_basket_lock(r, tb));
1251 }
1252}
1253
1255 rb_ractor_t *r;
1256 struct rb_ractor_basket *tb;
1257};
1258
1259static void
1260ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr)
1261{
1262 struct take_wait_take_cleanup_data *data = (struct take_wait_take_cleanup_data *)ptr;
1263 ractor_take_cleanup(cr, data->r, data->tb);
1264}
1265
1266static void
1267ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_thread_t *cur_th, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
1268{
1269 struct take_wait_take_cleanup_data data = {
1270 .r = r,
1271 .tb = take_basket,
1272 };
1273
1274 RACTOR_LOCK_SELF(cr);
1275 {
1276 if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
1277 ractor_sleep_with_cleanup(ec, cr, cur_th, wait_taking, ractor_wait_take_cleanup, &data);
1278 }
1279 }
1280 RACTOR_UNLOCK_SELF(cr);
1281}
1282
1283static VALUE
1284ractor_take(rb_execution_context_t *ec, rb_ractor_t *recv_r)
1285{
1286 RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(recv_r));
1287 VALUE v;
1288 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
1289 rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
1290
1291 struct rb_ractor_basket take_basket = {
1292 .type.e = basket_type_none,
1293 .sender = 0,
1294 };
1295
1296 ractor_register_take(cr, cur_th, recv_r, &take_basket, true, NULL, false);
1297
1298 while (UNDEF_P(v = ractor_try_take(cr, cur_th, recv_r, &take_basket))) {
1299 ractor_wait_take(ec, cr, cur_th, recv_r, &take_basket);
1300 }
1301
1302 VM_ASSERT(!basket_none_p(&take_basket)); // might be, for ex, basket_type_copy
1303 VM_ASSERT(!ractor_check_specific_take_basket_lock(recv_r, &take_basket));
1304
1305 return v;
1306}
1307
1308// Ractor.yield
1309
1310static bool
1311ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs)
1312{
1313 ASSERT_ractor_locking(cr);
1314
1315 for (int i=0; i<rs->cnt; i++) {
1316 struct rb_ractor_basket *b = ractor_queue_at(cr, rs, i);
1317 if (basket_type_p(b, basket_type_take_basket) &&
1318 basket_none_p(b->p.take.basket)) {
1319 return true;
1320 }
1321 }
1322
1323 return false;
1324}
1325
1326// Find another ractor that is taking from this ractor, so we can yield to it
1327static bool
1328ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b)
1329{
1330 ASSERT_ractor_unlocking(cr);
1331 struct rb_ractor_basket *first_tb = NULL;
1332 bool found = false;
1333
1334 RACTOR_LOCK_SELF(cr);
1335 {
1336 while (ractor_queue_deq(cr, rs, b)) {
1337 if (basket_type_p(b, basket_type_take_basket)) { // some other ractor is taking
1338 struct rb_ractor_basket *tb = b->p.take.basket;
1339
1340 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
1341 found = true; // payload basket is now "yielding" type
1342 break;
1343 }
1344 else {
1345 ractor_queue_enq(cr, rs, b);
1346 if (first_tb == NULL) first_tb = tb;
1347 struct rb_ractor_basket *head = ractor_queue_head(cr, rs);
1348 VM_ASSERT(head != NULL);
1349 if (basket_type_p(head, basket_type_take_basket) && head->p.take.basket == first_tb) {
1350 break; // loop detected
1351 }
1352 }
1353 }
1354 else {
1355 VM_ASSERT(basket_none_p(b));
1356 }
1357 }
1358
1359 if (found && b->p.take.config && !b->p.take.config->oneshot) {
1360 ractor_queue_enq(cr, rs, b);
1361 }
1362 }
1363 RACTOR_UNLOCK_SELF(cr);
1364
1365 return found;
1366}
1367
1368// Try yielding to a taking ractor
1369static bool
1370ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, volatile VALUE obj, VALUE move, bool exc, bool is_will)
1371{
1372 // Don't lock yielding ractor at same time as taking ractor. This could deadlock due to timing
1373 // issue because we don't have a lock hierarchy.
1374 ASSERT_ractor_unlocking(cr);
1375 rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
1376
1377 struct rb_ractor_basket b;
1378
1379 if (ractor_deq_take_basket(cr, ts, &b)) { // deq a take basket from takers queue of `cr` into `b`
1380 VM_ASSERT(basket_type_p(&b, basket_type_take_basket));
1381 VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding));
1382
1383 rb_ractor_t *tr = RACTOR_PTR(b.sender); // taking ractor
1384 rb_thread_t *tr_th = b.sending_th; // taking thread
1385 struct rb_ractor_basket *tb = b.p.take.basket; // payload basket
1386 enum rb_ractor_basket_type type;
1387
1388 RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr));
1389
1390 if (is_will) {
1391 type = basket_type_will; // last message
1392 }
1393 else {
1394 enum ruby_tag_type state;
1395
1396 // begin
1397 EC_PUSH_TAG(ec);
1398 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1399 // TODO: Ractor local GC
1400 ractor_basket_prepare_contents(obj, move, &obj, &type);
1401 }
1402 EC_POP_TAG();
1403 // rescue ractor copy/move error, then re-raise
1404 if (state) {
1405 RACTOR_LOCK_SELF(cr);
1406 {
1407 b.p.take.basket->type.e = basket_type_none;
1408 ractor_queue_enq(cr, ts, &b);
1409 }
1410 RACTOR_UNLOCK_SELF(cr);
1411 EC_JUMP_TAG(ec, state);
1412 }
1413 }
1414
1415 RACTOR_LOCK(tr);
1416 {
1417 VM_ASSERT(basket_type_p(tb, basket_type_yielding));
1418 // fill atomic
1419 RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr));
1420 ractor_basket_fill_(cr, cur_th, tb, obj, exc); // fill the take basket payload
1421 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) {
1422 rb_bug("unreachable");
1423 }
1424 ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_yield);
1425 }
1426 RACTOR_UNLOCK(tr);
1427
1428 return true;
1429 }
1430 else if (cr->sync.outgoing_port_closed) {
1431 rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
1432 }
1433 else {
1434 RUBY_DEBUG_LOG("no take basket");
1435 return false;
1436 }
1437}
1438
1439static void
1440ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts)
1441{
1442 rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
1443 RACTOR_LOCK_SELF(cr);
1444 {
1445 while (!ractor_check_take_basket(cr, ts) && !cr->sync.outgoing_port_closed) {
1446 ractor_sleep(ec, cr, cur_th, wait_yielding);
1447 }
1448 }
1449 RACTOR_UNLOCK_SELF(cr);
1450}
1451
1452// In order to yield, we wait until our takers queue has at least one element. Then, we wakeup a taker.
1453static VALUE
1454ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move)
1455{
1456 struct rb_ractor_queue *ts = &cr->sync.takers_queue;
1457
1458 while (!ractor_try_yield(ec, cr, ts, obj, move, false, false)) {
1459 ractor_wait_yield(ec, cr, ts);
1460 }
1461
1462 return Qnil;
1463}
1464
1465// Ractor::Selector
1466
1468 rb_ractor_t *r;
1469 struct rb_ractor_basket take_basket;
1470 st_table *take_ractors; // rb_ractor_t * => (struct rb_ractor_selector_take_config *)
1471};
1472
1473static int
1474ractor_selector_mark_ractors_i(st_data_t key, st_data_t value, st_data_t data)
1475{
1476 const rb_ractor_t *r = (rb_ractor_t *)key;
1477 rb_gc_mark(r->pub.self);
1478 return ST_CONTINUE;
1479}
1480
1481static void
1482ractor_selector_mark(void *ptr)
1483{
1484 struct rb_ractor_selector *s = ptr;
1485
1486 if (s->take_ractors) {
1487 st_foreach(s->take_ractors, ractor_selector_mark_ractors_i, 0);
1488 }
1489
1490 switch (s->take_basket.type.e) {
1491 case basket_type_ref:
1492 case basket_type_copy:
1493 case basket_type_move:
1494 case basket_type_will:
1495 rb_gc_mark(s->take_basket.sender);
1496 rb_gc_mark(s->take_basket.p.send.v);
1497 break;
1498 default:
1499 break;
1500 }
1501}
1502
1503static int
1504ractor_selector_release_i(st_data_t key, st_data_t val, st_data_t data)
1505{
1506 struct rb_ractor_selector *s = (struct rb_ractor_selector *)data;
1508
1509 if (!config->closed) {
1510 ractor_deregister_take((rb_ractor_t *)key, &s->take_basket);
1511 }
1512 free(config);
1513 return ST_CONTINUE;
1514}
1515
1516static void
1517ractor_selector_free(void *ptr)
1518{
1519 struct rb_ractor_selector *s = ptr;
1520 st_foreach(s->take_ractors, ractor_selector_release_i, (st_data_t)s);
1521 st_free_table(s->take_ractors);
1522 ruby_xfree(ptr);
1523}
1524
1525static size_t
1526ractor_selector_memsize(const void *ptr)
1527{
1528 const struct rb_ractor_selector *s = ptr;
1529 return sizeof(struct rb_ractor_selector) +
1530 st_memsize(s->take_ractors) +
1531 s->take_ractors->num_entries * sizeof(struct rb_ractor_selector_take_config);
1532}
1533
1534static const rb_data_type_t ractor_selector_data_type = {
1535 "ractor/selector",
1536 {
1537 ractor_selector_mark,
1538 ractor_selector_free,
1539 ractor_selector_memsize,
1540 NULL, // update
1541 },
1542 0, 0, RUBY_TYPED_FREE_IMMEDIATELY,
1543};
1544
1545static struct rb_ractor_selector *
1546RACTOR_SELECTOR_PTR(VALUE selv)
1547{
1548 VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type));
1549
1550 return (struct rb_ractor_selector *)DATA_PTR(selv);
1551}
1552
1553// Ractor::Selector.new
1554
1555static VALUE
1556ractor_selector_create(VALUE klass)
1557{
1558 struct rb_ractor_selector *s;
1559 VALUE selv = TypedData_Make_Struct(klass, struct rb_ractor_selector, &ractor_selector_data_type, s);
1560 s->take_basket.type.e = basket_type_reserved;
1561 s->take_ractors = st_init_numtable(); // ractor (ptr) -> take_config
1562 return selv;
1563}
1564
1565// Ractor::Selector#add(r)
1566
1567/*
1568 * call-seq:
1569 * add(ractor) -> ractor
1570 *
1571 * Adds _ractor_ to +self+. Raises an exception if _ractor_ is already added.
1572 * Returns _ractor_.
1573 */
1574static VALUE
1575ractor_selector_add(VALUE selv, VALUE rv)
1576{
1577 if (!rb_ractor_p(rv)) {
1578 rb_raise(rb_eArgError, "Not a ractor object");
1579 }
1580
1581 rb_ractor_t *r = RACTOR_PTR(rv);
1582 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1583
1584 if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
1585 rb_raise(rb_eArgError, "already added");
1586 }
1587
1588 struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config));
1589 VM_ASSERT(config != NULL);
1590 config->closed = false;
1591 config->oneshot = false;
1592
1593 if (ractor_register_take(GET_RACTOR(), GET_THREAD(), r, &s->take_basket, false, config, true)) {
1594 st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config);
1595 }
1596
1597 return rv;
1598}
1599
1600// Ractor::Selector#remove(r)
1601
1602/* call-seq:
1603 * remove(ractor) -> ractor
1604 *
1605 * Removes _ractor_ from +self+. Raises an exception if _ractor_ is not added.
1606 * Returns the removed _ractor_.
1607 */
1608static VALUE
1609ractor_selector_remove(VALUE selv, VALUE rv)
1610{
1611 if (!rb_ractor_p(rv)) {
1612 rb_raise(rb_eArgError, "Not a ractor object");
1613 }
1614
1615 rb_ractor_t *r = RACTOR_PTR(rv);
1616 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1617
1618 RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r));
1619
1620 if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
1621 rb_raise(rb_eArgError, "not added yet");
1622 }
1623
1624 ractor_deregister_take(r, &s->take_basket);
1625 struct rb_ractor_selector_take_config *config;
1626 st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config);
1627 free(config);
1628
1629 return rv;
1630}
1631
1632// Ractor::Selector#clear
1633
1635 VALUE selv;
1637};
1638
1639static int
1640ractor_selector_clear_i(st_data_t key, st_data_t val, st_data_t data)
1641{
1642 VALUE selv = (VALUE)data;
1643 rb_ractor_t *r = (rb_ractor_t *)key;
1644 ractor_selector_remove(selv, r->pub.self);
1645 return ST_CONTINUE;
1646}
1647
1648/*
1649 * call-seq:
1650 * clear -> self
1651 *
1652 * Removes all ractors from +self+. Raises +self+.
1653 */
1654static VALUE
1655ractor_selector_clear(VALUE selv)
1656{
1657 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1658
1659 st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)selv);
1660 st_clear(s->take_ractors);
1661 return selv;
1662}
1663
1664/*
1665 * call-seq:
1666 * empty? -> true or false
1667 *
1668 * Returns +true+ if no ractor is added.
1669 */
1670static VALUE
1671ractor_selector_empty_p(VALUE selv)
1672{
1673 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1674 return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse;
1675}
1676
1677static int
1678ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat)
1679{
1680 rb_ractor_t *r = (rb_ractor_t *)key;
1681 struct rb_ractor_basket *tb = (struct rb_ractor_basket *)dat;
1682 int ret;
1683
1684 if (!basket_none_p(tb)) {
1685 RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb->type.e));
1686 return ST_STOP;
1687 }
1688
1689 RACTOR_LOCK(r);
1690 {
1691 if (basket_type_p(&r->sync.will_basket, basket_type_will)) {
1692 RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r));
1693
1694 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_will) == basket_type_none) {
1695 ractor_take_will(r, tb);
1696 ret = ST_STOP;
1697 }
1698 else {
1699 RUBY_DEBUG_LOG("has will, but already taken (%s)", basket_type_name(tb->type.e));
1700 ret = ST_CONTINUE;
1701 }
1702 }
1703 else if (r->sync.outgoing_port_closed) {
1704 RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r));
1705
1706 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_deleted) == basket_type_none) {
1707 tb->sender = r->pub.self;
1708 ret = ST_STOP;
1709 }
1710 else {
1711 RUBY_DEBUG_LOG("closed, but already taken (%s)", basket_type_name(tb->type.e));
1712 ret = ST_CONTINUE;
1713 }
1714 }
1715 else {
1716 RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r));
1717 ractor_wakeup(r, NULL, wait_yielding, wakeup_by_take);
1718 ret = ST_CONTINUE;
1719 }
1720 }
1721 RACTOR_UNLOCK(r);
1722
1723 return ret;
1724}
1725
1726// Ractor::Selector#wait
1727
1728// cleanup function, cr is unlocked
1729static void
1730ractor_selector_wait_cleanup(rb_ractor_t *cr, void *ptr)
1731{
1732 struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr;
1733
1734 RACTOR_LOCK_SELF(cr);
1735 {
1736 while (basket_type_p(tb, basket_type_yielding)) {
1737 RACTOR_UNLOCK_SELF(cr);
1738 {
1739 rb_thread_sleep(0);
1740 }
1741 RACTOR_LOCK_SELF(cr);
1742 }
1743 // if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately.
1744 tb->type.e = basket_type_reserved;
1745 }
1746 RACTOR_UNLOCK_SELF(cr);
1747}
1748
1749/* :nodoc: */
1750static VALUE
1751ractor_selector__wait(VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move)
1752{
1753 rb_execution_context_t *ec = GET_EC();
1754 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1755 struct rb_ractor_basket *tb = &s->take_basket;
1756 struct rb_ractor_basket taken_basket;
1757 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
1758 rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
1759 bool do_receive = !!RTEST(do_receivev);
1760 bool do_yield = !!RTEST(do_yieldv);
1761 VALUE ret_v, ret_r;
1762 enum rb_ractor_wait_status wait_status;
1763 struct rb_ractor_queue *rq = &cr->sync.recv_queue;
1764 struct rb_ractor_queue *ts = &cr->sync.takers_queue;
1765
1766 RUBY_DEBUG_LOG("start");
1767
1768 retry:
1769 RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries);
1770
1771 // setup wait_status
1772 wait_status = wait_none;
1773 if (s->take_ractors->num_entries > 0) wait_status |= wait_taking;
1774 if (do_receive) wait_status |= wait_receiving;
1775 if (do_yield) wait_status |= wait_yielding;
1776
1777 RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status));
1778
1779 if (wait_status == wait_none) {
1780 rb_raise(rb_eRactorError, "no taking ractors");
1781 }
1782
1783 // check recv_queue
1784 if (do_receive && !UNDEF_P(ret_v = ractor_try_receive(ec, cr, rq))) {
1785 ret_r = ID2SYM(rb_intern("receive"));
1786 goto success;
1787 }
1788
1789 // check takers
1790 if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) {
1791 ret_v = Qnil;
1792 ret_r = ID2SYM(rb_intern("yield"));
1793 goto success;
1794 }
1795
1796 // check take_basket
1797 VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved));
1798 s->take_basket.type.e = basket_type_none;
1799 // kick all take target ractors
1800 st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb);
1801
1802 RACTOR_LOCK_SELF(cr);
1803 {
1804 retry_waiting:
1805 while (1) {
1806 if (!basket_none_p(tb)) {
1807 RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e),
1808 tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0);
1809 break;
1810 }
1811 if (do_receive && !ractor_queue_empty_p(cr, rq)) {
1812 RUBY_DEBUG_LOG("can receive (%d)", rq->cnt);
1813 break;
1814 }
1815 if (do_yield && ractor_check_take_basket(cr, ts)) {
1816 RUBY_DEBUG_LOG("can yield");
1817 break;
1818 }
1819
1820 ractor_sleep_with_cleanup(ec, cr, cur_th, wait_status, ractor_selector_wait_cleanup, tb);
1821 }
1822
1823 taken_basket = *tb;
1824
1825 // ensure
1826 // tb->type.e = basket_type_reserved # do it atomic in the following code
1827 if (taken_basket.type.e == basket_type_yielding ||
1828 RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) {
1829
1830 if (basket_type_p(tb, basket_type_yielding)) {
1831 RACTOR_UNLOCK_SELF(cr);
1832 {
1833 rb_thread_sleep(0);
1834 }
1835 RACTOR_LOCK_SELF(cr);
1836 }
1837 goto retry_waiting;
1838 }
1839 }
1840 RACTOR_UNLOCK_SELF(cr);
1841
1842 // check the taken result
1843 switch (taken_basket.type.e) {
1844 case basket_type_none:
1845 VM_ASSERT(do_receive || do_yield);
1846 goto retry;
1847 case basket_type_yielding:
1848 rb_bug("unreachable");
1849 case basket_type_deleted: {
1850 ractor_selector_remove(selv, taken_basket.sender);
1851
1852 rb_ractor_t *r = RACTOR_PTR(taken_basket.sender);
1853 if (ractor_take_will_lock(r, &taken_basket)) {
1854 RUBY_DEBUG_LOG("has_will");
1855 }
1856 else {
1857 RUBY_DEBUG_LOG("no will");
1858 // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
1859 // remove and retry wait
1860 goto retry;
1861 }
1862 break;
1863 }
1864 case basket_type_will:
1865 // no more messages
1866 ractor_selector_remove(selv, taken_basket.sender);
1867 break;
1868 default:
1869 break;
1870 }
1871
1872 RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e));
1873
1874 ret_v = ractor_basket_accept(&taken_basket);
1875 ret_r = taken_basket.sender;
1876 success:
1877 return rb_ary_new_from_args(2, ret_r, ret_v);
1878}
1879
1880/*
1881 * call-seq:
1882 * wait(receive: false, yield_value: undef, move: false) -> [ractor, value]
1883 *
1884 * Waits until any ractor in _selector_ can be active.
1885 */
1886static VALUE
1887ractor_selector_wait(int argc, VALUE *argv, VALUE selector)
1888{
1889 VALUE options;
1890 ID keywords[3];
1891 VALUE values[3];
1892
1893 keywords[0] = rb_intern("receive");
1894 keywords[1] = rb_intern("yield_value");
1895 keywords[2] = rb_intern("move");
1896
1897 rb_scan_args(argc, argv, "0:", &options);
1898 rb_get_kwargs(options, keywords, 0, numberof(values), values);
1899 return ractor_selector__wait(selector,
1900 values[0] == Qundef ? Qfalse : RTEST(values[0]),
1901 values[1] != Qundef, values[1], values[2]);
1902}
1903
1904static VALUE
1905ractor_selector_new(int argc, VALUE *ractors, VALUE klass)
1906{
1907 VALUE selector = ractor_selector_create(klass);
1908
1909 for (int i=0; i<argc; i++) {
1910 ractor_selector_add(selector, ractors[i]);
1911 }
1912
1913 return selector;
1914}
1915
1916static VALUE
1917ractor_select_internal(rb_execution_context_t *ec, VALUE self, VALUE ractors, VALUE do_receive, VALUE do_yield, VALUE yield_value, VALUE move)
1918{
1919 VALUE selector = ractor_selector_new(RARRAY_LENINT(ractors), (VALUE *)RARRAY_CONST_PTR(ractors), rb_cRactorSelector);
1920 VALUE result;
1921 int state;
1922
1923 EC_PUSH_TAG(ec);
1924 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1925 result = ractor_selector__wait(selector, do_receive, do_yield, yield_value, move);
1926 }
1927 EC_POP_TAG();
1928 if (state != TAG_NONE) {
1929 // ensure
1930 ractor_selector_clear(selector);
1931
1932 // jump
1933 EC_JUMP_TAG(ec, state);
1934 }
1935
1936 RB_GC_GUARD(ractors);
1937 return result;
1938}
1939
1940// Ractor#close_incoming
1941
1942static VALUE
1943ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
1944{
1945 VALUE prev;
1946 rb_thread_t *r_th = NULL;
1947 if (r == rb_ec_ractor_ptr(ec)) {
1948 r_th = rb_ec_thread_ptr(ec);
1949 }
1950
1951 RACTOR_LOCK(r);
1952 {
1953 if (!r->sync.incoming_port_closed) {
1954 prev = Qfalse;
1955 r->sync.incoming_port_closed = true;
1956 if (ractor_wakeup(r, r_th, wait_receiving, wakeup_by_close)) {
1957 VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue));
1958 RUBY_DEBUG_LOG("cancel receiving");
1959 }
1960 }
1961 else {
1962 prev = Qtrue;
1963 }
1964 }
1965 RACTOR_UNLOCK(r);
1966 return prev;
1967}
1968
1969// Ractor#close_outgoing
1970
1971static VALUE
1972ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
1973{
1974 VALUE prev;
1975
1976 RACTOR_LOCK(r);
1977 {
1978 struct rb_ractor_queue *ts = &r->sync.takers_queue;
1979 rb_ractor_t *tr;
1980 struct rb_ractor_basket b;
1981
1982 if (!r->sync.outgoing_port_closed) {
1983 prev = Qfalse;
1984 r->sync.outgoing_port_closed = true;
1985 }
1986 else {
1987 VM_ASSERT(ractor_queue_empty_p(r, ts));
1988 prev = Qtrue;
1989 }
1990
1991 // wakeup all taking ractors
1992 while (ractor_queue_deq(r, ts, &b)) {
1993 if (basket_type_p(&b, basket_type_take_basket)) {
1994 tr = RACTOR_PTR(b.sender);
1995 rb_thread_t *tr_th = b.sending_th;
1996 struct rb_ractor_basket *tb = b.p.take.basket;
1997
1998 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
1999 b.p.take.basket->sender = r->pub.self;
2000 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, basket_type_deleted) != basket_type_yielding) {
2001 rb_bug("unreachable");
2002 }
2003 RUBY_DEBUG_LOG("set delete for r:%u", rb_ractor_id(RACTOR_PTR(b.sender)));
2004 }
2005
2006 if (b.p.take.config) {
2007 b.p.take.config->closed = true;
2008 }
2009
2010 // TODO: deadlock-able?
2011 RACTOR_LOCK(tr);
2012 {
2013 ractor_wakeup(tr, tr_th, wait_taking, wakeup_by_close);
2014 }
2015 RACTOR_UNLOCK(tr);
2016 }
2017 }
2018
2019 // raising yielding Ractor
2020 ractor_wakeup(r, NULL, wait_yielding, wakeup_by_close);
2021
2022 VM_ASSERT(ractor_queue_empty_p(r, ts));
2023 }
2024 RACTOR_UNLOCK(r);
2025 return prev;
2026}
2027
2028// creation/termination
2029
2030static uint32_t
2031ractor_next_id(void)
2032{
2033 uint32_t id;
2034
2035 id = (uint32_t)(RUBY_ATOMIC_FETCH_ADD(ractor_last_id, 1) + 1);
2036
2037 return id;
2038}
2039
2040static void
2041vm_insert_ractor0(rb_vm_t *vm, rb_ractor_t *r, bool single_ractor_mode)
2042{
2043 RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r->pub.id, vm->ractor.cnt);
2044 VM_ASSERT(single_ractor_mode || RB_VM_LOCKED_P());
2045
2046 ccan_list_add_tail(&vm->ractor.set, &r->vmlr_node);
2047 vm->ractor.cnt++;
2048
2049 if (r->newobj_cache) {
2050 VM_ASSERT(r == ruby_single_main_ractor);
2051 }
2052 else {
2053 r->newobj_cache = rb_gc_ractor_cache_alloc(r);
2054 }
2055}
2056
2057static void
2058cancel_single_ractor_mode(void)
2059{
2060 // enable multi-ractor mode
2061 RUBY_DEBUG_LOG("enable multi-ractor mode");
2062
2063 ruby_single_main_ractor = NULL;
2064 rb_funcall(rb_cRactor, rb_intern("_activated"), 0);
2065}
2066
2067static void
2068vm_insert_ractor(rb_vm_t *vm, rb_ractor_t *r)
2069{
2070 VM_ASSERT(ractor_status_p(r, ractor_created));
2071
2072 if (rb_multi_ractor_p()) {
2073 RB_VM_LOCK();
2074 {
2075 vm_insert_ractor0(vm, r, false);
2076 vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__);
2077 }
2078 RB_VM_UNLOCK();
2079 }
2080 else {
2081 if (vm->ractor.cnt == 0) {
2082 // main ractor
2083 vm_insert_ractor0(vm, r, true);
2084 ractor_status_set(r, ractor_blocking);
2085 ractor_status_set(r, ractor_running);
2086 }
2087 else {
2088 cancel_single_ractor_mode();
2089 vm_insert_ractor0(vm, r, true);
2090 vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__);
2091 }
2092 }
2093}
2094
2095static void
2096vm_remove_ractor(rb_vm_t *vm, rb_ractor_t *cr)
2097{
2098 VM_ASSERT(ractor_status_p(cr, ractor_running));
2099 VM_ASSERT(vm->ractor.cnt > 1);
2100 VM_ASSERT(cr->threads.cnt == 1);
2101
2102 RB_VM_LOCK();
2103 {
2104 RUBY_DEBUG_LOG("ractor.cnt:%u-- terminate_waiting:%d",
2105 vm->ractor.cnt, vm->ractor.sync.terminate_waiting);
2106
2107 VM_ASSERT(vm->ractor.cnt > 0);
2108 ccan_list_del(&cr->vmlr_node);
2109
2110 if (vm->ractor.cnt <= 2 && vm->ractor.sync.terminate_waiting) {
2111 rb_native_cond_signal(&vm->ractor.sync.terminate_cond);
2112 }
2113 vm->ractor.cnt--;
2114
2115 rb_gc_ractor_cache_free(cr->newobj_cache);
2116 cr->newobj_cache = NULL;
2117
2118 ractor_status_set(cr, ractor_terminated);
2119 }
2120 RB_VM_UNLOCK();
2121}
2122
2123static VALUE
2124ractor_alloc(VALUE klass)
2125{
2126 rb_ractor_t *r;
2127 VALUE rv = TypedData_Make_Struct(klass, rb_ractor_t, &ractor_data_type, r);
2129 r->pub.self = rv;
2130 VM_ASSERT(ractor_status_p(r, ractor_created));
2131 return rv;
2132}
2133
2135rb_ractor_main_alloc(void)
2136{
2137 rb_ractor_t *r = ruby_mimcalloc(1, sizeof(rb_ractor_t));
2138 if (r == NULL) {
2139 fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n");
2140 exit(EXIT_FAILURE);
2141 }
2142 r->pub.id = ++ractor_last_id;
2143 r->loc = Qnil;
2144 r->name = Qnil;
2145 r->pub.self = Qnil;
2146 r->newobj_cache = rb_gc_ractor_cache_alloc(r);
2147 ruby_single_main_ractor = r;
2148
2149 return r;
2150}
2151
2152#if defined(HAVE_WORKING_FORK)
2153// Set up the main Ractor for the VM after fork.
2154// Puts us in "single Ractor mode"
2155void
2156rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th)
2157{
2158 // initialize as a main ractor
2159 vm->ractor.cnt = 0;
2160 vm->ractor.blocking_cnt = 0;
2161 ruby_single_main_ractor = th->ractor;
2162 th->ractor->status_ = ractor_created;
2163
2164 rb_ractor_living_threads_init(th->ractor);
2165 rb_ractor_living_threads_insert(th->ractor, th);
2166
2167 VM_ASSERT(vm->ractor.blocking_cnt == 0);
2168 VM_ASSERT(vm->ractor.cnt == 1);
2169}
2170
2171void
2172rb_ractor_terminate_atfork(rb_vm_t *vm, rb_ractor_t *r)
2173{
2174 rb_gc_ractor_cache_free(r->newobj_cache);
2175 r->newobj_cache = NULL;
2176 r->status_ = ractor_terminated;
2177 r->sync.outgoing_port_closed = true;
2178 r->sync.incoming_port_closed = true;
2179 r->sync.will_basket.type.e = basket_type_none;
2180}
2181#endif
2182
2183void rb_thread_sched_init(struct rb_thread_sched *, bool atfork);
2184
2185void
2186rb_ractor_living_threads_init(rb_ractor_t *r)
2187{
2188 ccan_list_head_init(&r->threads.set);
2189 r->threads.cnt = 0;
2190 r->threads.blocking_cnt = 0;
2191}
2192
2193static void
2194ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
2195{
2196 ractor_queue_setup(&r->sync.recv_queue);
2197 ractor_queue_setup(&r->sync.takers_queue);
2198 rb_native_mutex_initialize(&r->sync.lock);
2199 rb_native_cond_initialize(&r->barrier_wait_cond);
2200
2201#ifdef RUBY_THREAD_WIN32_H
2202 rb_native_cond_initialize(&r->barrier_wait_cond);
2203#endif
2204 ccan_list_head_init(&r->sync.wait.waiting_threads);
2205
2206 // thread management
2207 rb_thread_sched_init(&r->threads.sched, false);
2208 rb_ractor_living_threads_init(r);
2209
2210 // naming
2211 if (!NIL_P(name)) {
2212 rb_encoding *enc;
2213 StringValueCStr(name);
2214 enc = rb_enc_get(name);
2215 if (!rb_enc_asciicompat(enc)) {
2216 rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
2217 rb_enc_name(enc));
2218 }
2219 name = rb_str_new_frozen(name);
2220 }
2221 r->name = name;
2222 r->loc = loc;
2223}
2224
2225void
2226rb_ractor_main_setup(rb_vm_t *vm, rb_ractor_t *r, rb_thread_t *th)
2227{
2228 r->pub.self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r);
2229 FL_SET_RAW(r->pub.self, RUBY_FL_SHAREABLE);
2230 ractor_init(r, Qnil, Qnil);
2231 r->threads.main = th;
2232 rb_ractor_living_threads_insert(r, th);
2233}
2234
2235static VALUE
2236ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VALUE args, VALUE block)
2237{
2238 VALUE rv = ractor_alloc(self);
2239 rb_ractor_t *r = RACTOR_PTR(rv);
2240 ractor_init(r, name, loc);
2241
2242 // can block here
2243 r->pub.id = ractor_next_id();
2244 RUBY_DEBUG_LOG("r:%u", r->pub.id);
2245
2246 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2247 r->verbose = cr->verbose;
2248 r->debug = cr->debug;
2249
2250 rb_yjit_before_ractor_spawn();
2251 rb_thread_create_ractor(r, args, block);
2252
2253 RB_GC_GUARD(rv);
2254 return rv;
2255}
2256
2257static VALUE
2258ractor_create_func(VALUE klass, VALUE loc, VALUE name, VALUE args, rb_block_call_func_t func)
2259{
2260 VALUE block = rb_proc_new(func, Qnil);
2261 return ractor_create(rb_current_ec_noinline(), klass, loc, name, args, block);
2262}
2263
2264static void
2265ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc)
2266{
2267 if (cr->sync.outgoing_port_closed) {
2268 return;
2269 }
2270
2271 ASSERT_ractor_unlocking(cr);
2272
2273 struct rb_ractor_queue *ts = &cr->sync.takers_queue;
2274 rb_thread_t *cur_th = rb_ec_thread_ptr(ec);
2275
2276 retry:
2277 if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) {
2278 // OK.
2279 }
2280 else {
2281 bool retry = false;
2282 RACTOR_LOCK(cr);
2283 {
2284 if (!ractor_check_take_basket(cr, ts)) {
2285 VM_ASSERT(cur_th->ractor_waiting.wait_status == wait_none);
2286 RUBY_DEBUG_LOG("leave a will");
2287 ractor_basket_fill_will(cr, cur_th, &cr->sync.will_basket, v, exc);
2288 }
2289 else {
2290 RUBY_DEBUG_LOG("rare timing!");
2291 retry = true; // another ractor is waiting for the yield.
2292 }
2293 }
2294 RACTOR_UNLOCK(cr);
2295
2296 if (retry) goto retry;
2297 }
2298}
2299
2300void
2301rb_ractor_atexit(rb_execution_context_t *ec, VALUE result)
2302{
2303 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2304 ractor_yield_atexit(ec, cr, result, false);
2305}
2306
2307void
2308rb_ractor_atexit_exception(rb_execution_context_t *ec)
2309{
2310 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2311 ractor_yield_atexit(ec, cr, ec->errinfo, true);
2312}
2313
2314void
2315rb_ractor_teardown(rb_execution_context_t *ec)
2316{
2317 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2318 ractor_close_incoming(ec, cr);
2319 ractor_close_outgoing(ec, cr);
2320
2321 // sync with rb_ractor_terminate_interrupt_main_thread()
2322 RB_VM_LOCK_ENTER();
2323 {
2324 VM_ASSERT(cr->threads.main != NULL);
2325 cr->threads.main = NULL;
2326 }
2327 RB_VM_LOCK_LEAVE();
2328}
2329
2330void
2331rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr)
2332{
2333 for (int i=0; i<len; i++) {
2334 ptr[i] = ractor_receive(ec, r);
2335 }
2336}
2337
2338void
2339rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args)
2340{
2341 int len = RARRAY_LENINT(args);
2342 for (int i=0; i<len; i++) {
2343 ractor_send(ec, r, RARRAY_AREF(args, i), false);
2344 }
2345}
2346
2347bool
2348rb_ractor_main_p_(void)
2349{
2350 VM_ASSERT(rb_multi_ractor_p());
2351 rb_execution_context_t *ec = GET_EC();
2352 return rb_ec_ractor_ptr(ec) == rb_ec_vm_ptr(ec)->ractor.main_ractor;
2353}
2354
2355bool
2356rb_obj_is_main_ractor(VALUE gv)
2357{
2358 if (!rb_ractor_p(gv)) return false;
2359 rb_ractor_t *r = DATA_PTR(gv);
2360 return r == GET_VM()->ractor.main_ractor;
2361}
2362
2363int
2364rb_ractor_living_thread_num(const rb_ractor_t *r)
2365{
2366 return r->threads.cnt;
2367}
2368
2369// only for current ractor
2370VALUE
2371rb_ractor_thread_list(void)
2372{
2373 rb_ractor_t *r = GET_RACTOR();
2374 rb_thread_t *th = 0;
2375 VALUE ary = rb_ary_new();
2376
2377 ccan_list_for_each(&r->threads.set, th, lt_node) {
2378 switch (th->status) {
2379 case THREAD_RUNNABLE:
2380 case THREAD_STOPPED:
2381 case THREAD_STOPPED_FOREVER:
2382 rb_ary_push(ary, th->self);
2383 default:
2384 break;
2385 }
2386 }
2387
2388 return ary;
2389}
2390
2391void
2392rb_ractor_living_threads_insert(rb_ractor_t *r, rb_thread_t *th)
2393{
2394 VM_ASSERT(th != NULL);
2395
2396 RACTOR_LOCK(r);
2397 {
2398 RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r->pub.id, r->threads.cnt);
2399 ccan_list_add_tail(&r->threads.set, &th->lt_node);
2400 r->threads.cnt++;
2401 }
2402 RACTOR_UNLOCK(r);
2403
2404 // first thread for a ractor
2405 if (r->threads.cnt == 1) {
2406 VM_ASSERT(ractor_status_p(r, ractor_created));
2407 vm_insert_ractor(th->vm, r);
2408 }
2409}
2410
2411static void
2412vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line)
2413{
2414 ractor_status_set(r, ractor_blocking);
2415
2416 RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d++", vm->ractor.blocking_cnt);
2417 vm->ractor.blocking_cnt++;
2418 VM_ASSERT(vm->ractor.blocking_cnt <= vm->ractor.cnt);
2419}
2420
2421void
2422rb_vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line)
2423{
2424 ASSERT_vm_locking();
2425 VM_ASSERT(GET_RACTOR() == cr);
2426 vm_ractor_blocking_cnt_inc(vm, cr, file, line);
2427}
2428
2429void
2430rb_vm_ractor_blocking_cnt_dec(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line)
2431{
2432 ASSERT_vm_locking();
2433 VM_ASSERT(GET_RACTOR() == cr);
2434
2435 RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d--", vm->ractor.blocking_cnt);
2436 VM_ASSERT(vm->ractor.blocking_cnt > 0);
2437 vm->ractor.blocking_cnt--;
2438
2439 ractor_status_set(cr, ractor_running);
2440}
2441
2442static void
2443ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const char *file, int line)
2444{
2445 VM_ASSERT(cr == GET_RACTOR());
2446
2447 RUBY_DEBUG_LOG2(file, line,
2448 "cr->threads.cnt:%u cr->threads.blocking_cnt:%u vm->ractor.cnt:%u vm->ractor.blocking_cnt:%u",
2449 cr->threads.cnt, cr->threads.blocking_cnt,
2450 GET_VM()->ractor.cnt, GET_VM()->ractor.blocking_cnt);
2451
2452 VM_ASSERT(cr->threads.cnt >= cr->threads.blocking_cnt + 1);
2453
2454 if (remained_thread_cnt > 0 &&
2455 // will be block
2456 cr->threads.cnt == cr->threads.blocking_cnt + 1) {
2457 // change ractor status: running -> blocking
2458 rb_vm_t *vm = GET_VM();
2459
2460 RB_VM_LOCK_ENTER();
2461 {
2462 rb_vm_ractor_blocking_cnt_inc(vm, cr, file, line);
2463 }
2464 RB_VM_LOCK_LEAVE();
2465 }
2466}
2467
2468void rb_threadptr_remove(rb_thread_t *th);
2469
2470void
2471rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
2472{
2473 VM_ASSERT(cr == GET_RACTOR());
2474 RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt);
2475 ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__);
2476
2477 rb_threadptr_remove(th);
2478
2479 if (cr->threads.cnt == 1) {
2480 vm_remove_ractor(th->vm, cr);
2481 }
2482 else {
2483 RACTOR_LOCK(cr);
2484 {
2485 ccan_list_del(&th->lt_node);
2486 cr->threads.cnt--;
2487 }
2488 RACTOR_UNLOCK(cr);
2489 }
2490}
2491
2492void
2493rb_ractor_blocking_threads_inc(rb_ractor_t *cr, const char *file, int line)
2494{
2495 RUBY_DEBUG_LOG2(file, line, "cr->threads.blocking_cnt:%d++", cr->threads.blocking_cnt);
2496
2497 VM_ASSERT(cr->threads.cnt > 0);
2498 VM_ASSERT(cr == GET_RACTOR());
2499
2500 ractor_check_blocking(cr, cr->threads.cnt, __FILE__, __LINE__);
2501 cr->threads.blocking_cnt++;
2502}
2503
2504void
2505rb_ractor_blocking_threads_dec(rb_ractor_t *cr, const char *file, int line)
2506{
2507 RUBY_DEBUG_LOG2(file, line,
2508 "r->threads.blocking_cnt:%d--, r->threads.cnt:%u",
2509 cr->threads.blocking_cnt, cr->threads.cnt);
2510
2511 VM_ASSERT(cr == GET_RACTOR());
2512
2513 if (cr->threads.cnt == cr->threads.blocking_cnt) {
2514 rb_vm_t *vm = GET_VM();
2515
2516 RB_VM_LOCK_ENTER();
2517 {
2518 rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
2519 }
2520 RB_VM_LOCK_LEAVE();
2521 }
2522
2523 cr->threads.blocking_cnt--;
2524}
2525
2526void
2527rb_ractor_vm_barrier_interrupt_running_thread(rb_ractor_t *r)
2528{
2529 VM_ASSERT(r != GET_RACTOR());
2530 ASSERT_ractor_unlocking(r);
2531 ASSERT_vm_locking();
2532
2533 RACTOR_LOCK(r);
2534 {
2535 if (ractor_status_p(r, ractor_running)) {
2536 rb_execution_context_t *ec = r->threads.running_ec;
2537 if (ec) {
2538 RUBY_VM_SET_VM_BARRIER_INTERRUPT(ec);
2539 }
2540 }
2541 }
2542 RACTOR_UNLOCK(r);
2543}
2544
2545void
2546rb_ractor_terminate_interrupt_main_thread(rb_ractor_t *r)
2547{
2548 VM_ASSERT(r != GET_RACTOR());
2549 ASSERT_ractor_unlocking(r);
2550 ASSERT_vm_locking();
2551
2552 rb_thread_t *main_th = r->threads.main;
2553 if (main_th) {
2554 if (main_th->status != THREAD_KILLED) {
2555 RUBY_VM_SET_TERMINATE_INTERRUPT(main_th->ec);
2556 rb_threadptr_interrupt(main_th);
2557 }
2558 else {
2559 RUBY_DEBUG_LOG("killed (%p)", (void *)main_th);
2560 }
2561 }
2562}
2563
2564void rb_thread_terminate_all(rb_thread_t *th); // thread.c
2565
2566static void
2567ractor_terminal_interrupt_all(rb_vm_t *vm)
2568{
2569 if (vm->ractor.cnt > 1) {
2570 // send terminate notification to all ractors
2571 rb_ractor_t *r = 0;
2572 ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
2573 if (r != vm->ractor.main_ractor) {
2574 RUBY_DEBUG_LOG("r:%d", rb_ractor_id(r));
2575 rb_ractor_terminate_interrupt_main_thread(r);
2576 }
2577 }
2578 }
2579}
2580
2581void rb_add_running_thread(rb_thread_t *th);
2582void rb_del_running_thread(rb_thread_t *th);
2583
2584void
2585rb_ractor_terminate_all(void)
2586{
2587 rb_vm_t *vm = GET_VM();
2588 rb_ractor_t *cr = vm->ractor.main_ractor;
2589
2590 RUBY_DEBUG_LOG("ractor.cnt:%d", (int)vm->ractor.cnt);
2591
2592 VM_ASSERT(cr == GET_RACTOR()); // only main-ractor's main-thread should kick it.
2593
2594 if (vm->ractor.cnt > 1) {
2595 RB_VM_LOCK();
2596 {
2597 ractor_terminal_interrupt_all(vm); // kill all ractors
2598 }
2599 RB_VM_UNLOCK();
2600 }
2601 rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait
2602
2603 RB_VM_LOCK();
2604 {
2605 while (vm->ractor.cnt > 1) {
2606 RUBY_DEBUG_LOG("terminate_waiting:%d", vm->ractor.sync.terminate_waiting);
2607 vm->ractor.sync.terminate_waiting = true;
2608
2609 // wait for 1sec
2610 rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
2611 rb_del_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
2612 rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */);
2613 rb_add_running_thread(rb_ec_thread_ptr(cr->threads.running_ec));
2614 rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
2615
2616 ractor_terminal_interrupt_all(vm);
2617 }
2618 }
2619 RB_VM_UNLOCK();
2620}
2621
2623rb_vm_main_ractor_ec(rb_vm_t *vm)
2624{
2625 /* This code needs to carefully work around two bugs:
2626 * - Bug #20016: When M:N threading is enabled, running_ec is NULL if no thread is
2627 * actually currently running (as opposed to without M:N threading, when
2628 * running_ec will still point to the _last_ thread which ran)
2629 * - Bug #20197: If the main thread is sleeping, setting its postponed job
2630 * interrupt flag is pointless; it won't look at the flag until it stops sleeping
2631 * for some reason. It would be better to set the flag on the running ec, which
2632 * will presumably look at it soon.
2633 *
2634 * Solution: use running_ec if it's set, otherwise fall back to the main thread ec.
2635 * This is still susceptible to some rare race conditions (what if the last thread
2636 * to run just entered a long-running sleep?), but seems like the best balance of
2637 * robustness and complexity.
2638 */
2639 rb_execution_context_t *running_ec = vm->ractor.main_ractor->threads.running_ec;
2640 if (running_ec) { return running_ec; }
2641 return vm->ractor.main_thread->ec;
2642}
2643
2644static VALUE
2645ractor_moved_missing(int argc, VALUE *argv, VALUE self)
2646{
2647 rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object");
2648}
2649
2650#ifndef USE_RACTOR_SELECTOR
2651#define USE_RACTOR_SELECTOR 0
2652#endif
2653
2654RUBY_SYMBOL_EXPORT_BEGIN
2655void rb_init_ractor_selector(void);
2656RUBY_SYMBOL_EXPORT_END
2657
2658/*
2659 * Document-class: Ractor::Selector
2660 * :nodoc: currently
2661 *
2662 * Selects multiple Ractors to be activated.
2663 */
2664void
2665rb_init_ractor_selector(void)
2666{
2667 rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject);
2668 rb_undef_alloc_func(rb_cRactorSelector);
2669
2670 rb_define_singleton_method(rb_cRactorSelector, "new", ractor_selector_new , -1);
2671 rb_define_method(rb_cRactorSelector, "add", ractor_selector_add, 1);
2672 rb_define_method(rb_cRactorSelector, "remove", ractor_selector_remove, 1);
2673 rb_define_method(rb_cRactorSelector, "clear", ractor_selector_clear, 0);
2674 rb_define_method(rb_cRactorSelector, "empty?", ractor_selector_empty_p, 0);
2675 rb_define_method(rb_cRactorSelector, "wait", ractor_selector_wait, -1);
2676 rb_define_method(rb_cRactorSelector, "_wait", ractor_selector__wait, 4);
2677}
2678
2679/*
2680 * Document-class: Ractor::ClosedError
2681 *
2682 * Raised when an attempt is made to send a message to a closed port,
2683 * or to retrieve a message from a closed and empty port.
2684 * Ports may be closed explicitly with Ractor#close_outgoing/close_incoming
2685 * and are closed implicitly when a Ractor terminates.
2686 *
2687 * r = Ractor.new { sleep(500) }
2688 * r.close_outgoing
2689 * r.take # Ractor::ClosedError
2690 *
2691 * ClosedError is a descendant of StopIteration, so the closing of the ractor will break
2692 * the loops without propagating the error:
2693 *
2694 * r = Ractor.new do
2695 * loop do
2696 * msg = receive # raises ClosedError and loop traps it
2697 * puts "Received: #{msg}"
2698 * end
2699 * puts "loop exited"
2700 * end
2701 *
2702 * 3.times{|i| r << i}
2703 * r.close_incoming
2704 * r.take
2705 * puts "Continue successfully"
2706 *
2707 * This will print:
2708 *
2709 * Received: 0
2710 * Received: 1
2711 * Received: 2
2712 * loop exited
2713 * Continue successfully
2714 */
2715
2716/*
2717 * Document-class: Ractor::RemoteError
2718 *
2719 * Raised on attempt to Ractor#take if there was an uncaught exception in the Ractor.
2720 * Its +cause+ will contain the original exception, and +ractor+ is the original ractor
2721 * it was raised in.
2722 *
2723 * r = Ractor.new { raise "Something weird happened" }
2724 *
2725 * begin
2726 * r.take
2727 * rescue => e
2728 * p e # => #<Ractor::RemoteError: thrown by remote Ractor.>
2729 * p e.ractor == r # => true
2730 * p e.cause # => #<RuntimeError: Something weird happened>
2731 * end
2732 *
2733 */
2734
2735/*
2736 * Document-class: Ractor::MovedError
2737 *
2738 * Raised on an attempt to access an object which was moved in Ractor#send or Ractor.yield.
2739 *
2740 * r = Ractor.new { sleep }
2741 *
2742 * ary = [1, 2, 3]
2743 * r.send(ary, move: true)
2744 * ary.inspect
2745 * # Ractor::MovedError (can not send any methods to a moved object)
2746 *
2747 */
2748
2749/*
2750 * Document-class: Ractor::MovedObject
2751 *
2752 * A special object which replaces any value that was moved to another ractor in Ractor#send
2753 * or Ractor.yield. Any attempt to access the object results in Ractor::MovedError.
2754 *
2755 * r = Ractor.new { receive }
2756 *
2757 * ary = [1, 2, 3]
2758 * r.send(ary, move: true)
2759 * p Ractor::MovedObject === ary
2760 * # => true
2761 * ary.inspect
2762 * # Ractor::MovedError (can not send any methods to a moved object)
2763 */
2764
2765// Main docs are in ractor.rb, but without this clause there are weird artifacts
2766// in their rendering.
2767/*
2768 * Document-class: Ractor
2769 *
2770 */
2771
2772void
2773Init_Ractor(void)
2774{
2775 rb_cRactor = rb_define_class("Ractor", rb_cObject);
2777
2778 rb_eRactorError = rb_define_class_under(rb_cRactor, "Error", rb_eRuntimeError);
2779 rb_eRactorIsolationError = rb_define_class_under(rb_cRactor, "IsolationError", rb_eRactorError);
2780 rb_eRactorRemoteError = rb_define_class_under(rb_cRactor, "RemoteError", rb_eRactorError);
2781 rb_eRactorMovedError = rb_define_class_under(rb_cRactor, "MovedError", rb_eRactorError);
2782 rb_eRactorClosedError = rb_define_class_under(rb_cRactor, "ClosedError", rb_eStopIteration);
2783 rb_eRactorUnsafeError = rb_define_class_under(rb_cRactor, "UnsafeError", rb_eRactorError);
2784
2785 rb_cRactorMovedObject = rb_define_class_under(rb_cRactor, "MovedObject", rb_cBasicObject);
2786 rb_undef_alloc_func(rb_cRactorMovedObject);
2787 rb_define_method(rb_cRactorMovedObject, "method_missing", ractor_moved_missing, -1);
2788
2789 // override methods defined in BasicObject
2790 rb_define_method(rb_cRactorMovedObject, "__send__", ractor_moved_missing, -1);
2791 rb_define_method(rb_cRactorMovedObject, "!", ractor_moved_missing, -1);
2792 rb_define_method(rb_cRactorMovedObject, "==", ractor_moved_missing, -1);
2793 rb_define_method(rb_cRactorMovedObject, "!=", ractor_moved_missing, -1);
2794 rb_define_method(rb_cRactorMovedObject, "__id__", ractor_moved_missing, -1);
2795 rb_define_method(rb_cRactorMovedObject, "equal?", ractor_moved_missing, -1);
2796 rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1);
2797 rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1);
2798
2799 // internal
2800
2801#if USE_RACTOR_SELECTOR
2802 rb_init_ractor_selector();
2803#endif
2804}
2805
2806void
2807rb_ractor_dump(void)
2808{
2809 rb_vm_t *vm = GET_VM();
2810 rb_ractor_t *r = 0;
2811
2812 ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
2813 if (r != vm->ractor.main_ractor) {
2814 fprintf(stderr, "r:%u (%s)\n", r->pub.id, ractor_status_str(r->status_));
2815 }
2816 }
2817}
2818
2819VALUE
2821{
2822 if (rb_ractor_main_p()) {
2823 return rb_stdin;
2824 }
2825 else {
2826 rb_ractor_t *cr = GET_RACTOR();
2827 return cr->r_stdin;
2828 }
2829}
2830
2831VALUE
2832rb_ractor_stdout(void)
2833{
2834 if (rb_ractor_main_p()) {
2835 return rb_stdout;
2836 }
2837 else {
2838 rb_ractor_t *cr = GET_RACTOR();
2839 return cr->r_stdout;
2840 }
2841}
2842
2843VALUE
2844rb_ractor_stderr(void)
2845{
2846 if (rb_ractor_main_p()) {
2847 return rb_stderr;
2848 }
2849 else {
2850 rb_ractor_t *cr = GET_RACTOR();
2851 return cr->r_stderr;
2852 }
2853}
2854
2855void
2857{
2858 if (rb_ractor_main_p()) {
2859 rb_stdin = in;
2860 }
2861 else {
2862 rb_ractor_t *cr = GET_RACTOR();
2863 RB_OBJ_WRITE(cr->pub.self, &cr->r_stdin, in);
2864 }
2865}
2866
2867void
2869{
2870 if (rb_ractor_main_p()) {
2871 rb_stdout = out;
2872 }
2873 else {
2874 rb_ractor_t *cr = GET_RACTOR();
2875 RB_OBJ_WRITE(cr->pub.self, &cr->r_stdout, out);
2876 }
2877}
2878
2879void
2881{
2882 if (rb_ractor_main_p()) {
2883 rb_stderr = err;
2884 }
2885 else {
2886 rb_ractor_t *cr = GET_RACTOR();
2887 RB_OBJ_WRITE(cr->pub.self, &cr->r_stderr, err);
2888 }
2889}
2890
2892rb_ractor_hooks(rb_ractor_t *cr)
2893{
2894 return &cr->pub.hooks;
2895}
2896
2898
2899// 2: stop search
2900// 1: skip child
2901// 0: continue
2902
2903enum obj_traverse_iterator_result {
2904 traverse_cont,
2905 traverse_skip,
2906 traverse_stop,
2907};
2908
2909typedef enum obj_traverse_iterator_result (*rb_obj_traverse_enter_func)(VALUE obj);
2910typedef enum obj_traverse_iterator_result (*rb_obj_traverse_leave_func)(VALUE obj);
2911typedef enum obj_traverse_iterator_result (*rb_obj_traverse_final_func)(VALUE obj);
2912
2913static enum obj_traverse_iterator_result null_leave(VALUE obj);
2914
2916 rb_obj_traverse_enter_func enter_func;
2917 rb_obj_traverse_leave_func leave_func;
2918
2919 st_table *rec;
2920 VALUE rec_hash;
2921};
2922
2923
2925 bool stop;
2926 struct obj_traverse_data *data;
2927};
2928
2929static int obj_traverse_i(VALUE obj, struct obj_traverse_data *data);
2930
2931static int
2932obj_hash_traverse_i(VALUE key, VALUE val, VALUE ptr)
2933{
2935
2936 if (obj_traverse_i(key, d->data)) {
2937 d->stop = true;
2938 return ST_STOP;
2939 }
2940
2941 if (obj_traverse_i(val, d->data)) {
2942 d->stop = true;
2943 return ST_STOP;
2944 }
2945
2946 return ST_CONTINUE;
2947}
2948
2949static void
2950obj_traverse_reachable_i(VALUE obj, void *ptr)
2951{
2953
2954 if (obj_traverse_i(obj, d->data)) {
2955 d->stop = true;
2956 }
2957}
2958
2959static struct st_table *
2960obj_traverse_rec(struct obj_traverse_data *data)
2961{
2962 if (UNLIKELY(!data->rec)) {
2963 data->rec_hash = rb_ident_hash_new();
2964 data->rec = RHASH_ST_TABLE(data->rec_hash);
2965 }
2966 return data->rec;
2967}
2968
2969static int
2970obj_traverse_ivar_foreach_i(ID key, VALUE val, st_data_t ptr)
2971{
2973
2974 if (obj_traverse_i(val, d->data)) {
2975 d->stop = true;
2976 return ST_STOP;
2977 }
2978
2979 return ST_CONTINUE;
2980}
2981
2982static int
2983obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
2984{
2985 if (RB_SPECIAL_CONST_P(obj)) return 0;
2986
2987 switch (data->enter_func(obj)) {
2988 case traverse_cont: break;
2989 case traverse_skip: return 0; // skip children
2990 case traverse_stop: return 1; // stop search
2991 }
2992
2993 if (UNLIKELY(st_insert(obj_traverse_rec(data), obj, 1))) {
2994 // already traversed
2995 return 0;
2996 }
2997
2998 struct obj_traverse_callback_data d = {
2999 .stop = false,
3000 .data = data,
3001 };
3002 rb_ivar_foreach(obj, obj_traverse_ivar_foreach_i, (st_data_t)&d);
3003 if (d.stop) return 1;
3004
3005 switch (BUILTIN_TYPE(obj)) {
3006 // no child node
3007 case T_STRING:
3008 case T_FLOAT:
3009 case T_BIGNUM:
3010 case T_REGEXP:
3011 case T_FILE:
3012 case T_SYMBOL:
3013 case T_MATCH:
3014 break;
3015
3016 case T_OBJECT:
3017 /* Instance variables already traversed. */
3018 break;
3019
3020 case T_ARRAY:
3021 {
3022 for (int i = 0; i < RARRAY_LENINT(obj); i++) {
3023 VALUE e = rb_ary_entry(obj, i);
3024 if (obj_traverse_i(e, data)) return 1;
3025 }
3026 }
3027 break;
3028
3029 case T_HASH:
3030 {
3031 if (obj_traverse_i(RHASH_IFNONE(obj), data)) return 1;
3032
3033 struct obj_traverse_callback_data d = {
3034 .stop = false,
3035 .data = data,
3036 };
3037 rb_hash_foreach(obj, obj_hash_traverse_i, (VALUE)&d);
3038 if (d.stop) return 1;
3039 }
3040 break;
3041
3042 case T_STRUCT:
3043 {
3044 long len = RSTRUCT_LEN(obj);
3045 const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
3046
3047 for (long i=0; i<len; i++) {
3048 if (obj_traverse_i(ptr[i], data)) return 1;
3049 }
3050 }
3051 break;
3052
3053 case T_RATIONAL:
3054 if (obj_traverse_i(RRATIONAL(obj)->num, data)) return 1;
3055 if (obj_traverse_i(RRATIONAL(obj)->den, data)) return 1;
3056 break;
3057 case T_COMPLEX:
3058 if (obj_traverse_i(RCOMPLEX(obj)->real, data)) return 1;
3059 if (obj_traverse_i(RCOMPLEX(obj)->imag, data)) return 1;
3060 break;
3061
3062 case T_DATA:
3063 case T_IMEMO:
3064 {
3065 struct obj_traverse_callback_data d = {
3066 .stop = false,
3067 .data = data,
3068 };
3069 RB_VM_LOCK_ENTER_NO_BARRIER();
3070 {
3071 rb_objspace_reachable_objects_from(obj, obj_traverse_reachable_i, &d);
3072 }
3073 RB_VM_LOCK_LEAVE_NO_BARRIER();
3074 if (d.stop) return 1;
3075 }
3076 break;
3077
3078 // unreachable
3079 case T_CLASS:
3080 case T_MODULE:
3081 case T_ICLASS:
3082 default:
3083 rp(obj);
3084 rb_bug("unreachable");
3085 }
3086
3087 if (data->leave_func(obj) == traverse_stop) {
3088 return 1;
3089 }
3090 else {
3091 return 0;
3092 }
3093}
3094
3096 rb_obj_traverse_final_func final_func;
3097 int stopped;
3098};
3099
3100static int
3101obj_traverse_final_i(st_data_t key, st_data_t val, st_data_t arg)
3102{
3103 struct rb_obj_traverse_final_data *data = (void *)arg;
3104 if (data->final_func(key)) {
3105 data->stopped = 1;
3106 return ST_STOP;
3107 }
3108 return ST_CONTINUE;
3109}
3110
3111// 0: traverse all
3112// 1: stopped
3113static int
3114rb_obj_traverse(VALUE obj,
3115 rb_obj_traverse_enter_func enter_func,
3116 rb_obj_traverse_leave_func leave_func,
3117 rb_obj_traverse_final_func final_func)
3118{
3119 struct obj_traverse_data data = {
3120 .enter_func = enter_func,
3121 .leave_func = leave_func,
3122 .rec = NULL,
3123 };
3124
3125 if (obj_traverse_i(obj, &data)) return 1;
3126 if (final_func && data.rec) {
3127 struct rb_obj_traverse_final_data f = {final_func, 0};
3128 st_foreach(data.rec, obj_traverse_final_i, (st_data_t)&f);
3129 return f.stopped;
3130 }
3131 return 0;
3132}
3133
3134static int
3135allow_frozen_shareable_p(VALUE obj)
3136{
3137 if (!RB_TYPE_P(obj, T_DATA)) {
3138 return true;
3139 }
3140 else if (RTYPEDDATA_P(obj)) {
3141 const rb_data_type_t *type = RTYPEDDATA_TYPE(obj);
3142 if (type->flags & RUBY_TYPED_FROZEN_SHAREABLE) {
3143 return true;
3144 }
3145 }
3146
3147 return false;
3148}
3149
3150static enum obj_traverse_iterator_result
3151make_shareable_check_shareable(VALUE obj)
3152{
3153 VM_ASSERT(!SPECIAL_CONST_P(obj));
3154
3155 if (rb_ractor_shareable_p(obj)) {
3156 return traverse_skip;
3157 }
3158 else if (!allow_frozen_shareable_p(obj)) {
3159 if (rb_obj_is_proc(obj)) {
3160 rb_proc_ractor_make_shareable(obj);
3161 return traverse_cont;
3162 }
3163 else {
3164 rb_raise(rb_eRactorError, "can not make shareable object for %"PRIsVALUE, obj);
3165 }
3166 }
3167
3168 if (RB_TYPE_P(obj, T_IMEMO)) {
3169 return traverse_skip;
3170 }
3171
3172 if (!RB_OBJ_FROZEN_RAW(obj)) {
3173 rb_funcall(obj, idFreeze, 0);
3174
3175 if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) {
3176 rb_raise(rb_eRactorError, "#freeze does not freeze object correctly");
3177 }
3178
3179 if (RB_OBJ_SHAREABLE_P(obj)) {
3180 return traverse_skip;
3181 }
3182 }
3183
3184 return traverse_cont;
3185}
3186
3187static enum obj_traverse_iterator_result
3188mark_shareable(VALUE obj)
3189{
3191 return traverse_cont;
3192}
3193
3194VALUE
3196{
3197 rb_obj_traverse(obj,
3198 make_shareable_check_shareable,
3199 null_leave, mark_shareable);
3200 return obj;
3201}
3202
3203VALUE
3205{
3206 VALUE copy = ractor_copy(obj);
3207 return rb_ractor_make_shareable(copy);
3208}
3209
3210VALUE
3211rb_ractor_ensure_shareable(VALUE obj, VALUE name)
3212{
3213 if (!rb_ractor_shareable_p(obj)) {
3214 VALUE message = rb_sprintf("cannot assign unshareable object to %"PRIsVALUE,
3215 name);
3216 rb_exc_raise(rb_exc_new_str(rb_eRactorIsolationError, message));
3217 }
3218 return obj;
3219}
3220
3221void
3222rb_ractor_ensure_main_ractor(const char *msg)
3223{
3224 if (!rb_ractor_main_p()) {
3225 rb_raise(rb_eRactorIsolationError, "%s", msg);
3226 }
3227}
3228
3229static enum obj_traverse_iterator_result
3230shareable_p_enter(VALUE obj)
3231{
3232 if (RB_OBJ_SHAREABLE_P(obj)) {
3233 return traverse_skip;
3234 }
3235 else if (RB_TYPE_P(obj, T_CLASS) ||
3236 RB_TYPE_P(obj, T_MODULE) ||
3237 RB_TYPE_P(obj, T_ICLASS)) {
3238 // TODO: remove it
3239 mark_shareable(obj);
3240 return traverse_skip;
3241 }
3242 else if (RB_OBJ_FROZEN_RAW(obj) &&
3243 allow_frozen_shareable_p(obj)) {
3244 return traverse_cont;
3245 }
3246
3247 return traverse_stop; // fail
3248}
3249
3250bool
3251rb_ractor_shareable_p_continue(VALUE obj)
3252{
3253 if (rb_obj_traverse(obj,
3254 shareable_p_enter, null_leave,
3255 mark_shareable)) {
3256 return false;
3257 }
3258 else {
3259 return true;
3260 }
3261}
3262
3263#if RACTOR_CHECK_MODE > 0
3264void
3265rb_ractor_setup_belonging(VALUE obj)
3266{
3267 rb_ractor_setup_belonging_to(obj, rb_ractor_current_id());
3268}
3269
3270static enum obj_traverse_iterator_result
3271reset_belonging_enter(VALUE obj)
3272{
3273 if (rb_ractor_shareable_p(obj)) {
3274 return traverse_skip;
3275 }
3276 else {
3277 rb_ractor_setup_belonging(obj);
3278 return traverse_cont;
3279 }
3280}
3281#endif
3282
3283static enum obj_traverse_iterator_result
3284null_leave(VALUE obj)
3285{
3286 return traverse_cont;
3287}
3288
3289static VALUE
3290ractor_reset_belonging(VALUE obj)
3291{
3292#if RACTOR_CHECK_MODE > 0
3293 rb_obj_traverse(obj, reset_belonging_enter, null_leave, NULL);
3294#endif
3295 return obj;
3296}
3297
3298
3300
3301// 2: stop search
3302// 1: skip child
3303// 0: continue
3304
3306static int obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data);
3307typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_enter_func)(VALUE obj, struct obj_traverse_replace_data *data);
3308typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_leave_func)(VALUE obj, struct obj_traverse_replace_data *data);
3309
3311 rb_obj_traverse_replace_enter_func enter_func;
3312 rb_obj_traverse_replace_leave_func leave_func;
3313
3314 st_table *rec;
3315 VALUE rec_hash;
3316
3317 VALUE replacement;
3318 bool move;
3319};
3320
3322 bool stop;
3323 VALUE src;
3324 struct obj_traverse_replace_data *data;
3325};
3326
3327static int
3328obj_hash_traverse_replace_foreach_i(st_data_t key, st_data_t value, st_data_t argp, int error)
3329{
3330 return ST_REPLACE;
3331}
3332
3333static int
3334obj_hash_traverse_replace_i(st_data_t *key, st_data_t *val, st_data_t ptr, int exists)
3335{
3337 struct obj_traverse_replace_data *data = d->data;
3338
3339 if (obj_traverse_replace_i(*key, data)) {
3340 d->stop = true;
3341 return ST_STOP;
3342 }
3343 else if (*key != data->replacement) {
3344 VALUE v = *key = data->replacement;
3345 RB_OBJ_WRITTEN(d->src, Qundef, v);
3346 }
3347
3348 if (obj_traverse_replace_i(*val, data)) {
3349 d->stop = true;
3350 return ST_STOP;
3351 }
3352 else if (*val != data->replacement) {
3353 VALUE v = *val = data->replacement;
3354 RB_OBJ_WRITTEN(d->src, Qundef, v);
3355 }
3356
3357 return ST_CONTINUE;
3358}
3359
3360static int
3361obj_iv_hash_traverse_replace_foreach_i(st_data_t _key, st_data_t _val, st_data_t _data, int _x)
3362{
3363 return ST_REPLACE;
3364}
3365
3366static int
3367obj_iv_hash_traverse_replace_i(st_data_t * _key, st_data_t * val, st_data_t ptr, int exists)
3368{
3370 struct obj_traverse_replace_data *data = d->data;
3371
3372 if (obj_traverse_replace_i(*(VALUE *)val, data)) {
3373 d->stop = true;
3374 return ST_STOP;
3375 }
3376 else if (*(VALUE *)val != data->replacement) {
3377 VALUE v = *(VALUE *)val = data->replacement;
3378 RB_OBJ_WRITTEN(d->src, Qundef, v);
3379 }
3380
3381 return ST_CONTINUE;
3382}
3383
3384static struct st_table *
3385obj_traverse_replace_rec(struct obj_traverse_replace_data *data)
3386{
3387 if (UNLIKELY(!data->rec)) {
3388 data->rec_hash = rb_ident_hash_new();
3389 data->rec = RHASH_ST_TABLE(data->rec_hash);
3390 }
3391 return data->rec;
3392}
3393
3394static void
3395obj_refer_only_shareables_p_i(VALUE obj, void *ptr)
3396{
3397 int *pcnt = (int *)ptr;
3398
3399 if (!rb_ractor_shareable_p(obj)) {
3400 ++*pcnt;
3401 }
3402}
3403
3404static int
3405obj_refer_only_shareables_p(VALUE obj)
3406{
3407 int cnt = 0;
3408 RB_VM_LOCK_ENTER_NO_BARRIER();
3409 {
3410 rb_objspace_reachable_objects_from(obj, obj_refer_only_shareables_p_i, &cnt);
3411 }
3412 RB_VM_LOCK_LEAVE_NO_BARRIER();
3413 return cnt == 0;
3414}
3415
3416static int
3417obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
3418{
3419 st_data_t replacement;
3420
3421 if (RB_SPECIAL_CONST_P(obj)) {
3422 data->replacement = obj;
3423 return 0;
3424 }
3425
3426 switch (data->enter_func(obj, data)) {
3427 case traverse_cont: break;
3428 case traverse_skip: return 0; // skip children
3429 case traverse_stop: return 1; // stop search
3430 }
3431
3432 replacement = (st_data_t)data->replacement;
3433
3434 if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, &replacement))) {
3435 data->replacement = (VALUE)replacement;
3436 return 0;
3437 }
3438 else {
3439 st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, replacement);
3440 }
3441
3442 if (!data->move) {
3443 obj = replacement;
3444 }
3445
3446#define CHECK_AND_REPLACE(v) do { \
3447 VALUE _val = (v); \
3448 if (obj_traverse_replace_i(_val, data)) { return 1; } \
3449 else if (data->replacement != _val) { RB_OBJ_WRITE(obj, &v, data->replacement); } \
3450} while (0)
3451
3452 if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
3453 struct gen_fields_tbl *fields_tbl;
3454 rb_ivar_generic_fields_tbl_lookup(obj, &fields_tbl);
3455
3456 if (UNLIKELY(rb_shape_obj_too_complex_p(obj))) {
3458 .stop = false,
3459 .data = data,
3460 .src = obj,
3461 };
3462 rb_st_foreach_with_replace(
3463 fields_tbl->as.complex.table,
3464 obj_iv_hash_traverse_replace_foreach_i,
3465 obj_iv_hash_traverse_replace_i,
3466 (st_data_t)&d
3467 );
3468 if (d.stop) return 1;
3469 }
3470 else {
3471 for (uint32_t i = 0; i < fields_tbl->as.shape.fields_count; i++) {
3472 if (!UNDEF_P(fields_tbl->as.shape.fields[i])) {
3473 CHECK_AND_REPLACE(fields_tbl->as.shape.fields[i]);
3474 }
3475 }
3476 }
3477 }
3478
3479 switch (BUILTIN_TYPE(obj)) {
3480 // no child node
3481 case T_FLOAT:
3482 case T_BIGNUM:
3483 case T_REGEXP:
3484 case T_FILE:
3485 case T_SYMBOL:
3486 case T_MATCH:
3487 break;
3488 case T_STRING:
3489 rb_str_make_independent(obj);
3490 break;
3491
3492 case T_OBJECT:
3493 {
3494 if (rb_shape_obj_too_complex_p(obj)) {
3496 .stop = false,
3497 .data = data,
3498 .src = obj,
3499 };
3500 rb_st_foreach_with_replace(
3501 ROBJECT_FIELDS_HASH(obj),
3502 obj_iv_hash_traverse_replace_foreach_i,
3503 obj_iv_hash_traverse_replace_i,
3504 (st_data_t)&d
3505 );
3506 if (d.stop) return 1;
3507 }
3508 else {
3509 uint32_t len = ROBJECT_FIELDS_COUNT(obj);
3510 VALUE *ptr = ROBJECT_FIELDS(obj);
3511
3512 for (uint32_t i = 0; i < len; i++) {
3513 CHECK_AND_REPLACE(ptr[i]);
3514 }
3515 }
3516 }
3517 break;
3518
3519 case T_ARRAY:
3520 {
3521 rb_ary_cancel_sharing(obj);
3522
3523 for (int i = 0; i < RARRAY_LENINT(obj); i++) {
3524 VALUE e = rb_ary_entry(obj, i);
3525
3526 if (obj_traverse_replace_i(e, data)) {
3527 return 1;
3528 }
3529 else if (e != data->replacement) {
3530 RARRAY_ASET(obj, i, data->replacement);
3531 }
3532 }
3533 RB_GC_GUARD(obj);
3534 }
3535 break;
3536 case T_HASH:
3537 {
3539 .stop = false,
3540 .data = data,
3541 .src = obj,
3542 };
3543 rb_hash_stlike_foreach_with_replace(obj,
3544 obj_hash_traverse_replace_foreach_i,
3545 obj_hash_traverse_replace_i,
3546 (VALUE)&d);
3547 if (d.stop) return 1;
3548 // TODO: rehash here?
3549
3550 VALUE ifnone = RHASH_IFNONE(obj);
3551 if (obj_traverse_replace_i(ifnone, data)) {
3552 return 1;
3553 }
3554 else if (ifnone != data->replacement) {
3555 RHASH_SET_IFNONE(obj, data->replacement);
3556 }
3557 }
3558 break;
3559
3560 case T_STRUCT:
3561 {
3562 long len = RSTRUCT_LEN(obj);
3563 const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
3564
3565 for (long i=0; i<len; i++) {
3566 CHECK_AND_REPLACE(ptr[i]);
3567 }
3568 }
3569 break;
3570
3571 case T_RATIONAL:
3572 CHECK_AND_REPLACE(RRATIONAL(obj)->num);
3573 CHECK_AND_REPLACE(RRATIONAL(obj)->den);
3574 break;
3575 case T_COMPLEX:
3576 CHECK_AND_REPLACE(RCOMPLEX(obj)->real);
3577 CHECK_AND_REPLACE(RCOMPLEX(obj)->imag);
3578 break;
3579
3580 case T_DATA:
3581 if (!data->move && obj_refer_only_shareables_p(obj)) {
3582 break;
3583 }
3584 else {
3585 rb_raise(rb_eRactorError, "can not %s %"PRIsVALUE" object.",
3586 data->move ? "move" : "copy", rb_class_of(obj));
3587 }
3588
3589 case T_IMEMO:
3590 // not supported yet
3591 return 1;
3592
3593 // unreachable
3594 case T_CLASS:
3595 case T_MODULE:
3596 case T_ICLASS:
3597 default:
3598 rp(obj);
3599 rb_bug("unreachable");
3600 }
3601
3602 data->replacement = (VALUE)replacement;
3603
3604 if (data->leave_func(obj, data) == traverse_stop) {
3605 return 1;
3606 }
3607 else {
3608 return 0;
3609 }
3610}
3611
3612// 0: traverse all
3613// 1: stopped
3614static VALUE
3615rb_obj_traverse_replace(VALUE obj,
3616 rb_obj_traverse_replace_enter_func enter_func,
3617 rb_obj_traverse_replace_leave_func leave_func,
3618 bool move)
3619{
3620 struct obj_traverse_replace_data data = {
3621 .enter_func = enter_func,
3622 .leave_func = leave_func,
3623 .rec = NULL,
3624 .replacement = Qundef,
3625 .move = move,
3626 };
3627
3628 if (obj_traverse_replace_i(obj, &data)) {
3629 return Qundef;
3630 }
3631 else {
3632 return data.replacement;
3633 }
3634}
3635
3636static const bool wb_protected_types[RUBY_T_MASK] = {
3647};
3648
3649static enum obj_traverse_iterator_result
3650move_enter(VALUE obj, struct obj_traverse_replace_data *data)
3651{
3652 if (rb_ractor_shareable_p(obj)) {
3653 data->replacement = obj;
3654 return traverse_skip;
3655 }
3656 else {
3657 VALUE type = RB_BUILTIN_TYPE(obj);
3658 type |= wb_protected_types[type] ? FL_WB_PROTECTED : 0;
3659 NEWOBJ_OF(moved, struct RBasic, 0, type, rb_gc_obj_slot_size(obj), 0);
3660 data->replacement = (VALUE)moved;
3661 return traverse_cont;
3662 }
3663}
3664
3665static enum obj_traverse_iterator_result
3666move_leave(VALUE obj, struct obj_traverse_replace_data *data)
3667{
3668 size_t size = rb_gc_obj_slot_size(obj);
3669 memcpy((void *)data->replacement, (void *)obj, size);
3670
3671 void rb_replace_generic_ivar(VALUE clone, VALUE obj); // variable.c
3672
3673 rb_gc_obj_id_moved(data->replacement);
3674
3675 if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
3676 rb_replace_generic_ivar(data->replacement, obj);
3677 }
3678
3679 // Avoid mutations using bind_call, etc.
3680 MEMZERO((char *)obj + sizeof(struct RBasic), char, size - sizeof(struct RBasic));
3681 RBASIC(obj)->flags = T_OBJECT | FL_FREEZE;
3682 RBASIC_SET_CLASS_RAW(obj, rb_cRactorMovedObject);
3683 return traverse_cont;
3684}
3685
3686static VALUE
3687ractor_move(VALUE obj)
3688{
3689 VALUE val = rb_obj_traverse_replace(obj, move_enter, move_leave, true);
3690 if (!UNDEF_P(val)) {
3691 return val;
3692 }
3693 else {
3694 rb_raise(rb_eRactorError, "can not move the object");
3695 }
3696}
3697
3698static enum obj_traverse_iterator_result
3699copy_enter(VALUE obj, struct obj_traverse_replace_data *data)
3700{
3701 if (rb_ractor_shareable_p(obj)) {
3702 data->replacement = obj;
3703 return traverse_skip;
3704 }
3705 else {
3706 data->replacement = rb_obj_clone(obj);
3707 return traverse_cont;
3708 }
3709}
3710
3711static enum obj_traverse_iterator_result
3712copy_leave(VALUE obj, struct obj_traverse_replace_data *data)
3713{
3714 return traverse_cont;
3715}
3716
3717static VALUE
3718ractor_copy(VALUE obj)
3719{
3720 VALUE val = rb_obj_traverse_replace(obj, copy_enter, copy_leave, false);
3721 if (!UNDEF_P(val)) {
3722 return val;
3723 }
3724 else {
3725 rb_raise(rb_eRactorError, "can not copy the object");
3726 }
3727}
3728
3729// Ractor local storage
3730
3732 const struct rb_ractor_local_storage_type *type;
3733 void *main_cache;
3734};
3735
3737 int cnt;
3738 int capa;
3740} freed_ractor_local_keys;
3741
3742static int
3743ractor_local_storage_mark_i(st_data_t key, st_data_t val, st_data_t dmy)
3744{
3746 if (k->type->mark) (*k->type->mark)((void *)val);
3747 return ST_CONTINUE;
3748}
3749
3750static enum rb_id_table_iterator_result
3751idkey_local_storage_mark_i(VALUE val, void *dmy)
3752{
3753 rb_gc_mark(val);
3754 return ID_TABLE_CONTINUE;
3755}
3756
3757static void
3758ractor_local_storage_mark(rb_ractor_t *r)
3759{
3760 if (r->local_storage) {
3761 st_foreach(r->local_storage, ractor_local_storage_mark_i, 0);
3762
3763 for (int i=0; i<freed_ractor_local_keys.cnt; i++) {
3764 rb_ractor_local_key_t key = freed_ractor_local_keys.keys[i];
3765 st_data_t val, k = (st_data_t)key;
3766 if (st_delete(r->local_storage, &k, &val) &&
3767 (key = (rb_ractor_local_key_t)k)->type->free) {
3768 (*key->type->free)((void *)val);
3769 }
3770 }
3771 }
3772
3773 if (r->idkey_local_storage) {
3774 rb_id_table_foreach_values(r->idkey_local_storage, idkey_local_storage_mark_i, NULL);
3775 }
3776
3777 rb_gc_mark(r->local_storage_store_lock);
3778}
3779
3780static int
3781ractor_local_storage_free_i(st_data_t key, st_data_t val, st_data_t dmy)
3782{
3784 if (k->type->free) (*k->type->free)((void *)val);
3785 return ST_CONTINUE;
3786}
3787
3788static void
3789ractor_local_storage_free(rb_ractor_t *r)
3790{
3791 if (r->local_storage) {
3792 st_foreach(r->local_storage, ractor_local_storage_free_i, 0);
3793 st_free_table(r->local_storage);
3794 }
3795
3796 if (r->idkey_local_storage) {
3797 rb_id_table_free(r->idkey_local_storage);
3798 }
3799}
3800
3801static void
3802rb_ractor_local_storage_value_mark(void *ptr)
3803{
3804 rb_gc_mark((VALUE)ptr);
3805}
3806
3807static const struct rb_ractor_local_storage_type ractor_local_storage_type_null = {
3808 NULL,
3809 NULL,
3810};
3811
3813 NULL,
3814 ruby_xfree,
3815};
3816
3817static const struct rb_ractor_local_storage_type ractor_local_storage_type_value = {
3818 rb_ractor_local_storage_value_mark,
3819 NULL,
3820};
3821
3824{
3826 key->type = type ? type : &ractor_local_storage_type_null;
3827 key->main_cache = (void *)Qundef;
3828 return key;
3829}
3830
3833{
3834 return rb_ractor_local_storage_ptr_newkey(&ractor_local_storage_type_value);
3835}
3836
3837void
3838rb_ractor_local_storage_delkey(rb_ractor_local_key_t key)
3839{
3840 RB_VM_LOCK_ENTER();
3841 {
3842 if (freed_ractor_local_keys.cnt == freed_ractor_local_keys.capa) {
3843 freed_ractor_local_keys.capa = freed_ractor_local_keys.capa ? freed_ractor_local_keys.capa * 2 : 4;
3844 REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, freed_ractor_local_keys.capa);
3845 }
3846 freed_ractor_local_keys.keys[freed_ractor_local_keys.cnt++] = key;
3847 }
3848 RB_VM_LOCK_LEAVE();
3849}
3850
3851static bool
3852ractor_local_ref(rb_ractor_local_key_t key, void **pret)
3853{
3854 if (rb_ractor_main_p()) {
3855 if (!UNDEF_P((VALUE)key->main_cache)) {
3856 *pret = key->main_cache;
3857 return true;
3858 }
3859 else {
3860 return false;
3861 }
3862 }
3863 else {
3864 rb_ractor_t *cr = GET_RACTOR();
3865
3866 if (cr->local_storage && st_lookup(cr->local_storage, (st_data_t)key, (st_data_t *)pret)) {
3867 return true;
3868 }
3869 else {
3870 return false;
3871 }
3872 }
3873}
3874
3875static void
3876ractor_local_set(rb_ractor_local_key_t key, void *ptr)
3877{
3878 rb_ractor_t *cr = GET_RACTOR();
3879
3880 if (cr->local_storage == NULL) {
3881 cr->local_storage = st_init_numtable();
3882 }
3883
3884 st_insert(cr->local_storage, (st_data_t)key, (st_data_t)ptr);
3885
3886 if (rb_ractor_main_p()) {
3887 key->main_cache = ptr;
3888 }
3889}
3890
3891VALUE
3893{
3894 void *val;
3895 if (ractor_local_ref(key, &val)) {
3896 return (VALUE)val;
3897 }
3898 else {
3899 return Qnil;
3900 }
3901}
3902
3903bool
3905{
3906 if (ractor_local_ref(key, (void **)val)) {
3907 return true;
3908 }
3909 else {
3910 return false;
3911 }
3912}
3913
3914void
3916{
3917 ractor_local_set(key, (void *)val);
3918}
3919
3920void *
3922{
3923 void *ret;
3924 if (ractor_local_ref(key, &ret)) {
3925 return ret;
3926 }
3927 else {
3928 return NULL;
3929 }
3930}
3931
3932void
3934{
3935 ractor_local_set(key, ptr);
3936}
3937
3938#define DEFAULT_KEYS_CAPA 0x10
3939
3940void
3941rb_ractor_finish_marking(void)
3942{
3943 for (int i=0; i<freed_ractor_local_keys.cnt; i++) {
3944 ruby_xfree(freed_ractor_local_keys.keys[i]);
3945 }
3946 freed_ractor_local_keys.cnt = 0;
3947 if (freed_ractor_local_keys.capa > DEFAULT_KEYS_CAPA) {
3948 freed_ractor_local_keys.capa = DEFAULT_KEYS_CAPA;
3949 REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, DEFAULT_KEYS_CAPA);
3950 }
3951}
3952
3953static VALUE
3954ractor_local_value(rb_execution_context_t *ec, VALUE self, VALUE sym)
3955{
3956 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
3957 ID id = rb_check_id(&sym);
3958 struct rb_id_table *tbl = cr->idkey_local_storage;
3959 VALUE val;
3960
3961 if (id && tbl && rb_id_table_lookup(tbl, id, &val)) {
3962 return val;
3963 }
3964 else {
3965 return Qnil;
3966 }
3967}
3968
3969static VALUE
3970ractor_local_value_set(rb_execution_context_t *ec, VALUE self, VALUE sym, VALUE val)
3971{
3972 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
3973 ID id = SYM2ID(rb_to_symbol(sym));
3974 struct rb_id_table *tbl = cr->idkey_local_storage;
3975
3976 if (tbl == NULL) {
3977 tbl = cr->idkey_local_storage = rb_id_table_create(2);
3978 }
3979 rb_id_table_insert(tbl, id, val);
3980 return val;
3981}
3982
3985 struct rb_id_table *tbl;
3986 ID id;
3987 VALUE sym;
3988};
3989
3990static VALUE
3991ractor_local_value_store_i(VALUE ptr)
3992{
3993 VALUE val;
3995
3996 if (rb_id_table_lookup(data->tbl, data->id, &val)) {
3997 // after synchronization, we found already registered entry
3998 }
3999 else {
4000 val = rb_yield(Qnil);
4001 ractor_local_value_set(data->ec, Qnil, data->sym, val);
4002 }
4003 return val;
4004}
4005
4006static VALUE
4007ractor_local_value_store_if_absent(rb_execution_context_t *ec, VALUE self, VALUE sym)
4008{
4009 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
4010 struct ractor_local_storage_store_data data = {
4011 .ec = ec,
4012 .sym = sym,
4013 .id = SYM2ID(rb_to_symbol(sym)),
4014 .tbl = cr->idkey_local_storage,
4015 };
4016 VALUE val;
4017
4018 if (data.tbl == NULL) {
4019 data.tbl = cr->idkey_local_storage = rb_id_table_create(2);
4020 }
4021 else if (rb_id_table_lookup(data.tbl, data.id, &val)) {
4022 // already set
4023 return val;
4024 }
4025
4026 if (!cr->local_storage_store_lock) {
4027 cr->local_storage_store_lock = rb_mutex_new();
4028 }
4029
4030 return rb_mutex_synchronize(cr->local_storage_store_lock, ractor_local_value_store_i, (VALUE)&data);
4031}
4032
4033// Ractor::Channel (emulate with Ractor)
4034
4036
4037static VALUE
4038ractor_channel_func(RB_BLOCK_CALL_FUNC_ARGLIST(y, c))
4039{
4040 rb_execution_context_t *ec = GET_EC();
4041 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
4042
4043 while (1) {
4044 int state;
4045
4046 EC_PUSH_TAG(ec);
4047 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
4048 VALUE obj = ractor_receive(ec, cr);
4049 ractor_yield(ec, cr, obj, Qfalse);
4050 }
4051 EC_POP_TAG();
4052
4053 if (state) {
4054 // ignore the error
4055 break;
4056 }
4057 }
4058
4059 return Qnil;
4060}
4061
4062static VALUE
4063rb_ractor_channel_new(void)
4064{
4065#if 0
4066 return rb_funcall(rb_const_get(rb_cRactor, rb_intern("Channel")), rb_intern("new"), 0);
4067#else
4068 // class Channel
4069 // def self.new
4070 // Ractor.new do # func body
4071 // while true
4072 // obj = Ractor.receive
4073 // Ractor.yield obj
4074 // end
4075 // rescue Ractor::ClosedError
4076 // nil
4077 // end
4078 // end
4079 // end
4080
4081 return ractor_create_func(rb_cRactor, Qnil, rb_str_new2("Ractor/channel"), rb_ary_new(), ractor_channel_func);
4082#endif
4083}
4084
4085static VALUE
4086rb_ractor_channel_yield(rb_execution_context_t *ec, VALUE vch, VALUE obj)
4087{
4088 VM_ASSERT(ec == rb_current_ec_noinline());
4089 rb_ractor_channel_t *ch = RACTOR_PTR(vch);
4090
4091 ractor_send(ec, (rb_ractor_t *)ch, obj, Qfalse);
4092 return Qnil;
4093}
4094
4095static VALUE
4096rb_ractor_channel_take(rb_execution_context_t *ec, VALUE vch)
4097{
4098 VM_ASSERT(ec == rb_current_ec_noinline());
4099 rb_ractor_channel_t *ch = RACTOR_PTR(vch);
4100
4101 return ractor_take(ec, (rb_ractor_t *)ch);
4102}
4103
4104static VALUE
4105rb_ractor_channel_close(rb_execution_context_t *ec, VALUE vch)
4106{
4107 VM_ASSERT(ec == rb_current_ec_noinline());
4108 rb_ractor_channel_t *ch = RACTOR_PTR(vch);
4109
4110 ractor_close_incoming(ec, (rb_ractor_t *)ch);
4111 return ractor_close_outgoing(ec, (rb_ractor_t *)ch);
4112}
4113
4114// Ractor#require
4115
4117 VALUE ch;
4118 VALUE result;
4119 VALUE exception;
4120
4121 // require
4122 VALUE feature;
4123
4124 // autoload
4125 VALUE module;
4126 ID name;
4127};
4128
4129static VALUE
4130require_body(VALUE data)
4131{
4132 struct cross_ractor_require *crr = (struct cross_ractor_require *)data;
4133
4134 ID require;
4135 CONST_ID(require, "require");
4136 crr->result = rb_funcallv(Qnil, require, 1, &crr->feature);
4137
4138 return Qnil;
4139}
4140
4141static VALUE
4142require_rescue(VALUE data, VALUE errinfo)
4143{
4144 struct cross_ractor_require *crr = (struct cross_ractor_require *)data;
4145 crr->exception = errinfo;
4146 return Qundef;
4147}
4148
4149static VALUE
4150require_result_copy_body(VALUE data)
4151{
4152 struct cross_ractor_require *crr = (struct cross_ractor_require *)data;
4153
4154 if (crr->exception != Qundef) {
4155 VM_ASSERT(crr->result == Qundef);
4156 crr->exception = ractor_copy(crr->exception);
4157 }
4158 else{
4159 VM_ASSERT(crr->result != Qundef);
4160 crr->result = ractor_copy(crr->result);
4161 }
4162
4163 return Qnil;
4164}
4165
4166static VALUE
4167require_result_copy_resuce(VALUE data, VALUE errinfo)
4168{
4169 struct cross_ractor_require *crr = (struct cross_ractor_require *)data;
4170 crr->exception = errinfo; // ractor_move(crr->exception);
4171 return Qnil;
4172}
4173
4174static VALUE
4175ractor_require_protect(struct cross_ractor_require *crr, VALUE (*func)(VALUE))
4176{
4177 // catch any error
4178 rb_rescue2(func, (VALUE)crr,
4179 require_rescue, (VALUE)crr, rb_eException, 0);
4180
4181 rb_rescue2(require_result_copy_body, (VALUE)crr,
4182 require_result_copy_resuce, (VALUE)crr, rb_eException, 0);
4183
4184 rb_ractor_channel_yield(GET_EC(), crr->ch, Qtrue);
4185 return Qnil;
4186
4187}
4188
4189static VALUE
4190ractore_require_func(void *data)
4191{
4192 struct cross_ractor_require *crr = (struct cross_ractor_require *)data;
4193 return ractor_require_protect(crr, require_body);
4194}
4195
4196VALUE
4197rb_ractor_require(VALUE feature)
4198{
4199 // TODO: make feature shareable
4200 struct cross_ractor_require crr = {
4201 .feature = feature, // TODO: ractor
4202 .ch = rb_ractor_channel_new(),
4203 .result = Qundef,
4204 .exception = Qundef,
4205 };
4206
4207 rb_execution_context_t *ec = GET_EC();
4208 rb_ractor_t *main_r = GET_VM()->ractor.main_ractor;
4209 rb_ractor_interrupt_exec(main_r, ractore_require_func, &crr, 0);
4210
4211 // wait for require done
4212 rb_ractor_channel_take(ec, crr.ch);
4213 rb_ractor_channel_close(ec, crr.ch);
4214
4215 if (crr.exception != Qundef) {
4216 rb_exc_raise(crr.exception);
4217 }
4218 else {
4219 return crr.result;
4220 }
4221}
4222
4223static VALUE
4224ractor_require(rb_execution_context_t *ec, VALUE self, VALUE feature)
4225{
4226 return rb_ractor_require(feature);
4227}
4228
4229static VALUE
4230autoload_load_body(VALUE data)
4231{
4232 struct cross_ractor_require *crr = (struct cross_ractor_require *)data;
4233 crr->result = rb_autoload_load(crr->module, crr->name);
4234 return Qnil;
4235}
4236
4237static VALUE
4238ractor_autoload_load_func(void *data)
4239{
4240 struct cross_ractor_require *crr = (struct cross_ractor_require *)data;
4241 return ractor_require_protect(crr, autoload_load_body);
4242}
4243
4244VALUE
4245rb_ractor_autoload_load(VALUE module, ID name)
4246{
4247 struct cross_ractor_require crr = {
4248 .module = module,
4249 .name = name,
4250 .ch = rb_ractor_channel_new(),
4251 .result = Qundef,
4252 .exception = Qundef,
4253 };
4254
4255 rb_execution_context_t *ec = GET_EC();
4256 rb_ractor_t *main_r = GET_VM()->ractor.main_ractor;
4257 rb_ractor_interrupt_exec(main_r, ractor_autoload_load_func, &crr, 0);
4258
4259 // wait for require done
4260 rb_ractor_channel_take(ec, crr.ch);
4261 rb_ractor_channel_close(ec, crr.ch);
4262
4263 if (crr.exception != Qundef) {
4264 rb_exc_raise(crr.exception);
4265 }
4266 else {
4267 return crr.result;
4268 }
4269}
4270
4271#include "ractor.rbinc"
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
#define RUBY_ATOMIC_CAS(var, oldval, newval)
Atomic compare-and-swap.
Definition atomic.h:140
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define RUBY_ATOMIC_FETCH_ADD(var, val)
Atomically replaces the value pointed by var with the result of addition of val to the old value of v...
Definition atomic.h:93
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
#define rb_define_singleton_method(klass, mid, func, arity)
Defines klass.mid.
static VALUE RB_OBJ_FROZEN_RAW(VALUE obj)
This is an implementation detail of RB_OBJ_FROZEN().
Definition fl_type.h:870
@ RUBY_FL_SHAREABLE
This flag has something to do with Ractor.
Definition fl_type.h:265
VALUE rb_define_class(const char *name, VALUE super)
Defines a top-level class.
Definition class.c:1479
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition class.c:1515
int rb_scan_args(int argc, const VALUE *argv, const char *fmt,...)
Retrieves argument from argc and argv to given VALUE references according to the format string.
Definition class.c:3133
int rb_get_kwargs(VALUE keyword_hash, const ID *table, int required, int optional, VALUE *values)
Keyword argument deconstructor.
Definition class.c:2922
#define rb_str_new2
Old name of rb_str_new_cstr.
Definition string.h:1675
#define T_COMPLEX
Old name of RUBY_T_COMPLEX.
Definition value_type.h:59
#define T_FILE
Old name of RUBY_T_FILE.
Definition value_type.h:62
#define FL_EXIVAR
Old name of RUBY_FL_EXIVAR.
Definition fl_type.h:65
#define REALLOC_N
Old name of RB_REALLOC_N.
Definition memory.h:403
#define ALLOC
Old name of RB_ALLOC.
Definition memory.h:400
#define T_STRING
Old name of RUBY_T_STRING.
Definition value_type.h:78
#define Qundef
Old name of RUBY_Qundef.
#define T_FLOAT
Old name of RUBY_T_FLOAT.
Definition value_type.h:64
#define T_IMEMO
Old name of RUBY_T_IMEMO.
Definition value_type.h:67
#define ID2SYM
Old name of RB_ID2SYM.
Definition symbol.h:44
#define T_BIGNUM
Old name of RUBY_T_BIGNUM.
Definition value_type.h:57
#define SPECIAL_CONST_P
Old name of RB_SPECIAL_CONST_P.
#define T_STRUCT
Old name of RUBY_T_STRUCT.
Definition value_type.h:79
#define SYM2ID
Old name of RB_SYM2ID.
Definition symbol.h:45
#define T_DATA
Old name of RUBY_T_DATA.
Definition value_type.h:60
#define T_MODULE
Old name of RUBY_T_MODULE.
Definition value_type.h:70
#define T_RATIONAL
Old name of RUBY_T_RATIONAL.
Definition value_type.h:76
#define T_ICLASS
Old name of RUBY_T_ICLASS.
Definition value_type.h:66
#define T_HASH
Old name of RUBY_T_HASH.
Definition value_type.h:65
#define FL_TEST_RAW
Old name of RB_FL_TEST_RAW.
Definition fl_type.h:131
#define Qtrue
Old name of RUBY_Qtrue.
#define Qnil
Old name of RUBY_Qnil.
#define Qfalse
Old name of RUBY_Qfalse.
#define T_ARRAY
Old name of RUBY_T_ARRAY.
Definition value_type.h:56
#define T_OBJECT
Old name of RUBY_T_OBJECT.
Definition value_type.h:75
#define NIL_P
Old name of RB_NIL_P.
#define FL_WB_PROTECTED
Old name of RUBY_FL_WB_PROTECTED.
Definition fl_type.h:59
#define T_SYMBOL
Old name of RUBY_T_SYMBOL.
Definition value_type.h:80
#define T_MATCH
Old name of RUBY_T_MATCH.
Definition value_type.h:69
#define T_CLASS
Old name of RUBY_T_CLASS.
Definition value_type.h:58
#define BUILTIN_TYPE
Old name of RB_BUILTIN_TYPE.
Definition value_type.h:85
#define FL_FREEZE
Old name of RUBY_FL_FREEZE.
Definition fl_type.h:66
#define CONST_ID
Old name of RUBY_CONST_ID.
Definition symbol.h:47
#define FL_SET_RAW
Old name of RB_FL_SET_RAW.
Definition fl_type.h:129
#define T_REGEXP
Old name of RUBY_T_REGEXP.
Definition value_type.h:77
void rb_exc_raise(VALUE mesg)
Raises an exception in the current thread.
Definition eval.c:676
int rb_typeddata_is_kind_of(VALUE obj, const rb_data_type_t *data_type)
Checks if the given object is of given kind.
Definition error.c:1380
VALUE rb_eRuntimeError
RuntimeError exception.
Definition error.c:1428
VALUE rb_eStopIteration
StopIteration exception.
Definition enumerator.c:181
VALUE rb_exc_new_str(VALUE etype, VALUE str)
Identical to rb_exc_new_cstr(), except it takes a Ruby's string instead of C's.
Definition error.c:1481
VALUE rb_eException
Mother of all exceptions.
Definition error.c:1422
VALUE rb_cRactor
Ractor class.
Definition ractor.c:23
VALUE rb_stdin
STDIN constant.
Definition io.c:201
VALUE rb_stderr
STDERR constant.
Definition io.c:201
static VALUE rb_class_of(VALUE obj)
Object to class mapping function.
Definition globals.h:174
VALUE rb_cBasicObject
BasicObject class.
Definition object.c:64
VALUE rb_obj_clone(VALUE obj)
Produces a shallow copy of the given object.
Definition object.c:534
VALUE rb_stdout
STDOUT constant.
Definition io.c:201
#define RB_OBJ_WRITTEN(old, oldv, young)
Identical to RB_OBJ_WRITE(), except it doesn't write any values, but only a WB declaration.
Definition gc.h:615
#define RB_OBJ_WRITE(old, slot, young)
Declaration of a "back" pointer.
Definition gc.h:603
VALUE rb_funcall(VALUE recv, ID mid, int n,...)
Calls a method.
Definition vm_eval.c:1116
#define RGENGC_WB_PROTECTED_STRUCT
This is a compile-time flag to enable/disable write barrier for struct RStruct.
Definition gc.h:468
#define RGENGC_WB_PROTECTED_STRING
This is a compile-time flag to enable/disable write barrier for struct RString.
Definition gc.h:479
#define RGENGC_WB_PROTECTED_HASH
This is a compile-time flag to enable/disable write barrier for struct RHash.
Definition gc.h:457
#define RGENGC_WB_PROTECTED_MATCH
This is a compile-time flag to enable/disable write barrier for struct RMatch.
Definition gc.h:512
#define RGENGC_WB_PROTECTED_ARRAY
This is a compile-time flag to enable/disable write barrier for struct RArray.
Definition gc.h:446
#define RGENGC_WB_PROTECTED_COMPLEX
This is a compile-time flag to enable/disable write barrier for struct RComplex.
Definition gc.h:545
#define RGENGC_WB_PROTECTED_FLOAT
This is a compile-time flag to enable/disable write barrier for struct RFloat.
Definition gc.h:534
#define RGENGC_WB_PROTECTED_RATIONAL
This is a compile-time flag to enable/disable write barrier for struct RRational.
Definition gc.h:556
#define RGENGC_WB_PROTECTED_REGEXP
This is a compile-time flag to enable/disable write barrier for struct RRegexp.
Definition gc.h:501
#define RGENGC_WB_PROTECTED_OBJECT
This is a compile-time flag to enable/disable write barrier for struct RObject.
Definition gc.h:490
VALUE rb_ary_new(void)
Allocates a new, empty array.
VALUE rb_ary_push(VALUE ary, VALUE elem)
Special case of rb_ary_cat() that it adds only one element.
VALUE rb_ary_entry(VALUE ary, long off)
Queries an element of an array.
VALUE rb_obj_is_proc(VALUE recv)
Queries if the given object is a proc.
Definition proc.c:119
#define rb_exc_new_cstr(exc, str)
Identical to rb_exc_new(), except it assumes the passed pointer is a pointer to a C string.
Definition string.h:1670
VALUE rb_str_new_frozen(VALUE str)
Creates a frozen copy of the string, if necessary.
Definition string.c:1841
VALUE rb_mutex_new(void)
Creates a mutex.
VALUE rb_mutex_synchronize(VALUE mutex, VALUE(*func)(VALUE arg), VALUE arg)
Obtains the lock, runs the passed function, and releases the lock when it completes.
void rb_unblock_function_t(void *)
This is the type of UBFs.
Definition thread.h:336
VALUE rb_mutex_unlock(VALUE mutex)
Releases the mutex.
VALUE rb_mutex_lock(VALUE mutex)
Attempts to lock the mutex.
void rb_thread_sleep(int sec)
Blocks for the given period of time.
Definition thread.c:1455
VALUE rb_const_get(VALUE space, ID name)
Identical to rb_const_defined(), except it returns the actual defined value.
Definition variable.c:3651
VALUE rb_ivar_set(VALUE obj, ID name, VALUE val)
Identical to rb_iv_set(), except it accepts the name as an ID instead of a C string.
Definition variable.c:2125
VALUE rb_autoload_load(VALUE space, ID name)
Kicks the autoload procedure as if it was "touched".
Definition variable.c:3486
void rb_undef_alloc_func(VALUE klass)
Deletes the allocator function of a class.
Definition vm_method.c:1387
ID rb_check_id(volatile VALUE *namep)
Detects if the given name is already interned or not.
Definition symbol.c:1134
VALUE rb_to_symbol(VALUE name)
Identical to rb_intern_str(), except it generates a dynamic symbol if necessary.
Definition string.c:12943
int len
Length of the buffer.
Definition io.h:8
const struct rb_ractor_local_storage_type rb_ractor_local_storage_type_free
A type of ractor-local storage that destructs itself using ruby_xfree.
Definition ractor.c:3812
VALUE rb_ractor_make_shareable_copy(VALUE obj)
Identical to rb_ractor_make_shareable(), except it returns a (deep) copy of the passed one instead of...
Definition ractor.c:3204
struct rb_ractor_local_key_struct * rb_ractor_local_key_t
(Opaque) struct that holds a ractor-local storage key.
Definition ractor.h:42
void * rb_ractor_local_storage_ptr(rb_ractor_local_key_t key)
Identical to rb_ractor_local_storage_value() except the return type.
Definition ractor.c:3921
void rb_ractor_local_storage_ptr_set(rb_ractor_local_key_t key, void *ptr)
Identical to rb_ractor_local_storage_value_set() except the parameter type.
Definition ractor.c:3933
rb_ractor_local_key_t rb_ractor_local_storage_ptr_newkey(const struct rb_ractor_local_storage_type *type)
Extended version of rb_ractor_local_storage_value_newkey().
Definition ractor.c:3823
VALUE rb_ractor_stdin(void)
Queries the standard input of the current Ractor that is calling this function.
Definition ractor.c:2820
static bool rb_ractor_shareable_p(VALUE obj)
Queries if multiple Ractors can share the passed object or not.
Definition ractor.h:249
void rb_ractor_stderr_set(VALUE io)
Assigns an IO to the standard error of the Ractor that is calling this function.
Definition ractor.c:2880
void rb_ractor_local_storage_value_set(rb_ractor_local_key_t key, VALUE val)
Associates the passed value to the passed key.
Definition ractor.c:3915
bool rb_ractor_local_storage_value_lookup(rb_ractor_local_key_t key, VALUE *val)
Queries the key.
Definition ractor.c:3904
#define RB_OBJ_SHAREABLE_P(obj)
Queries if the passed object has previously classified as shareable or not.
Definition ractor.h:235
VALUE rb_ractor_make_shareable(VALUE obj)
Destructively transforms the passed object so that multiple Ractors can share it.
Definition ractor.c:3195
rb_ractor_local_key_t rb_ractor_local_storage_value_newkey(void)
Issues a new key.
Definition ractor.c:3832
void rb_ractor_stdout_set(VALUE io)
Assigns an IO to the standard output of the Ractor that is calling this function.
Definition ractor.c:2868
void rb_ractor_stdin_set(VALUE io)
Assigns an IO to the standard input of the Ractor that is calling this function.
Definition ractor.c:2856
VALUE rb_ractor_local_storage_value(rb_ractor_local_key_t key)
Queries the key.
Definition ractor.c:3892
#define RB_NOGVL_INTR_FAIL
Passing this flag to rb_nogvl() prevents it from checking interrupts.
Definition thread.h:48
void * rb_nogvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2, int flags)
Identical to rb_thread_call_without_gvl(), except it additionally takes "flags" that change the behav...
Definition thread.c:1537
#define RB_BLOCK_CALL_FUNC_ARGLIST(yielded_arg, callback_arg)
Shim for block function parameters.
Definition iterator.h:58
VALUE rb_yield(VALUE val)
Yields the block.
Definition vm_eval.c:1371
rb_block_call_func * rb_block_call_func_t
Shorthand type that represents an iterator-written-in-C function pointer.
Definition iterator.h:88
#define MEMZERO(p, type, n)
Handy macro to erase a region of memory.
Definition memory.h:360
#define RB_GC_GUARD(v)
Prevents premature destruction of local objects.
Definition memory.h:167
VALUE rb_proc_new(type *q, VALUE w)
Creates a rb_cProc instance.
VALUE type(ANYARGS)
ANYARGS-ed function type.
void rb_hash_foreach(VALUE q, int_type *w, VALUE e)
Iteration over the given hash.
void rb_ivar_foreach(VALUE q, int_type *w, VALUE e)
Iteration over each instance variable of the object.
VALUE rb_rescue2(type *q, VALUE w, type *e, VALUE r,...)
An equivalent of rescue clause.
VALUE rb_ensure(type *q, VALUE w, type *e, VALUE r)
An equivalent of ensure clause.
static int RARRAY_LENINT(VALUE ary)
Identical to rb_array_len(), except it differs for the return type.
Definition rarray.h:281
static void RARRAY_ASET(VALUE ary, long i, VALUE v)
Assigns an object in an array.
Definition rarray.h:386
#define RARRAY_AREF(a, i)
Definition rarray.h:403
#define RARRAY_CONST_PTR
Just another name of rb_array_const_ptr.
Definition rarray.h:52
#define RBASIC(obj)
Convenient casting macro.
Definition rbasic.h:40
#define DATA_PTR(obj)
Convenient getter macro.
Definition rdata.h:67
#define RHASH_SET_IFNONE(h, ifnone)
Destructively updates the default value of the hash.
Definition rhash.h:92
#define RHASH_IFNONE(h)
Definition rhash.h:59
static VALUE * ROBJECT_FIELDS(VALUE obj)
Queries the instance variables.
Definition robject.h:126
#define StringValueCStr(v)
Identical to StringValuePtr, except it additionally checks for the contents for viability as a C stri...
Definition rstring.h:89
static bool RTYPEDDATA_P(VALUE obj)
Checks whether the passed object is RTypedData or RData.
Definition rtypeddata.h:579
#define TypedData_Wrap_Struct(klass, data_type, sval)
Converts sval, a pointer to your struct, into a Ruby object.
Definition rtypeddata.h:450
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:498
static const struct rb_data_type_struct * RTYPEDDATA_TYPE(VALUE obj)
Queries for the type of given object.
Definition rtypeddata.h:602
static bool RB_SPECIAL_CONST_P(VALUE obj)
Checks if the given object is of enum ruby_special_consts.
#define RTEST
This is an old name of RB_TEST.
Ruby object's base components.
Definition rbasic.h:63
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:203
Type that defines a ractor-local storage.
Definition ractor.h:21
void(* free)(void *ptr)
A function to destruct a ractor-local storage.
Definition ractor.h:37
void(* mark)(void *ptr)
A function to mark a ractor-local storage.
Definition ractor.h:29
Definition st.h:79
Definition string.c:8738
void rb_native_mutex_lock(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_lock.
void rb_native_cond_initialize(rb_nativethread_cond_t *cond)
Fills the passed condition variable with an initial value.
void rb_native_mutex_initialize(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_initialize.
void rb_native_mutex_unlock(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_unlock.
void rb_native_mutex_destroy(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_destroy.
void rb_native_cond_signal(rb_nativethread_cond_t *cond)
Signals a condition variable.
void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex)
Waits for the passed condition variable to be signalled.
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40
static enum ruby_value_type RB_BUILTIN_TYPE(VALUE obj)
Queries the type of the object.
Definition value_type.h:182
static bool RB_TYPE_P(VALUE obj, enum ruby_value_type t)
Queries if the given object is of given type.
Definition value_type.h:376
@ RUBY_T_MASK
Bitmask of ruby_value_type.
Definition value_type.h:145