Ruby 4.1.0dev (2026-04-17 revision 11e3c78b61da705c783dd12fb7f158c0d256ede0)
thread_pthread_mn.c (11e3c78b61da705c783dd12fb7f158c0d256ede0)
1// included by "thread_pthread.c"
2
3#if USE_MN_THREADS
4
5static void timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags);
6static void timer_thread_wakeup_thread_locked(struct rb_thread_sched *sched, rb_thread_t *th, uint32_t event_serial);
7
8static bool
9timer_thread_cancel_waiting(rb_thread_t *th)
10{
11 bool canceled = false;
12
13 rb_native_mutex_lock(&timer_th.waiting_lock);
14 {
15 if (th->sched.waiting_reason.flags) {
16 canceled = true;
17 ccan_list_del_init(&th->sched.waiting_reason.node);
18 timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
19 th->sched.waiting_reason.flags = thread_sched_waiting_none;
20 }
21 }
22 rb_native_mutex_unlock(&timer_th.waiting_lock);
23
24 return canceled;
25}
26
27static void
28ubf_event_waiting(void *ptr)
29{
30 rb_thread_t *th = (rb_thread_t *)ptr;
31 struct rb_thread_sched *sched = TH_SCHED(th);
32
33 RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
34
35 VM_ASSERT(th->nt == NULL || !th_has_dedicated_nt(th));
36
37 // only once. it is safe because th->interrupt_lock is already acquired.
38 th->unblock.func = NULL;
39 th->unblock.arg = NULL;
40
41 thread_sched_lock(sched, th);
42 {
43 bool canceled = timer_thread_cancel_waiting(th);
44
45 if (sched->running == th) {
46 RUBY_DEBUG_LOG("not waiting yet");
47 }
48 else if (canceled) {
49 thread_sched_to_ready_common(sched, th, true, false);
50 }
51 else {
52 RUBY_DEBUG_LOG("already not waiting");
53 }
54 }
55 thread_sched_unlock(sched, th);
56}
57
58static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial);
59
60// return true if timed out
61static bool
62thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
63{
64 VM_ASSERT(!th_has_dedicated_nt(th)); // on SNT
65
66 volatile bool timedout = false, need_cancel = false;
67
68 uint32_t event_serial = ++th->sched.event_serial; // overflow is okay
69
70
71 thread_sched_lock(sched, th);
72 {
73 // NOTE: there's a lock ordering inversion here with the ubf call, but it's benign.
74 if (ubf_set(th, ubf_event_waiting, (void *)th, NULL)) {
75 thread_sched_unlock(sched, th);
76 return false;
77 }
78
79 if (timer_thread_register_waiting(th, fd, events, rel, event_serial)) {
80 RUBY_DEBUG_LOG("wait fd:%d", fd);
81
82 RB_VM_SAVE_MACHINE_CONTEXT(th);
83
84 RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
85
86 if (th->sched.waiting_reason.flags == thread_sched_waiting_none) {
87 th->sched.event_serial++;
88 // timer thread has dequeued us already, but it won't try to wake us because we bumped our serial
89 }
90 else if (RUBY_VM_INTERRUPTED(th->ec)) {
91 th->sched.event_serial++; // make sure timer thread doesn't try to wake us
92 need_cancel = true;
93 }
94 else {
95 RUBY_DEBUG_LOG("sleep");
96
97 th->status = THREAD_STOPPED_FOREVER;
98 thread_sched_wakeup_next_thread(sched, th, true);
99 thread_sched_wait_running_turn(sched, th, true);
100
101 RUBY_DEBUG_LOG("wakeup");
102 }
103
104 timedout = th->sched.waiting_reason.data.result == 0;
105
106 if (need_cancel) {
107 timer_thread_cancel_waiting(th);
108 }
109
110 th->status = THREAD_RUNNABLE;
111 }
112 else {
113 RUBY_DEBUG_LOG("can not wait fd:%d", fd);
114 timedout = false;
115 }
116 }
117 thread_sched_unlock(sched, th);
118
119 // if ubf triggered between sched unlock and ubf clear, sched->running == th here
120 ubf_clear(th, false);
121
122 VM_ASSERT(sched->running == th);
123
124 return timedout;
125}
126
128
129static int
130get_sysconf_page_size(void)
131{
132 static long page_size = 0;
133
134 if (UNLIKELY(page_size == 0)) {
135 page_size = sysconf(_SC_PAGESIZE);
136 VM_ASSERT(page_size < INT_MAX);
137 }
138 return (int)page_size;
139}
140
141#define MSTACK_CHUNK_SIZE (512 * 1024 * 1024) // 512MB
142#define MSTACK_PAGE_SIZE get_sysconf_page_size()
143#define MSTACK_CHUNK_PAGE_NUM (MSTACK_CHUNK_SIZE / MSTACK_PAGE_SIZE - 1) // 1 is start redzone
144
145// 512MB chunk
146// 131,072 pages (> 65,536)
147// 0th page is Redzone. Start from 1st page.
148
149/*
150 * <--> machine stack + vm stack
151 * ----------------------------------
152 * |HD...|RZ| ... |RZ| ... ... |RZ|
153 * <------------- 512MB ------------->
154 */
155
156static struct nt_stack_chunk_header {
157 struct nt_stack_chunk_header *prev_chunk;
158 struct nt_stack_chunk_header *prev_free_chunk;
159
160 uint16_t start_page;
161 uint16_t stack_count;
162 uint16_t uninitialized_stack_count;
163
164 uint16_t free_stack_pos;
165 uint16_t free_stack[];
166} *nt_stack_chunks = NULL,
167 *nt_free_stack_chunks = NULL;
168
169struct nt_machine_stack_footer {
170 struct nt_stack_chunk_header *ch;
171 size_t index;
172};
173
174static rb_nativethread_lock_t nt_machine_stack_lock = RB_NATIVETHREAD_LOCK_INIT;
175
176#include <sys/mman.h>
177
178// vm_stack_size + machine_stack_size + 1 * (guard page size)
179static inline size_t
180nt_thread_stack_size(void)
181{
182 static size_t msz;
183 if (LIKELY(msz > 0)) return msz;
184
185 rb_vm_t *vm = GET_VM();
186 int sz = (int)(vm->default_params.thread_vm_stack_size + vm->default_params.thread_machine_stack_size + MSTACK_PAGE_SIZE);
187 int page_num = roomof(sz, MSTACK_PAGE_SIZE);
188 msz = (size_t)page_num * MSTACK_PAGE_SIZE;
189 return msz;
190}
191
192static struct nt_stack_chunk_header *
193nt_alloc_thread_stack_chunk(void)
194{
195 const char *m = (void *)mmap(NULL, MSTACK_CHUNK_SIZE, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
196 if (m == MAP_FAILED) {
197 return NULL;
198 }
199
200 ruby_annotate_mmap(m, MSTACK_CHUNK_SIZE, "Ruby:nt_alloc_thread_stack_chunk");
201
202 size_t msz = nt_thread_stack_size();
203 int header_page_cnt = 1;
204 int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
205 int ch_size = sizeof(struct nt_stack_chunk_header) + sizeof(uint16_t) * stack_count;
206
207 if (ch_size > MSTACK_PAGE_SIZE * header_page_cnt) {
208 header_page_cnt = (ch_size + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE;
209 stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
210 }
211
212 VM_ASSERT(stack_count <= UINT16_MAX);
213
214 // Enable read/write for the header pages
215 if (mprotect((void *)m, (size_t)header_page_cnt * MSTACK_PAGE_SIZE, PROT_READ | PROT_WRITE) != 0) {
216 munmap((void *)m, MSTACK_CHUNK_SIZE);
217 return NULL;
218 }
219
220 struct nt_stack_chunk_header *ch = (struct nt_stack_chunk_header *)m;
221
222 ch->start_page = header_page_cnt;
223 ch->prev_chunk = nt_stack_chunks;
224 ch->prev_free_chunk = nt_free_stack_chunks;
225 ch->uninitialized_stack_count = ch->stack_count = (uint16_t)stack_count;
226 ch->free_stack_pos = 0;
227
228 RUBY_DEBUG_LOG("ch:%p start_page:%d stack_cnt:%d stack_size:%d", ch, (int)ch->start_page, (int)ch->stack_count, (int)msz);
229
230 return ch;
231}
232
233static void *
234nt_stack_chunk_get_stack_start(struct nt_stack_chunk_header *ch, size_t idx)
235{
236 const char *m = (char *)ch;
237 return (void *)(m + ch->start_page * MSTACK_PAGE_SIZE + idx * nt_thread_stack_size());
238}
239
240static struct nt_machine_stack_footer *
241nt_stack_chunk_get_msf(const rb_vm_t *vm, const char *mstack)
242{
243 // TODO: stack direction
244 const size_t msz = vm->default_params.thread_machine_stack_size;
245 return (struct nt_machine_stack_footer *)&mstack[msz - sizeof(struct nt_machine_stack_footer)];
246}
247
248static void
249nt_stack_chunk_get_stack(const rb_vm_t *vm, struct nt_stack_chunk_header *ch, size_t idx, void **vm_stack, void **machine_stack)
250{
251 // TODO: only support stack going down
252 // [VM ... <GUARD> machine stack ...]
253
254 const char *vstack, *mstack;
255 const char *guard_page;
256 vstack = nt_stack_chunk_get_stack_start(ch, idx);
257 guard_page = vstack + vm->default_params.thread_vm_stack_size;
258 mstack = guard_page + MSTACK_PAGE_SIZE;
259
260 struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(vm, mstack);
261 msf->ch = ch;
262 msf->index = idx;
263
264#if 0
265 RUBY_DEBUG_LOG("msf:%p vstack:%p-%p guard_page:%p-%p mstack:%p-%p", msf,
266 vstack, (void *)(guard_page-1),
267 guard_page, (void *)(mstack-1),
268 mstack, (void *)(msf));
269#endif
270
271 *vm_stack = (void *)vstack;
272 *machine_stack = (void *)mstack;
273}
274
276static void
277nt_stack_chunk_dump(void)
278{
279 struct nt_stack_chunk_header *ch;
280 int i;
281
282 fprintf(stderr, "** nt_stack_chunks\n");
283 ch = nt_stack_chunks;
284 for (i=0; ch; i++, ch = ch->prev_chunk) {
285 fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos);
286 }
287
288 fprintf(stderr, "** nt_free_stack_chunks\n");
289 ch = nt_free_stack_chunks;
290 for (i=0; ch; i++, ch = ch->prev_free_chunk) {
291 fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos);
292 }
293}
294
295static int
296nt_alloc_stack(rb_vm_t *vm, void **vm_stack, void **machine_stack)
297{
298 int err = 0;
299
300 rb_native_mutex_lock(&nt_machine_stack_lock);
301 {
302 retry:
303 if (nt_free_stack_chunks) {
304 struct nt_stack_chunk_header *ch = nt_free_stack_chunks;
305 if (ch->free_stack_pos > 0) {
306 RUBY_DEBUG_LOG("free_stack_pos:%d", ch->free_stack_pos);
307 nt_stack_chunk_get_stack(vm, ch, ch->free_stack[--ch->free_stack_pos], vm_stack, machine_stack);
308 }
309 else if (ch->uninitialized_stack_count > 0) {
310 RUBY_DEBUG_LOG("uninitialized_stack_count:%d", ch->uninitialized_stack_count);
311
312 size_t idx = ch->stack_count - ch->uninitialized_stack_count--;
313
314 // The chunk was mapped PROT_NONE; enable the VM stack and
315 // machine stack pages, leaving the guard page as PROT_NONE.
316 char *stack_start = nt_stack_chunk_get_stack_start(ch, idx);
317 size_t vm_stack_size = vm->default_params.thread_vm_stack_size;
318 size_t mstack_size = nt_thread_stack_size() - vm_stack_size - MSTACK_PAGE_SIZE;
319 char *mstack_start = stack_start + vm_stack_size + MSTACK_PAGE_SIZE;
320
321 int mstack_flags = MAP_FIXED | MAP_ANONYMOUS | MAP_PRIVATE;
322#if defined(MAP_STACK) && !defined(__FreeBSD__) && !defined(__FreeBSD_kernel__)
323 mstack_flags |= MAP_STACK;
324#endif
325
326 if (mprotect(stack_start, vm_stack_size, PROT_READ | PROT_WRITE) != 0 ||
327 mmap(mstack_start, mstack_size, PROT_READ | PROT_WRITE, mstack_flags, -1, 0) == MAP_FAILED) {
328 err = errno;
329 }
330 else {
331 nt_stack_chunk_get_stack(vm, ch, idx, vm_stack, machine_stack);
332 }
333 }
334 else {
335 nt_free_stack_chunks = ch->prev_free_chunk;
336 ch->prev_free_chunk = NULL;
337 goto retry;
338 }
339 }
340 else {
341 struct nt_stack_chunk_header *p = nt_alloc_thread_stack_chunk();
342 if (p == NULL) {
343 err = errno;
344 }
345 else {
346 nt_free_stack_chunks = nt_stack_chunks = p;
347 goto retry;
348 }
349 }
350 }
351 rb_native_mutex_unlock(&nt_machine_stack_lock);
352
353 return err;
354}
355
356static void
357nt_madvise_free_or_dontneed(void *addr, size_t len)
358{
359 /* There is no real way to perform error handling here. Both MADV_FREE
360 * and MADV_DONTNEED are both documented to pretty much only return EINVAL
361 * for a huge variety of errors. It's indistinguishable if madvise fails
362 * because the parameters were bad, or because the kernel we're running on
363 * does not support the given advice. This kind of free-but-don't-unmap
364 * is best-effort anyway, so don't sweat it.
365 *
366 * n.b. A very common case of "the kernel doesn't support MADV_FREE and
367 * returns EINVAL" is running under the `rr` debugger; it makes all
368 * MADV_FREE calls return EINVAL. */
369
370#if defined(MADV_FREE)
371 int r = madvise(addr, len, MADV_FREE);
372 // Return on success, or else try MADV_DONTNEED
373 if (r == 0) return;
374#endif
375#if defined(MADV_DONTNEED)
376 madvise(addr, len, MADV_DONTNEED);
377#endif
378}
379
380static void
381nt_free_stack(void *mstack)
382{
383 if (!mstack) return;
384
385 rb_native_mutex_lock(&nt_machine_stack_lock);
386 {
387 struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(GET_VM(), mstack);
388 struct nt_stack_chunk_header *ch = msf->ch;
389 int idx = (int)msf->index;
390 void *stack = nt_stack_chunk_get_stack_start(ch, idx);
391
392 RUBY_DEBUG_LOG("stack:%p mstack:%p ch:%p index:%d", stack, mstack, ch, idx);
393
394 if (ch->prev_free_chunk == NULL) {
395 ch->prev_free_chunk = nt_free_stack_chunks;
396 nt_free_stack_chunks = ch;
397 }
398 ch->free_stack[ch->free_stack_pos++] = idx;
399
400 // clear the stack pages
401 nt_madvise_free_or_dontneed(stack, nt_thread_stack_size());
402 }
403 rb_native_mutex_unlock(&nt_machine_stack_lock);
404}
405
406
407static int
408native_thread_check_and_create_shared(rb_vm_t *vm)
409{
410 bool need_to_make = false;
411
412 rb_native_mutex_lock(&vm->ractor.sched.lock);
413 {
414 unsigned int schedulable_ractor_cnt = vm->ractor.cnt;
415 RUBY_ASSERT(schedulable_ractor_cnt >= 1);
416
417 if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads)
418 schedulable_ractor_cnt--; // do not need snt for main ractor
419
420 unsigned int snt_cnt = vm->ractor.sched.snt_cnt;
421 if (((int)snt_cnt < MINIMUM_SNT) ||
422 (snt_cnt < schedulable_ractor_cnt &&
423 snt_cnt < vm->ractor.sched.max_cpu)) {
424
425 RUBY_DEBUG_LOG("added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u",
426 vm->ractor.sched.snt_cnt,
427 vm->ractor.sched.dnt_cnt,
428 vm->ractor.cnt,
429 vm->ractor.sched.grq_cnt);
430
431 vm->ractor.sched.snt_cnt++;
432 need_to_make = true;
433 }
434 else {
435 RUBY_DEBUG_LOG("snt:%d ractor_cnt:%d", (int)vm->ractor.sched.snt_cnt, (int)vm->ractor.cnt);
436 }
437 }
438 rb_native_mutex_unlock(&vm->ractor.sched.lock);
439
440 if (need_to_make) {
441 struct rb_native_thread *nt = native_thread_alloc();
442 nt->vm = vm;
443 return native_thread_create0(nt);
444 }
445 else {
446 return 0;
447 }
448}
449
450#ifdef __APPLE__
451# define co_start ruby_coroutine_start
452#else
453static
454#endif
455COROUTINE
456co_start(struct coroutine_context *from, struct coroutine_context *self)
457{
458#ifdef RUBY_ASAN_ENABLED
459 __sanitizer_finish_switch_fiber(self->fake_stack,
460 (const void**)&from->stack_base, &from->stack_size);
461#endif
462
463 rb_thread_t *th = (rb_thread_t *)self->argument;
464 struct rb_thread_sched *sched = TH_SCHED(th);
465 VM_ASSERT(th->nt != NULL);
466 VM_ASSERT(th == sched->running);
467 VM_ASSERT(sched->lock_owner == NULL);
468
469 // RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
470
471 thread_sched_set_locked(sched, th);
472 thread_sched_add_running_thread(TH_SCHED(th), th);
473 thread_sched_unlock(sched, th);
474 {
475 RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED, th);
476 call_thread_start_func_2(th);
477 }
478 thread_sched_lock(sched, NULL);
479
480 RUBY_DEBUG_LOG("terminated th:%d", (int)th->serial);
481
482 // Thread is terminated
483
484 struct rb_native_thread *nt = th->nt;
485 bool is_dnt = th_has_dedicated_nt(th);
486 native_thread_assign(NULL, th);
487 rb_ractor_set_current_ec(th->ractor, NULL);
488
489 if (is_dnt) {
490 // SNT became DNT while running. Just return to the nt_context
491
492 th->sched.finished = true;
493 coroutine_transfer0(self, nt->nt_context, true);
494 }
495 else {
496 rb_thread_t *next_th = sched->running;
497
498 if (next_th && !next_th->nt) {
499 // switch to the next thread
500 thread_sched_set_unlocked(sched, NULL);
501 th->sched.finished = true;
502 thread_sched_switch0(th->sched.context, next_th, nt, true);
503 }
504 else {
505 // switch to the next Ractor
506 th->sched.finished = true;
507 coroutine_transfer0(self, nt->nt_context, true);
508 }
509 }
510
511 rb_bug("unreachable");
512}
513
514static int
515native_thread_create_shared(rb_thread_t *th)
516{
517 // setup coroutine
518 rb_vm_t *vm = th->vm;
519 void *vm_stack = NULL, *machine_stack = NULL;
520 int err = nt_alloc_stack(vm, &vm_stack, &machine_stack);
521 if (err) return err;
522
523 VM_ASSERT(vm_stack < machine_stack);
524
525 // setup vm stack
526 size_t vm_stack_words = th->vm->default_params.thread_vm_stack_size/sizeof(VALUE);
527 rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_words);
528
529 // setup machine stack
530 size_t machine_stack_size = vm->default_params.thread_machine_stack_size - sizeof(struct nt_machine_stack_footer);
531 th->ec->machine.stack_start = (void *)((uintptr_t)machine_stack + machine_stack_size);
532 th->ec->machine.stack_maxsize = machine_stack_size; // TODO
533 th->sched.context_stack = machine_stack;
534 th->sched.context_stack_size = machine_stack_size;
535
536 th->sched.context = ruby_xmalloc(sizeof(struct coroutine_context));
537 coroutine_initialize(th->sched.context, co_start, machine_stack, machine_stack_size);
538 th->sched.context->argument = th;
539
540 RUBY_DEBUG_LOG("th:%u vm_stack:%p machine_stack:%p", rb_th_serial(th), vm_stack, machine_stack);
541 thread_sched_to_ready(TH_SCHED(th), th);
542
543 // setup nt
544 return native_thread_check_and_create_shared(th->vm);
545}
546
547#else // USE_MN_THREADS
548
549static int
550native_thread_create_shared(rb_thread_t *th)
551{
552 rb_bug("unreachable");
553}
554
555static bool
556thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
557{
558 rb_bug("unreachable");
559}
560
561#endif // USE_MN_THREADS
562
564#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS
565
566static bool
567fd_readable_nonblock(int fd)
568{
569 struct pollfd pfd = {
570 .fd = fd,
571 .events = POLLIN,
572 };
573 return poll(&pfd, 1, 0) != 0;
574}
575
576static bool
577fd_writable_nonblock(int fd)
578{
579 struct pollfd pfd = {
580 .fd = fd,
581 .events = POLLOUT,
582 };
583 return poll(&pfd, 1, 0) != 0;
584}
585
586static void
587verify_waiting_list(void)
588{
589#if VM_CHECK_MODE > 0
590 struct rb_thread_sched_waiting *w, *prev_w = NULL;
591
592 // waiting list's timeout order should be [1, 2, 3, ..., 0, 0, 0]
593
594 ccan_list_for_each(&timer_th.waiting, w, node) {
595 // fprintf(stderr, "verify_waiting_list th:%u abs:%lu\n", rb_th_serial(wth), (unsigned long)wth->sched.waiting_reason.data.timeout);
596 if (prev_w) {
597 rb_hrtime_t timeout = w->data.timeout;
598 rb_hrtime_t prev_timeout = w->data.timeout;
599 VM_ASSERT(timeout == 0 || prev_timeout <= timeout);
600 }
601 prev_w = w;
602 }
603#endif
604}
605
606#if HAVE_SYS_EVENT_H // kqueue helpers
607
608static enum thread_sched_waiting_flag
609kqueue_translate_filter_to_flags(int16_t filter)
610{
611 switch (filter) {
612 case EVFILT_READ:
613 return thread_sched_waiting_io_read;
614 case EVFILT_WRITE:
615 return thread_sched_waiting_io_write;
616 case EVFILT_TIMER:
617 return thread_sched_waiting_timeout;
618 default:
619 rb_bug("kevent filter:%d not supported", filter);
620 }
621}
622
623static int
624kqueue_wait(rb_vm_t *vm)
625{
626 struct timespec calculated_timeout;
627 struct timespec *timeout = NULL;
628 int timeout_ms = timer_thread_set_timeout(vm);
629
630 if (timeout_ms > 0) {
631 calculated_timeout.tv_sec = timeout_ms / 1000;
632 calculated_timeout.tv_nsec = (timeout_ms % 1000) * 1000000;
633 timeout = &calculated_timeout;
634 }
635 else if (timeout_ms == 0) {
636 // Relying on the absence of other members of struct timespec is not strictly portable,
637 // and kevent needs a 0-valued timespec to mean immediate timeout.
638 memset(&calculated_timeout, 0, sizeof(struct timespec));
639 timeout = &calculated_timeout;
640 }
641
642 return kevent(timer_th.event_fd, NULL, 0, timer_th.finished_events, KQUEUE_EVENTS_MAX, timeout);
643}
644
645static void
646kqueue_create(void)
647{
648 if ((timer_th.event_fd = kqueue()) == -1) rb_bug("kqueue creation failed (errno:%d)", errno);
649 int flags = fcntl(timer_th.event_fd, F_GETFD);
650 if (flags == -1) {
651 rb_bug("kqueue GETFD failed (errno:%d)", errno);
652 }
653
654 flags |= FD_CLOEXEC;
655 if (fcntl(timer_th.event_fd, F_SETFD, flags) == -1) {
656 rb_bug("kqueue SETFD failed (errno:%d)", errno);
657 }
658}
659
660static void
661kqueue_unregister_waiting(int fd, enum thread_sched_waiting_flag flags)
662{
663 if (flags) {
664 struct kevent ke[2];
665 int num_events = 0;
666
667 if (flags & thread_sched_waiting_io_read) {
668 EV_SET(&ke[num_events], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
669 num_events++;
670 }
671 if (flags & thread_sched_waiting_io_write) {
672 EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
673 num_events++;
674 }
675 if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
676 perror("kevent");
677 rb_bug("unregister/kevent fails. errno:%d", errno);
678 }
679 }
680}
681
682static bool
683kqueue_already_registered(int fd)
684{
685 struct rb_thread_sched_waiting *w, *found_w = NULL;
686
687 ccan_list_for_each(&timer_th.waiting, w, node) {
688 // Similar to EEXIST in epoll_ctl, but more strict because it checks fd rather than flags
689 // for simplicity
690 if (w->flags && w->data.fd == fd) {
691 found_w = w;
692 break;
693 }
694 }
695 return found_w != NULL;
696}
697
698#endif // HAVE_SYS_EVENT_H
699
700// return false if the fd is not waitable or not need to wait.
701static bool
702timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial)
703{
704 RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (unsigned long)*rel : 0);
705
706 VM_ASSERT(th == NULL || TH_SCHED(th)->running == th);
707 VM_ASSERT(flags != 0);
708
709 rb_hrtime_t abs = 0; // 0 means no timeout
710
711 if (rel) {
712 if (*rel > 0) {
713 flags |= thread_sched_waiting_timeout;
714 }
715 else {
716 return false;
717 }
718 }
719
720 if (rel && *rel > 0) {
721 flags |= thread_sched_waiting_timeout;
722 }
723
724#if HAVE_SYS_EVENT_H
725 struct kevent ke[2];
726 int num_events = 0;
727#else
728 uint32_t epoll_events = 0;
729#endif
730 if (flags & thread_sched_waiting_timeout) {
731 VM_ASSERT(rel != NULL);
732 abs = rb_hrtime_add(rb_hrtime_now(), *rel);
733 }
734
735 if (flags & thread_sched_waiting_io_read) {
736 if (!(flags & thread_sched_waiting_io_force) && fd_readable_nonblock(fd)) {
737 RUBY_DEBUG_LOG("fd_readable_nonblock");
738 return false;
739 }
740 else {
741 VM_ASSERT(fd >= 0);
742#if HAVE_SYS_EVENT_H
743 EV_SET(&ke[num_events], fd, EVFILT_READ, EV_ADD, 0, 0, (void *)th);
744 num_events++;
745#else
746 epoll_events |= EPOLLIN;
747#endif
748 }
749 }
750
751 if (flags & thread_sched_waiting_io_write) {
752 if (!(flags & thread_sched_waiting_io_force) && fd_writable_nonblock(fd)) {
753 RUBY_DEBUG_LOG("fd_writable_nonblock");
754 return false;
755 }
756 else {
757 VM_ASSERT(fd >= 0);
758#if HAVE_SYS_EVENT_H
759 EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_ADD, 0, 0, (void *)th);
760 num_events++;
761#else
762 epoll_events |= EPOLLOUT;
763#endif
764 }
765 }
766
767 rb_native_mutex_lock(&timer_th.waiting_lock);
768 {
769#if HAVE_SYS_EVENT_H
770 if (num_events > 0) {
771 if (kqueue_already_registered(fd)) {
772 rb_native_mutex_unlock(&timer_th.waiting_lock);
773 return false;
774 }
775
776 if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
777 RUBY_DEBUG_LOG("failed (%d)", errno);
778
779 switch (errno) {
780 case EBADF:
781 // the fd is closed?
782 case EINTR:
783 // signal received? is there a sensible way to handle this?
784 default:
785 perror("kevent");
786 rb_bug("register/kevent failed(fd:%d, errno:%d)", fd, errno);
787 }
788 }
789 RUBY_DEBUG_LOG("kevent(add, fd:%d) success", fd);
790 }
791#else
792 if (epoll_events) {
793 struct epoll_event event = {
794 .events = epoll_events,
795 .data = {
796 .ptr = (void *)th,
797 },
798 };
799 if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
800 RUBY_DEBUG_LOG("failed (%d)", errno);
801
802 switch (errno) {
803 case EBADF:
804 // the fd is closed?
805 case EPERM:
806 // the fd doesn't support epoll
807 case EEXIST:
808 // the fd is already registered by another thread
809 rb_native_mutex_unlock(&timer_th.waiting_lock);
810 return false;
811 default:
812 perror("epoll_ctl");
813 rb_bug("register/epoll_ctl failed(fd:%d, errno:%d)", fd, errno);
814 }
815 }
816 RUBY_DEBUG_LOG("epoll_ctl(add, fd:%d, events:%d) success", fd, epoll_events);
817 }
818#endif
819
820 if (th) {
821 VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none);
822
823 // setup waiting information
824 {
825 th->sched.waiting_reason.flags = flags;
826 th->sched.waiting_reason.data.timeout = abs;
827 th->sched.waiting_reason.data.fd = fd;
828 th->sched.waiting_reason.data.result = 0;
829 th->sched.waiting_reason.data.event_serial = event_serial;
830 }
831
832 if (abs == 0) { // no timeout
833 VM_ASSERT(!(flags & thread_sched_waiting_timeout));
834 ccan_list_add_tail(&timer_th.waiting, &th->sched.waiting_reason.node);
835 }
836 else {
837 RUBY_DEBUG_LOG("abs:%lu", (unsigned long)abs);
838 VM_ASSERT(flags & thread_sched_waiting_timeout);
839
840 // insert th to sorted list (TODO: O(n))
841 struct rb_thread_sched_waiting *w, *prev_w = NULL;
842
843 ccan_list_for_each(&timer_th.waiting, w, node) {
844 if ((w->flags & thread_sched_waiting_timeout) &&
845 w->data.timeout < abs) {
846 prev_w = w;
847 }
848 else {
849 break;
850 }
851 }
852
853 if (prev_w) {
854 ccan_list_add_after(&timer_th.waiting, &prev_w->node, &th->sched.waiting_reason.node);
855 }
856 else {
857 ccan_list_add(&timer_th.waiting, &th->sched.waiting_reason.node);
858 }
859
860 verify_waiting_list();
861
862 // update timeout seconds; force wake so timer thread notices short deadlines
863 timer_thread_wakeup_force();
864 }
865 }
866 else {
867 VM_ASSERT(abs == 0);
868 }
869 }
870 rb_native_mutex_unlock(&timer_th.waiting_lock);
871
872 return true;
873}
874
875static void
876timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags)
877{
878 if (!(th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write))) {
879 return;
880 }
881
882 RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd);
883#if HAVE_SYS_EVENT_H
884 kqueue_unregister_waiting(fd, flags);
885#else
886 // Linux 2.6.9 or later is needed to pass NULL as data.
887 if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_DEL, fd, NULL) == -1) {
888 switch (errno) {
889 case EBADF:
890 // just ignore. maybe fd is closed.
891 break;
892 default:
893 perror("epoll_ctl");
894 rb_bug("unregister/epoll_ctl fails. errno:%d", errno);
895 }
896 }
897#endif
898}
899
900static void
901timer_thread_setup_mn(void)
902{
903#if HAVE_SYS_EVENT_H
904 kqueue_create();
905 RUBY_DEBUG_LOG("kqueue_fd:%d", timer_th.event_fd);
906#else
907 if ((timer_th.event_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug("epoll_create (errno:%d)", errno);
908 RUBY_DEBUG_LOG("epoll_fd:%d", timer_th.event_fd);
909#endif
910 RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]);
911
912 timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL, 0);
913}
914
915static int
916event_wait(rb_vm_t *vm)
917{
918#if HAVE_SYS_EVENT_H
919 int r = kqueue_wait(vm);
920#else
921 int r = epoll_wait(timer_th.event_fd, timer_th.finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm));
922#endif
923 return r;
924}
925
926/*
927 * The purpose of the timer thread:
928 *
929 * (1) Periodic checking
930 * (1-1) Provide time slice for active NTs
931 * (1-2) Check NT shortage
932 * (1-3) Periodic UBF (global)
933 * (1-4) Lazy GRQ deq start
934 * (2) Receive notification
935 * (2-1) async I/O termination
936 * (2-2) timeout
937 * (2-2-1) sleep(n)
938 * (2-2-2) timeout(n), I/O, ...
939 */
940static void
941timer_thread_polling(rb_vm_t *vm)
942{
943 int r = event_wait(vm);
944
945 RUBY_DEBUG_LOG("r:%d errno:%d", r, errno);
946
947 switch (r) {
948 case 0: // timeout
949 RUBY_DEBUG_LOG("timeout%s", "");
950
951 ractor_sched_lock(vm, NULL);
952 {
953 // (1-1) timeslice
954 timer_thread_check_timeslice(vm);
955
956 // (1-4) lazy grq deq
957 if (vm->ractor.sched.grq_cnt > 0) {
958 RUBY_DEBUG_LOG("GRQ cnt: %u", vm->ractor.sched.grq_cnt);
959 rb_native_cond_signal(&vm->ractor.sched.cond);
960 }
961 }
962 ractor_sched_unlock(vm, NULL);
963
964 // (1-2)
965 native_thread_check_and_create_shared(vm);
966
967 break;
968
969 case -1:
970 switch (errno) {
971 case EINTR:
972 // simply retry
973 break;
974 default:
975 perror("event_wait");
976 rb_bug("event_wait errno:%d", errno);
977 }
978 break;
979
980 default:
981 RUBY_DEBUG_LOG("%d event(s)", r);
982
983#if HAVE_SYS_EVENT_H
984 for (int i=0; i<r; i++) {
985 rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].udata;
986 int fd = (int)timer_th.finished_events[i].ident;
987 int16_t filter = timer_th.finished_events[i].filter;
988
989 if (th == NULL) {
990 // wakeup timerthread
991 RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]);
992 consume_communication_pipe(timer_th.comm_fds[0]);
993 }
994 else {
995 // wakeup specific thread by IO
996 RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s",
997 rb_th_serial(th),
998 (filter == EVFILT_READ) ? "read/" : "",
999 (filter == EVFILT_WRITE) ? "write/" : "");
1000
1001 struct rb_thread_sched *sched = TH_SCHED(th);
1002 thread_sched_lock(sched, th);
1003 rb_native_mutex_lock(&timer_th.waiting_lock);
1004 {
1005 if (th->sched.waiting_reason.flags) {
1006 // delete from chain
1007 ccan_list_del_init(&th->sched.waiting_reason.node);
1008 timer_thread_unregister_waiting(th, fd, kqueue_translate_filter_to_flags(filter));
1009
1010 th->sched.waiting_reason.flags = thread_sched_waiting_none;
1011 th->sched.waiting_reason.data.fd = -1;
1012 th->sched.waiting_reason.data.result = filter;
1013 uint32_t event_serial = th->sched.waiting_reason.data.event_serial;
1014
1015 timer_thread_wakeup_thread_locked(sched, th, event_serial);
1016 }
1017 else {
1018 // already released
1019 }
1020 }
1021 rb_native_mutex_unlock(&timer_th.waiting_lock);
1022 thread_sched_unlock(sched, th);
1023 }
1024 }
1025#else
1026 for (int i=0; i<r; i++) {
1027 rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].data.ptr;
1028
1029 if (th == NULL) {
1030 // wakeup timerthread
1031 RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]);
1032 consume_communication_pipe(timer_th.comm_fds[0]);
1033 }
1034 else {
1035 // wakeup specific thread by IO
1036 uint32_t events = timer_th.finished_events[i].events;
1037
1038 RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s%s%s%s%s",
1039 rb_th_serial(th),
1040 (events & EPOLLIN) ? "in/" : "",
1041 (events & EPOLLOUT) ? "out/" : "",
1042 (events & EPOLLRDHUP) ? "RDHUP/" : "",
1043 (events & EPOLLPRI) ? "pri/" : "",
1044 (events & EPOLLERR) ? "err/" : "",
1045 (events & EPOLLHUP) ? "hup/" : "");
1046
1047 struct rb_thread_sched *sched = TH_SCHED(th);
1048 thread_sched_lock(sched, th);
1049 rb_native_mutex_lock(&timer_th.waiting_lock);
1050 {
1051 if (th->sched.waiting_reason.flags) {
1052 // delete from chain
1053 ccan_list_del_init(&th->sched.waiting_reason.node);
1054 timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
1055
1056 th->sched.waiting_reason.flags = thread_sched_waiting_none;
1057 th->sched.waiting_reason.data.fd = -1;
1058 th->sched.waiting_reason.data.result = (int)events;
1059 uint32_t event_serial = th->sched.waiting_reason.data.event_serial;
1060
1061 timer_thread_wakeup_thread_locked(sched, th, event_serial);
1062 }
1063 else {
1064 // already released
1065 }
1066 }
1067 rb_native_mutex_unlock(&timer_th.waiting_lock);
1068 thread_sched_unlock(sched, th);
1069 }
1070 }
1071#endif
1072 }
1073}
1074
1075#else // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H
1076
1077static void
1078timer_thread_setup_mn(void)
1079{
1080 // do nothing
1081}
1082
1083static void
1084timer_thread_polling(rb_vm_t *vm)
1085{
1086 int timeout = timer_thread_set_timeout(vm);
1087
1088 struct pollfd pfd = {
1089 .fd = timer_th.comm_fds[0],
1090 .events = POLLIN,
1091 };
1092
1093 int r = poll(&pfd, 1, timeout);
1094
1095 switch (r) {
1096 case 0: // timeout
1097 ractor_sched_lock(vm, NULL);
1098 {
1099 // (1-1) timeslice
1100 timer_thread_check_timeslice(vm);
1101 }
1102 ractor_sched_unlock(vm, NULL);
1103 break;
1104
1105 case -1: // error
1106 switch (errno) {
1107 case EINTR:
1108 // simply retry
1109 break;
1110 default:
1111 perror("poll");
1112 rb_bug("poll errno:%d", errno);
1113 break;
1114 }
1115
1116 case 1:
1117 consume_communication_pipe(timer_th.comm_fds[0]);
1118 break;
1119
1120 default:
1121 rb_bug("unreachbale");
1122 }
1123}
1124
1125#endif // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
int len
Length of the buffer.
Definition io.h:8
#define RUBY_INTERNAL_THREAD_EVENT_RESUMED
Triggered when a thread successfully acquired the GVL.
Definition thread.h:238
#define RUBY_INTERNAL_THREAD_EVENT_SUSPENDED
Triggered when a thread released the GVL.
Definition thread.h:245
#define RBIMPL_ATTR_MAYBE_UNUSED()
Wraps (or simulates) [[maybe_unused]]
#define errno
Ractor-aware version of errno.
Definition ruby.h:388
void rb_native_mutex_lock(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_lock.
void rb_native_mutex_unlock(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_unlock.
void rb_native_cond_signal(rb_nativethread_cond_t *cond)
Signals a condition variable.
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40