5static void timer_thread_unregister_waiting(
rb_thread_t *th,
int fd,
enum thread_sched_waiting_flag flags);
11 bool canceled =
false;
15 if (th->sched.waiting_reason.flags) {
17 ccan_list_del_init(&th->sched.waiting_reason.node);
18 timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
19 th->sched.waiting_reason.flags = thread_sched_waiting_none;
28ubf_event_waiting(
void *ptr)
33 RUBY_DEBUG_LOG(
"th:%u", rb_th_serial(th));
35 VM_ASSERT(th->nt == NULL || !th_has_dedicated_nt(th));
38 th->unblock.func = NULL;
39 th->unblock.arg = NULL;
41 thread_sched_lock(sched, th);
43 bool canceled = timer_thread_cancel_waiting(th);
45 if (sched->running == th) {
46 RUBY_DEBUG_LOG(
"not waiting yet");
49 thread_sched_to_ready_common(sched, th,
true,
false);
52 RUBY_DEBUG_LOG(
"already not waiting");
55 thread_sched_unlock(sched, th);
58static bool timer_thread_register_waiting(
rb_thread_t *th,
int fd,
enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial);
62thread_sched_wait_events(
struct rb_thread_sched *sched,
rb_thread_t *th,
int fd,
enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
64 VM_ASSERT(!th_has_dedicated_nt(th));
66 volatile bool timedout =
false, need_cancel =
false;
68 uint32_t event_serial = ++th->sched.event_serial;
71 thread_sched_lock(sched, th);
74 if (ubf_set(th, ubf_event_waiting, (
void *)th, NULL)) {
75 thread_sched_unlock(sched, th);
79 if (timer_thread_register_waiting(th, fd, events, rel, event_serial)) {
80 RUBY_DEBUG_LOG(
"wait fd:%d", fd);
82 RB_VM_SAVE_MACHINE_CONTEXT(th);
86 if (th->sched.waiting_reason.flags == thread_sched_waiting_none) {
87 th->sched.event_serial++;
90 else if (RUBY_VM_INTERRUPTED(th->ec)) {
91 th->sched.event_serial++;
95 RUBY_DEBUG_LOG(
"sleep");
97 th->status = THREAD_STOPPED_FOREVER;
98 thread_sched_wakeup_next_thread(sched, th,
true);
99 thread_sched_wait_running_turn(sched, th,
true);
101 RUBY_DEBUG_LOG(
"wakeup");
104 timedout = th->sched.waiting_reason.data.result == 0;
107 timer_thread_cancel_waiting(th);
110 th->status = THREAD_RUNNABLE;
113 RUBY_DEBUG_LOG(
"can not wait fd:%d", fd);
117 thread_sched_unlock(sched, th);
120 ubf_clear(th,
false);
122 VM_ASSERT(sched->running == th);
130get_sysconf_page_size(
void)
132 static long page_size = 0;
134 if (UNLIKELY(page_size == 0)) {
135 page_size = sysconf(_SC_PAGESIZE);
136 VM_ASSERT(page_size < INT_MAX);
138 return (
int)page_size;
141#define MSTACK_CHUNK_SIZE (512 * 1024 * 1024)
142#define MSTACK_PAGE_SIZE get_sysconf_page_size()
143#define MSTACK_CHUNK_PAGE_NUM (MSTACK_CHUNK_SIZE / MSTACK_PAGE_SIZE - 1)
156static struct nt_stack_chunk_header {
157 struct nt_stack_chunk_header *prev_chunk;
158 struct nt_stack_chunk_header *prev_free_chunk;
161 uint16_t stack_count;
162 uint16_t uninitialized_stack_count;
164 uint16_t free_stack_pos;
165 uint16_t free_stack[];
166} *nt_stack_chunks = NULL,
167 *nt_free_stack_chunks = NULL;
169struct nt_machine_stack_footer {
170 struct nt_stack_chunk_header *ch;
174static rb_nativethread_lock_t nt_machine_stack_lock = RB_NATIVETHREAD_LOCK_INIT;
180nt_thread_stack_size(
void)
183 if (LIKELY(msz > 0))
return msz;
186 int sz = (int)(vm->default_params.thread_vm_stack_size + vm->default_params.thread_machine_stack_size + MSTACK_PAGE_SIZE);
187 int page_num = roomof(sz, MSTACK_PAGE_SIZE);
188 msz = (size_t)page_num * MSTACK_PAGE_SIZE;
192static struct nt_stack_chunk_header *
193nt_alloc_thread_stack_chunk(
void)
195 const char *m = (
void *)mmap(NULL, MSTACK_CHUNK_SIZE, PROT_NONE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0);
196 if (m == MAP_FAILED) {
200 ruby_annotate_mmap(m, MSTACK_CHUNK_SIZE,
"Ruby:nt_alloc_thread_stack_chunk");
202 size_t msz = nt_thread_stack_size();
203 int header_page_cnt = 1;
204 int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
205 int ch_size =
sizeof(
struct nt_stack_chunk_header) + sizeof(uint16_t) * stack_count;
207 if (ch_size > MSTACK_PAGE_SIZE * header_page_cnt) {
208 header_page_cnt = (ch_size + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE;
209 stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
212 VM_ASSERT(stack_count <= UINT16_MAX);
215 if (mprotect((
void *)m, (
size_t)header_page_cnt * MSTACK_PAGE_SIZE, PROT_READ | PROT_WRITE) != 0) {
216 munmap((
void *)m, MSTACK_CHUNK_SIZE);
220 struct nt_stack_chunk_header *ch = (
struct nt_stack_chunk_header *)m;
222 ch->start_page = header_page_cnt;
223 ch->prev_chunk = nt_stack_chunks;
224 ch->prev_free_chunk = nt_free_stack_chunks;
225 ch->uninitialized_stack_count = ch->stack_count = (uint16_t)stack_count;
226 ch->free_stack_pos = 0;
228 RUBY_DEBUG_LOG(
"ch:%p start_page:%d stack_cnt:%d stack_size:%d", ch, (
int)ch->start_page, (
int)ch->stack_count, (
int)msz);
234nt_stack_chunk_get_stack_start(
struct nt_stack_chunk_header *ch,
size_t idx)
236 const char *m = (
char *)ch;
237 return (
void *)(m + ch->start_page * MSTACK_PAGE_SIZE + idx * nt_thread_stack_size());
240static struct nt_machine_stack_footer *
241nt_stack_chunk_get_msf(
const rb_vm_t *vm,
const char *mstack)
244 const size_t msz = vm->default_params.thread_machine_stack_size;
245 return (
struct nt_machine_stack_footer *)&mstack[msz -
sizeof(
struct nt_machine_stack_footer)];
249nt_stack_chunk_get_stack(
const rb_vm_t *vm,
struct nt_stack_chunk_header *ch,
size_t idx,
void **vm_stack,
void **machine_stack)
254 const char *vstack, *mstack;
255 const char *guard_page;
256 vstack = nt_stack_chunk_get_stack_start(ch, idx);
257 guard_page = vstack + vm->default_params.thread_vm_stack_size;
258 mstack = guard_page + MSTACK_PAGE_SIZE;
260 struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(vm, mstack);
265 RUBY_DEBUG_LOG(
"msf:%p vstack:%p-%p guard_page:%p-%p mstack:%p-%p", msf,
266 vstack, (
void *)(guard_page-1),
267 guard_page, (
void *)(mstack-1),
268 mstack, (
void *)(msf));
271 *vm_stack = (
void *)vstack;
272 *machine_stack = (
void *)mstack;
277nt_stack_chunk_dump(
void)
279 struct nt_stack_chunk_header *ch;
282 fprintf(stderr,
"** nt_stack_chunks\n");
283 ch = nt_stack_chunks;
284 for (i=0; ch; i++, ch = ch->prev_chunk) {
285 fprintf(stderr,
"%d %p free_pos:%d\n", i, (
void *)ch, (
int)ch->free_stack_pos);
288 fprintf(stderr,
"** nt_free_stack_chunks\n");
289 ch = nt_free_stack_chunks;
290 for (i=0; ch; i++, ch = ch->prev_free_chunk) {
291 fprintf(stderr,
"%d %p free_pos:%d\n", i, (
void *)ch, (
int)ch->free_stack_pos);
296nt_alloc_stack(
rb_vm_t *vm,
void **vm_stack,
void **machine_stack)
303 if (nt_free_stack_chunks) {
304 struct nt_stack_chunk_header *ch = nt_free_stack_chunks;
305 if (ch->free_stack_pos > 0) {
306 RUBY_DEBUG_LOG(
"free_stack_pos:%d", ch->free_stack_pos);
307 nt_stack_chunk_get_stack(vm, ch, ch->free_stack[--ch->free_stack_pos], vm_stack, machine_stack);
309 else if (ch->uninitialized_stack_count > 0) {
310 RUBY_DEBUG_LOG(
"uninitialized_stack_count:%d", ch->uninitialized_stack_count);
312 size_t idx = ch->stack_count - ch->uninitialized_stack_count--;
316 char *stack_start = nt_stack_chunk_get_stack_start(ch, idx);
317 size_t vm_stack_size = vm->default_params.thread_vm_stack_size;
318 size_t mstack_size = nt_thread_stack_size() - vm_stack_size - MSTACK_PAGE_SIZE;
319 char *mstack_start = stack_start + vm_stack_size + MSTACK_PAGE_SIZE;
321 int mstack_flags = MAP_FIXED | MAP_ANONYMOUS | MAP_PRIVATE;
322#if defined(MAP_STACK) && !defined(__FreeBSD__) && !defined(__FreeBSD_kernel__)
323 mstack_flags |= MAP_STACK;
326 if (mprotect(stack_start, vm_stack_size, PROT_READ | PROT_WRITE) != 0 ||
327 mmap(mstack_start, mstack_size, PROT_READ | PROT_WRITE, mstack_flags, -1, 0) == MAP_FAILED) {
331 nt_stack_chunk_get_stack(vm, ch, idx, vm_stack, machine_stack);
335 nt_free_stack_chunks = ch->prev_free_chunk;
336 ch->prev_free_chunk = NULL;
341 struct nt_stack_chunk_header *p = nt_alloc_thread_stack_chunk();
346 nt_free_stack_chunks = nt_stack_chunks = p;
357nt_madvise_free_or_dontneed(
void *addr,
size_t len)
370#if defined(MADV_FREE)
371 int r = madvise(addr,
len, MADV_FREE);
375#if defined(MADV_DONTNEED)
376 madvise(addr,
len, MADV_DONTNEED);
381nt_free_stack(
void *mstack)
387 struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(GET_VM(), mstack);
388 struct nt_stack_chunk_header *ch = msf->ch;
389 int idx = (int)msf->index;
390 void *stack = nt_stack_chunk_get_stack_start(ch, idx);
392 RUBY_DEBUG_LOG(
"stack:%p mstack:%p ch:%p index:%d", stack, mstack, ch, idx);
394 if (ch->prev_free_chunk == NULL) {
395 ch->prev_free_chunk = nt_free_stack_chunks;
396 nt_free_stack_chunks = ch;
398 ch->free_stack[ch->free_stack_pos++] = idx;
401 nt_madvise_free_or_dontneed(stack, nt_thread_stack_size());
408native_thread_check_and_create_shared(
rb_vm_t *vm)
410 bool need_to_make =
false;
414 unsigned int schedulable_ractor_cnt = vm->ractor.cnt;
417 if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads)
418 schedulable_ractor_cnt--;
420 unsigned int snt_cnt = vm->ractor.sched.snt_cnt;
421 if (((
int)snt_cnt < MINIMUM_SNT) ||
422 (snt_cnt < schedulable_ractor_cnt &&
423 snt_cnt < vm->ractor.sched.max_cpu)) {
425 RUBY_DEBUG_LOG(
"added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u",
426 vm->ractor.sched.snt_cnt,
427 vm->ractor.sched.dnt_cnt,
429 vm->ractor.sched.grq_cnt);
431 vm->ractor.sched.snt_cnt++;
435 RUBY_DEBUG_LOG(
"snt:%d ractor_cnt:%d", (
int)vm->ractor.sched.snt_cnt, (
int)vm->ractor.cnt);
443 return native_thread_create0(nt);
451# define co_start ruby_coroutine_start
458#ifdef RUBY_ASAN_ENABLED
459 __sanitizer_finish_switch_fiber(self->fake_stack,
460 (
const void**)&from->stack_base, &from->stack_size);
465 VM_ASSERT(th->nt != NULL);
466 VM_ASSERT(th == sched->running);
467 VM_ASSERT(sched->lock_owner == NULL);
471 thread_sched_set_locked(sched, th);
472 thread_sched_add_running_thread(TH_SCHED(th), th);
473 thread_sched_unlock(sched, th);
476 call_thread_start_func_2(th);
478 thread_sched_lock(sched, NULL);
480 RUBY_DEBUG_LOG(
"terminated th:%d", (
int)th->serial);
485 bool is_dnt = th_has_dedicated_nt(th);
486 native_thread_assign(NULL, th);
487 rb_ractor_set_current_ec(th->ractor, NULL);
492 th->sched.finished =
true;
493 coroutine_transfer0(self, nt->nt_context,
true);
498 if (next_th && !next_th->nt) {
500 thread_sched_set_unlocked(sched, NULL);
501 th->sched.finished =
true;
502 thread_sched_switch0(th->sched.context, next_th, nt,
true);
506 th->sched.finished =
true;
507 coroutine_transfer0(self, nt->nt_context,
true);
511 rb_bug(
"unreachable");
519 void *vm_stack = NULL, *machine_stack = NULL;
520 int err = nt_alloc_stack(vm, &vm_stack, &machine_stack);
523 VM_ASSERT(vm_stack < machine_stack);
526 size_t vm_stack_words = th->vm->default_params.thread_vm_stack_size/
sizeof(
VALUE);
527 rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_words);
530 size_t machine_stack_size = vm->default_params.thread_machine_stack_size -
sizeof(
struct nt_machine_stack_footer);
531 th->ec->machine.stack_start = (
void *)((uintptr_t)machine_stack + machine_stack_size);
532 th->ec->machine.stack_maxsize = machine_stack_size;
533 th->sched.context_stack = machine_stack;
534 th->sched.context_stack_size = machine_stack_size;
537 coroutine_initialize(th->sched.context, co_start, machine_stack, machine_stack_size);
538 th->sched.context->argument = th;
540 RUBY_DEBUG_LOG(
"th:%u vm_stack:%p machine_stack:%p", rb_th_serial(th), vm_stack, machine_stack);
541 thread_sched_to_ready(TH_SCHED(th), th);
544 return native_thread_check_and_create_shared(th->vm);
552 rb_bug(
"unreachable");
556thread_sched_wait_events(
struct rb_thread_sched *sched,
rb_thread_t *th,
int fd,
enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
558 rb_bug(
"unreachable");
564#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS
567fd_readable_nonblock(
int fd)
569 struct pollfd pfd = {
573 return poll(&pfd, 1, 0) != 0;
577fd_writable_nonblock(
int fd)
579 struct pollfd pfd = {
583 return poll(&pfd, 1, 0) != 0;
587verify_waiting_list(
void)
594 ccan_list_for_each(&timer_th.waiting, w, node) {
597 rb_hrtime_t timeout = w->data.timeout;
598 rb_hrtime_t prev_timeout = w->data.timeout;
599 VM_ASSERT(timeout == 0 || prev_timeout <= timeout);
608static enum thread_sched_waiting_flag
609kqueue_translate_filter_to_flags(int16_t filter)
613 return thread_sched_waiting_io_read;
615 return thread_sched_waiting_io_write;
617 return thread_sched_waiting_timeout;
619 rb_bug(
"kevent filter:%d not supported", filter);
628 int timeout_ms = timer_thread_set_timeout(vm);
630 if (timeout_ms > 0) {
631 calculated_timeout.tv_sec = timeout_ms / 1000;
632 calculated_timeout.tv_nsec = (timeout_ms % 1000) * 1000000;
633 timeout = &calculated_timeout;
635 else if (timeout_ms == 0) {
638 memset(&calculated_timeout, 0,
sizeof(
struct timespec));
639 timeout = &calculated_timeout;
642 return kevent(timer_th.event_fd, NULL, 0, timer_th.finished_events, KQUEUE_EVENTS_MAX, timeout);
648 if ((timer_th.event_fd = kqueue()) == -1) rb_bug(
"kqueue creation failed (errno:%d)",
errno);
649 int flags = fcntl(timer_th.event_fd, F_GETFD);
651 rb_bug(
"kqueue GETFD failed (errno:%d)",
errno);
655 if (fcntl(timer_th.event_fd, F_SETFD, flags) == -1) {
656 rb_bug(
"kqueue SETFD failed (errno:%d)",
errno);
661kqueue_unregister_waiting(
int fd,
enum thread_sched_waiting_flag flags)
667 if (flags & thread_sched_waiting_io_read) {
668 EV_SET(&ke[num_events], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
671 if (flags & thread_sched_waiting_io_write) {
672 EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
675 if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
677 rb_bug(
"unregister/kevent fails. errno:%d",
errno);
683kqueue_already_registered(
int fd)
687 ccan_list_for_each(&timer_th.waiting, w, node) {
690 if (w->flags && w->data.fd == fd) {
695 return found_w != NULL;
702timer_thread_register_waiting(
rb_thread_t *th,
int fd,
enum thread_sched_waiting_flag flags, rb_hrtime_t *rel, uint32_t event_serial)
704 RUBY_DEBUG_LOG(
"th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (
unsigned long)*rel : 0);
706 VM_ASSERT(th == NULL || TH_SCHED(th)->running == th);
707 VM_ASSERT(flags != 0);
713 flags |= thread_sched_waiting_timeout;
720 if (rel && *rel > 0) {
721 flags |= thread_sched_waiting_timeout;
728 uint32_t epoll_events = 0;
730 if (flags & thread_sched_waiting_timeout) {
731 VM_ASSERT(rel != NULL);
732 abs = rb_hrtime_add(rb_hrtime_now(), *rel);
735 if (flags & thread_sched_waiting_io_read) {
736 if (!(flags & thread_sched_waiting_io_force) && fd_readable_nonblock(fd)) {
737 RUBY_DEBUG_LOG(
"fd_readable_nonblock");
743 EV_SET(&ke[num_events], fd, EVFILT_READ, EV_ADD, 0, 0, (
void *)th);
746 epoll_events |= EPOLLIN;
751 if (flags & thread_sched_waiting_io_write) {
752 if (!(flags & thread_sched_waiting_io_force) && fd_writable_nonblock(fd)) {
753 RUBY_DEBUG_LOG(
"fd_writable_nonblock");
759 EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_ADD, 0, 0, (
void *)th);
762 epoll_events |= EPOLLOUT;
770 if (num_events > 0) {
771 if (kqueue_already_registered(fd)) {
776 if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
777 RUBY_DEBUG_LOG(
"failed (%d)",
errno);
786 rb_bug(
"register/kevent failed(fd:%d, errno:%d)", fd,
errno);
789 RUBY_DEBUG_LOG(
"kevent(add, fd:%d) success", fd);
793 struct epoll_event event = {
794 .events = epoll_events,
799 if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
800 RUBY_DEBUG_LOG(
"failed (%d)",
errno);
813 rb_bug(
"register/epoll_ctl failed(fd:%d, errno:%d)", fd,
errno);
816 RUBY_DEBUG_LOG(
"epoll_ctl(add, fd:%d, events:%d) success", fd, epoll_events);
821 VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none);
825 th->sched.waiting_reason.flags = flags;
826 th->sched.waiting_reason.data.timeout = abs;
827 th->sched.waiting_reason.data.fd = fd;
828 th->sched.waiting_reason.data.result = 0;
829 th->sched.waiting_reason.data.event_serial = event_serial;
833 VM_ASSERT(!(flags & thread_sched_waiting_timeout));
834 ccan_list_add_tail(&timer_th.waiting, &th->sched.waiting_reason.node);
837 RUBY_DEBUG_LOG(
"abs:%lu", (
unsigned long)abs);
838 VM_ASSERT(flags & thread_sched_waiting_timeout);
843 ccan_list_for_each(&timer_th.waiting, w, node) {
844 if ((w->flags & thread_sched_waiting_timeout) &&
845 w->data.timeout < abs) {
854 ccan_list_add_after(&timer_th.waiting, &prev_w->node, &th->sched.waiting_reason.node);
857 ccan_list_add(&timer_th.waiting, &th->sched.waiting_reason.node);
860 verify_waiting_list();
863 timer_thread_wakeup_force();
876timer_thread_unregister_waiting(
rb_thread_t *th,
int fd,
enum thread_sched_waiting_flag flags)
878 if (!(th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write))) {
882 RUBY_DEBUG_LOG(
"th:%u fd:%d", rb_th_serial(th), fd);
884 kqueue_unregister_waiting(fd, flags);
887 if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_DEL, fd, NULL) == -1) {
894 rb_bug(
"unregister/epoll_ctl fails. errno:%d",
errno);
901timer_thread_setup_mn(
void)
905 RUBY_DEBUG_LOG(
"kqueue_fd:%d", timer_th.event_fd);
907 if ((timer_th.event_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug(
"epoll_create (errno:%d)",
errno);
908 RUBY_DEBUG_LOG(
"epoll_fd:%d", timer_th.event_fd);
910 RUBY_DEBUG_LOG(
"comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]);
912 timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL, 0);
919 int r = kqueue_wait(vm);
921 int r = epoll_wait(timer_th.event_fd, timer_th.finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm));
941timer_thread_polling(
rb_vm_t *vm)
943 int r = event_wait(vm);
945 RUBY_DEBUG_LOG(
"r:%d errno:%d", r,
errno);
949 RUBY_DEBUG_LOG(
"timeout%s",
"");
951 ractor_sched_lock(vm, NULL);
954 timer_thread_check_timeslice(vm);
957 if (vm->ractor.sched.grq_cnt > 0) {
958 RUBY_DEBUG_LOG(
"GRQ cnt: %u", vm->ractor.sched.grq_cnt);
962 ractor_sched_unlock(vm, NULL);
965 native_thread_check_and_create_shared(vm);
975 perror(
"event_wait");
976 rb_bug(
"event_wait errno:%d",
errno);
981 RUBY_DEBUG_LOG(
"%d event(s)", r);
984 for (
int i=0; i<r; i++) {
986 int fd = (int)timer_th.finished_events[i].ident;
987 int16_t filter = timer_th.finished_events[i].filter;
991 RUBY_DEBUG_LOG(
"comm from fd:%d", timer_th.comm_fds[1]);
992 consume_communication_pipe(timer_th.comm_fds[0]);
996 RUBY_DEBUG_LOG(
"io event. wakeup_th:%u event:%s%s",
998 (filter == EVFILT_READ) ?
"read/" :
"",
999 (filter == EVFILT_WRITE) ?
"write/" :
"");
1002 thread_sched_lock(sched, th);
1005 if (th->sched.waiting_reason.flags) {
1007 ccan_list_del_init(&th->sched.waiting_reason.node);
1008 timer_thread_unregister_waiting(th, fd, kqueue_translate_filter_to_flags(filter));
1010 th->sched.waiting_reason.flags = thread_sched_waiting_none;
1011 th->sched.waiting_reason.data.fd = -1;
1012 th->sched.waiting_reason.data.result = filter;
1013 uint32_t event_serial = th->sched.waiting_reason.data.event_serial;
1015 timer_thread_wakeup_thread_locked(sched, th, event_serial);
1022 thread_sched_unlock(sched, th);
1026 for (
int i=0; i<r; i++) {
1031 RUBY_DEBUG_LOG(
"comm from fd:%d", timer_th.comm_fds[1]);
1032 consume_communication_pipe(timer_th.comm_fds[0]);
1036 uint32_t events = timer_th.finished_events[i].events;
1038 RUBY_DEBUG_LOG(
"io event. wakeup_th:%u event:%s%s%s%s%s%s",
1040 (events & EPOLLIN) ?
"in/" :
"",
1041 (events & EPOLLOUT) ?
"out/" :
"",
1042 (events & EPOLLRDHUP) ?
"RDHUP/" :
"",
1043 (events & EPOLLPRI) ?
"pri/" :
"",
1044 (events & EPOLLERR) ?
"err/" :
"",
1045 (events & EPOLLHUP) ?
"hup/" :
"");
1048 thread_sched_lock(sched, th);
1051 if (th->sched.waiting_reason.flags) {
1053 ccan_list_del_init(&th->sched.waiting_reason.node);
1054 timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
1056 th->sched.waiting_reason.flags = thread_sched_waiting_none;
1057 th->sched.waiting_reason.data.fd = -1;
1058 th->sched.waiting_reason.data.result = (int)events;
1059 uint32_t event_serial = th->sched.waiting_reason.data.event_serial;
1061 timer_thread_wakeup_thread_locked(sched, th, event_serial);
1068 thread_sched_unlock(sched, th);
1078timer_thread_setup_mn(
void)
1084timer_thread_polling(
rb_vm_t *vm)
1086 int timeout = timer_thread_set_timeout(vm);
1088 struct pollfd pfd = {
1089 .fd = timer_th.comm_fds[0],
1093 int r = poll(&pfd, 1, timeout);
1097 ractor_sched_lock(vm, NULL);
1100 timer_thread_check_timeslice(vm);
1102 ractor_sched_unlock(vm, NULL);
1112 rb_bug(
"poll errno:%d",
errno);
1117 consume_communication_pipe(timer_th.comm_fds[0]);
1121 rb_bug(
"unreachbale");
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
int len
Length of the buffer.
#define RUBY_INTERNAL_THREAD_EVENT_RESUMED
Triggered when a thread successfully acquired the GVL.
#define RUBY_INTERNAL_THREAD_EVENT_SUSPENDED
Triggered when a thread released the GVL.
#define RBIMPL_ATTR_MAYBE_UNUSED()
Wraps (or simulates) [[maybe_unused]]
#define errno
Ractor-aware version of errno.
void rb_native_mutex_lock(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_lock.
void rb_native_mutex_unlock(rb_nativethread_lock_t *lock)
Just another name of rb_nativethread_lock_unlock.
void rb_native_cond_signal(rb_nativethread_cond_t *cond)
Signals a condition variable.
uintptr_t VALUE
Type that represents a Ruby object.