Ruby 3.5.0dev (2025-09-17 revision e758198846b7811f20e1c21aa971124fbb2fe103)
thread_pthread_mn.c (e758198846b7811f20e1c21aa971124fbb2fe103)
1// included by "thread_pthread.c"
2
3#if USE_MN_THREADS
4
5static void timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags);
6
7static bool
8timer_thread_cancel_waiting(rb_thread_t *th)
9{
10 bool canceled = false;
11
12 if (th->sched.waiting_reason.flags) {
13 rb_native_mutex_lock(&timer_th.waiting_lock);
14 {
15 if (th->sched.waiting_reason.flags) {
16 canceled = true;
17 ccan_list_del_init(&th->sched.waiting_reason.node);
18 if (th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write)) {
19 timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
20 }
21 th->sched.waiting_reason.flags = thread_sched_waiting_none;
22 }
23 }
24 rb_native_mutex_unlock(&timer_th.waiting_lock);
25 }
26
27 return canceled;
28}
29
30static void
31ubf_event_waiting(void *ptr)
32{
33 rb_thread_t *th = (rb_thread_t *)ptr;
34 struct rb_thread_sched *sched = TH_SCHED(th);
35
36 RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
37
38 VM_ASSERT(th->nt == NULL || !th_has_dedicated_nt(th));
39
40 // only once. it is safe because th->interrupt_lock is already acquired.
41 th->unblock.func = NULL;
42 th->unblock.arg = NULL;
43
44 bool canceled = timer_thread_cancel_waiting(th);
45
46 thread_sched_lock(sched, th);
47 {
48 if (sched->running == th) {
49 RUBY_DEBUG_LOG("not waiting yet");
50 }
51 else if (canceled) {
52 thread_sched_to_ready_common(sched, th, true, false);
53 }
54 else {
55 RUBY_DEBUG_LOG("already not waiting");
56 }
57 }
58 thread_sched_unlock(sched, th);
59}
60
61static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel);
62
63// return true if timed out
64static bool
65thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
66{
67 VM_ASSERT(!th_has_dedicated_nt(th)); // on SNT
68
69 volatile bool timedout = false, need_cancel = false;
70
71 if (timer_thread_register_waiting(th, fd, events, rel)) {
72 RUBY_DEBUG_LOG("wait fd:%d", fd);
73
74 RB_VM_SAVE_MACHINE_CONTEXT(th);
75 ubf_set(th, ubf_event_waiting, (void *)th);
76
77 RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
78
79 thread_sched_lock(sched, th);
80 {
81 if (th->sched.waiting_reason.flags == thread_sched_waiting_none) {
82 // already awaken
83 }
84 else if (RUBY_VM_INTERRUPTED(th->ec)) {
85 need_cancel = true;
86 }
87 else {
88 RUBY_DEBUG_LOG("sleep");
89
90 th->status = THREAD_STOPPED_FOREVER;
91 thread_sched_wakeup_next_thread(sched, th, true);
92 thread_sched_wait_running_turn(sched, th, true);
93
94 RUBY_DEBUG_LOG("wakeup");
95 }
96
97 timedout = th->sched.waiting_reason.data.result == 0;
98 }
99 thread_sched_unlock(sched, th);
100
101 if (need_cancel) {
102 timer_thread_cancel_waiting(th);
103 }
104
105 ubf_clear(th); // TODO: maybe it is already NULL?
106
107 th->status = THREAD_RUNNABLE;
108 }
109 else {
110 RUBY_DEBUG_LOG("can not wait fd:%d", fd);
111 return false;
112 }
113
114 VM_ASSERT(sched->running == th);
115
116 return timedout;
117}
118
120
121static int
122get_sysconf_page_size(void)
123{
124 static long page_size = 0;
125
126 if (UNLIKELY(page_size == 0)) {
127 page_size = sysconf(_SC_PAGESIZE);
128 VM_ASSERT(page_size < INT_MAX);
129 }
130 return (int)page_size;
131}
132
133#define MSTACK_CHUNK_SIZE (512 * 1024 * 1024) // 512MB
134#define MSTACK_PAGE_SIZE get_sysconf_page_size()
135#define MSTACK_CHUNK_PAGE_NUM (MSTACK_CHUNK_SIZE / MSTACK_PAGE_SIZE - 1) // 1 is start redzone
136
137// 512MB chunk
138// 131,072 pages (> 65,536)
139// 0th page is Redzone. Start from 1st page.
140
141/*
142 * <--> machine stack + vm stack
143 * ----------------------------------
144 * |HD...|RZ| ... |RZ| ... ... |RZ|
145 * <------------- 512MB ------------->
146 */
147
148static struct nt_stack_chunk_header {
149 struct nt_stack_chunk_header *prev_chunk;
150 struct nt_stack_chunk_header *prev_free_chunk;
151
152 uint16_t start_page;
153 uint16_t stack_count;
154 uint16_t uninitialized_stack_count;
155
156 uint16_t free_stack_pos;
157 uint16_t free_stack[];
158} *nt_stack_chunks = NULL,
159 *nt_free_stack_chunks = NULL;
160
161struct nt_machine_stack_footer {
162 struct nt_stack_chunk_header *ch;
163 size_t index;
164};
165
166static rb_nativethread_lock_t nt_machine_stack_lock = RB_NATIVETHREAD_LOCK_INIT;
167
168#include <sys/mman.h>
169
170// vm_stack_size + machine_stack_size + 1 * (guard page size)
171static inline size_t
172nt_thread_stack_size(void)
173{
174 static size_t msz;
175 if (LIKELY(msz > 0)) return msz;
176
177 rb_vm_t *vm = GET_VM();
178 int sz = (int)(vm->default_params.thread_vm_stack_size + vm->default_params.thread_machine_stack_size + MSTACK_PAGE_SIZE);
179 int page_num = roomof(sz, MSTACK_PAGE_SIZE);
180 msz = (size_t)page_num * MSTACK_PAGE_SIZE;
181 return msz;
182}
183
184static struct nt_stack_chunk_header *
185nt_alloc_thread_stack_chunk(void)
186{
187 int mmap_flags = MAP_ANONYMOUS | MAP_PRIVATE;
188#if defined(MAP_STACK) && !defined(__FreeBSD__) && !defined(__FreeBSD_kernel__)
189 mmap_flags |= MAP_STACK;
190#endif
191
192 const char *m = (void *)mmap(NULL, MSTACK_CHUNK_SIZE, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
193 if (m == MAP_FAILED) {
194 return NULL;
195 }
196
197 ruby_annotate_mmap(m, MSTACK_CHUNK_SIZE, "Ruby:nt_alloc_thread_stack_chunk");
198
199 size_t msz = nt_thread_stack_size();
200 int header_page_cnt = 1;
201 int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
202 int ch_size = sizeof(struct nt_stack_chunk_header) + sizeof(uint16_t) * stack_count;
203
204 if (ch_size > MSTACK_PAGE_SIZE * header_page_cnt) {
205 header_page_cnt = (ch_size + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE;
206 stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
207 }
208
209 VM_ASSERT(stack_count <= UINT16_MAX);
210
211 struct nt_stack_chunk_header *ch = (struct nt_stack_chunk_header *)m;
212
213 ch->start_page = header_page_cnt;
214 ch->prev_chunk = nt_stack_chunks;
215 ch->prev_free_chunk = nt_free_stack_chunks;
216 ch->uninitialized_stack_count = ch->stack_count = (uint16_t)stack_count;
217 ch->free_stack_pos = 0;
218
219 RUBY_DEBUG_LOG("ch:%p start_page:%d stack_cnt:%d stack_size:%d", ch, (int)ch->start_page, (int)ch->stack_count, (int)msz);
220
221 return ch;
222}
223
224static void *
225nt_stack_chunk_get_stack_start(struct nt_stack_chunk_header *ch, size_t idx)
226{
227 const char *m = (char *)ch;
228 return (void *)(m + ch->start_page * MSTACK_PAGE_SIZE + idx * nt_thread_stack_size());
229}
230
231static struct nt_machine_stack_footer *
232nt_stack_chunk_get_msf(const rb_vm_t *vm, const char *mstack)
233{
234 // TODO: stack direction
235 const size_t msz = vm->default_params.thread_machine_stack_size;
236 return (struct nt_machine_stack_footer *)&mstack[msz - sizeof(struct nt_machine_stack_footer)];
237}
238
239static void *
240nt_stack_chunk_get_stack(const rb_vm_t *vm, struct nt_stack_chunk_header *ch, size_t idx, void **vm_stack, void **machine_stack)
241{
242 // TODO: only support stack going down
243 // [VM ... <GUARD> machine stack ...]
244
245 const char *vstack, *mstack;
246 const char *guard_page;
247 vstack = nt_stack_chunk_get_stack_start(ch, idx);
248 guard_page = vstack + vm->default_params.thread_vm_stack_size;
249 mstack = guard_page + MSTACK_PAGE_SIZE;
250
251 struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(vm, mstack);
252 msf->ch = ch;
253 msf->index = idx;
254
255#if 0
256 RUBY_DEBUG_LOG("msf:%p vstack:%p-%p guard_page:%p-%p mstack:%p-%p", msf,
257 vstack, (void *)(guard_page-1),
258 guard_page, (void *)(mstack-1),
259 mstack, (void *)(msf));
260#endif
261
262 *vm_stack = (void *)vstack;
263 *machine_stack = (void *)mstack;
264
265 return (void *)guard_page;
266}
267
269static void
270nt_stack_chunk_dump(void)
271{
272 struct nt_stack_chunk_header *ch;
273 int i;
274
275 fprintf(stderr, "** nt_stack_chunks\n");
276 ch = nt_stack_chunks;
277 for (i=0; ch; i++, ch = ch->prev_chunk) {
278 fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos);
279 }
280
281 fprintf(stderr, "** nt_free_stack_chunks\n");
282 ch = nt_free_stack_chunks;
283 for (i=0; ch; i++, ch = ch->prev_free_chunk) {
284 fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos);
285 }
286}
287
288static int
289nt_guard_page(const char *p, size_t len)
290{
291 if (mprotect((void *)p, len, PROT_NONE) != -1) {
292 return 0;
293 }
294 else {
295 return errno;
296 }
297}
298
299static int
300nt_alloc_stack(rb_vm_t *vm, void **vm_stack, void **machine_stack)
301{
302 int err = 0;
303
304 rb_native_mutex_lock(&nt_machine_stack_lock);
305 {
306 retry:
307 if (nt_free_stack_chunks) {
308 struct nt_stack_chunk_header *ch = nt_free_stack_chunks;
309 if (ch->free_stack_pos > 0) {
310 RUBY_DEBUG_LOG("free_stack_pos:%d", ch->free_stack_pos);
311 nt_stack_chunk_get_stack(vm, ch, ch->free_stack[--ch->free_stack_pos], vm_stack, machine_stack);
312 }
313 else if (ch->uninitialized_stack_count > 0) {
314 RUBY_DEBUG_LOG("uninitialized_stack_count:%d", ch->uninitialized_stack_count);
315
316 size_t idx = ch->stack_count - ch->uninitialized_stack_count--;
317 void *guard_page = nt_stack_chunk_get_stack(vm, ch, idx, vm_stack, machine_stack);
318 err = nt_guard_page(guard_page, MSTACK_PAGE_SIZE);
319 }
320 else {
321 nt_free_stack_chunks = ch->prev_free_chunk;
322 ch->prev_free_chunk = NULL;
323 goto retry;
324 }
325 }
326 else {
327 struct nt_stack_chunk_header *p = nt_alloc_thread_stack_chunk();
328 if (p == NULL) {
329 err = errno;
330 }
331 else {
332 nt_free_stack_chunks = nt_stack_chunks = p;
333 goto retry;
334 }
335 }
336 }
337 rb_native_mutex_unlock(&nt_machine_stack_lock);
338
339 return err;
340}
341
342static void
343nt_madvise_free_or_dontneed(void *addr, size_t len)
344{
345 /* There is no real way to perform error handling here. Both MADV_FREE
346 * and MADV_DONTNEED are both documented to pretty much only return EINVAL
347 * for a huge variety of errors. It's indistinguishable if madvise fails
348 * because the parameters were bad, or because the kernel we're running on
349 * does not support the given advice. This kind of free-but-don't-unmap
350 * is best-effort anyway, so don't sweat it.
351 *
352 * n.b. A very common case of "the kernel doesn't support MADV_FREE and
353 * returns EINVAL" is running under the `rr` debugger; it makes all
354 * MADV_FREE calls return EINVAL. */
355
356#if defined(MADV_FREE)
357 int r = madvise(addr, len, MADV_FREE);
358 // Return on success, or else try MADV_DONTNEED
359 if (r == 0) return;
360#endif
361#if defined(MADV_DONTNEED)
362 madvise(addr, len, MADV_DONTNEED);
363#endif
364}
365
366static void
367nt_free_stack(void *mstack)
368{
369 if (!mstack) return;
370
371 rb_native_mutex_lock(&nt_machine_stack_lock);
372 {
373 struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(GET_VM(), mstack);
374 struct nt_stack_chunk_header *ch = msf->ch;
375 int idx = (int)msf->index;
376 void *stack = nt_stack_chunk_get_stack_start(ch, idx);
377
378 RUBY_DEBUG_LOG("stack:%p mstack:%p ch:%p index:%d", stack, mstack, ch, idx);
379
380 if (ch->prev_free_chunk == NULL) {
381 ch->prev_free_chunk = nt_free_stack_chunks;
382 nt_free_stack_chunks = ch;
383 }
384 ch->free_stack[ch->free_stack_pos++] = idx;
385
386 // clear the stack pages
387 nt_madvise_free_or_dontneed(stack, nt_thread_stack_size());
388 }
389 rb_native_mutex_unlock(&nt_machine_stack_lock);
390}
391
392
393static int
394native_thread_check_and_create_shared(rb_vm_t *vm)
395{
396 bool need_to_make = false;
397
398 rb_native_mutex_lock(&vm->ractor.sched.lock);
399 {
400 unsigned int schedulable_ractor_cnt = vm->ractor.cnt;
401 RUBY_ASSERT(schedulable_ractor_cnt >= 1);
402
403 if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads)
404 schedulable_ractor_cnt--; // do not need snt for main ractor
405
406 unsigned int snt_cnt = vm->ractor.sched.snt_cnt;
407 if (((int)snt_cnt < MINIMUM_SNT) ||
408 (snt_cnt < schedulable_ractor_cnt &&
409 snt_cnt < vm->ractor.sched.max_cpu)) {
410
411 RUBY_DEBUG_LOG("added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u",
412 vm->ractor.sched.snt_cnt,
413 vm->ractor.sched.dnt_cnt,
414 vm->ractor.cnt,
415 vm->ractor.sched.grq_cnt);
416
417 vm->ractor.sched.snt_cnt++;
418 need_to_make = true;
419 }
420 else {
421 RUBY_DEBUG_LOG("snt:%d ractor_cnt:%d", (int)vm->ractor.sched.snt_cnt, (int)vm->ractor.cnt);
422 }
423 }
424 rb_native_mutex_unlock(&vm->ractor.sched.lock);
425
426 if (need_to_make) {
427 struct rb_native_thread *nt = native_thread_alloc();
428 nt->vm = vm;
429 return native_thread_create0(nt);
430 }
431 else {
432 return 0;
433 }
434}
435
436#ifdef __APPLE__
437# define co_start ruby_coroutine_start
438#else
439static
440#endif
441COROUTINE
442co_start(struct coroutine_context *from, struct coroutine_context *self)
443{
444#ifdef RUBY_ASAN_ENABLED
445 __sanitizer_finish_switch_fiber(self->fake_stack,
446 (const void**)&from->stack_base, &from->stack_size);
447#endif
448
449 rb_thread_t *th = (rb_thread_t *)self->argument;
450 struct rb_thread_sched *sched = TH_SCHED(th);
451 VM_ASSERT(th->nt != NULL);
452 VM_ASSERT(th == sched->running);
453 VM_ASSERT(sched->lock_owner == NULL);
454
455 // RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
456
457 thread_sched_set_locked(sched, th);
458 thread_sched_add_running_thread(TH_SCHED(th), th);
459 thread_sched_unlock(sched, th);
460 {
461 RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED, th);
462 call_thread_start_func_2(th);
463 }
464 thread_sched_lock(sched, NULL);
465
466 RUBY_DEBUG_LOG("terminated th:%d", (int)th->serial);
467
468 // Thread is terminated
469
470 struct rb_native_thread *nt = th->nt;
471 bool is_dnt = th_has_dedicated_nt(th);
472 native_thread_assign(NULL, th);
473 rb_ractor_set_current_ec(th->ractor, NULL);
474
475 if (is_dnt) {
476 // SNT became DNT while running. Just return to the nt_context
477
478 th->sched.finished = true;
479 coroutine_transfer0(self, nt->nt_context, true);
480 }
481 else {
482 rb_thread_t *next_th = sched->running;
483
484 if (next_th && !next_th->nt) {
485 // switch to the next thread
486 thread_sched_set_unlocked(sched, NULL);
487 th->sched.finished = true;
488 thread_sched_switch0(th->sched.context, next_th, nt, true);
489 }
490 else {
491 // switch to the next Ractor
492 th->sched.finished = true;
493 coroutine_transfer0(self, nt->nt_context, true);
494 }
495 }
496
497 rb_bug("unreachable");
498}
499
500static int
501native_thread_create_shared(rb_thread_t *th)
502{
503 // setup coroutine
504 rb_vm_t *vm = th->vm;
505 void *vm_stack = NULL, *machine_stack = NULL;
506 int err = nt_alloc_stack(vm, &vm_stack, &machine_stack);
507 if (err) return err;
508
509 VM_ASSERT(vm_stack < machine_stack);
510
511 // setup vm stack
512 size_t vm_stack_words = th->vm->default_params.thread_vm_stack_size/sizeof(VALUE);
513 rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_words);
514
515 // setup machine stack
516 size_t machine_stack_size = vm->default_params.thread_machine_stack_size - sizeof(struct nt_machine_stack_footer);
517 th->ec->machine.stack_start = (void *)((uintptr_t)machine_stack + machine_stack_size);
518 th->ec->machine.stack_maxsize = machine_stack_size; // TODO
519 th->sched.context_stack = machine_stack;
520
521 th->sched.context = ruby_xmalloc(sizeof(struct coroutine_context));
522 coroutine_initialize(th->sched.context, co_start, machine_stack, machine_stack_size);
523 th->sched.context->argument = th;
524
525 RUBY_DEBUG_LOG("th:%u vm_stack:%p machine_stack:%p", rb_th_serial(th), vm_stack, machine_stack);
526 thread_sched_to_ready(TH_SCHED(th), th);
527
528 // setup nt
529 return native_thread_check_and_create_shared(th->vm);
530}
531
532#else // USE_MN_THREADS
533
534static int
535native_thread_create_shared(rb_thread_t *th)
536{
537 rb_bug("unreachable");
538}
539
540static bool
541thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
542{
543 rb_bug("unreachable");
544}
545
546#endif // USE_MN_THREADS
547
549#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS
550
551static bool
552fd_readable_nonblock(int fd)
553{
554 struct pollfd pfd = {
555 .fd = fd,
556 .events = POLLIN,
557 };
558 return poll(&pfd, 1, 0) != 0;
559}
560
561static bool
562fd_writable_nonblock(int fd)
563{
564 struct pollfd pfd = {
565 .fd = fd,
566 .events = POLLOUT,
567 };
568 return poll(&pfd, 1, 0) != 0;
569}
570
571static void
572verify_waiting_list(void)
573{
574#if VM_CHECK_MODE > 0
575 struct rb_thread_sched_waiting *w, *prev_w = NULL;
576
577 // waiting list's timeout order should be [1, 2, 3, ..., 0, 0, 0]
578
579 ccan_list_for_each(&timer_th.waiting, w, node) {
580 // fprintf(stderr, "verify_waiting_list th:%u abs:%lu\n", rb_th_serial(wth), (unsigned long)wth->sched.waiting_reason.data.timeout);
581 if (prev_w) {
582 rb_hrtime_t timeout = w->data.timeout;
583 rb_hrtime_t prev_timeout = w->data.timeout;
584 VM_ASSERT(timeout == 0 || prev_timeout <= timeout);
585 }
586 prev_w = w;
587 }
588#endif
589}
590
591#if HAVE_SYS_EVENT_H // kqueue helpers
592
593static enum thread_sched_waiting_flag
594kqueue_translate_filter_to_flags(int16_t filter)
595{
596 switch (filter) {
597 case EVFILT_READ:
598 return thread_sched_waiting_io_read;
599 case EVFILT_WRITE:
600 return thread_sched_waiting_io_write;
601 case EVFILT_TIMER:
602 return thread_sched_waiting_timeout;
603 default:
604 rb_bug("kevent filter:%d not supported", filter);
605 }
606}
607
608static int
609kqueue_wait(rb_vm_t *vm)
610{
611 struct timespec calculated_timeout;
612 struct timespec *timeout = NULL;
613 int timeout_ms = timer_thread_set_timeout(vm);
614
615 if (timeout_ms >= 0) {
616 calculated_timeout.tv_sec = timeout_ms / 1000;
617 calculated_timeout.tv_nsec = (timeout_ms % 1000) * 1000000;
618 timeout = &calculated_timeout;
619 }
620
621 return kevent(timer_th.event_fd, NULL, 0, timer_th.finished_events, KQUEUE_EVENTS_MAX, timeout);
622}
623
624static void
625kqueue_create(void)
626{
627 if ((timer_th.event_fd = kqueue()) == -1) rb_bug("kqueue creation failed (errno:%d)", errno);
628 int flags = fcntl(timer_th.event_fd, F_GETFD);
629 if (flags == -1) {
630 rb_bug("kqueue GETFD failed (errno:%d)", errno);
631 }
632
633 flags |= FD_CLOEXEC;
634 if (fcntl(timer_th.event_fd, F_SETFD, flags) == -1) {
635 rb_bug("kqueue SETFD failed (errno:%d)", errno);
636 }
637}
638
639static void
640kqueue_unregister_waiting(int fd, enum thread_sched_waiting_flag flags)
641{
642 if (flags) {
643 struct kevent ke[2];
644 int num_events = 0;
645
646 if (flags & thread_sched_waiting_io_read) {
647 EV_SET(&ke[num_events], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
648 num_events++;
649 }
650 if (flags & thread_sched_waiting_io_write) {
651 EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
652 num_events++;
653 }
654 if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
655 perror("kevent");
656 rb_bug("unregister/kevent fails. errno:%d", errno);
657 }
658 }
659}
660
661static bool
662kqueue_already_registered(int fd)
663{
664 struct rb_thread_sched_waiting *w, *found_w = NULL;
665
666 ccan_list_for_each(&timer_th.waiting, w, node) {
667 // Similar to EEXIST in epoll_ctl, but more strict because it checks fd rather than flags
668 // for simplicity
669 if (w->flags && w->data.fd == fd) {
670 found_w = w;
671 break;
672 }
673 }
674 return found_w != NULL;
675}
676
677#endif // HAVE_SYS_EVENT_H
678
679// return false if the fd is not waitable or not need to wait.
680static bool
681timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel)
682{
683 RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (unsigned long)*rel : 0);
684
685 VM_ASSERT(th == NULL || TH_SCHED(th)->running == th);
686 VM_ASSERT(flags != 0);
687
688 rb_hrtime_t abs = 0; // 0 means no timeout
689
690 if (rel) {
691 if (*rel > 0) {
692 flags |= thread_sched_waiting_timeout;
693 }
694 else {
695 return false;
696 }
697 }
698
699 if (rel && *rel > 0) {
700 flags |= thread_sched_waiting_timeout;
701 }
702
703#if HAVE_SYS_EVENT_H
704 struct kevent ke[2];
705 int num_events = 0;
706#else
707 uint32_t epoll_events = 0;
708#endif
709 if (flags & thread_sched_waiting_timeout) {
710 VM_ASSERT(rel != NULL);
711 abs = rb_hrtime_add(rb_hrtime_now(), *rel);
712 }
713
714 if (flags & thread_sched_waiting_io_read) {
715 if (!(flags & thread_sched_waiting_io_force) && fd_readable_nonblock(fd)) {
716 RUBY_DEBUG_LOG("fd_readable_nonblock");
717 return false;
718 }
719 else {
720 VM_ASSERT(fd >= 0);
721#if HAVE_SYS_EVENT_H
722 EV_SET(&ke[num_events], fd, EVFILT_READ, EV_ADD, 0, 0, (void *)th);
723 num_events++;
724#else
725 epoll_events |= EPOLLIN;
726#endif
727 }
728 }
729
730 if (flags & thread_sched_waiting_io_write) {
731 if (!(flags & thread_sched_waiting_io_force) && fd_writable_nonblock(fd)) {
732 RUBY_DEBUG_LOG("fd_writable_nonblock");
733 return false;
734 }
735 else {
736 VM_ASSERT(fd >= 0);
737#if HAVE_SYS_EVENT_H
738 EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_ADD, 0, 0, (void *)th);
739 num_events++;
740#else
741 epoll_events |= EPOLLOUT;
742#endif
743 }
744 }
745
746 rb_native_mutex_lock(&timer_th.waiting_lock);
747 {
748#if HAVE_SYS_EVENT_H
749 if (num_events > 0) {
750 if (kqueue_already_registered(fd)) {
751 rb_native_mutex_unlock(&timer_th.waiting_lock);
752 return false;
753 }
754
755 if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
756 RUBY_DEBUG_LOG("failed (%d)", errno);
757
758 switch (errno) {
759 case EBADF:
760 // the fd is closed?
761 case EINTR:
762 // signal received? is there a sensible way to handle this?
763 default:
764 perror("kevent");
765 rb_bug("register/kevent failed(fd:%d, errno:%d)", fd, errno);
766 }
767 }
768 RUBY_DEBUG_LOG("kevent(add, fd:%d) success", fd);
769 }
770#else
771 if (epoll_events) {
772 struct epoll_event event = {
773 .events = epoll_events,
774 .data = {
775 .ptr = (void *)th,
776 },
777 };
778 if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
779 RUBY_DEBUG_LOG("failed (%d)", errno);
780
781 switch (errno) {
782 case EBADF:
783 // the fd is closed?
784 case EPERM:
785 // the fd doesn't support epoll
786 case EEXIST:
787 // the fd is already registered by another thread
788 rb_native_mutex_unlock(&timer_th.waiting_lock);
789 return false;
790 default:
791 perror("epoll_ctl");
792 rb_bug("register/epoll_ctl failed(fd:%d, errno:%d)", fd, errno);
793 }
794 }
795 RUBY_DEBUG_LOG("epoll_ctl(add, fd:%d, events:%d) success", fd, epoll_events);
796 }
797#endif
798
799 if (th) {
800 VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none);
801
802 // setup waiting information
803 {
804 th->sched.waiting_reason.flags = flags;
805 th->sched.waiting_reason.data.timeout = abs;
806 th->sched.waiting_reason.data.fd = fd;
807 th->sched.waiting_reason.data.result = 0;
808 }
809
810 if (abs == 0) { // no timeout
811 VM_ASSERT(!(flags & thread_sched_waiting_timeout));
812 ccan_list_add_tail(&timer_th.waiting, &th->sched.waiting_reason.node);
813 }
814 else {
815 RUBY_DEBUG_LOG("abs:%lu", (unsigned long)abs);
816 VM_ASSERT(flags & thread_sched_waiting_timeout);
817
818 // insert th to sorted list (TODO: O(n))
819 struct rb_thread_sched_waiting *w, *prev_w = NULL;
820
821 ccan_list_for_each(&timer_th.waiting, w, node) {
822 if ((w->flags & thread_sched_waiting_timeout) &&
823 w->data.timeout < abs) {
824 prev_w = w;
825 }
826 else {
827 break;
828 }
829 }
830
831 if (prev_w) {
832 ccan_list_add_after(&timer_th.waiting, &prev_w->node, &th->sched.waiting_reason.node);
833 }
834 else {
835 ccan_list_add(&timer_th.waiting, &th->sched.waiting_reason.node);
836 }
837
838 verify_waiting_list();
839
840 // update timeout seconds
841 timer_thread_wakeup();
842 }
843 }
844 else {
845 VM_ASSERT(abs == 0);
846 }
847 }
848 rb_native_mutex_unlock(&timer_th.waiting_lock);
849
850 return true;
851}
852
853static void
854timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags)
855{
856 RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd);
857#if HAVE_SYS_EVENT_H
858 kqueue_unregister_waiting(fd, flags);
859#else
860 // Linux 2.6.9 or later is needed to pass NULL as data.
861 if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_DEL, fd, NULL) == -1) {
862 switch (errno) {
863 case EBADF:
864 // just ignore. maybe fd is closed.
865 break;
866 default:
867 perror("epoll_ctl");
868 rb_bug("unregister/epoll_ctl fails. errno:%d", errno);
869 }
870 }
871#endif
872}
873
874static void
875timer_thread_setup_mn(void)
876{
877#if HAVE_SYS_EVENT_H
878 kqueue_create();
879 RUBY_DEBUG_LOG("kqueue_fd:%d", timer_th.event_fd);
880#else
881 if ((timer_th.event_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug("epoll_create (errno:%d)", errno);
882 RUBY_DEBUG_LOG("epoll_fd:%d", timer_th.event_fd);
883#endif
884 RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]);
885
886 timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL);
887}
888
889static int
890event_wait(rb_vm_t *vm)
891{
892#if HAVE_SYS_EVENT_H
893 int r = kqueue_wait(vm);
894#else
895 int r = epoll_wait(timer_th.event_fd, timer_th.finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm));
896#endif
897 return r;
898}
899
900/*
901 * The purpose of the timer thread:
902 *
903 * (1) Periodic checking
904 * (1-1) Provide time slice for active NTs
905 * (1-2) Check NT shortage
906 * (1-3) Periodic UBF (global)
907 * (1-4) Lazy GRQ deq start
908 * (2) Receive notification
909 * (2-1) async I/O termination
910 * (2-2) timeout
911 * (2-2-1) sleep(n)
912 * (2-2-2) timeout(n), I/O, ...
913 */
914static void
915timer_thread_polling(rb_vm_t *vm)
916{
917 int r = event_wait(vm);
918
919 RUBY_DEBUG_LOG("r:%d errno:%d", r, errno);
920
921 switch (r) {
922 case 0: // timeout
923 RUBY_DEBUG_LOG("timeout%s", "");
924
925 ractor_sched_lock(vm, NULL);
926 {
927 // (1-1) timeslice
928 timer_thread_check_timeslice(vm);
929
930 // (1-4) lazy grq deq
931 if (vm->ractor.sched.grq_cnt > 0) {
932 RUBY_DEBUG_LOG("GRQ cnt: %u", vm->ractor.sched.grq_cnt);
933 rb_native_cond_signal(&vm->ractor.sched.cond);
934 }
935 }
936 ractor_sched_unlock(vm, NULL);
937
938 // (1-2)
939 native_thread_check_and_create_shared(vm);
940
941 break;
942
943 case -1:
944 switch (errno) {
945 case EINTR:
946 // simply retry
947 break;
948 default:
949 perror("event_wait");
950 rb_bug("event_wait errno:%d", errno);
951 }
952 break;
953
954 default:
955 RUBY_DEBUG_LOG("%d event(s)", r);
956
957#if HAVE_SYS_EVENT_H
958 for (int i=0; i<r; i++) {
959 rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].udata;
960 int fd = (int)timer_th.finished_events[i].ident;
961 int16_t filter = timer_th.finished_events[i].filter;
962
963 if (th == NULL) {
964 // wakeup timerthread
965 RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]);
966 consume_communication_pipe(timer_th.comm_fds[0]);
967 }
968 else {
969 // wakeup specific thread by IO
970 RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s",
971 rb_th_serial(th),
972 (filter == EVFILT_READ) ? "read/" : "",
973 (filter == EVFILT_WRITE) ? "write/" : "");
974
975 rb_native_mutex_lock(&timer_th.waiting_lock);
976 {
977 if (th->sched.waiting_reason.flags) {
978 // delete from chain
979 ccan_list_del_init(&th->sched.waiting_reason.node);
980 timer_thread_unregister_waiting(th, fd, kqueue_translate_filter_to_flags(filter));
981
982 th->sched.waiting_reason.flags = thread_sched_waiting_none;
983 th->sched.waiting_reason.data.fd = -1;
984 th->sched.waiting_reason.data.result = filter;
985
986 timer_thread_wakeup_thread(th);
987 }
988 else {
989 // already released
990 }
991 }
992 rb_native_mutex_unlock(&timer_th.waiting_lock);
993 }
994 }
995#else
996 for (int i=0; i<r; i++) {
997 rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].data.ptr;
998
999 if (th == NULL) {
1000 // wakeup timerthread
1001 RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]);
1002 consume_communication_pipe(timer_th.comm_fds[0]);
1003 }
1004 else {
1005 // wakeup specific thread by IO
1006 uint32_t events = timer_th.finished_events[i].events;
1007
1008 RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s%s%s%s%s",
1009 rb_th_serial(th),
1010 (events & EPOLLIN) ? "in/" : "",
1011 (events & EPOLLOUT) ? "out/" : "",
1012 (events & EPOLLRDHUP) ? "RDHUP/" : "",
1013 (events & EPOLLPRI) ? "pri/" : "",
1014 (events & EPOLLERR) ? "err/" : "",
1015 (events & EPOLLHUP) ? "hup/" : "");
1016
1017 rb_native_mutex_lock(&timer_th.waiting_lock);
1018 {
1019 if (th->sched.waiting_reason.flags) {
1020 // delete from chain
1021 ccan_list_del_init(&th->sched.waiting_reason.node);
1022 timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
1023
1024 th->sched.waiting_reason.flags = thread_sched_waiting_none;
1025 th->sched.waiting_reason.data.fd = -1;
1026 th->sched.waiting_reason.data.result = (int)events;
1027
1028 timer_thread_wakeup_thread(th);
1029 }
1030 else {
1031 // already released
1032 }
1033 }
1034 rb_native_mutex_unlock(&timer_th.waiting_lock);
1035 }
1036 }
1037#endif
1038 }
1039}
1040
1041#else // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H
1042
1043static void
1044timer_thread_setup_mn(void)
1045{
1046 // do nothing
1047}
1048
1049static void
1050timer_thread_polling(rb_vm_t *vm)
1051{
1052 int timeout = timer_thread_set_timeout(vm);
1053
1054 struct pollfd pfd = {
1055 .fd = timer_th.comm_fds[0],
1056 .events = POLLIN,
1057 };
1058
1059 int r = poll(&pfd, 1, timeout);
1060
1061 switch (r) {
1062 case 0: // timeout
1063 rb_native_mutex_lock(&vm->ractor.sched.lock);
1064 {
1065 // (1-1) timeslice
1066 timer_thread_check_timeslice(vm);
1067 }
1068 rb_native_mutex_unlock(&vm->ractor.sched.lock);
1069 break;
1070
1071 case -1: // error
1072 switch (errno) {
1073 case EINTR:
1074 // simply retry
1075 break;
1076 default:
1077 perror("poll");
1078 rb_bug("poll errno:%d", errno);
1079 break;
1080 }
1081
1082 case 1:
1083 consume_communication_pipe(timer_th.comm_fds[0]);
1084 break;
1085
1086 default:
1087 rb_bug("unreachbale");
1088 }
1089}
1090
1091#endif // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
int len
Length of the buffer.
Definition io.h:8
#define RUBY_INTERNAL_THREAD_EVENT_RESUMED
Triggered when a thread successfully acquired the GVL.
Definition thread.h:238
#define RUBY_INTERNAL_THREAD_EVENT_SUSPENDED
Triggered when a thread released the GVL.
Definition thread.h:245
#define RBIMPL_ATTR_MAYBE_UNUSED()
Wraps (or simulates) [[maybe_unused]]
#define errno
Ractor-aware version of errno.
Definition ruby.h:388
void rb_native_mutex_lock(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_lock.
void rb_native_mutex_unlock(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_unlock.
void rb_native_cond_signal(rb_nativethread_cond_t *cond)
Signals a condition variable.
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40