Ruby 4.0.0dev (2025-12-07 revision 4080abecd6f7b2ce6f7e6a6d1801d3d9fcfa9a58)
thread_pthread_mn.c (4080abecd6f7b2ce6f7e6a6d1801d3d9fcfa9a58)
1// included by "thread_pthread.c"
2
3#if USE_MN_THREADS
4
5static void timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags);
6static void timer_thread_wakeup_thread_locked(struct rb_thread_sched *sched, rb_thread_t *th, uint32_t event_serial);
7
8static bool
9timer_thread_cancel_waiting(rb_thread_t *th)
10{
11 bool canceled = false;
12
13 rb_native_mutex_lock(&timer_th.waiting_lock);
14 {
15 if (th->sched.waiting_reason.flags) {
16 canceled = true;
17 ccan_list_del_init(&th->sched.waiting_reason.node);
18 timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
19 th->sched.waiting_reason.flags = thread_sched_waiting_none;
20 }
21 }
22 rb_native_mutex_unlock(&timer_th.waiting_lock);
23
24 return canceled;
25}
26
27static void
28ubf_event_waiting(void *ptr)
29{
30 rb_thread_t *th = (rb_thread_t *)ptr;
31 struct rb_thread_sched *sched = TH_SCHED(th);
32
33 RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
34
35 VM_ASSERT(th->nt == NULL || !th_has_dedicated_nt(th));
36
37 // only once. it is safe because th->interrupt_lock is already acquired.
38 th->unblock.func = NULL;
39 th->unblock.arg = NULL;
40
41 thread_sched_lock(sched, th);
42 {
43 bool canceled = timer_thread_cancel_waiting(th);
44
45 if (sched->running == th) {
46 RUBY_DEBUG_LOG("not waiting yet");
47 }
48 else if (canceled) {
49 thread_sched_to_ready_common(sched, th, true, false);
50 }
51 else {
52 RUBY_DEBUG_LOG("already not waiting");
53 }
54 }
55 thread_sched_unlock(sched, th);
56}
57
58static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial);
59
60// return true if timed out
61static bool
62thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
63{
64 VM_ASSERT(!th_has_dedicated_nt(th)); // on SNT
65
66 volatile bool timedout = false, need_cancel = false;
67
68 uint32_t event_serial = ++th->event_serial; // overflow is okay
69
70 if (ubf_set(th, ubf_event_waiting, (void *)th)) {
71 return false;
72 }
73
74 thread_sched_lock(sched, th);
75 {
76 if (timer_thread_register_waiting(th, fd, events, rel, event_serial)) {
77 RUBY_DEBUG_LOG("wait fd:%d", fd);
78
79 RB_VM_SAVE_MACHINE_CONTEXT(th);
80
81 RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
82
83 if (th->sched.waiting_reason.flags == thread_sched_waiting_none) {
84 th->event_serial++;
85 // timer thread has dequeued us already, but it won't try to wake us because we bumped our serial
86 }
87 else if (RUBY_VM_INTERRUPTED(th->ec)) {
88 th->event_serial++; // make sure timer thread doesn't try to wake us
89 need_cancel = true;
90 }
91 else {
92 RUBY_DEBUG_LOG("sleep");
93
94 th->status = THREAD_STOPPED_FOREVER;
95 thread_sched_wakeup_next_thread(sched, th, true);
96 thread_sched_wait_running_turn(sched, th, true);
97
98 RUBY_DEBUG_LOG("wakeup");
99 }
100
101 timedout = th->sched.waiting_reason.data.result == 0;
102
103 if (need_cancel) {
104 timer_thread_cancel_waiting(th);
105 }
106
107 th->status = THREAD_RUNNABLE;
108 }
109 else {
110 RUBY_DEBUG_LOG("can not wait fd:%d", fd);
111 timedout = false;
112 }
113 }
114 thread_sched_unlock(sched, th);
115
116 // if ubf triggered between sched unlock and ubf clear, sched->running == th here
117 ubf_clear(th);
118
119 VM_ASSERT(sched->running == th);
120
121 return timedout;
122}
123
125
126static int
127get_sysconf_page_size(void)
128{
129 static long page_size = 0;
130
131 if (UNLIKELY(page_size == 0)) {
132 page_size = sysconf(_SC_PAGESIZE);
133 VM_ASSERT(page_size < INT_MAX);
134 }
135 return (int)page_size;
136}
137
138#define MSTACK_CHUNK_SIZE (512 * 1024 * 1024) // 512MB
139#define MSTACK_PAGE_SIZE get_sysconf_page_size()
140#define MSTACK_CHUNK_PAGE_NUM (MSTACK_CHUNK_SIZE / MSTACK_PAGE_SIZE - 1) // 1 is start redzone
141
142// 512MB chunk
143// 131,072 pages (> 65,536)
144// 0th page is Redzone. Start from 1st page.
145
146/*
147 * <--> machine stack + vm stack
148 * ----------------------------------
149 * |HD...|RZ| ... |RZ| ... ... |RZ|
150 * <------------- 512MB ------------->
151 */
152
153static struct nt_stack_chunk_header {
154 struct nt_stack_chunk_header *prev_chunk;
155 struct nt_stack_chunk_header *prev_free_chunk;
156
157 uint16_t start_page;
158 uint16_t stack_count;
159 uint16_t uninitialized_stack_count;
160
161 uint16_t free_stack_pos;
162 uint16_t free_stack[];
163} *nt_stack_chunks = NULL,
164 *nt_free_stack_chunks = NULL;
165
166struct nt_machine_stack_footer {
167 struct nt_stack_chunk_header *ch;
168 size_t index;
169};
170
171static rb_nativethread_lock_t nt_machine_stack_lock = RB_NATIVETHREAD_LOCK_INIT;
172
173#include <sys/mman.h>
174
175// vm_stack_size + machine_stack_size + 1 * (guard page size)
176static inline size_t
177nt_thread_stack_size(void)
178{
179 static size_t msz;
180 if (LIKELY(msz > 0)) return msz;
181
182 rb_vm_t *vm = GET_VM();
183 int sz = (int)(vm->default_params.thread_vm_stack_size + vm->default_params.thread_machine_stack_size + MSTACK_PAGE_SIZE);
184 int page_num = roomof(sz, MSTACK_PAGE_SIZE);
185 msz = (size_t)page_num * MSTACK_PAGE_SIZE;
186 return msz;
187}
188
189static struct nt_stack_chunk_header *
190nt_alloc_thread_stack_chunk(void)
191{
192 int mmap_flags = MAP_ANONYMOUS | MAP_PRIVATE;
193#if defined(MAP_STACK) && !defined(__FreeBSD__) && !defined(__FreeBSD_kernel__)
194 mmap_flags |= MAP_STACK;
195#endif
196
197 const char *m = (void *)mmap(NULL, MSTACK_CHUNK_SIZE, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
198 if (m == MAP_FAILED) {
199 return NULL;
200 }
201
202 ruby_annotate_mmap(m, MSTACK_CHUNK_SIZE, "Ruby:nt_alloc_thread_stack_chunk");
203
204 size_t msz = nt_thread_stack_size();
205 int header_page_cnt = 1;
206 int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
207 int ch_size = sizeof(struct nt_stack_chunk_header) + sizeof(uint16_t) * stack_count;
208
209 if (ch_size > MSTACK_PAGE_SIZE * header_page_cnt) {
210 header_page_cnt = (ch_size + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE;
211 stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
212 }
213
214 VM_ASSERT(stack_count <= UINT16_MAX);
215
216 struct nt_stack_chunk_header *ch = (struct nt_stack_chunk_header *)m;
217
218 ch->start_page = header_page_cnt;
219 ch->prev_chunk = nt_stack_chunks;
220 ch->prev_free_chunk = nt_free_stack_chunks;
221 ch->uninitialized_stack_count = ch->stack_count = (uint16_t)stack_count;
222 ch->free_stack_pos = 0;
223
224 RUBY_DEBUG_LOG("ch:%p start_page:%d stack_cnt:%d stack_size:%d", ch, (int)ch->start_page, (int)ch->stack_count, (int)msz);
225
226 return ch;
227}
228
229static void *
230nt_stack_chunk_get_stack_start(struct nt_stack_chunk_header *ch, size_t idx)
231{
232 const char *m = (char *)ch;
233 return (void *)(m + ch->start_page * MSTACK_PAGE_SIZE + idx * nt_thread_stack_size());
234}
235
236static struct nt_machine_stack_footer *
237nt_stack_chunk_get_msf(const rb_vm_t *vm, const char *mstack)
238{
239 // TODO: stack direction
240 const size_t msz = vm->default_params.thread_machine_stack_size;
241 return (struct nt_machine_stack_footer *)&mstack[msz - sizeof(struct nt_machine_stack_footer)];
242}
243
244static void *
245nt_stack_chunk_get_stack(const rb_vm_t *vm, struct nt_stack_chunk_header *ch, size_t idx, void **vm_stack, void **machine_stack)
246{
247 // TODO: only support stack going down
248 // [VM ... <GUARD> machine stack ...]
249
250 const char *vstack, *mstack;
251 const char *guard_page;
252 vstack = nt_stack_chunk_get_stack_start(ch, idx);
253 guard_page = vstack + vm->default_params.thread_vm_stack_size;
254 mstack = guard_page + MSTACK_PAGE_SIZE;
255
256 struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(vm, mstack);
257 msf->ch = ch;
258 msf->index = idx;
259
260#if 0
261 RUBY_DEBUG_LOG("msf:%p vstack:%p-%p guard_page:%p-%p mstack:%p-%p", msf,
262 vstack, (void *)(guard_page-1),
263 guard_page, (void *)(mstack-1),
264 mstack, (void *)(msf));
265#endif
266
267 *vm_stack = (void *)vstack;
268 *machine_stack = (void *)mstack;
269
270 return (void *)guard_page;
271}
272
274static void
275nt_stack_chunk_dump(void)
276{
277 struct nt_stack_chunk_header *ch;
278 int i;
279
280 fprintf(stderr, "** nt_stack_chunks\n");
281 ch = nt_stack_chunks;
282 for (i=0; ch; i++, ch = ch->prev_chunk) {
283 fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos);
284 }
285
286 fprintf(stderr, "** nt_free_stack_chunks\n");
287 ch = nt_free_stack_chunks;
288 for (i=0; ch; i++, ch = ch->prev_free_chunk) {
289 fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos);
290 }
291}
292
293static int
294nt_guard_page(const char *p, size_t len)
295{
296 if (mprotect((void *)p, len, PROT_NONE) != -1) {
297 return 0;
298 }
299 else {
300 return errno;
301 }
302}
303
304static int
305nt_alloc_stack(rb_vm_t *vm, void **vm_stack, void **machine_stack)
306{
307 int err = 0;
308
309 rb_native_mutex_lock(&nt_machine_stack_lock);
310 {
311 retry:
312 if (nt_free_stack_chunks) {
313 struct nt_stack_chunk_header *ch = nt_free_stack_chunks;
314 if (ch->free_stack_pos > 0) {
315 RUBY_DEBUG_LOG("free_stack_pos:%d", ch->free_stack_pos);
316 nt_stack_chunk_get_stack(vm, ch, ch->free_stack[--ch->free_stack_pos], vm_stack, machine_stack);
317 }
318 else if (ch->uninitialized_stack_count > 0) {
319 RUBY_DEBUG_LOG("uninitialized_stack_count:%d", ch->uninitialized_stack_count);
320
321 size_t idx = ch->stack_count - ch->uninitialized_stack_count--;
322 void *guard_page = nt_stack_chunk_get_stack(vm, ch, idx, vm_stack, machine_stack);
323 err = nt_guard_page(guard_page, MSTACK_PAGE_SIZE);
324 }
325 else {
326 nt_free_stack_chunks = ch->prev_free_chunk;
327 ch->prev_free_chunk = NULL;
328 goto retry;
329 }
330 }
331 else {
332 struct nt_stack_chunk_header *p = nt_alloc_thread_stack_chunk();
333 if (p == NULL) {
334 err = errno;
335 }
336 else {
337 nt_free_stack_chunks = nt_stack_chunks = p;
338 goto retry;
339 }
340 }
341 }
342 rb_native_mutex_unlock(&nt_machine_stack_lock);
343
344 return err;
345}
346
347static void
348nt_madvise_free_or_dontneed(void *addr, size_t len)
349{
350 /* There is no real way to perform error handling here. Both MADV_FREE
351 * and MADV_DONTNEED are both documented to pretty much only return EINVAL
352 * for a huge variety of errors. It's indistinguishable if madvise fails
353 * because the parameters were bad, or because the kernel we're running on
354 * does not support the given advice. This kind of free-but-don't-unmap
355 * is best-effort anyway, so don't sweat it.
356 *
357 * n.b. A very common case of "the kernel doesn't support MADV_FREE and
358 * returns EINVAL" is running under the `rr` debugger; it makes all
359 * MADV_FREE calls return EINVAL. */
360
361#if defined(MADV_FREE)
362 int r = madvise(addr, len, MADV_FREE);
363 // Return on success, or else try MADV_DONTNEED
364 if (r == 0) return;
365#endif
366#if defined(MADV_DONTNEED)
367 madvise(addr, len, MADV_DONTNEED);
368#endif
369}
370
371static void
372nt_free_stack(void *mstack)
373{
374 if (!mstack) return;
375
376 rb_native_mutex_lock(&nt_machine_stack_lock);
377 {
378 struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(GET_VM(), mstack);
379 struct nt_stack_chunk_header *ch = msf->ch;
380 int idx = (int)msf->index;
381 void *stack = nt_stack_chunk_get_stack_start(ch, idx);
382
383 RUBY_DEBUG_LOG("stack:%p mstack:%p ch:%p index:%d", stack, mstack, ch, idx);
384
385 if (ch->prev_free_chunk == NULL) {
386 ch->prev_free_chunk = nt_free_stack_chunks;
387 nt_free_stack_chunks = ch;
388 }
389 ch->free_stack[ch->free_stack_pos++] = idx;
390
391 // clear the stack pages
392 nt_madvise_free_or_dontneed(stack, nt_thread_stack_size());
393 }
394 rb_native_mutex_unlock(&nt_machine_stack_lock);
395}
396
397
398static int
399native_thread_check_and_create_shared(rb_vm_t *vm)
400{
401 bool need_to_make = false;
402
403 rb_native_mutex_lock(&vm->ractor.sched.lock);
404 {
405 unsigned int schedulable_ractor_cnt = vm->ractor.cnt;
406 RUBY_ASSERT(schedulable_ractor_cnt >= 1);
407
408 if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads)
409 schedulable_ractor_cnt--; // do not need snt for main ractor
410
411 unsigned int snt_cnt = vm->ractor.sched.snt_cnt;
412 if (((int)snt_cnt < MINIMUM_SNT) ||
413 (snt_cnt < schedulable_ractor_cnt &&
414 snt_cnt < vm->ractor.sched.max_cpu)) {
415
416 RUBY_DEBUG_LOG("added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u",
417 vm->ractor.sched.snt_cnt,
418 vm->ractor.sched.dnt_cnt,
419 vm->ractor.cnt,
420 vm->ractor.sched.grq_cnt);
421
422 vm->ractor.sched.snt_cnt++;
423 need_to_make = true;
424 }
425 else {
426 RUBY_DEBUG_LOG("snt:%d ractor_cnt:%d", (int)vm->ractor.sched.snt_cnt, (int)vm->ractor.cnt);
427 }
428 }
429 rb_native_mutex_unlock(&vm->ractor.sched.lock);
430
431 if (need_to_make) {
432 struct rb_native_thread *nt = native_thread_alloc();
433 nt->vm = vm;
434 return native_thread_create0(nt);
435 }
436 else {
437 return 0;
438 }
439}
440
441#ifdef __APPLE__
442# define co_start ruby_coroutine_start
443#else
444static
445#endif
446COROUTINE
447co_start(struct coroutine_context *from, struct coroutine_context *self)
448{
449#ifdef RUBY_ASAN_ENABLED
450 __sanitizer_finish_switch_fiber(self->fake_stack,
451 (const void**)&from->stack_base, &from->stack_size);
452#endif
453
454 rb_thread_t *th = (rb_thread_t *)self->argument;
455 struct rb_thread_sched *sched = TH_SCHED(th);
456 VM_ASSERT(th->nt != NULL);
457 VM_ASSERT(th == sched->running);
458 VM_ASSERT(sched->lock_owner == NULL);
459
460 // RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
461
462 thread_sched_set_locked(sched, th);
463 thread_sched_add_running_thread(TH_SCHED(th), th);
464 thread_sched_unlock(sched, th);
465 {
466 RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED, th);
467 call_thread_start_func_2(th);
468 }
469 thread_sched_lock(sched, NULL);
470
471 RUBY_DEBUG_LOG("terminated th:%d", (int)th->serial);
472
473 // Thread is terminated
474
475 struct rb_native_thread *nt = th->nt;
476 bool is_dnt = th_has_dedicated_nt(th);
477 native_thread_assign(NULL, th);
478 rb_ractor_set_current_ec(th->ractor, NULL);
479
480 if (is_dnt) {
481 // SNT became DNT while running. Just return to the nt_context
482
483 th->sched.finished = true;
484 coroutine_transfer0(self, nt->nt_context, true);
485 }
486 else {
487 rb_thread_t *next_th = sched->running;
488
489 if (next_th && !next_th->nt) {
490 // switch to the next thread
491 thread_sched_set_unlocked(sched, NULL);
492 th->sched.finished = true;
493 thread_sched_switch0(th->sched.context, next_th, nt, true);
494 }
495 else {
496 // switch to the next Ractor
497 th->sched.finished = true;
498 coroutine_transfer0(self, nt->nt_context, true);
499 }
500 }
501
502 rb_bug("unreachable");
503}
504
505static int
506native_thread_create_shared(rb_thread_t *th)
507{
508 // setup coroutine
509 rb_vm_t *vm = th->vm;
510 void *vm_stack = NULL, *machine_stack = NULL;
511 int err = nt_alloc_stack(vm, &vm_stack, &machine_stack);
512 if (err) return err;
513
514 VM_ASSERT(vm_stack < machine_stack);
515
516 // setup vm stack
517 size_t vm_stack_words = th->vm->default_params.thread_vm_stack_size/sizeof(VALUE);
518 rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_words);
519
520 // setup machine stack
521 size_t machine_stack_size = vm->default_params.thread_machine_stack_size - sizeof(struct nt_machine_stack_footer);
522 th->ec->machine.stack_start = (void *)((uintptr_t)machine_stack + machine_stack_size);
523 th->ec->machine.stack_maxsize = machine_stack_size; // TODO
524 th->sched.context_stack = machine_stack;
525
526 th->sched.context = ruby_xmalloc(sizeof(struct coroutine_context));
527 coroutine_initialize(th->sched.context, co_start, machine_stack, machine_stack_size);
528 th->sched.context->argument = th;
529
530 RUBY_DEBUG_LOG("th:%u vm_stack:%p machine_stack:%p", rb_th_serial(th), vm_stack, machine_stack);
531 thread_sched_to_ready(TH_SCHED(th), th);
532
533 // setup nt
534 return native_thread_check_and_create_shared(th->vm);
535}
536
537#else // USE_MN_THREADS
538
539static int
540native_thread_create_shared(rb_thread_t *th)
541{
542 rb_bug("unreachable");
543}
544
545static bool
546thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
547{
548 rb_bug("unreachable");
549}
550
551#endif // USE_MN_THREADS
552
554#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS
555
556static bool
557fd_readable_nonblock(int fd)
558{
559 struct pollfd pfd = {
560 .fd = fd,
561 .events = POLLIN,
562 };
563 return poll(&pfd, 1, 0) != 0;
564}
565
566static bool
567fd_writable_nonblock(int fd)
568{
569 struct pollfd pfd = {
570 .fd = fd,
571 .events = POLLOUT,
572 };
573 return poll(&pfd, 1, 0) != 0;
574}
575
576static void
577verify_waiting_list(void)
578{
579#if VM_CHECK_MODE > 0
580 struct rb_thread_sched_waiting *w, *prev_w = NULL;
581
582 // waiting list's timeout order should be [1, 2, 3, ..., 0, 0, 0]
583
584 ccan_list_for_each(&timer_th.waiting, w, node) {
585 // fprintf(stderr, "verify_waiting_list th:%u abs:%lu\n", rb_th_serial(wth), (unsigned long)wth->sched.waiting_reason.data.timeout);
586 if (prev_w) {
587 rb_hrtime_t timeout = w->data.timeout;
588 rb_hrtime_t prev_timeout = w->data.timeout;
589 VM_ASSERT(timeout == 0 || prev_timeout <= timeout);
590 }
591 prev_w = w;
592 }
593#endif
594}
595
596#if HAVE_SYS_EVENT_H // kqueue helpers
597
598static enum thread_sched_waiting_flag
599kqueue_translate_filter_to_flags(int16_t filter)
600{
601 switch (filter) {
602 case EVFILT_READ:
603 return thread_sched_waiting_io_read;
604 case EVFILT_WRITE:
605 return thread_sched_waiting_io_write;
606 case EVFILT_TIMER:
607 return thread_sched_waiting_timeout;
608 default:
609 rb_bug("kevent filter:%d not supported", filter);
610 }
611}
612
613static int
614kqueue_wait(rb_vm_t *vm)
615{
616 struct timespec calculated_timeout;
617 struct timespec *timeout = NULL;
618 int timeout_ms = timer_thread_set_timeout(vm);
619
620 if (timeout_ms >= 0) {
621 calculated_timeout.tv_sec = timeout_ms / 1000;
622 calculated_timeout.tv_nsec = (timeout_ms % 1000) * 1000000;
623 timeout = &calculated_timeout;
624 }
625
626 return kevent(timer_th.event_fd, NULL, 0, timer_th.finished_events, KQUEUE_EVENTS_MAX, timeout);
627}
628
629static void
630kqueue_create(void)
631{
632 if ((timer_th.event_fd = kqueue()) == -1) rb_bug("kqueue creation failed (errno:%d)", errno);
633 int flags = fcntl(timer_th.event_fd, F_GETFD);
634 if (flags == -1) {
635 rb_bug("kqueue GETFD failed (errno:%d)", errno);
636 }
637
638 flags |= FD_CLOEXEC;
639 if (fcntl(timer_th.event_fd, F_SETFD, flags) == -1) {
640 rb_bug("kqueue SETFD failed (errno:%d)", errno);
641 }
642}
643
644static void
645kqueue_unregister_waiting(int fd, enum thread_sched_waiting_flag flags)
646{
647 if (flags) {
648 struct kevent ke[2];
649 int num_events = 0;
650
651 if (flags & thread_sched_waiting_io_read) {
652 EV_SET(&ke[num_events], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
653 num_events++;
654 }
655 if (flags & thread_sched_waiting_io_write) {
656 EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
657 num_events++;
658 }
659 if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
660 perror("kevent");
661 rb_bug("unregister/kevent fails. errno:%d", errno);
662 }
663 }
664}
665
666static bool
667kqueue_already_registered(int fd)
668{
669 struct rb_thread_sched_waiting *w, *found_w = NULL;
670
671 ccan_list_for_each(&timer_th.waiting, w, node) {
672 // Similar to EEXIST in epoll_ctl, but more strict because it checks fd rather than flags
673 // for simplicity
674 if (w->flags && w->data.fd == fd) {
675 found_w = w;
676 break;
677 }
678 }
679 return found_w != NULL;
680}
681
682#endif // HAVE_SYS_EVENT_H
683
684// return false if the fd is not waitable or not need to wait.
685static bool
686timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial)
687{
688 RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (unsigned long)*rel : 0);
689
690 VM_ASSERT(th == NULL || TH_SCHED(th)->running == th);
691 VM_ASSERT(flags != 0);
692
693 rb_hrtime_t abs = 0; // 0 means no timeout
694
695 if (rel) {
696 if (*rel > 0) {
697 flags |= thread_sched_waiting_timeout;
698 }
699 else {
700 return false;
701 }
702 }
703
704 if (rel && *rel > 0) {
705 flags |= thread_sched_waiting_timeout;
706 }
707
708#if HAVE_SYS_EVENT_H
709 struct kevent ke[2];
710 int num_events = 0;
711#else
712 uint32_t epoll_events = 0;
713#endif
714 if (flags & thread_sched_waiting_timeout) {
715 VM_ASSERT(rel != NULL);
716 abs = rb_hrtime_add(rb_hrtime_now(), *rel);
717 }
718
719 if (flags & thread_sched_waiting_io_read) {
720 if (!(flags & thread_sched_waiting_io_force) && fd_readable_nonblock(fd)) {
721 RUBY_DEBUG_LOG("fd_readable_nonblock");
722 return false;
723 }
724 else {
725 VM_ASSERT(fd >= 0);
726#if HAVE_SYS_EVENT_H
727 EV_SET(&ke[num_events], fd, EVFILT_READ, EV_ADD, 0, 0, (void *)th);
728 num_events++;
729#else
730 epoll_events |= EPOLLIN;
731#endif
732 }
733 }
734
735 if (flags & thread_sched_waiting_io_write) {
736 if (!(flags & thread_sched_waiting_io_force) && fd_writable_nonblock(fd)) {
737 RUBY_DEBUG_LOG("fd_writable_nonblock");
738 return false;
739 }
740 else {
741 VM_ASSERT(fd >= 0);
742#if HAVE_SYS_EVENT_H
743 EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_ADD, 0, 0, (void *)th);
744 num_events++;
745#else
746 epoll_events |= EPOLLOUT;
747#endif
748 }
749 }
750
751 rb_native_mutex_lock(&timer_th.waiting_lock);
752 {
753#if HAVE_SYS_EVENT_H
754 if (num_events > 0) {
755 if (kqueue_already_registered(fd)) {
756 rb_native_mutex_unlock(&timer_th.waiting_lock);
757 return false;
758 }
759
760 if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
761 RUBY_DEBUG_LOG("failed (%d)", errno);
762
763 switch (errno) {
764 case EBADF:
765 // the fd is closed?
766 case EINTR:
767 // signal received? is there a sensible way to handle this?
768 default:
769 perror("kevent");
770 rb_bug("register/kevent failed(fd:%d, errno:%d)", fd, errno);
771 }
772 }
773 RUBY_DEBUG_LOG("kevent(add, fd:%d) success", fd);
774 }
775#else
776 if (epoll_events) {
777 struct epoll_event event = {
778 .events = epoll_events,
779 .data = {
780 .ptr = (void *)th,
781 },
782 };
783 if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
784 RUBY_DEBUG_LOG("failed (%d)", errno);
785
786 switch (errno) {
787 case EBADF:
788 // the fd is closed?
789 case EPERM:
790 // the fd doesn't support epoll
791 case EEXIST:
792 // the fd is already registered by another thread
793 rb_native_mutex_unlock(&timer_th.waiting_lock);
794 return false;
795 default:
796 perror("epoll_ctl");
797 rb_bug("register/epoll_ctl failed(fd:%d, errno:%d)", fd, errno);
798 }
799 }
800 RUBY_DEBUG_LOG("epoll_ctl(add, fd:%d, events:%d) success", fd, epoll_events);
801 }
802#endif
803
804 if (th) {
805 VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none);
806
807 // setup waiting information
808 {
809 th->sched.waiting_reason.flags = flags;
810 th->sched.waiting_reason.data.timeout = abs;
811 th->sched.waiting_reason.data.fd = fd;
812 th->sched.waiting_reason.data.result = 0;
813 th->sched.waiting_reason.data.event_serial = event_serial;
814 }
815
816 if (abs == 0) { // no timeout
817 VM_ASSERT(!(flags & thread_sched_waiting_timeout));
818 ccan_list_add_tail(&timer_th.waiting, &th->sched.waiting_reason.node);
819 }
820 else {
821 RUBY_DEBUG_LOG("abs:%lu", (unsigned long)abs);
822 VM_ASSERT(flags & thread_sched_waiting_timeout);
823
824 // insert th to sorted list (TODO: O(n))
825 struct rb_thread_sched_waiting *w, *prev_w = NULL;
826
827 ccan_list_for_each(&timer_th.waiting, w, node) {
828 if ((w->flags & thread_sched_waiting_timeout) &&
829 w->data.timeout < abs) {
830 prev_w = w;
831 }
832 else {
833 break;
834 }
835 }
836
837 if (prev_w) {
838 ccan_list_add_after(&timer_th.waiting, &prev_w->node, &th->sched.waiting_reason.node);
839 }
840 else {
841 ccan_list_add(&timer_th.waiting, &th->sched.waiting_reason.node);
842 }
843
844 verify_waiting_list();
845
846 // update timeout seconds; force wake so timer thread notices short deadlines
847 timer_thread_wakeup_force();
848 }
849 }
850 else {
851 VM_ASSERT(abs == 0);
852 }
853 }
854 rb_native_mutex_unlock(&timer_th.waiting_lock);
855
856 return true;
857}
858
859static void
860timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags)
861{
862 if (!(th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write))) {
863 return;
864 }
865
866 RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd);
867#if HAVE_SYS_EVENT_H
868 kqueue_unregister_waiting(fd, flags);
869#else
870 // Linux 2.6.9 or later is needed to pass NULL as data.
871 if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_DEL, fd, NULL) == -1) {
872 switch (errno) {
873 case EBADF:
874 // just ignore. maybe fd is closed.
875 break;
876 default:
877 perror("epoll_ctl");
878 rb_bug("unregister/epoll_ctl fails. errno:%d", errno);
879 }
880 }
881#endif
882}
883
884static void
885timer_thread_setup_mn(void)
886{
887#if HAVE_SYS_EVENT_H
888 kqueue_create();
889 RUBY_DEBUG_LOG("kqueue_fd:%d", timer_th.event_fd);
890#else
891 if ((timer_th.event_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug("epoll_create (errno:%d)", errno);
892 RUBY_DEBUG_LOG("epoll_fd:%d", timer_th.event_fd);
893#endif
894 RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]);
895
896 timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL, 0);
897}
898
899static int
900event_wait(rb_vm_t *vm)
901{
902#if HAVE_SYS_EVENT_H
903 int r = kqueue_wait(vm);
904#else
905 int r = epoll_wait(timer_th.event_fd, timer_th.finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm));
906#endif
907 return r;
908}
909
910/*
911 * The purpose of the timer thread:
912 *
913 * (1) Periodic checking
914 * (1-1) Provide time slice for active NTs
915 * (1-2) Check NT shortage
916 * (1-3) Periodic UBF (global)
917 * (1-4) Lazy GRQ deq start
918 * (2) Receive notification
919 * (2-1) async I/O termination
920 * (2-2) timeout
921 * (2-2-1) sleep(n)
922 * (2-2-2) timeout(n), I/O, ...
923 */
924static void
925timer_thread_polling(rb_vm_t *vm)
926{
927 int r = event_wait(vm);
928
929 RUBY_DEBUG_LOG("r:%d errno:%d", r, errno);
930
931 switch (r) {
932 case 0: // timeout
933 RUBY_DEBUG_LOG("timeout%s", "");
934
935 ractor_sched_lock(vm, NULL);
936 {
937 // (1-1) timeslice
938 timer_thread_check_timeslice(vm);
939
940 // (1-4) lazy grq deq
941 if (vm->ractor.sched.grq_cnt > 0) {
942 RUBY_DEBUG_LOG("GRQ cnt: %u", vm->ractor.sched.grq_cnt);
943 rb_native_cond_signal(&vm->ractor.sched.cond);
944 }
945 }
946 ractor_sched_unlock(vm, NULL);
947
948 // (1-2)
949 native_thread_check_and_create_shared(vm);
950
951 break;
952
953 case -1:
954 switch (errno) {
955 case EINTR:
956 // simply retry
957 break;
958 default:
959 perror("event_wait");
960 rb_bug("event_wait errno:%d", errno);
961 }
962 break;
963
964 default:
965 RUBY_DEBUG_LOG("%d event(s)", r);
966
967#if HAVE_SYS_EVENT_H
968 for (int i=0; i<r; i++) {
969 rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].udata;
970 int fd = (int)timer_th.finished_events[i].ident;
971 int16_t filter = timer_th.finished_events[i].filter;
972
973 if (th == NULL) {
974 // wakeup timerthread
975 RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]);
976 consume_communication_pipe(timer_th.comm_fds[0]);
977 }
978 else {
979 // wakeup specific thread by IO
980 RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s",
981 rb_th_serial(th),
982 (filter == EVFILT_READ) ? "read/" : "",
983 (filter == EVFILT_WRITE) ? "write/" : "");
984
985 struct rb_thread_sched *sched = TH_SCHED(th);
986 thread_sched_lock(sched, th);
987 rb_native_mutex_lock(&timer_th.waiting_lock);
988 {
989 if (th->sched.waiting_reason.flags) {
990 // delete from chain
991 ccan_list_del_init(&th->sched.waiting_reason.node);
992 timer_thread_unregister_waiting(th, fd, kqueue_translate_filter_to_flags(filter));
993
994 th->sched.waiting_reason.flags = thread_sched_waiting_none;
995 th->sched.waiting_reason.data.fd = -1;
996 th->sched.waiting_reason.data.result = filter;
997 uint32_t event_serial = th->sched.waiting_reason.data.event_serial;
998
999 timer_thread_wakeup_thread_locked(sched, th, event_serial);
1000 }
1001 else {
1002 // already released
1003 }
1004 }
1005 rb_native_mutex_unlock(&timer_th.waiting_lock);
1006 thread_sched_unlock(sched, th);
1007 }
1008 }
1009#else
1010 for (int i=0; i<r; i++) {
1011 rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].data.ptr;
1012
1013 if (th == NULL) {
1014 // wakeup timerthread
1015 RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]);
1016 consume_communication_pipe(timer_th.comm_fds[0]);
1017 }
1018 else {
1019 // wakeup specific thread by IO
1020 uint32_t events = timer_th.finished_events[i].events;
1021
1022 RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s%s%s%s%s",
1023 rb_th_serial(th),
1024 (events & EPOLLIN) ? "in/" : "",
1025 (events & EPOLLOUT) ? "out/" : "",
1026 (events & EPOLLRDHUP) ? "RDHUP/" : "",
1027 (events & EPOLLPRI) ? "pri/" : "",
1028 (events & EPOLLERR) ? "err/" : "",
1029 (events & EPOLLHUP) ? "hup/" : "");
1030
1031 struct rb_thread_sched *sched = TH_SCHED(th);
1032 thread_sched_lock(sched, th);
1033 rb_native_mutex_lock(&timer_th.waiting_lock);
1034 {
1035 if (th->sched.waiting_reason.flags) {
1036 // delete from chain
1037 ccan_list_del_init(&th->sched.waiting_reason.node);
1038 timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
1039
1040 th->sched.waiting_reason.flags = thread_sched_waiting_none;
1041 th->sched.waiting_reason.data.fd = -1;
1042 th->sched.waiting_reason.data.result = (int)events;
1043 uint32_t event_serial = th->sched.waiting_reason.data.event_serial;
1044
1045 timer_thread_wakeup_thread_locked(sched, th, event_serial);
1046 }
1047 else {
1048 // already released
1049 }
1050 }
1051 rb_native_mutex_unlock(&timer_th.waiting_lock);
1052 thread_sched_unlock(sched, th);
1053 }
1054 }
1055#endif
1056 }
1057}
1058
1059#else // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H
1060
1061static void
1062timer_thread_setup_mn(void)
1063{
1064 // do nothing
1065}
1066
1067static void
1068timer_thread_polling(rb_vm_t *vm)
1069{
1070 int timeout = timer_thread_set_timeout(vm);
1071
1072 struct pollfd pfd = {
1073 .fd = timer_th.comm_fds[0],
1074 .events = POLLIN,
1075 };
1076
1077 int r = poll(&pfd, 1, timeout);
1078
1079 switch (r) {
1080 case 0: // timeout
1081 rb_native_mutex_lock(&vm->ractor.sched.lock);
1082 {
1083 // (1-1) timeslice
1084 timer_thread_check_timeslice(vm);
1085 }
1086 rb_native_mutex_unlock(&vm->ractor.sched.lock);
1087 break;
1088
1089 case -1: // error
1090 switch (errno) {
1091 case EINTR:
1092 // simply retry
1093 break;
1094 default:
1095 perror("poll");
1096 rb_bug("poll errno:%d", errno);
1097 break;
1098 }
1099
1100 case 1:
1101 consume_communication_pipe(timer_th.comm_fds[0]);
1102 break;
1103
1104 default:
1105 rb_bug("unreachbale");
1106 }
1107}
1108
1109#endif // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
int len
Length of the buffer.
Definition io.h:8
#define RUBY_INTERNAL_THREAD_EVENT_RESUMED
Triggered when a thread successfully acquired the GVL.
Definition thread.h:238
#define RUBY_INTERNAL_THREAD_EVENT_SUSPENDED
Triggered when a thread released the GVL.
Definition thread.h:245
#define RBIMPL_ATTR_MAYBE_UNUSED()
Wraps (or simulates) [[maybe_unused]]
#define errno
Ractor-aware version of errno.
Definition ruby.h:388
void rb_native_mutex_lock(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_lock.
void rb_native_mutex_unlock(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_unlock.
void rb_native_cond_signal(rb_nativethread_cond_t *cond)
Signals a condition variable.
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40