Ruby 3.5.0dev (2025-06-06 revision 347e581a4cbe2bbf7c13532038f2a68b0b37099a)
scheduler.c (347e581a4cbe2bbf7c13532038f2a68b0b37099a)
1/**********************************************************************
2
3 scheduler.c
4
5 $Author$
6
7 Copyright (C) 2020 Samuel Grant Dawson Williams
8
9**********************************************************************/
10
11#include "vm_core.h"
13#include "ruby/io.h"
14#include "ruby/io/buffer.h"
15
16#include "ruby/thread.h"
17
18// For `ruby_thread_has_gvl_p`:
19#include "internal/thread.h"
20
21// For atomic operations:
22#include "ruby_atomic.h"
23
24static ID id_close;
25static ID id_scheduler_close;
26
27static ID id_block;
28static ID id_unblock;
29
30static ID id_timeout_after;
31static ID id_kernel_sleep;
32static ID id_process_wait;
33
34static ID id_io_read, id_io_pread;
35static ID id_io_write, id_io_pwrite;
36static ID id_io_wait;
37static ID id_io_select;
38static ID id_io_close;
39
40static ID id_address_resolve;
41
42static ID id_blocking_operation_wait;
43static ID id_fiber_interrupt;
44
45static ID id_fiber_schedule;
46
47// Our custom blocking operation class
48static VALUE rb_cFiberSchedulerBlockingOperation;
49
50/*
51 * Custom blocking operation structure for blocking operations
52 * This replaces the use of Ruby procs to avoid use-after-free issues
53 * and provides a cleaner C API for native work pools.
54 */
55
56typedef enum {
57 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED, // Submitted but not started
58 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING, // Currently running
59 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED, // Finished (success/error)
60 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED // Cancelled
61} rb_fiber_blocking_operation_status_t;
62
64 void *(*function)(void *);
65 void *data;
66 rb_unblock_function_t *unblock_function;
67 void *data2;
68 int flags;
70
71 // Execution status
72 volatile rb_atomic_t status;
73};
74
75static void
76blocking_operation_mark(void *ptr)
77{
78 // No Ruby objects to mark in our struct
79}
80
81static void
82blocking_operation_free(void *ptr)
83{
85 ruby_xfree(blocking_operation);
86}
87
88static size_t
89blocking_operation_memsize(const void *ptr)
90{
92}
93
94static const rb_data_type_t blocking_operation_data_type = {
95 "Fiber::Scheduler::BlockingOperation",
96 {
97 blocking_operation_mark,
98 blocking_operation_free,
99 blocking_operation_memsize,
100 },
101 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
102};
103
104/*
105 * Allocate a new blocking operation
106 */
107static VALUE
108blocking_operation_alloc(VALUE klass)
109{
110 rb_fiber_scheduler_blocking_operation_t *blocking_operation;
111 VALUE obj = TypedData_Make_Struct(klass, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
112
113 blocking_operation->function = NULL;
114 blocking_operation->data = NULL;
115 blocking_operation->unblock_function = NULL;
116 blocking_operation->data2 = NULL;
117 blocking_operation->flags = 0;
118 blocking_operation->state = NULL;
119 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
120
121 return obj;
122}
123
124/*
125 * Get the blocking operation struct from a Ruby object
126 */
128get_blocking_operation(VALUE obj)
129{
130 rb_fiber_scheduler_blocking_operation_t *blocking_operation;
131 TypedData_Get_Struct(obj, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
132 return blocking_operation;
133}
134
135/*
136 * Document-method: Fiber::Scheduler::BlockingOperation#call
137 *
138 * Execute the blocking operation. This method releases the GVL and calls
139 * the blocking function, then restores the errno value.
140 *
141 * Returns nil. The actual result is stored in the associated state object.
142 */
143static VALUE
144blocking_operation_call(VALUE self)
145{
146 rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
147
148 if (blocking_operation->status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
149 rb_raise(rb_eRuntimeError, "Blocking operation has already been executed!");
150 }
151
152 if (blocking_operation->function == NULL) {
153 rb_raise(rb_eRuntimeError, "Blocking operation has no function to execute!");
154 }
155
156 if (blocking_operation->state == NULL) {
157 rb_raise(rb_eRuntimeError, "Blocking operation has no result object!");
158 }
159
160 // Mark as executing
161 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
162
163 // Execute the blocking operation without GVL
164 blocking_operation->state->result = rb_nogvl(blocking_operation->function, blocking_operation->data,
165 blocking_operation->unblock_function, blocking_operation->data2,
166 blocking_operation->flags);
167 blocking_operation->state->saved_errno = rb_errno();
168
169 // Mark as completed
170 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED;
171
172 return Qnil;
173}
174
175/*
176 * C API: Extract blocking operation struct from Ruby object (GVL required)
177 *
178 * This function safely extracts the opaque struct from a BlockingOperation VALUE
179 * while holding the GVL. The returned pointer can be passed to worker threads
180 * and used with rb_fiber_scheduler_blocking_operation_execute_opaque_nogvl.
181 *
182 * Returns the opaque struct pointer on success, NULL on error.
183 * Must be called while holding the GVL.
184 */
187{
188 return get_blocking_operation(self);
189}
190
191/*
192 * C API: Execute blocking operation from opaque struct (GVL not required)
193 *
194 * This function executes a blocking operation using the opaque struct pointer
195 * obtained from rb_fiber_scheduler_blocking_operation_extract.
196 * It can be called from native threads without holding the GVL.
197 *
198 * Returns 0 on success, -1 on error.
199 */
200int
202{
203 if (blocking_operation == NULL) {
204 return -1;
205 }
206
207 if (blocking_operation->function == NULL || blocking_operation->state == NULL) {
208 return -1; // Invalid blocking operation
209 }
210
211 // Atomically check if we can transition from QUEUED to EXECUTING
212 rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
213 if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) {
214 // Already cancelled or in wrong state
215 return -1;
216 }
217
218 // Now we're executing - call the function
219 blocking_operation->state->result = blocking_operation->function(blocking_operation->data);
220 blocking_operation->state->saved_errno = errno;
221
222 // Atomically transition to completed (unless cancelled during execution)
223 expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
224 if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) == expected) {
225 // Successfully completed
226 return 0;
227 } else {
228 // Was cancelled during execution
229 blocking_operation->state->saved_errno = EINTR;
230 return -1;
231 }
232}
233
234/*
235 * C API: Create a new blocking operation
236 *
237 * This creates a blocking operation that can be executed by native work pools.
238 * The blocking operation holds references to the function and data safely.
239 */
240VALUE
241rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data,
242 rb_unblock_function_t *unblock_function, void *data2,
243 int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
244{
245 VALUE self = blocking_operation_alloc(rb_cFiberSchedulerBlockingOperation);
246 rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
247
248 blocking_operation->function = function;
249 blocking_operation->data = data;
250 blocking_operation->unblock_function = unblock_function;
251 blocking_operation->data2 = data2;
252 blocking_operation->flags = flags;
253 blocking_operation->state = state;
254
255 return self;
256}
257
258/*
259 *
260 * Document-class: Fiber::Scheduler
261 *
262 * This is not an existing class, but documentation of the interface that Scheduler
263 * object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking
264 * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations
265 * of some concepts.
266 *
267 * Scheduler's behavior and usage are expected to be as follows:
268 *
269 * * When the execution in the non-blocking Fiber reaches some blocking operation (like
270 * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's
271 * hook methods, listed below.
272 * * Scheduler somehow registers what the current fiber is waiting on, and yields control
273 * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its
274 * wait to end, and other fibers in the same thread can perform)
275 * * At the end of the current thread execution, the scheduler's method #scheduler_close is called
276 * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has
277 * registered on hook calls) and resuming them when the awaited resource is ready
278 * (e.g. I/O ready or sleep time elapsed).
279 *
280 * This way concurrent execution will be achieved transparently for every
281 * individual Fiber's code.
282 *
283 * Scheduler implementations are provided by gems, like
284 * Async[https://github.com/socketry/async].
285 *
286 * Hook methods are:
287 *
288 * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite, and #io_select, #io_close
289 * * #process_wait
290 * * #kernel_sleep
291 * * #timeout_after
292 * * #address_resolve
293 * * #block and #unblock
294 * * #blocking_operation_wait
295 * * (the list is expanded as Ruby developers make more methods having non-blocking calls)
296 *
297 * When not specified otherwise, the hook implementations are mandatory: if they are not
298 * implemented, the methods trying to call hook will fail. To provide backward compatibility,
299 * in the future hooks will be optional (if they are not implemented, due to the scheduler
300 * being created for the older Ruby version, the code which needs this hook will not fail,
301 * and will just behave in a blocking fashion).
302 *
303 * It is also strongly recommended that the scheduler implements the #fiber method, which is
304 * delegated to by Fiber.schedule.
305 *
306 * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in
307 * <tt>test/fiber/scheduler.rb</tt>
308 *
309 */
310void
311Init_Fiber_Scheduler(void)
312{
313 id_close = rb_intern_const("close");
314 id_scheduler_close = rb_intern_const("scheduler_close");
315
316 id_block = rb_intern_const("block");
317 id_unblock = rb_intern_const("unblock");
318
319 id_timeout_after = rb_intern_const("timeout_after");
320 id_kernel_sleep = rb_intern_const("kernel_sleep");
321 id_process_wait = rb_intern_const("process_wait");
322
323 id_io_read = rb_intern_const("io_read");
324 id_io_pread = rb_intern_const("io_pread");
325 id_io_write = rb_intern_const("io_write");
326 id_io_pwrite = rb_intern_const("io_pwrite");
327
328 id_io_wait = rb_intern_const("io_wait");
329 id_io_select = rb_intern_const("io_select");
330 id_io_close = rb_intern_const("io_close");
331
332 id_address_resolve = rb_intern_const("address_resolve");
333
334 id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
335 id_fiber_interrupt = rb_intern_const("fiber_interrupt");
336
337 id_fiber_schedule = rb_intern_const("fiber");
338
339 // Define an anonymous BlockingOperation class for internal use only
340 // This is completely hidden from Ruby code and cannot be instantiated directly
341 rb_cFiberSchedulerBlockingOperation = rb_class_new(rb_cObject);
342 rb_define_alloc_func(rb_cFiberSchedulerBlockingOperation, blocking_operation_alloc);
343 rb_define_method(rb_cFiberSchedulerBlockingOperation, "call", blocking_operation_call, 0);
344
345 // Register the anonymous class as a GC root so it doesn't get collected
346 rb_gc_register_mark_object(rb_cFiberSchedulerBlockingOperation);
347
348#if 0 /* for RDoc */
349 rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject);
350 rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0);
351 rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_process_wait, 2);
352 rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_io_wait, 3);
353 rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_io_read, 4);
354 rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_io_write, 4);
355 rb_define_method(rb_cFiberScheduler, "io_pread", rb_fiber_scheduler_io_pread, 5);
356 rb_define_method(rb_cFiberScheduler, "io_pwrite", rb_fiber_scheduler_io_pwrite, 5);
357 rb_define_method(rb_cFiberScheduler, "io_select", rb_fiber_scheduler_io_select, 4);
358 rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_kernel_sleep, 1);
359 rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_address_resolve, 1);
360 rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3);
361 rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2);
362 rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
363 rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2);
364 rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2);
365#endif
366}
367
368VALUE
370{
371 RUBY_ASSERT(ruby_thread_has_gvl_p());
372
373 rb_thread_t *thread = GET_THREAD();
374 RUBY_ASSERT(thread);
375
376 return thread->scheduler;
377}
378
379static void
380verify_interface(VALUE scheduler)
381{
382 if (!rb_respond_to(scheduler, id_block)) {
383 rb_raise(rb_eArgError, "Scheduler must implement #block");
384 }
385
386 if (!rb_respond_to(scheduler, id_unblock)) {
387 rb_raise(rb_eArgError, "Scheduler must implement #unblock");
388 }
389
390 if (!rb_respond_to(scheduler, id_kernel_sleep)) {
391 rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep");
392 }
393
394 if (!rb_respond_to(scheduler, id_io_wait)) {
395 rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
396 }
397
398 if (!rb_respond_to(scheduler, id_fiber_interrupt)) {
399 rb_warn("Scheduler should implement #fiber_interrupt");
400 }
401}
402
403static VALUE
404fiber_scheduler_close(VALUE scheduler)
405{
406 return rb_fiber_scheduler_close(scheduler);
407}
408
409static VALUE
410fiber_scheduler_close_ensure(VALUE _thread)
411{
412 rb_thread_t *thread = (rb_thread_t*)_thread;
413 thread->scheduler = Qnil;
414
415 return Qnil;
416}
417
418VALUE
420{
421 RUBY_ASSERT(ruby_thread_has_gvl_p());
422
423 rb_thread_t *thread = GET_THREAD();
424 RUBY_ASSERT(thread);
425
426 if (scheduler != Qnil) {
427 verify_interface(scheduler);
428 }
429
430 // We invoke Scheduler#close when setting it to something else, to ensure
431 // the previous scheduler runs to completion before changing the scheduler.
432 // That way, we do not need to consider interactions, e.g., of a Fiber from
433 // the previous scheduler with the new scheduler.
434 if (thread->scheduler != Qnil) {
435 // rb_fiber_scheduler_close(thread->scheduler);
436 rb_ensure(fiber_scheduler_close, thread->scheduler, fiber_scheduler_close_ensure, (VALUE)thread);
437 }
438
439 thread->scheduler = scheduler;
440
441 return thread->scheduler;
442}
443
444static VALUE
445rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
446{
447 RUBY_ASSERT(thread);
448
449 if (thread->blocking == 0) {
450 return thread->scheduler;
451 }
452 else {
453 return Qnil;
454 }
455}
456
457VALUE
459{
460 return rb_fiber_scheduler_current_for_threadptr(GET_THREAD());
461}
462
464{
465 return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
466}
467
468/*
469 *
470 * Document-method: Fiber::Scheduler#close
471 *
472 * Called when the current thread exits. The scheduler is expected to implement this
473 * method in order to allow all waiting fibers to finalize their execution.
474 *
475 * The suggested pattern is to implement the main event loop in the #close method.
476 *
477 */
478VALUE
480{
481 RUBY_ASSERT(ruby_thread_has_gvl_p());
482
483 VALUE result;
484
485 // The reason for calling `scheduler_close` before calling `close` is for
486 // legacy schedulers which implement `close` and expect the user to call
487 // it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
488 // which should call `scheduler_close`. If it were to call `close`, it
489 // would create an infinite loop.
490
491 result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
492 if (!UNDEF_P(result)) return result;
493
494 result = rb_check_funcall(scheduler, id_close, 0, NULL);
495 if (!UNDEF_P(result)) return result;
496
497 return Qnil;
498}
499
500VALUE
502{
503 if (timeout) {
504 return rb_float_new((double)timeout->tv_sec + (0.000001 * timeout->tv_usec));
505 }
506
507 return Qnil;
508}
509
510/*
511 * Document-method: Fiber::Scheduler#kernel_sleep
512 * call-seq: kernel_sleep(duration = nil)
513 *
514 * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide
515 * an implementation of sleeping in a non-blocking way. Implementation might
516 * register the current fiber in some list of "which fiber wait until what
517 * moment", call Fiber.yield to pass control, and then in #close resume
518 * the fibers whose wait period has elapsed.
519 *
520 */
521VALUE
523{
524 return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
525}
526
527VALUE
528rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
529{
530 return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
531}
532
533#if 0
534/*
535 * Document-method: Fiber::Scheduler#timeout_after
536 * call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block
537 *
538 * Invoked by Timeout.timeout to execute the given +block+ within the given
539 * +duration+. It can also be invoked directly by the scheduler or user code.
540 *
541 * Attempt to limit the execution time of a given +block+ to the given
542 * +duration+ if possible. When a non-blocking operation causes the +block+'s
543 * execution time to exceed the specified +duration+, that non-blocking
544 * operation should be interrupted by raising the specified +exception_class+
545 * constructed with the given +exception_arguments+.
546 *
547 * General execution timeouts are often considered risky. This implementation
548 * will only interrupt non-blocking operations. This is by design because it's
549 * expected that non-blocking operations can fail for a variety of
550 * unpredictable reasons, so applications should already be robust in handling
551 * these conditions and by implication timeouts.
552 *
553 * However, as a result of this design, if the +block+ does not invoke any
554 * non-blocking operations, it will be impossible to interrupt it. If you
555 * desire to provide predictable points for timeouts, consider adding
556 * +sleep(0)+.
557 *
558 * If the block is executed successfully, its result will be returned.
559 *
560 * The exception will typically be raised using Fiber#raise.
561 */
562VALUE
563rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
564{
565 VALUE arguments[] = {
566 timeout, exception, message
567 };
568
569 return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
570}
571
572VALUE
573rb_fiber_scheduler_timeout_afterv(VALUE scheduler, int argc, VALUE * argv)
574{
575 return rb_check_funcall(scheduler, id_timeout_after, argc, argv);
576}
577#endif
578
579/*
580 * Document-method: Fiber::Scheduler#process_wait
581 * call-seq: process_wait(pid, flags)
582 *
583 * Invoked by Process::Status.wait in order to wait for a specified process.
584 * See that method description for arguments description.
585 *
586 * Suggested minimal implementation:
587 *
588 * Thread.new do
589 * Process::Status.wait(pid, flags)
590 * end.value
591 *
592 * This hook is optional: if it is not present in the current scheduler,
593 * Process::Status.wait will behave as a blocking method.
594 *
595 * Expected to return a Process::Status instance.
596 */
597VALUE
598rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
599{
600 VALUE arguments[] = {
601 PIDT2NUM(pid), RB_INT2NUM(flags)
602 };
603
604 return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
605}
606
607/*
608 * Document-method: Fiber::Scheduler#block
609 * call-seq: block(blocker, timeout = nil)
610 *
611 * Invoked by methods like Thread.join, and by Mutex, to signify that current
612 * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has
613 * elapsed.
614 *
615 * +blocker+ is what we are waiting on, informational only (for debugging and
616 * logging). There are no guarantee about its value.
617 *
618 * Expected to return boolean, specifying whether the blocking operation was
619 * successful or not.
620 */
621VALUE
622rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
623{
624 return rb_funcall(scheduler, id_block, 2, blocker, timeout);
625}
626
627/*
628 * Document-method: Fiber::Scheduler#unblock
629 * call-seq: unblock(blocker, fiber)
630 *
631 * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock
632 * calls #block and Mutex#unlock calls #unblock). The scheduler should use
633 * the +fiber+ parameter to understand which fiber is unblocked.
634 *
635 * +blocker+ is what was awaited for, but it is informational only (for debugging
636 * and logging), and it is not guaranteed to be the same value as the +blocker+ for
637 * #block.
638 *
639 */
640VALUE
641rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
642{
643 RUBY_ASSERT(rb_obj_is_fiber(fiber));
644
645 // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`.
646 // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
647 int saved_errno = errno;
648
649#ifdef RUBY_DEBUG
650 rb_execution_context_t *ec = GET_EC();
651 if (RUBY_VM_INTERRUPTED(ec)) {
652 rb_bug("rb_fiber_scheduler_unblock called with pending interrupt");
653 }
654#endif
655
656 VALUE result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
657
658 errno = saved_errno;
659
660 return result;
661}
662
663/*
664 * Document-method: Fiber::Scheduler#io_wait
665 * call-seq: io_wait(io, events, timeout)
666 *
667 * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the
668 * specified descriptor is ready for specified events within
669 * the specified +timeout+.
670 *
671 * +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and
672 * <tt>IO::PRIORITY</tt>.
673 *
674 * Suggested implementation should register which Fiber is waiting for which
675 * resources and immediately calling Fiber.yield to pass control to other
676 * fibers. Then, in the #close method, the scheduler might dispatch all the
677 * I/O resources to fibers waiting for it.
678 *
679 * Expected to return the subset of events that are ready immediately.
680 *
681 */
682static VALUE
683fiber_scheduler_io_wait(VALUE _argument) {
684 VALUE *arguments = (VALUE*)_argument;
685
686 return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
687}
688
689VALUE
690rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
691{
692 VALUE arguments[] = {
693 scheduler, io, events, timeout
694 };
695
696 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
697 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
698 } else {
699 return fiber_scheduler_io_wait((VALUE)&arguments);
700 }
701}
702
703VALUE
708
709VALUE
714
715/*
716 * Document-method: Fiber::Scheduler#io_select
717 * call-seq: io_select(readables, writables, exceptables, timeout)
718 *
719 * Invoked by IO.select to ask whether the specified descriptors are ready for
720 * specified events within the specified +timeout+.
721 *
722 * Expected to return the 3-tuple of Array of IOs that are ready.
723 *
724 */
725VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
726{
727 VALUE arguments[] = {
728 readables, writables, exceptables, timeout
729 };
730
731 return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
732}
733
735{
736 // I wondered about extracting argv, and checking if there is only a single
737 // IO instance, and instead calling `io_wait`. However, it would require a
738 // decent amount of work and it would be hard to preserve the exact
739 // semantics of IO.select.
740
741 return rb_check_funcall(scheduler, id_io_select, argc, argv);
742}
743
744/*
745 * Document-method: Fiber::Scheduler#io_read
746 * call-seq: io_read(io, buffer, length, offset) -> read length or -errno
747 *
748 * Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a
749 * specified +buffer+ (see IO::Buffer) at the given +offset+.
750 *
751 * The +length+ argument is the "minimum length to be read". If the IO buffer
752 * size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read,
753 * but at least 1KiB will be. Generally, the only case where less data than
754 * +length+ will be read is if there is an error reading the data.
755 *
756 * Specifying a +length+ of 0 is valid and means try reading at least once and
757 * return any available data.
758 *
759 * Suggested implementation should try to read from +io+ in a non-blocking
760 * manner and call #io_wait if the +io+ is not ready (which will yield control
761 * to other fibers).
762 *
763 * See IO::Buffer for an interface available to return data.
764 *
765 * Expected to return number of bytes read, or, in case of an error,
766 * <tt>-errno</tt> (negated number corresponding to system's error code).
767 *
768 * The method should be considered _experimental_.
769 */
770static VALUE
771fiber_scheduler_io_read(VALUE _argument) {
772 VALUE *arguments = (VALUE*)_argument;
773
774 return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
775}
776
777VALUE
778rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
779{
780 if (!rb_respond_to(scheduler, id_io_read)) {
781 return RUBY_Qundef;
782 }
783
784 VALUE arguments[] = {
785 scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
786 };
787
788 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
789 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
790 } else {
791 return fiber_scheduler_io_read((VALUE)&arguments);
792 }
793}
794
795/*
796 * Document-method: Fiber::Scheduler#io_read
797 * call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno
798 *
799 * Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+
800 * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
801 * +offset+.
802 *
803 * This method is semantically the same as #io_read, but it allows to specify
804 * the offset to read from and is often better for asynchronous IO on the same
805 * file.
806 *
807 * The method should be considered _experimental_.
808 */
809static VALUE
810fiber_scheduler_io_pread(VALUE _argument) {
811 VALUE *arguments = (VALUE*)_argument;
812
813 return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
814}
815
816VALUE
817rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
818{
819 if (!rb_respond_to(scheduler, id_io_pread)) {
820 return RUBY_Qundef;
821 }
822
823 VALUE arguments[] = {
824 scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
825 };
826
827 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
828 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
829 } else {
830 return fiber_scheduler_io_pread((VALUE)&arguments);
831 }
832}
833
834/*
835 * Document-method: Scheduler#io_write
836 * call-seq: io_write(io, buffer, length, offset) -> written length or -errno
837 *
838 * Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from
839 * from a specified +buffer+ (see IO::Buffer) at the given +offset+.
840 *
841 * The +length+ argument is the "minimum length to be written". If the IO
842 * buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB
843 * will be written, but at least 1KiB will be. Generally, the only case where
844 * less data than +length+ will be written is if there is an error writing the
845 * data.
846 *
847 * Specifying a +length+ of 0 is valid and means try writing at least once, as
848 * much data as possible.
849 *
850 * Suggested implementation should try to write to +io+ in a non-blocking
851 * manner and call #io_wait if the +io+ is not ready (which will yield control
852 * to other fibers).
853 *
854 * See IO::Buffer for an interface available to get data from buffer
855 * efficiently.
856 *
857 * Expected to return number of bytes written, or, in case of an error,
858 * <tt>-errno</tt> (negated number corresponding to system's error code).
859 *
860 * The method should be considered _experimental_.
861 */
862static VALUE
863fiber_scheduler_io_write(VALUE _argument) {
864 VALUE *arguments = (VALUE*)_argument;
865
866 return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
867}
868
869VALUE
870rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
871{
872 if (!rb_respond_to(scheduler, id_io_write)) {
873 return RUBY_Qundef;
874 }
875
876 VALUE arguments[] = {
877 scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
878 };
879
880 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
881 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
882 } else {
883 return fiber_scheduler_io_write((VALUE)&arguments);
884 }
885}
886
887/*
888 * Document-method: Fiber::Scheduler#io_pwrite
889 * call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno
890 *
891 * Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+
892 * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
893 * +offset+.
894 *
895 * This method is semantically the same as #io_write, but it allows to specify
896 * the offset to write to and is often better for asynchronous IO on the same
897 * file.
898 *
899 * The method should be considered _experimental_.
900 *
901 */
902static VALUE
903fiber_scheduler_io_pwrite(VALUE _argument) {
904 VALUE *arguments = (VALUE*)_argument;
905
906 return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
907}
908
909VALUE
910rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
911{
912 if (!rb_respond_to(scheduler, id_io_pwrite)) {
913 return RUBY_Qundef;
914 }
915
916 VALUE arguments[] = {
917 scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
918 };
919
920 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
921 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
922 } else {
923 return fiber_scheduler_io_pwrite((VALUE)&arguments);
924 }
925}
926
927VALUE
928rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
929{
930 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
931
932 VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0);
933
934 rb_io_buffer_free_locked(buffer);
935
936 return result;
937}
938
939VALUE
940rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
941{
942 VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
943
944 VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0);
945
946 rb_io_buffer_free_locked(buffer);
947
948 return result;
949}
950
951VALUE
952rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
953{
954 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
955
956 VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0);
957
958 rb_io_buffer_free_locked(buffer);
959
960 return result;
961}
962
963VALUE
964rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
965{
966 VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
967
968 VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0);
969
970 rb_io_buffer_free_locked(buffer);
971
972 return result;
973}
974
975VALUE
977{
978 VALUE arguments[] = {io};
979
980 return rb_check_funcall(scheduler, id_io_close, 1, arguments);
981}
982
983/*
984 * Document-method: Fiber::Scheduler#address_resolve
985 * call-seq: address_resolve(hostname) -> array_of_strings or nil
986 *
987 * Invoked by any method that performs a non-reverse DNS lookup. The most
988 * notable method is Addrinfo.getaddrinfo, but there are many other.
989 *
990 * The method is expected to return an array of strings corresponding to ip
991 * addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved.
992 *
993 * Fairly exhaustive list of all possible call-sites:
994 *
995 * - Addrinfo.getaddrinfo
996 * - Addrinfo.tcp
997 * - Addrinfo.udp
998 * - Addrinfo.ip
999 * - Addrinfo.new
1000 * - Addrinfo.marshal_load
1001 * - SOCKSSocket.new
1002 * - TCPServer.new
1003 * - TCPSocket.new
1004 * - IPSocket.getaddress
1005 * - TCPSocket.gethostbyname
1006 * - UDPSocket#connect
1007 * - UDPSocket#bind
1008 * - UDPSocket#send
1009 * - Socket.getaddrinfo
1010 * - Socket.gethostbyname
1011 * - Socket.pack_sockaddr_in
1012 * - Socket.sockaddr_in
1013 * - Socket.unpack_sockaddr_in
1014 */
1015VALUE
1017{
1018 VALUE arguments[] = {
1019 hostname
1020 };
1021
1022 return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
1023}
1024
1025/*
1026 * Document-method: Fiber::Scheduler#blocking_operation_wait
1027 * call-seq: blocking_operation_wait(blocking_operation)
1028 *
1029 * Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
1030 * The blocking_operation is a Fiber::Scheduler::BlockingOperation that encapsulates the blocking operation.
1031 *
1032 * If the scheduler doesn't implement this method, or if the scheduler doesn't execute
1033 * the blocking operation, Ruby will fall back to the non-scheduler implementation.
1034 *
1035 * Minimal suggested implementation is:
1036 *
1037 * def blocking_operation_wait(blocking_operation)
1038 * Thread.new { blocking_operation.call }.join
1039 * end
1040 */
1041VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
1042{
1043 // Check if scheduler supports blocking_operation_wait before creating the object
1044 if (!rb_respond_to(scheduler, id_blocking_operation_wait)) {
1045 return Qundef;
1046 }
1047
1048 // Create a new BlockingOperation with the blocking operation
1049 VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state);
1050
1051 VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation);
1052
1053 // Get the operation data to check if it was executed
1054 rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation);
1055 rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status);
1056
1057 // Invalidate the operation now that we're done with it
1058 operation->function = NULL;
1059 operation->state = NULL;
1060 operation->data = NULL;
1061 operation->data2 = NULL;
1062 operation->unblock_function = NULL;
1063
1064 // If the blocking operation was never executed, return Qundef to signal
1065 // the caller to use rb_nogvl instead
1066 if (current_status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) {
1067 return Qundef;
1068 }
1069
1070 return result;
1071}
1072
1074{
1075 VALUE arguments[] = {
1076 fiber, exception
1077 };
1078
1079#ifdef RUBY_DEBUG
1080 rb_execution_context_t *ec = GET_EC();
1081 if (RUBY_VM_INTERRUPTED(ec)) {
1082 rb_bug("rb_fiber_scheduler_fiber_interrupt called with pending interrupt");
1083 }
1084#endif
1085
1086 return rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
1087}
1088
1089/*
1090 * Document-method: Fiber::Scheduler#fiber
1091 * call-seq: fiber(&block)
1092 *
1093 * Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately
1094 * run the given block of code in a separate non-blocking fiber, and to return that Fiber.
1095 *
1096 * Minimal suggested implementation is:
1097 *
1098 * def fiber(&block)
1099 * fiber = Fiber.new(blocking: false, &block)
1100 * fiber.resume
1101 * fiber
1102 * end
1103 */
1104VALUE
1105rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
1106{
1107 return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
1108}
1109
1110/*
1111 * C API: Cancel a blocking operation
1112 *
1113 * This function cancels a blocking operation. If the operation is queued,
1114 * it just marks it as cancelled. If it's executing, it marks it as cancelled
1115 * and calls the unblock function to interrupt the operation.
1116 *
1117 * Returns 1 if unblock function was called, 0 if just marked cancelled, -1 on error.
1118 */
1119int
1121{
1122 if (blocking_operation == NULL) {
1123 return -1;
1124 }
1125
1126 rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status);
1127
1128 switch (current_state) {
1129 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED:
1130 // Work hasn't started - just mark as cancelled
1131 if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) {
1132 return 0; // Successfully cancelled before execution
1133 }
1134 // Fall through if state changed between load and CAS
1135
1136 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING:
1137 // Work is running - mark cancelled AND call unblock function
1138 RUBY_ATOMIC_SET(blocking_operation->status, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED);
1139 if (blocking_operation->unblock_function) {
1140 blocking_operation->unblock_function(blocking_operation->data2);
1141 }
1142 return 1; // Cancelled during execution (unblock function called)
1143
1144 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED:
1145 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED:
1146 // Already finished or cancelled
1147 return 0;
1148 }
1149
1150 return 0;
1151}
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
#define RUBY_ATOMIC_CAS(var, oldval, newval)
Atomic compare-and-swap.
Definition atomic.h:140
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define RUBY_ATOMIC_LOAD(var)
Atomic load.
Definition atomic.h:150
#define RUBY_ATOMIC_SET(var, val)
Identical to RUBY_ATOMIC_EXCHANGE, except for the return type.
Definition atomic.h:160
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
VALUE rb_class_new(VALUE super)
Creates a new, anonymous class.
Definition class.c:835
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition class.c:1501
#define Qundef
Old name of RUBY_Qundef.
#define SIZET2NUM
Old name of RB_SIZE2NUM.
Definition size_t.h:62
#define Qnil
Old name of RUBY_Qnil.
VALUE rb_eRuntimeError
RuntimeError exception.
Definition error.c:1428
void rb_warn(const char *fmt,...)
Identical to rb_warning(), except it reports unless $VERBOSE is nil.
Definition error.c:466
VALUE rb_funcall(VALUE recv, ID mid, int n,...)
Calls a method.
Definition vm_eval.c:1117
VALUE rb_funcall_passing_block_kw(VALUE recv, ID mid, int argc, const VALUE *argv, int kw_splat)
Identical to rb_funcallv_passing_block(), except you can specify how to handle the last element of th...
Definition vm_eval.c:1187
void rb_unblock_function_t(void *)
This is the type of UBFs.
Definition thread.h:336
int rb_respond_to(VALUE obj, ID mid)
Queries if the object responds to the method.
Definition vm_method.c:3058
VALUE rb_check_funcall(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcallv(), except it returns RUBY_Qundef instead of raising rb_eNoMethodError.
Definition vm_eval.c:686
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
static ID rb_intern_const(const char *str)
This is a "tiny optimisation" over rb_intern().
Definition symbol.h:284
VALUE rb_io_timeout(VALUE io)
Get the timeout associated with the specified io object.
Definition io.c:857
@ RUBY_IO_READABLE
IO::READABLE
Definition io.h:97
@ RUBY_IO_WRITABLE
IO::WRITABLE
Definition io.h:98
void * rb_nogvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2, int flags)
Identical to rb_thread_call_without_gvl(), except it additionally takes "flags" that change the behav...
Definition thread.c:1547
#define RB_UINT2NUM
Just another name of rb_uint2num_inline.
Definition int.h:39
#define RB_INT2NUM
Just another name of rb_int2num_inline.
Definition int.h:37
VALUE rb_ensure(type *q, VALUE w, type *e, VALUE r)
An equivalent of ensure clause.
#define OFFT2NUM
Converts a C's off_t into an instance of rb_cInteger.
Definition off_t.h:33
#define PIDT2NUM
Converts a C's pid_t into an instance of rb_cInteger.
Definition pid_t.h:28
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
Definition rtypeddata.h:515
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:497
#define errno
Ractor-aware version of errno.
Definition ruby.h:388
Scheduler APIs.
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void *(*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
Defer the execution of the passed function to the scheduler.
Definition scheduler.c:1041
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition scheduler.c:458
VALUE rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
Non-blocking pread from the passed IO using a native buffer.
Definition scheduler.c:952
VALUE rb_fiber_scheduler_make_timeout(struct timeval *timeout)
Converts the passed timeout to an expression that rb_fiber_scheduler_block() etc.
Definition scheduler.c:501
VALUE rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for reading.
Definition scheduler.c:704
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
Non-blocking read from the passed IO using a native buffer.
Definition scheduler.c:928
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO at the specified offset.
Definition scheduler.c:910
VALUE rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE *argv)
Identical to rb_fiber_scheduler_kernel_sleep(), except it can pass multiple arguments.
Definition scheduler.c:528
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
Interrupt a fiber by raising an exception.
Definition scheduler.c:1073
VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
Non-blocking version of rb_io_wait().
Definition scheduler.c:690
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
Non-blocking version of IO.select.
Definition scheduler.c:725
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO.
Definition scheduler.c:778
int rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
Cancel a blocking operation.
Definition scheduler.c:1120
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
Non-blocking version of IO.select, argv variant.
Definition scheduler.c:734
VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
Non-blocking waitpid.
Definition scheduler.c:598
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition scheduler.c:622
int rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
Execute blocking operation from handle (GVL not required).
Definition scheduler.c:201
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO at the specified offset.
Definition scheduler.c:817
VALUE rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
Non-blocking pwrite to the passed IO using a native buffer.
Definition scheduler.c:964
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO.
Definition scheduler.c:870
VALUE rb_fiber_scheduler_close(VALUE scheduler)
Closes the passed scheduler object.
Definition scheduler.c:479
rb_fiber_scheduler_blocking_operation_t * rb_fiber_scheduler_blocking_operation_extract(VALUE self)
Extract the blocking operation handle from a BlockingOperationRuby object.
Definition scheduler.c:186
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
Identical to rb_fiber_scheduler_current(), except it queries for that of the passed thread instead of...
Definition scheduler.c:463
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
Definition scheduler.c:522
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
Non-blocking DNS lookup.
Definition scheduler.c:1016
VALUE rb_fiber_scheduler_set(VALUE scheduler)
Destructively assigns the passed scheduler to that of the current thread that is calling this functio...
Definition scheduler.c:419
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
Non-blocking write to the passed IO using a native buffer.
Definition scheduler.c:940
VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for writing.
Definition scheduler.c:710
VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
Non-blocking close the given IO.
Definition scheduler.c:976
VALUE rb_fiber_scheduler_get(void)
Queries the current scheduler of the current thread that is calling this function.
Definition scheduler.c:369
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition scheduler.c:641
VALUE rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
Create and schedule a non-blocking fiber.
Definition scheduler.c:1105
@ RUBY_Qundef
Represents so-called undef.
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:203
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40