Ruby 3.5.0dev (2025-06-27 revision aca692cd41d4cf74f5b5d35a703b55262ff4bcf8)
scheduler.c (aca692cd41d4cf74f5b5d35a703b55262ff4bcf8)
1/**********************************************************************
2
3 scheduler.c
4
5 $Author$
6
7 Copyright (C) 2020 Samuel Grant Dawson Williams
8
9**********************************************************************/
10
11#include "vm_core.h"
13#include "ruby/io.h"
14#include "ruby/io/buffer.h"
15
16#include "ruby/thread.h"
17
18// For `ruby_thread_has_gvl_p`:
19#include "internal/thread.h"
20
21// For atomic operations:
22#include "ruby_atomic.h"
23
24static ID id_close;
25static ID id_scheduler_close;
26
27static ID id_block;
28static ID id_unblock;
29
30static ID id_timeout_after;
31static ID id_kernel_sleep;
32static ID id_process_wait;
33
34static ID id_io_read, id_io_pread;
35static ID id_io_write, id_io_pwrite;
36static ID id_io_wait;
37static ID id_io_select;
38static ID id_io_close;
39
40static ID id_address_resolve;
41
42static ID id_blocking_operation_wait;
43static ID id_fiber_interrupt;
44
45static ID id_fiber_schedule;
46
47// Our custom blocking operation class
48static VALUE rb_cFiberSchedulerBlockingOperation;
49
50/*
51 * Custom blocking operation structure for blocking operations
52 * This replaces the use of Ruby procs to avoid use-after-free issues
53 * and provides a cleaner C API for native work pools.
54 */
55
56typedef enum {
57 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED, // Submitted but not started
58 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING, // Currently running
59 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED, // Finished (success/error)
60 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED // Cancelled
61} rb_fiber_blocking_operation_status_t;
62
64 void *(*function)(void *);
65 void *data;
66
67 rb_unblock_function_t *unblock_function;
68 void *data2;
69
70 int flags;
72
73 // Execution status
74 volatile rb_atomic_t status;
75};
76
77static void
78blocking_operation_mark(void *ptr)
79{
80 // No Ruby objects to mark in our struct
81}
82
83static void
84blocking_operation_free(void *ptr)
85{
87 ruby_xfree(blocking_operation);
88}
89
90static size_t
91blocking_operation_memsize(const void *ptr)
92{
94}
95
96static const rb_data_type_t blocking_operation_data_type = {
97 "Fiber::Scheduler::BlockingOperation",
98 {
99 blocking_operation_mark,
100 blocking_operation_free,
101 blocking_operation_memsize,
102 },
103 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
104};
105
106/*
107 * Allocate a new blocking operation
108 */
109static VALUE
110blocking_operation_alloc(VALUE klass)
111{
112 rb_fiber_scheduler_blocking_operation_t *blocking_operation;
113 VALUE obj = TypedData_Make_Struct(klass, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
114
115 blocking_operation->function = NULL;
116 blocking_operation->data = NULL;
117 blocking_operation->unblock_function = NULL;
118 blocking_operation->data2 = NULL;
119 blocking_operation->flags = 0;
120 blocking_operation->state = NULL;
121 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
122
123 return obj;
124}
125
126/*
127 * Get the blocking operation struct from a Ruby object
128 */
130get_blocking_operation(VALUE obj)
131{
132 rb_fiber_scheduler_blocking_operation_t *blocking_operation;
133 TypedData_Get_Struct(obj, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
134 return blocking_operation;
135}
136
137/*
138 * Document-method: Fiber::Scheduler::BlockingOperation#call
139 *
140 * Execute the blocking operation. This method releases the GVL and calls
141 * the blocking function, then restores the errno value.
142 *
143 * Returns nil. The actual result is stored in the associated state object.
144 */
145static VALUE
146blocking_operation_call(VALUE self)
147{
148 rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
149
150 if (blocking_operation->status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
151 rb_raise(rb_eRuntimeError, "Blocking operation has already been executed!");
152 }
153
154 if (blocking_operation->function == NULL) {
155 rb_raise(rb_eRuntimeError, "Blocking operation has no function to execute!");
156 }
157
158 if (blocking_operation->state == NULL) {
159 rb_raise(rb_eRuntimeError, "Blocking operation has no result object!");
160 }
161
162 // Mark as executing
163 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
164
165 // Execute the blocking operation without GVL
166 blocking_operation->state->result = rb_nogvl(blocking_operation->function, blocking_operation->data,
167 blocking_operation->unblock_function, blocking_operation->data2,
168 blocking_operation->flags);
169 blocking_operation->state->saved_errno = rb_errno();
170
171 // Mark as completed
172 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED;
173
174 return Qnil;
175}
176
177/*
178 * C API: Extract blocking operation struct from Ruby object (GVL required)
179 *
180 * This function safely extracts the opaque struct from a BlockingOperation VALUE
181 * while holding the GVL. The returned pointer can be passed to worker threads
182 * and used with rb_fiber_scheduler_blocking_operation_execute_opaque_nogvl.
183 *
184 * Returns the opaque struct pointer on success, NULL on error.
185 * Must be called while holding the GVL.
186 */
189{
190 return get_blocking_operation(self);
191}
192
193/*
194 * C API: Execute blocking operation from opaque struct (GVL not required)
195 *
196 * This function executes a blocking operation using the opaque struct pointer
197 * obtained from rb_fiber_scheduler_blocking_operation_extract.
198 * It can be called from native threads without holding the GVL.
199 *
200 * Returns 0 on success, -1 on error.
201 */
202int
204{
205 if (blocking_operation == NULL) {
206 return -1;
207 }
208
209 if (blocking_operation->function == NULL || blocking_operation->state == NULL) {
210 return -1; // Invalid blocking operation
211 }
212
213 // Resolve sentinel values for unblock_function and data2:
214 rb_thread_resolve_unblock_function(&blocking_operation->unblock_function, &blocking_operation->data2, GET_THREAD());
215
216 // Atomically check if we can transition from QUEUED to EXECUTING
217 rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
218 if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) {
219 // Already cancelled or in wrong state
220 return -1;
221 }
222
223 // Now we're executing - call the function
224 blocking_operation->state->result = blocking_operation->function(blocking_operation->data);
225 blocking_operation->state->saved_errno = errno;
226
227 // Atomically transition to completed (unless cancelled during execution)
228 expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
229 if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) == expected) {
230 // Successfully completed
231 return 0;
232 } else {
233 // Was cancelled during execution
234 blocking_operation->state->saved_errno = EINTR;
235 return -1;
236 }
237}
238
239/*
240 * C API: Create a new blocking operation
241 *
242 * This creates a blocking operation that can be executed by native work pools.
243 * The blocking operation holds references to the function and data safely.
244 */
245VALUE
246rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data,
247 rb_unblock_function_t *unblock_function, void *data2,
248 int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
249{
250 VALUE self = blocking_operation_alloc(rb_cFiberSchedulerBlockingOperation);
251 rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
252
253 blocking_operation->function = function;
254 blocking_operation->data = data;
255 blocking_operation->unblock_function = unblock_function;
256 blocking_operation->data2 = data2;
257 blocking_operation->flags = flags;
258 blocking_operation->state = state;
259
260 return self;
261}
262
263/*
264 *
265 * Document-class: Fiber::Scheduler
266 *
267 * This is not an existing class, but documentation of the interface that Scheduler
268 * object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking
269 * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations
270 * of some concepts.
271 *
272 * Scheduler's behavior and usage are expected to be as follows:
273 *
274 * * When the execution in the non-blocking Fiber reaches some blocking operation (like
275 * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's
276 * hook methods, listed below.
277 * * Scheduler somehow registers what the current fiber is waiting on, and yields control
278 * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its
279 * wait to end, and other fibers in the same thread can perform)
280 * * At the end of the current thread execution, the scheduler's method #scheduler_close is called
281 * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has
282 * registered on hook calls) and resuming them when the awaited resource is ready
283 * (e.g. I/O ready or sleep time elapsed).
284 *
285 * This way concurrent execution will be achieved transparently for every
286 * individual Fiber's code.
287 *
288 * Scheduler implementations are provided by gems, like
289 * Async[https://github.com/socketry/async].
290 *
291 * Hook methods are:
292 *
293 * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite, and #io_select, #io_close
294 * * #process_wait
295 * * #kernel_sleep
296 * * #timeout_after
297 * * #address_resolve
298 * * #block and #unblock
299 * * #blocking_operation_wait
300 * * (the list is expanded as Ruby developers make more methods having non-blocking calls)
301 *
302 * When not specified otherwise, the hook implementations are mandatory: if they are not
303 * implemented, the methods trying to call hook will fail. To provide backward compatibility,
304 * in the future hooks will be optional (if they are not implemented, due to the scheduler
305 * being created for the older Ruby version, the code which needs this hook will not fail,
306 * and will just behave in a blocking fashion).
307 *
308 * It is also strongly recommended that the scheduler implements the #fiber method, which is
309 * delegated to by Fiber.schedule.
310 *
311 * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in
312 * <tt>test/fiber/scheduler.rb</tt>
313 *
314 */
315void
316Init_Fiber_Scheduler(void)
317{
318 id_close = rb_intern_const("close");
319 id_scheduler_close = rb_intern_const("scheduler_close");
320
321 id_block = rb_intern_const("block");
322 id_unblock = rb_intern_const("unblock");
323
324 id_timeout_after = rb_intern_const("timeout_after");
325 id_kernel_sleep = rb_intern_const("kernel_sleep");
326 id_process_wait = rb_intern_const("process_wait");
327
328 id_io_read = rb_intern_const("io_read");
329 id_io_pread = rb_intern_const("io_pread");
330 id_io_write = rb_intern_const("io_write");
331 id_io_pwrite = rb_intern_const("io_pwrite");
332
333 id_io_wait = rb_intern_const("io_wait");
334 id_io_select = rb_intern_const("io_select");
335 id_io_close = rb_intern_const("io_close");
336
337 id_address_resolve = rb_intern_const("address_resolve");
338
339 id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
340 id_fiber_interrupt = rb_intern_const("fiber_interrupt");
341
342 id_fiber_schedule = rb_intern_const("fiber");
343
344 // Define an anonymous BlockingOperation class for internal use only
345 // This is completely hidden from Ruby code and cannot be instantiated directly
346 rb_cFiberSchedulerBlockingOperation = rb_class_new(rb_cObject);
347 rb_define_alloc_func(rb_cFiberSchedulerBlockingOperation, blocking_operation_alloc);
348 rb_define_method(rb_cFiberSchedulerBlockingOperation, "call", blocking_operation_call, 0);
349
350 // Register the anonymous class as a GC root so it doesn't get collected
351 rb_gc_register_mark_object(rb_cFiberSchedulerBlockingOperation);
352
353#if 0 /* for RDoc */
354 rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject);
355 rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0);
356 rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_process_wait, 2);
357 rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_io_wait, 3);
358 rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_io_read, 4);
359 rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_io_write, 4);
360 rb_define_method(rb_cFiberScheduler, "io_pread", rb_fiber_scheduler_io_pread, 5);
361 rb_define_method(rb_cFiberScheduler, "io_pwrite", rb_fiber_scheduler_io_pwrite, 5);
362 rb_define_method(rb_cFiberScheduler, "io_select", rb_fiber_scheduler_io_select, 4);
363 rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_kernel_sleep, 1);
364 rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_address_resolve, 1);
365 rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3);
366 rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2);
367 rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
368 rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2);
369 rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2);
370#endif
371}
372
373VALUE
375{
376 RUBY_ASSERT(ruby_thread_has_gvl_p());
377
378 rb_thread_t *thread = GET_THREAD();
379 RUBY_ASSERT(thread);
380
381 return thread->scheduler;
382}
383
384static void
385verify_interface(VALUE scheduler)
386{
387 if (!rb_respond_to(scheduler, id_block)) {
388 rb_raise(rb_eArgError, "Scheduler must implement #block");
389 }
390
391 if (!rb_respond_to(scheduler, id_unblock)) {
392 rb_raise(rb_eArgError, "Scheduler must implement #unblock");
393 }
394
395 if (!rb_respond_to(scheduler, id_kernel_sleep)) {
396 rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep");
397 }
398
399 if (!rb_respond_to(scheduler, id_io_wait)) {
400 rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
401 }
402
403 if (!rb_respond_to(scheduler, id_fiber_interrupt)) {
404 rb_warn("Scheduler should implement #fiber_interrupt");
405 }
406}
407
408static VALUE
409fiber_scheduler_close(VALUE scheduler)
410{
411 return rb_fiber_scheduler_close(scheduler);
412}
413
414static VALUE
415fiber_scheduler_close_ensure(VALUE _thread)
416{
417 rb_thread_t *thread = (rb_thread_t*)_thread;
418 thread->scheduler = Qnil;
419
420 return Qnil;
421}
422
423VALUE
425{
426 RUBY_ASSERT(ruby_thread_has_gvl_p());
427
428 rb_thread_t *thread = GET_THREAD();
429 RUBY_ASSERT(thread);
430
431 if (scheduler != Qnil) {
432 verify_interface(scheduler);
433 }
434
435 // We invoke Scheduler#close when setting it to something else, to ensure
436 // the previous scheduler runs to completion before changing the scheduler.
437 // That way, we do not need to consider interactions, e.g., of a Fiber from
438 // the previous scheduler with the new scheduler.
439 if (thread->scheduler != Qnil) {
440 // rb_fiber_scheduler_close(thread->scheduler);
441 rb_ensure(fiber_scheduler_close, thread->scheduler, fiber_scheduler_close_ensure, (VALUE)thread);
442 }
443
444 thread->scheduler = scheduler;
445
446 return thread->scheduler;
447}
448
449static VALUE
450rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
451{
452 RUBY_ASSERT(thread);
453
454 if (thread->blocking == 0) {
455 return thread->scheduler;
456 }
457 else {
458 return Qnil;
459 }
460}
461
462VALUE
464{
465 return rb_fiber_scheduler_current_for_threadptr(GET_THREAD());
466}
467
469{
470 return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
471}
472
473/*
474 *
475 * Document-method: Fiber::Scheduler#close
476 *
477 * Called when the current thread exits. The scheduler is expected to implement this
478 * method in order to allow all waiting fibers to finalize their execution.
479 *
480 * The suggested pattern is to implement the main event loop in the #close method.
481 *
482 */
483VALUE
485{
486 RUBY_ASSERT(ruby_thread_has_gvl_p());
487
488 VALUE result;
489
490 // The reason for calling `scheduler_close` before calling `close` is for
491 // legacy schedulers which implement `close` and expect the user to call
492 // it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
493 // which should call `scheduler_close`. If it were to call `close`, it
494 // would create an infinite loop.
495
496 result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
497 if (!UNDEF_P(result)) return result;
498
499 result = rb_check_funcall(scheduler, id_close, 0, NULL);
500 if (!UNDEF_P(result)) return result;
501
502 return Qnil;
503}
504
505VALUE
507{
508 if (timeout) {
509 return rb_float_new((double)timeout->tv_sec + (0.000001 * timeout->tv_usec));
510 }
511
512 return Qnil;
513}
514
515/*
516 * Document-method: Fiber::Scheduler#kernel_sleep
517 * call-seq: kernel_sleep(duration = nil)
518 *
519 * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide
520 * an implementation of sleeping in a non-blocking way. Implementation might
521 * register the current fiber in some list of "which fiber wait until what
522 * moment", call Fiber.yield to pass control, and then in #close resume
523 * the fibers whose wait period has elapsed.
524 *
525 */
526VALUE
528{
529 return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
530}
531
532VALUE
533rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
534{
535 return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
536}
537
538#if 0
539/*
540 * Document-method: Fiber::Scheduler#timeout_after
541 * call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block
542 *
543 * Invoked by Timeout.timeout to execute the given +block+ within the given
544 * +duration+. It can also be invoked directly by the scheduler or user code.
545 *
546 * Attempt to limit the execution time of a given +block+ to the given
547 * +duration+ if possible. When a non-blocking operation causes the +block+'s
548 * execution time to exceed the specified +duration+, that non-blocking
549 * operation should be interrupted by raising the specified +exception_class+
550 * constructed with the given +exception_arguments+.
551 *
552 * General execution timeouts are often considered risky. This implementation
553 * will only interrupt non-blocking operations. This is by design because it's
554 * expected that non-blocking operations can fail for a variety of
555 * unpredictable reasons, so applications should already be robust in handling
556 * these conditions and by implication timeouts.
557 *
558 * However, as a result of this design, if the +block+ does not invoke any
559 * non-blocking operations, it will be impossible to interrupt it. If you
560 * desire to provide predictable points for timeouts, consider adding
561 * +sleep(0)+.
562 *
563 * If the block is executed successfully, its result will be returned.
564 *
565 * The exception will typically be raised using Fiber#raise.
566 */
567VALUE
568rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
569{
570 VALUE arguments[] = {
571 timeout, exception, message
572 };
573
574 return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
575}
576
577VALUE
578rb_fiber_scheduler_timeout_afterv(VALUE scheduler, int argc, VALUE * argv)
579{
580 return rb_check_funcall(scheduler, id_timeout_after, argc, argv);
581}
582#endif
583
584/*
585 * Document-method: Fiber::Scheduler#process_wait
586 * call-seq: process_wait(pid, flags)
587 *
588 * Invoked by Process::Status.wait in order to wait for a specified process.
589 * See that method description for arguments description.
590 *
591 * Suggested minimal implementation:
592 *
593 * Thread.new do
594 * Process::Status.wait(pid, flags)
595 * end.value
596 *
597 * This hook is optional: if it is not present in the current scheduler,
598 * Process::Status.wait will behave as a blocking method.
599 *
600 * Expected to return a Process::Status instance.
601 */
602VALUE
603rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
604{
605 VALUE arguments[] = {
606 PIDT2NUM(pid), RB_INT2NUM(flags)
607 };
608
609 return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
610}
611
612/*
613 * Document-method: Fiber::Scheduler#block
614 * call-seq: block(blocker, timeout = nil)
615 *
616 * Invoked by methods like Thread.join, and by Mutex, to signify that current
617 * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has
618 * elapsed.
619 *
620 * +blocker+ is what we are waiting on, informational only (for debugging and
621 * logging). There are no guarantee about its value.
622 *
623 * Expected to return boolean, specifying whether the blocking operation was
624 * successful or not.
625 */
626VALUE
627rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
628{
629 return rb_funcall(scheduler, id_block, 2, blocker, timeout);
630}
631
632/*
633 * Document-method: Fiber::Scheduler#unblock
634 * call-seq: unblock(blocker, fiber)
635 *
636 * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock
637 * calls #block and Mutex#unlock calls #unblock). The scheduler should use
638 * the +fiber+ parameter to understand which fiber is unblocked.
639 *
640 * +blocker+ is what was awaited for, but it is informational only (for debugging
641 * and logging), and it is not guaranteed to be the same value as the +blocker+ for
642 * #block.
643 *
644 */
645VALUE
646rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
647{
648 RUBY_ASSERT(rb_obj_is_fiber(fiber));
649
650 // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`.
651 // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
652 int saved_errno = errno;
653
654#ifdef RUBY_DEBUG
655 rb_execution_context_t *ec = GET_EC();
656 if (RUBY_VM_INTERRUPTED(ec)) {
657 rb_bug("rb_fiber_scheduler_unblock called with pending interrupt");
658 }
659#endif
660
661 VALUE result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
662
663 errno = saved_errno;
664
665 return result;
666}
667
668/*
669 * Document-method: Fiber::Scheduler#io_wait
670 * call-seq: io_wait(io, events, timeout)
671 *
672 * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the
673 * specified descriptor is ready for specified events within
674 * the specified +timeout+.
675 *
676 * +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and
677 * <tt>IO::PRIORITY</tt>.
678 *
679 * Suggested implementation should register which Fiber is waiting for which
680 * resources and immediately calling Fiber.yield to pass control to other
681 * fibers. Then, in the #close method, the scheduler might dispatch all the
682 * I/O resources to fibers waiting for it.
683 *
684 * Expected to return the subset of events that are ready immediately.
685 *
686 */
687static VALUE
688fiber_scheduler_io_wait(VALUE _argument) {
689 VALUE *arguments = (VALUE*)_argument;
690
691 return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
692}
693
694VALUE
695rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
696{
697 VALUE arguments[] = {
698 scheduler, io, events, timeout
699 };
700
701 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
702 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
703 } else {
704 return fiber_scheduler_io_wait((VALUE)&arguments);
705 }
706}
707
708VALUE
713
714VALUE
719
720/*
721 * Document-method: Fiber::Scheduler#io_select
722 * call-seq: io_select(readables, writables, exceptables, timeout)
723 *
724 * Invoked by IO.select to ask whether the specified descriptors are ready for
725 * specified events within the specified +timeout+.
726 *
727 * Expected to return the 3-tuple of Array of IOs that are ready.
728 *
729 */
730VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
731{
732 VALUE arguments[] = {
733 readables, writables, exceptables, timeout
734 };
735
736 return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
737}
738
740{
741 // I wondered about extracting argv, and checking if there is only a single
742 // IO instance, and instead calling `io_wait`. However, it would require a
743 // decent amount of work and it would be hard to preserve the exact
744 // semantics of IO.select.
745
746 return rb_check_funcall(scheduler, id_io_select, argc, argv);
747}
748
749/*
750 * Document-method: Fiber::Scheduler#io_read
751 * call-seq: io_read(io, buffer, length, offset) -> read length or -errno
752 *
753 * Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a
754 * specified +buffer+ (see IO::Buffer) at the given +offset+.
755 *
756 * The +length+ argument is the "minimum length to be read". If the IO buffer
757 * size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read,
758 * but at least 1KiB will be. Generally, the only case where less data than
759 * +length+ will be read is if there is an error reading the data.
760 *
761 * Specifying a +length+ of 0 is valid and means try reading at least once and
762 * return any available data.
763 *
764 * Suggested implementation should try to read from +io+ in a non-blocking
765 * manner and call #io_wait if the +io+ is not ready (which will yield control
766 * to other fibers).
767 *
768 * See IO::Buffer for an interface available to return data.
769 *
770 * Expected to return number of bytes read, or, in case of an error,
771 * <tt>-errno</tt> (negated number corresponding to system's error code).
772 *
773 * The method should be considered _experimental_.
774 */
775static VALUE
776fiber_scheduler_io_read(VALUE _argument) {
777 VALUE *arguments = (VALUE*)_argument;
778
779 return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
780}
781
782VALUE
783rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
784{
785 if (!rb_respond_to(scheduler, id_io_read)) {
786 return RUBY_Qundef;
787 }
788
789 VALUE arguments[] = {
790 scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
791 };
792
793 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
794 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
795 } else {
796 return fiber_scheduler_io_read((VALUE)&arguments);
797 }
798}
799
800/*
801 * Document-method: Fiber::Scheduler#io_read
802 * call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno
803 *
804 * Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+
805 * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
806 * +offset+.
807 *
808 * This method is semantically the same as #io_read, but it allows to specify
809 * the offset to read from and is often better for asynchronous IO on the same
810 * file.
811 *
812 * The method should be considered _experimental_.
813 */
814static VALUE
815fiber_scheduler_io_pread(VALUE _argument) {
816 VALUE *arguments = (VALUE*)_argument;
817
818 return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
819}
820
821VALUE
822rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
823{
824 if (!rb_respond_to(scheduler, id_io_pread)) {
825 return RUBY_Qundef;
826 }
827
828 VALUE arguments[] = {
829 scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
830 };
831
832 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
833 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
834 } else {
835 return fiber_scheduler_io_pread((VALUE)&arguments);
836 }
837}
838
839/*
840 * Document-method: Scheduler#io_write
841 * call-seq: io_write(io, buffer, length, offset) -> written length or -errno
842 *
843 * Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from
844 * from a specified +buffer+ (see IO::Buffer) at the given +offset+.
845 *
846 * The +length+ argument is the "minimum length to be written". If the IO
847 * buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB
848 * will be written, but at least 1KiB will be. Generally, the only case where
849 * less data than +length+ will be written is if there is an error writing the
850 * data.
851 *
852 * Specifying a +length+ of 0 is valid and means try writing at least once, as
853 * much data as possible.
854 *
855 * Suggested implementation should try to write to +io+ in a non-blocking
856 * manner and call #io_wait if the +io+ is not ready (which will yield control
857 * to other fibers).
858 *
859 * See IO::Buffer for an interface available to get data from buffer
860 * efficiently.
861 *
862 * Expected to return number of bytes written, or, in case of an error,
863 * <tt>-errno</tt> (negated number corresponding to system's error code).
864 *
865 * The method should be considered _experimental_.
866 */
867static VALUE
868fiber_scheduler_io_write(VALUE _argument) {
869 VALUE *arguments = (VALUE*)_argument;
870
871 return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
872}
873
874VALUE
875rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
876{
877 if (!rb_respond_to(scheduler, id_io_write)) {
878 return RUBY_Qundef;
879 }
880
881 VALUE arguments[] = {
882 scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
883 };
884
885 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
886 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
887 } else {
888 return fiber_scheduler_io_write((VALUE)&arguments);
889 }
890}
891
892/*
893 * Document-method: Fiber::Scheduler#io_pwrite
894 * call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno
895 *
896 * Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+
897 * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
898 * +offset+.
899 *
900 * This method is semantically the same as #io_write, but it allows to specify
901 * the offset to write to and is often better for asynchronous IO on the same
902 * file.
903 *
904 * The method should be considered _experimental_.
905 *
906 */
907static VALUE
908fiber_scheduler_io_pwrite(VALUE _argument) {
909 VALUE *arguments = (VALUE*)_argument;
910
911 return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
912}
913
914VALUE
915rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
916{
917 if (!rb_respond_to(scheduler, id_io_pwrite)) {
918 return RUBY_Qundef;
919 }
920
921 VALUE arguments[] = {
922 scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
923 };
924
925 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
926 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
927 } else {
928 return fiber_scheduler_io_pwrite((VALUE)&arguments);
929 }
930}
931
932VALUE
933rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
934{
935 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
936
937 VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0);
938
939 rb_io_buffer_free_locked(buffer);
940
941 return result;
942}
943
944VALUE
945rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
946{
947 VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
948
949 VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0);
950
951 rb_io_buffer_free_locked(buffer);
952
953 return result;
954}
955
956VALUE
957rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
958{
959 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
960
961 VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0);
962
963 rb_io_buffer_free_locked(buffer);
964
965 return result;
966}
967
968VALUE
969rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
970{
971 VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
972
973 VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0);
974
975 rb_io_buffer_free_locked(buffer);
976
977 return result;
978}
979
980VALUE
982{
983 VALUE arguments[] = {io};
984
985 return rb_check_funcall(scheduler, id_io_close, 1, arguments);
986}
987
988/*
989 * Document-method: Fiber::Scheduler#address_resolve
990 * call-seq: address_resolve(hostname) -> array_of_strings or nil
991 *
992 * Invoked by any method that performs a non-reverse DNS lookup. The most
993 * notable method is Addrinfo.getaddrinfo, but there are many other.
994 *
995 * The method is expected to return an array of strings corresponding to ip
996 * addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved.
997 *
998 * Fairly exhaustive list of all possible call-sites:
999 *
1000 * - Addrinfo.getaddrinfo
1001 * - Addrinfo.tcp
1002 * - Addrinfo.udp
1003 * - Addrinfo.ip
1004 * - Addrinfo.new
1005 * - Addrinfo.marshal_load
1006 * - SOCKSSocket.new
1007 * - TCPServer.new
1008 * - TCPSocket.new
1009 * - IPSocket.getaddress
1010 * - TCPSocket.gethostbyname
1011 * - UDPSocket#connect
1012 * - UDPSocket#bind
1013 * - UDPSocket#send
1014 * - Socket.getaddrinfo
1015 * - Socket.gethostbyname
1016 * - Socket.pack_sockaddr_in
1017 * - Socket.sockaddr_in
1018 * - Socket.unpack_sockaddr_in
1019 */
1020VALUE
1022{
1023 VALUE arguments[] = {
1024 hostname
1025 };
1026
1027 return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
1028}
1029
1030/*
1031 * Document-method: Fiber::Scheduler#blocking_operation_wait
1032 * call-seq: blocking_operation_wait(blocking_operation)
1033 *
1034 * Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
1035 * The blocking_operation is a Fiber::Scheduler::BlockingOperation that encapsulates the blocking operation.
1036 *
1037 * If the scheduler doesn't implement this method, or if the scheduler doesn't execute
1038 * the blocking operation, Ruby will fall back to the non-scheduler implementation.
1039 *
1040 * Minimal suggested implementation is:
1041 *
1042 * def blocking_operation_wait(blocking_operation)
1043 * Thread.new { blocking_operation.call }.join
1044 * end
1045 */
1046VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
1047{
1048 // Check if scheduler supports blocking_operation_wait before creating the object
1049 if (!rb_respond_to(scheduler, id_blocking_operation_wait)) {
1050 return Qundef;
1051 }
1052
1053 // Create a new BlockingOperation with the blocking operation
1054 VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state);
1055
1056 VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation);
1057
1058 // Get the operation data to check if it was executed
1059 rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation);
1060 rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status);
1061
1062 // Invalidate the operation now that we're done with it
1063 operation->function = NULL;
1064 operation->state = NULL;
1065 operation->data = NULL;
1066 operation->data2 = NULL;
1067 operation->unblock_function = NULL;
1068
1069 // If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead
1070 if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
1071 return Qundef;
1072 }
1073
1074 return result;
1075}
1076
1078{
1079 VALUE arguments[] = {
1080 fiber, exception
1081 };
1082
1083#ifdef RUBY_DEBUG
1084 rb_execution_context_t *ec = GET_EC();
1085 if (RUBY_VM_INTERRUPTED(ec)) {
1086 rb_bug("rb_fiber_scheduler_fiber_interrupt called with pending interrupt");
1087 }
1088#endif
1089
1090 return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
1091}
1092
1093/*
1094 * Document-method: Fiber::Scheduler#fiber
1095 * call-seq: fiber(&block)
1096 *
1097 * Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately
1098 * run the given block of code in a separate non-blocking fiber, and to return that Fiber.
1099 *
1100 * Minimal suggested implementation is:
1101 *
1102 * def fiber(&block)
1103 * fiber = Fiber.new(blocking: false, &block)
1104 * fiber.resume
1105 * fiber
1106 * end
1107 */
1108VALUE
1109rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
1110{
1111 return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
1112}
1113
1114/*
1115 * C API: Cancel a blocking operation
1116 *
1117 * This function cancels a blocking operation. If the operation is queued,
1118 * it just marks it as cancelled. If it's executing, it marks it as cancelled
1119 * and calls the unblock function to interrupt the operation.
1120 *
1121 * Returns 1 if unblock function was called, 0 if just marked cancelled, -1 on error.
1122 */
1123int
1125{
1126 if (blocking_operation == NULL) {
1127 return -1;
1128 }
1129
1130 rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status);
1131
1132 switch (current_state) {
1133 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED:
1134 // Work hasn't started - just mark as cancelled:
1135 if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) {
1136 // Successfully cancelled before execution:
1137 return 0;
1138 }
1139 // Fall through if state changed between load and CAS
1140
1141 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING:
1142 // Work is running - mark cancelled AND call unblock function
1143 if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) != current_state) {
1144 // State changed between load and CAS - operation may have completed:
1145 return 0;
1146 }
1147 // Otherwise, we successfully marked it as cancelled, so we can call the unblock function:
1148 rb_unblock_function_t *unblock_function = blocking_operation->unblock_function;
1149 if (unblock_function) {
1150 RUBY_ASSERT(unblock_function != (rb_unblock_function_t *)-1 && "unblock_function is still sentinel value -1, should have been resolved earlier");
1151 blocking_operation->unblock_function(blocking_operation->data2);
1152 }
1153 // Cancelled during execution (unblock function called):
1154 return 1;
1155
1156 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED:
1157 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED:
1158 // Already finished or cancelled:
1159 return 0;
1160 }
1161
1162 return 0;
1163}
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
#define RUBY_ATOMIC_CAS(var, oldval, newval)
Atomic compare-and-swap.
Definition atomic.h:140
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define RUBY_ATOMIC_LOAD(var)
Atomic load.
Definition atomic.h:150
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
VALUE rb_class_new(VALUE super)
Creates a new, anonymous class.
Definition class.c:855
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition class.c:1520
#define Qundef
Old name of RUBY_Qundef.
#define SIZET2NUM
Old name of RB_SIZE2NUM.
Definition size_t.h:62
#define Qnil
Old name of RUBY_Qnil.
VALUE rb_eRuntimeError
RuntimeError exception.
Definition error.c:1428
void rb_warn(const char *fmt,...)
Identical to rb_warning(), except it reports unless $VERBOSE is nil.
Definition error.c:466
VALUE rb_funcall(VALUE recv, ID mid, int n,...)
Calls a method.
Definition vm_eval.c:1117
VALUE rb_funcall_passing_block_kw(VALUE recv, ID mid, int argc, const VALUE *argv, int kw_splat)
Identical to rb_funcallv_passing_block(), except you can specify how to handle the last element of th...
Definition vm_eval.c:1187
void rb_unblock_function_t(void *)
This is the type of UBFs.
Definition thread.h:336
int rb_respond_to(VALUE obj, ID mid)
Queries if the object responds to the method.
Definition vm_method.c:3094
VALUE rb_check_funcall(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcallv(), except it returns RUBY_Qundef instead of raising rb_eNoMethodError.
Definition vm_eval.c:686
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
static ID rb_intern_const(const char *str)
This is a "tiny optimisation" over rb_intern().
Definition symbol.h:284
VALUE rb_io_timeout(VALUE io)
Get the timeout associated with the specified io object.
Definition io.c:857
@ RUBY_IO_READABLE
IO::READABLE
Definition io.h:97
@ RUBY_IO_WRITABLE
IO::WRITABLE
Definition io.h:98
void * rb_nogvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2, int flags)
Identical to rb_thread_call_without_gvl(), except it additionally takes "flags" that change the behav...
Definition thread.c:1566
#define RB_UINT2NUM
Just another name of rb_uint2num_inline.
Definition int.h:39
#define RB_INT2NUM
Just another name of rb_int2num_inline.
Definition int.h:37
VALUE rb_ensure(type *q, VALUE w, type *e, VALUE r)
An equivalent of ensure clause.
#define OFFT2NUM
Converts a C's off_t into an instance of rb_cInteger.
Definition off_t.h:33
#define PIDT2NUM
Converts a C's pid_t into an instance of rb_cInteger.
Definition pid_t.h:28
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
Definition rtypeddata.h:515
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:497
#define errno
Ractor-aware version of errno.
Definition ruby.h:388
Scheduler APIs.
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void *(*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
Defer the execution of the passed function to the scheduler.
Definition scheduler.c:1046
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition scheduler.c:463
VALUE rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
Non-blocking pread from the passed IO using a native buffer.
Definition scheduler.c:957
VALUE rb_fiber_scheduler_make_timeout(struct timeval *timeout)
Converts the passed timeout to an expression that rb_fiber_scheduler_block() etc.
Definition scheduler.c:506
VALUE rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for reading.
Definition scheduler.c:709
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
Non-blocking read from the passed IO using a native buffer.
Definition scheduler.c:933
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO at the specified offset.
Definition scheduler.c:915
VALUE rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE *argv)
Identical to rb_fiber_scheduler_kernel_sleep(), except it can pass multiple arguments.
Definition scheduler.c:533
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
Interrupt a fiber by raising an exception.
Definition scheduler.c:1077
VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
Non-blocking version of rb_io_wait().
Definition scheduler.c:695
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
Non-blocking version of IO.select.
Definition scheduler.c:730
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO.
Definition scheduler.c:783
int rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
Cancel a blocking operation.
Definition scheduler.c:1124
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
Non-blocking version of IO.select, argv variant.
Definition scheduler.c:739
VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
Non-blocking waitpid.
Definition scheduler.c:603
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition scheduler.c:627
int rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
Execute blocking operation from handle (GVL not required).
Definition scheduler.c:203
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO at the specified offset.
Definition scheduler.c:822
VALUE rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
Non-blocking pwrite to the passed IO using a native buffer.
Definition scheduler.c:969
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO.
Definition scheduler.c:875
VALUE rb_fiber_scheduler_close(VALUE scheduler)
Closes the passed scheduler object.
Definition scheduler.c:484
rb_fiber_scheduler_blocking_operation_t * rb_fiber_scheduler_blocking_operation_extract(VALUE self)
Extract the blocking operation handle from a BlockingOperationRuby object.
Definition scheduler.c:188
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
Identical to rb_fiber_scheduler_current(), except it queries for that of the passed thread instead of...
Definition scheduler.c:468
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
Definition scheduler.c:527
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
Non-blocking DNS lookup.
Definition scheduler.c:1021
VALUE rb_fiber_scheduler_set(VALUE scheduler)
Destructively assigns the passed scheduler to that of the current thread that is calling this functio...
Definition scheduler.c:424
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
Non-blocking write to the passed IO using a native buffer.
Definition scheduler.c:945
VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for writing.
Definition scheduler.c:715
VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
Non-blocking close the given IO.
Definition scheduler.c:981
VALUE rb_fiber_scheduler_get(void)
Queries the current scheduler of the current thread that is calling this function.
Definition scheduler.c:374
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition scheduler.c:646
VALUE rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
Create and schedule a non-blocking fiber.
Definition scheduler.c:1109
@ RUBY_Qundef
Represents so-called undef.
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:203
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40