Ruby 3.5.0dev (2025-10-10 revision 83d0b064c88df718e13bb8d6b4182ec635f7b03b)
scheduler.c (83d0b064c88df718e13bb8d6b4182ec635f7b03b)
1/**********************************************************************
2
3 scheduler.c
4
5 $Author$
6
7 Copyright (C) 2020 Samuel Grant Dawson Williams
8
9**********************************************************************/
10
11#include "vm_core.h"
12#include "eval_intern.h"
14#include "ruby/io.h"
15#include "ruby/io/buffer.h"
16
17#include "ruby/thread.h"
18
19// For `ruby_thread_has_gvl_p`:
20#include "internal/thread.h"
21
22// For atomic operations:
23#include "ruby_atomic.h"
24
25static ID id_close;
26static ID id_scheduler_close;
27
28static ID id_block;
29static ID id_unblock;
30
31static ID id_timeout_after;
32static ID id_kernel_sleep;
33static ID id_process_wait;
34
35static ID id_io_read, id_io_pread;
36static ID id_io_write, id_io_pwrite;
37static ID id_io_wait;
38static ID id_io_select;
39static ID id_io_close;
40
41static ID id_address_resolve;
42
43static ID id_blocking_operation_wait;
44static ID id_fiber_interrupt;
45
46static ID id_fiber_schedule;
47
48// Our custom blocking operation class
49static VALUE rb_cFiberSchedulerBlockingOperation;
50
51/*
52 * Custom blocking operation structure for blocking operations
53 * This replaces the use of Ruby procs to avoid use-after-free issues
54 * and provides a cleaner C API for native work pools.
55 */
56
57typedef enum {
58 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED, // Submitted but not started
59 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING, // Currently running
60 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED, // Finished (success/error)
61 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED // Cancelled
62} rb_fiber_blocking_operation_status_t;
63
65 void *(*function)(void *);
66 void *data;
67
68 rb_unblock_function_t *unblock_function;
69 void *data2;
70
71 int flags;
73
74 // Execution status
75 volatile rb_atomic_t status;
76};
77
78static void
79blocking_operation_mark(void *ptr)
80{
81 // No Ruby objects to mark in our struct
82}
83
84static void
85blocking_operation_free(void *ptr)
86{
88 ruby_xfree(blocking_operation);
89}
90
91static size_t
92blocking_operation_memsize(const void *ptr)
93{
95}
96
97static const rb_data_type_t blocking_operation_data_type = {
98 "Fiber::Scheduler::BlockingOperation",
99 {
100 blocking_operation_mark,
101 blocking_operation_free,
102 blocking_operation_memsize,
103 },
104 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
105};
106
107/*
108 * Allocate a new blocking operation
109 */
110static VALUE
111blocking_operation_alloc(VALUE klass)
112{
113 rb_fiber_scheduler_blocking_operation_t *blocking_operation;
114 VALUE obj = TypedData_Make_Struct(klass, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
115
116 blocking_operation->function = NULL;
117 blocking_operation->data = NULL;
118 blocking_operation->unblock_function = NULL;
119 blocking_operation->data2 = NULL;
120 blocking_operation->flags = 0;
121 blocking_operation->state = NULL;
122 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
123
124 return obj;
125}
126
127/*
128 * Get the blocking operation struct from a Ruby object
129 */
131get_blocking_operation(VALUE obj)
132{
133 rb_fiber_scheduler_blocking_operation_t *blocking_operation;
134 TypedData_Get_Struct(obj, rb_fiber_scheduler_blocking_operation_t, &blocking_operation_data_type, blocking_operation);
135 return blocking_operation;
136}
137
138/*
139 * Document-method: Fiber::Scheduler::BlockingOperation#call
140 *
141 * Execute the blocking operation. This method releases the GVL and calls
142 * the blocking function, then restores the errno value.
143 *
144 * Returns nil. The actual result is stored in the associated state object.
145 */
146static VALUE
147blocking_operation_call(VALUE self)
148{
149 rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
150
151 if (blocking_operation->status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
152 rb_raise(rb_eRuntimeError, "Blocking operation has already been executed!");
153 }
154
155 if (blocking_operation->function == NULL) {
156 rb_raise(rb_eRuntimeError, "Blocking operation has no function to execute!");
157 }
158
159 if (blocking_operation->state == NULL) {
160 rb_raise(rb_eRuntimeError, "Blocking operation has no result object!");
161 }
162
163 // Mark as executing
164 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
165
166 // Execute the blocking operation without GVL
167 blocking_operation->state->result = rb_nogvl(blocking_operation->function, blocking_operation->data,
168 blocking_operation->unblock_function, blocking_operation->data2,
169 blocking_operation->flags);
170 blocking_operation->state->saved_errno = rb_errno();
171
172 // Mark as completed
173 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED;
174
175 return Qnil;
176}
177
178/*
179 * C API: Extract blocking operation struct from Ruby object (GVL required)
180 *
181 * This function safely extracts the opaque struct from a BlockingOperation VALUE
182 * while holding the GVL. The returned pointer can be passed to worker threads
183 * and used with rb_fiber_scheduler_blocking_operation_execute_opaque_nogvl.
184 *
185 * Returns the opaque struct pointer on success, NULL on error.
186 * Must be called while holding the GVL.
187 */
190{
191 return get_blocking_operation(self);
192}
193
194/*
195 * C API: Execute blocking operation from opaque struct (GVL not required)
196 *
197 * This function executes a blocking operation using the opaque struct pointer
198 * obtained from rb_fiber_scheduler_blocking_operation_extract.
199 * It can be called from native threads without holding the GVL.
200 *
201 * Returns 0 on success, -1 on error.
202 */
203int
205{
206 if (blocking_operation == NULL) {
207 return -1;
208 }
209
210 if (blocking_operation->function == NULL || blocking_operation->state == NULL) {
211 return -1; // Invalid blocking operation
212 }
213
214 // Resolve sentinel values for unblock_function and data2:
215 rb_thread_resolve_unblock_function(&blocking_operation->unblock_function, &blocking_operation->data2, GET_THREAD());
216
217 // Atomically check if we can transition from QUEUED to EXECUTING
218 rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
219 if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) {
220 // Already cancelled or in wrong state
221 return -1;
222 }
223
224 // Now we're executing - call the function
225 blocking_operation->state->result = blocking_operation->function(blocking_operation->data);
226 blocking_operation->state->saved_errno = errno;
227
228 // Atomically transition to completed (unless cancelled during execution)
229 expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
230 if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) == expected) {
231 // Successfully completed
232 return 0;
233 } else {
234 // Was cancelled during execution
235 blocking_operation->state->saved_errno = EINTR;
236 return -1;
237 }
238}
239
240/*
241 * C API: Create a new blocking operation
242 *
243 * This creates a blocking operation that can be executed by native work pools.
244 * The blocking operation holds references to the function and data safely.
245 */
246VALUE
247rb_fiber_scheduler_blocking_operation_new(void *(*function)(void *), void *data,
248 rb_unblock_function_t *unblock_function, void *data2,
249 int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
250{
251 VALUE self = blocking_operation_alloc(rb_cFiberSchedulerBlockingOperation);
252 rb_fiber_scheduler_blocking_operation_t *blocking_operation = get_blocking_operation(self);
253
254 blocking_operation->function = function;
255 blocking_operation->data = data;
256 blocking_operation->unblock_function = unblock_function;
257 blocking_operation->data2 = data2;
258 blocking_operation->flags = flags;
259 blocking_operation->state = state;
260
261 return self;
262}
263
264/*
265 *
266 * Document-class: Fiber::Scheduler
267 *
268 * This is not an existing class, but documentation of the interface that Scheduler
269 * object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking
270 * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations
271 * of some concepts.
272 *
273 * Scheduler's behavior and usage are expected to be as follows:
274 *
275 * * When the execution in the non-blocking Fiber reaches some blocking operation (like
276 * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's
277 * hook methods, listed below.
278 * * Scheduler somehow registers what the current fiber is waiting on, and yields control
279 * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its
280 * wait to end, and other fibers in the same thread can perform)
281 * * At the end of the current thread execution, the scheduler's method #scheduler_close is called
282 * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has
283 * registered on hook calls) and resuming them when the awaited resource is ready
284 * (e.g. I/O ready or sleep time elapsed).
285 *
286 * This way concurrent execution will be achieved transparently for every
287 * individual Fiber's code.
288 *
289 * Scheduler implementations are provided by gems, like
290 * Async[https://github.com/socketry/async].
291 *
292 * Hook methods are:
293 *
294 * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite, and #io_select, #io_close
295 * * #process_wait
296 * * #kernel_sleep
297 * * #timeout_after
298 * * #address_resolve
299 * * #block and #unblock
300 * * #blocking_operation_wait
301 * * (the list is expanded as Ruby developers make more methods having non-blocking calls)
302 *
303 * When not specified otherwise, the hook implementations are mandatory: if they are not
304 * implemented, the methods trying to call hook will fail. To provide backward compatibility,
305 * in the future hooks will be optional (if they are not implemented, due to the scheduler
306 * being created for the older Ruby version, the code which needs this hook will not fail,
307 * and will just behave in a blocking fashion).
308 *
309 * It is also strongly recommended that the scheduler implements the #fiber method, which is
310 * delegated to by Fiber.schedule.
311 *
312 * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in
313 * <tt>test/fiber/scheduler.rb</tt>
314 *
315 */
316void
317Init_Fiber_Scheduler(void)
318{
319 id_close = rb_intern_const("close");
320 id_scheduler_close = rb_intern_const("scheduler_close");
321
322 id_block = rb_intern_const("block");
323 id_unblock = rb_intern_const("unblock");
324
325 id_timeout_after = rb_intern_const("timeout_after");
326 id_kernel_sleep = rb_intern_const("kernel_sleep");
327 id_process_wait = rb_intern_const("process_wait");
328
329 id_io_read = rb_intern_const("io_read");
330 id_io_pread = rb_intern_const("io_pread");
331 id_io_write = rb_intern_const("io_write");
332 id_io_pwrite = rb_intern_const("io_pwrite");
333
334 id_io_wait = rb_intern_const("io_wait");
335 id_io_select = rb_intern_const("io_select");
336 id_io_close = rb_intern_const("io_close");
337
338 id_address_resolve = rb_intern_const("address_resolve");
339
340 id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
341 id_fiber_interrupt = rb_intern_const("fiber_interrupt");
342
343 id_fiber_schedule = rb_intern_const("fiber");
344
345 // Define an anonymous BlockingOperation class for internal use only
346 // This is completely hidden from Ruby code and cannot be instantiated directly
347 rb_cFiberSchedulerBlockingOperation = rb_class_new(rb_cObject);
348 rb_define_alloc_func(rb_cFiberSchedulerBlockingOperation, blocking_operation_alloc);
349 rb_define_method(rb_cFiberSchedulerBlockingOperation, "call", blocking_operation_call, 0);
350
351 // Register the anonymous class as a GC root so it doesn't get collected
352 rb_gc_register_mark_object(rb_cFiberSchedulerBlockingOperation);
353
354#if 0 /* for RDoc */
355 rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject);
356 rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0);
357 rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_process_wait, 2);
358 rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_io_wait, 3);
359 rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_io_read, 4);
360 rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_io_write, 4);
361 rb_define_method(rb_cFiberScheduler, "io_pread", rb_fiber_scheduler_io_pread, 5);
362 rb_define_method(rb_cFiberScheduler, "io_pwrite", rb_fiber_scheduler_io_pwrite, 5);
363 rb_define_method(rb_cFiberScheduler, "io_select", rb_fiber_scheduler_io_select, 4);
364 rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_kernel_sleep, 1);
365 rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_address_resolve, 1);
366 rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3);
367 rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2);
368 rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
369 rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler_fiber, -2);
370 rb_define_method(rb_cFiberScheduler, "blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2);
371#endif
372}
373
374VALUE
376{
377 RUBY_ASSERT(ruby_thread_has_gvl_p());
378
379 rb_thread_t *thread = GET_THREAD();
380 RUBY_ASSERT(thread);
381
382 return thread->scheduler;
383}
384
385static void
386verify_interface(VALUE scheduler)
387{
388 if (!rb_respond_to(scheduler, id_block)) {
389 rb_raise(rb_eArgError, "Scheduler must implement #block");
390 }
391
392 if (!rb_respond_to(scheduler, id_unblock)) {
393 rb_raise(rb_eArgError, "Scheduler must implement #unblock");
394 }
395
396 if (!rb_respond_to(scheduler, id_kernel_sleep)) {
397 rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep");
398 }
399
400 if (!rb_respond_to(scheduler, id_io_wait)) {
401 rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
402 }
403
404 if (!rb_respond_to(scheduler, id_fiber_interrupt)) {
405 rb_warn("Scheduler should implement #fiber_interrupt");
406 }
407}
408
409static VALUE
410fiber_scheduler_close(VALUE scheduler)
411{
412 return rb_fiber_scheduler_close(scheduler);
413}
414
415static VALUE
416fiber_scheduler_close_ensure(VALUE _thread)
417{
418 rb_thread_t *thread = (rb_thread_t*)_thread;
419 thread->scheduler = Qnil;
420
421 return Qnil;
422}
423
424VALUE
426{
427 RUBY_ASSERT(ruby_thread_has_gvl_p());
428
429 rb_thread_t *thread = GET_THREAD();
430 RUBY_ASSERT(thread);
431
432 if (scheduler != Qnil) {
433 verify_interface(scheduler);
434 }
435
436 // We invoke Scheduler#close when setting it to something else, to ensure
437 // the previous scheduler runs to completion before changing the scheduler.
438 // That way, we do not need to consider interactions, e.g., of a Fiber from
439 // the previous scheduler with the new scheduler.
440 if (thread->scheduler != Qnil) {
441 // rb_fiber_scheduler_close(thread->scheduler);
442 rb_ensure(fiber_scheduler_close, thread->scheduler, fiber_scheduler_close_ensure, (VALUE)thread);
443 }
444
445 thread->scheduler = scheduler;
446
447 return thread->scheduler;
448}
449
450static VALUE
451rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
452{
453 RUBY_ASSERT(thread);
454
455 if (thread->blocking == 0) {
456 return thread->scheduler;
457 }
458 else {
459 return Qnil;
460 }
461}
462
463VALUE
465{
466 return rb_fiber_scheduler_current_for_threadptr(GET_THREAD());
467}
468
470{
471 return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
472}
473
474/*
475 *
476 * Document-method: Fiber::Scheduler#close
477 *
478 * Called when the current thread exits. The scheduler is expected to implement this
479 * method in order to allow all waiting fibers to finalize their execution.
480 *
481 * The suggested pattern is to implement the main event loop in the #close method.
482 *
483 */
484VALUE
486{
487 RUBY_ASSERT(ruby_thread_has_gvl_p());
488
489 VALUE result;
490
491 // The reason for calling `scheduler_close` before calling `close` is for
492 // legacy schedulers which implement `close` and expect the user to call
493 // it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
494 // which should call `scheduler_close`. If it were to call `close`, it
495 // would create an infinite loop.
496
497 result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
498 if (!UNDEF_P(result)) return result;
499
500 result = rb_check_funcall(scheduler, id_close, 0, NULL);
501 if (!UNDEF_P(result)) return result;
502
503 return Qnil;
504}
505
506VALUE
508{
509 if (timeout) {
510 return rb_float_new((double)timeout->tv_sec + (0.000001 * timeout->tv_usec));
511 }
512
513 return Qnil;
514}
515
516/*
517 * Document-method: Fiber::Scheduler#kernel_sleep
518 * call-seq: kernel_sleep(duration = nil)
519 *
520 * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide
521 * an implementation of sleeping in a non-blocking way. Implementation might
522 * register the current fiber in some list of "which fiber wait until what
523 * moment", call Fiber.yield to pass control, and then in #close resume
524 * the fibers whose wait period has elapsed.
525 *
526 */
527VALUE
529{
530 return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
531}
532
533VALUE
534rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
535{
536 return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
537}
538
539#if 0
540/*
541 * Document-method: Fiber::Scheduler#timeout_after
542 * call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block
543 *
544 * Invoked by Timeout.timeout to execute the given +block+ within the given
545 * +duration+. It can also be invoked directly by the scheduler or user code.
546 *
547 * Attempt to limit the execution time of a given +block+ to the given
548 * +duration+ if possible. When a non-blocking operation causes the +block+'s
549 * execution time to exceed the specified +duration+, that non-blocking
550 * operation should be interrupted by raising the specified +exception_class+
551 * constructed with the given +exception_arguments+.
552 *
553 * General execution timeouts are often considered risky. This implementation
554 * will only interrupt non-blocking operations. This is by design because it's
555 * expected that non-blocking operations can fail for a variety of
556 * unpredictable reasons, so applications should already be robust in handling
557 * these conditions and by implication timeouts.
558 *
559 * However, as a result of this design, if the +block+ does not invoke any
560 * non-blocking operations, it will be impossible to interrupt it. If you
561 * desire to provide predictable points for timeouts, consider adding
562 * +sleep(0)+.
563 *
564 * If the block is executed successfully, its result will be returned.
565 *
566 * The exception will typically be raised using Fiber#raise.
567 */
568VALUE
569rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
570{
571 VALUE arguments[] = {
572 timeout, exception, message
573 };
574
575 return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
576}
577
578VALUE
579rb_fiber_scheduler_timeout_afterv(VALUE scheduler, int argc, VALUE * argv)
580{
581 return rb_check_funcall(scheduler, id_timeout_after, argc, argv);
582}
583#endif
584
585/*
586 * Document-method: Fiber::Scheduler#process_wait
587 * call-seq: process_wait(pid, flags)
588 *
589 * Invoked by Process::Status.wait in order to wait for a specified process.
590 * See that method description for arguments description.
591 *
592 * Suggested minimal implementation:
593 *
594 * Thread.new do
595 * Process::Status.wait(pid, flags)
596 * end.value
597 *
598 * This hook is optional: if it is not present in the current scheduler,
599 * Process::Status.wait will behave as a blocking method.
600 *
601 * Expected to return a Process::Status instance.
602 */
603VALUE
604rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
605{
606 VALUE arguments[] = {
607 PIDT2NUM(pid), RB_INT2NUM(flags)
608 };
609
610 return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
611}
612
613/*
614 * Document-method: Fiber::Scheduler#block
615 * call-seq: block(blocker, timeout = nil)
616 *
617 * Invoked by methods like Thread.join, and by Mutex, to signify that current
618 * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has
619 * elapsed.
620 *
621 * +blocker+ is what we are waiting on, informational only (for debugging and
622 * logging). There are no guarantee about its value.
623 *
624 * Expected to return boolean, specifying whether the blocking operation was
625 * successful or not.
626 */
627VALUE
628rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
629{
630 return rb_funcall(scheduler, id_block, 2, blocker, timeout);
631}
632
633/*
634 * Document-method: Fiber::Scheduler#unblock
635 * call-seq: unblock(blocker, fiber)
636 *
637 * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock
638 * calls #block and Mutex#unlock calls #unblock). The scheduler should use
639 * the +fiber+ parameter to understand which fiber is unblocked.
640 *
641 * +blocker+ is what was awaited for, but it is informational only (for debugging
642 * and logging), and it is not guaranteed to be the same value as the +blocker+ for
643 * #block.
644 *
645 */
646VALUE
647rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
648{
649 RUBY_ASSERT(rb_obj_is_fiber(fiber));
650
651 VALUE result;
652 enum ruby_tag_type state;
653
654 // `rb_fiber_scheduler_unblock` can be called from points where `errno` is expected to be preserved. Therefore, we should save and restore it. For example `io_binwrite` calls `rb_fiber_scheduler_unblock` and if `errno` is reset to 0 by user code, it will break the error handling in `io_write`.
655 //
656 // If we explicitly preserve `errno` in `io_binwrite` and other similar functions (e.g. by returning it), this code is no longer needed. I hope in the future we will be able to remove it.
657 int saved_errno = errno;
658
659 // We must prevent interrupts while invoking the unblock method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_fiber_interrupt`.
660 rb_execution_context_t *ec = GET_EC();
661 int saved_interrupt_mask = ec->interrupt_mask;
662 ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
663
664 EC_PUSH_TAG(ec);
665 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
666 result = rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
667 }
668 EC_POP_TAG();
669
670 ec->interrupt_mask = saved_interrupt_mask;
671
672 if (state) {
673 EC_JUMP_TAG(ec, state);
674 }
675
676 RUBY_VM_CHECK_INTS(ec);
677
678 errno = saved_errno;
679
680 return result;
681}
682
683/*
684 * Document-method: Fiber::Scheduler#io_wait
685 * call-seq: io_wait(io, events, timeout)
686 *
687 * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the
688 * specified descriptor is ready for specified events within
689 * the specified +timeout+.
690 *
691 * +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and
692 * <tt>IO::PRIORITY</tt>.
693 *
694 * Suggested implementation should register which Fiber is waiting for which
695 * resources and immediately calling Fiber.yield to pass control to other
696 * fibers. Then, in the #close method, the scheduler might dispatch all the
697 * I/O resources to fibers waiting for it.
698 *
699 * Expected to return the subset of events that are ready immediately.
700 *
701 */
702static VALUE
703fiber_scheduler_io_wait(VALUE _argument) {
704 VALUE *arguments = (VALUE*)_argument;
705
706 return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
707}
708
709VALUE
710rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
711{
712 VALUE arguments[] = {
713 scheduler, io, events, timeout
714 };
715
716 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
717 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments);
718 } else {
719 return fiber_scheduler_io_wait((VALUE)&arguments);
720 }
721}
722
723VALUE
728
729VALUE
734
735/*
736 * Document-method: Fiber::Scheduler#io_select
737 * call-seq: io_select(readables, writables, exceptables, timeout)
738 *
739 * Invoked by IO.select to ask whether the specified descriptors are ready for
740 * specified events within the specified +timeout+.
741 *
742 * Expected to return the 3-tuple of Array of IOs that are ready.
743 *
744 */
745VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
746{
747 VALUE arguments[] = {
748 readables, writables, exceptables, timeout
749 };
750
751 return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
752}
753
755{
756 // I wondered about extracting argv, and checking if there is only a single
757 // IO instance, and instead calling `io_wait`. However, it would require a
758 // decent amount of work and it would be hard to preserve the exact
759 // semantics of IO.select.
760
761 return rb_check_funcall(scheduler, id_io_select, argc, argv);
762}
763
764/*
765 * Document-method: Fiber::Scheduler#io_read
766 * call-seq: io_read(io, buffer, length, offset) -> read length or -errno
767 *
768 * Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a
769 * specified +buffer+ (see IO::Buffer) at the given +offset+.
770 *
771 * The +length+ argument is the "minimum length to be read". If the IO buffer
772 * size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read,
773 * but at least 1KiB will be. Generally, the only case where less data than
774 * +length+ will be read is if there is an error reading the data.
775 *
776 * Specifying a +length+ of 0 is valid and means try reading at least once and
777 * return any available data.
778 *
779 * Suggested implementation should try to read from +io+ in a non-blocking
780 * manner and call #io_wait if the +io+ is not ready (which will yield control
781 * to other fibers).
782 *
783 * See IO::Buffer for an interface available to return data.
784 *
785 * Expected to return number of bytes read, or, in case of an error,
786 * <tt>-errno</tt> (negated number corresponding to system's error code).
787 *
788 * The method should be considered _experimental_.
789 */
790static VALUE
791fiber_scheduler_io_read(VALUE _argument) {
792 VALUE *arguments = (VALUE*)_argument;
793
794 return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
795}
796
797VALUE
798rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
799{
800 if (!rb_respond_to(scheduler, id_io_read)) {
801 return RUBY_Qundef;
802 }
803
804 VALUE arguments[] = {
805 scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
806 };
807
808 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
809 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments);
810 } else {
811 return fiber_scheduler_io_read((VALUE)&arguments);
812 }
813}
814
815/*
816 * Document-method: Fiber::Scheduler#io_pread
817 * call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno
818 *
819 * Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+
820 * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
821 * +offset+.
822 *
823 * This method is semantically the same as #io_read, but it allows to specify
824 * the offset to read from and is often better for asynchronous IO on the same
825 * file.
826 *
827 * The method should be considered _experimental_.
828 */
829static VALUE
830fiber_scheduler_io_pread(VALUE _argument) {
831 VALUE *arguments = (VALUE*)_argument;
832
833 return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
834}
835
836VALUE
837rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
838{
839 if (!rb_respond_to(scheduler, id_io_pread)) {
840 return RUBY_Qundef;
841 }
842
843 VALUE arguments[] = {
844 scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
845 };
846
847 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
848 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments);
849 } else {
850 return fiber_scheduler_io_pread((VALUE)&arguments);
851 }
852}
853
854/*
855 * Document-method: Fiber::Scheduler#io_write
856 * call-seq: io_write(io, buffer, length, offset) -> written length or -errno
857 *
858 * Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from
859 * from a specified +buffer+ (see IO::Buffer) at the given +offset+.
860 *
861 * The +length+ argument is the "minimum length to be written". If the IO
862 * buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB
863 * will be written, but at least 1KiB will be. Generally, the only case where
864 * less data than +length+ will be written is if there is an error writing the
865 * data.
866 *
867 * Specifying a +length+ of 0 is valid and means try writing at least once, as
868 * much data as possible.
869 *
870 * Suggested implementation should try to write to +io+ in a non-blocking
871 * manner and call #io_wait if the +io+ is not ready (which will yield control
872 * to other fibers).
873 *
874 * See IO::Buffer for an interface available to get data from buffer
875 * efficiently.
876 *
877 * Expected to return number of bytes written, or, in case of an error,
878 * <tt>-errno</tt> (negated number corresponding to system's error code).
879 *
880 * The method should be considered _experimental_.
881 */
882static VALUE
883fiber_scheduler_io_write(VALUE _argument) {
884 VALUE *arguments = (VALUE*)_argument;
885
886 return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
887}
888
889VALUE
890rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
891{
892 if (!rb_respond_to(scheduler, id_io_write)) {
893 return RUBY_Qundef;
894 }
895
896 VALUE arguments[] = {
897 scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
898 };
899
900 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
901 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments);
902 } else {
903 return fiber_scheduler_io_write((VALUE)&arguments);
904 }
905}
906
907/*
908 * Document-method: Fiber::Scheduler#io_pwrite
909 * call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno
910 *
911 * Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+
912 * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
913 * +offset+.
914 *
915 * This method is semantically the same as #io_write, but it allows to specify
916 * the offset to write to and is often better for asynchronous IO on the same
917 * file.
918 *
919 * The method should be considered _experimental_.
920 *
921 */
922static VALUE
923fiber_scheduler_io_pwrite(VALUE _argument) {
924 VALUE *arguments = (VALUE*)_argument;
925
926 return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
927}
928
929VALUE
930rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
931{
932 if (!rb_respond_to(scheduler, id_io_pwrite)) {
933 return RUBY_Qundef;
934 }
935
936 VALUE arguments[] = {
937 scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
938 };
939
940 if (rb_respond_to(scheduler, id_fiber_interrupt)) {
941 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments);
942 } else {
943 return fiber_scheduler_io_pwrite((VALUE)&arguments);
944 }
945}
946
947VALUE
948rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
949{
950 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
951
952 VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0);
953
954 rb_io_buffer_free_locked(buffer);
955
956 return result;
957}
958
959VALUE
960rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
961{
962 VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
963
964 VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0);
965
966 rb_io_buffer_free_locked(buffer);
967
968 return result;
969}
970
971VALUE
972rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
973{
974 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
975
976 VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0);
977
978 rb_io_buffer_free_locked(buffer);
979
980 return result;
981}
982
983VALUE
984rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
985{
986 VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
987
988 VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0);
989
990 rb_io_buffer_free_locked(buffer);
991
992 return result;
993}
994
995VALUE
997{
998 VALUE arguments[] = {io};
999
1000 return rb_check_funcall(scheduler, id_io_close, 1, arguments);
1001}
1002
1003/*
1004 * Document-method: Fiber::Scheduler#address_resolve
1005 * call-seq: address_resolve(hostname) -> array_of_strings or nil
1006 *
1007 * Invoked by any method that performs a non-reverse DNS lookup. The most
1008 * notable method is Addrinfo.getaddrinfo, but there are many other.
1009 *
1010 * The method is expected to return an array of strings corresponding to ip
1011 * addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved.
1012 *
1013 * Fairly exhaustive list of all possible call-sites:
1014 *
1015 * - Addrinfo.getaddrinfo
1016 * - Addrinfo.tcp
1017 * - Addrinfo.udp
1018 * - Addrinfo.ip
1019 * - Addrinfo.new
1020 * - Addrinfo.marshal_load
1021 * - SOCKSSocket.new
1022 * - TCPServer.new
1023 * - TCPSocket.new
1024 * - IPSocket.getaddress
1025 * - TCPSocket.gethostbyname
1026 * - UDPSocket#connect
1027 * - UDPSocket#bind
1028 * - UDPSocket#send
1029 * - Socket.getaddrinfo
1030 * - Socket.gethostbyname
1031 * - Socket.pack_sockaddr_in
1032 * - Socket.sockaddr_in
1033 * - Socket.unpack_sockaddr_in
1034 */
1035VALUE
1037{
1038 VALUE arguments[] = {
1039 hostname
1040 };
1041
1042 return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
1043}
1044
1045/*
1046 * Document-method: Fiber::Scheduler#blocking_operation_wait
1047 * call-seq: blocking_operation_wait(blocking_operation)
1048 *
1049 * Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
1050 * The blocking_operation is a Fiber::Scheduler::BlockingOperation that encapsulates the blocking operation.
1051 *
1052 * If the scheduler doesn't implement this method, or if the scheduler doesn't execute
1053 * the blocking operation, Ruby will fall back to the non-scheduler implementation.
1054 *
1055 * Minimal suggested implementation is:
1056 *
1057 * def blocking_operation_wait(blocking_operation)
1058 * Thread.new { blocking_operation.call }.join
1059 * end
1060 */
1061VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
1062{
1063 // Check if scheduler supports blocking_operation_wait before creating the object
1064 if (!rb_respond_to(scheduler, id_blocking_operation_wait)) {
1065 return Qundef;
1066 }
1067
1068 // Create a new BlockingOperation with the blocking operation
1069 VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state);
1070
1071 VALUE result = rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation);
1072
1073 // Get the operation data to check if it was executed
1074 rb_fiber_scheduler_blocking_operation_t *operation = get_blocking_operation(blocking_operation);
1075 rb_atomic_t current_status = RUBY_ATOMIC_LOAD(operation->status);
1076
1077 // Invalidate the operation now that we're done with it
1078 operation->function = NULL;
1079 operation->state = NULL;
1080 operation->data = NULL;
1081 operation->data2 = NULL;
1082 operation->unblock_function = NULL;
1083
1084 // If the blocking operation was never executed, return Qundef to signal the caller to use rb_nogvl instead
1085 if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
1086 return Qundef;
1087 }
1088
1089 return result;
1090}
1091
1093{
1094 VALUE arguments[] = {
1095 fiber, exception
1096 };
1097
1098 VALUE result;
1099 enum ruby_tag_type state;
1100
1101 // We must prevent interrupts while invoking the fiber_interrupt method, because otherwise fibers can be left permanently blocked if an interrupt occurs during the execution of user code. See also `rb_fiber_scheduler_unblock`.
1102 rb_execution_context_t *ec = GET_EC();
1103 int saved_interrupt_mask = ec->interrupt_mask;
1104 ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
1105
1106 EC_PUSH_TAG(ec);
1107 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1108 result = rb_check_funcall(scheduler, id_fiber_interrupt, 2, arguments);
1109 }
1110 EC_POP_TAG();
1111
1112 ec->interrupt_mask = saved_interrupt_mask;
1113
1114 if (state) {
1115 EC_JUMP_TAG(ec, state);
1116 }
1117
1118 RUBY_VM_CHECK_INTS(ec);
1119
1120 return result;
1121}
1122
1123/*
1124 * Document-method: Fiber::Scheduler#fiber
1125 * call-seq: fiber(&block)
1126 *
1127 * Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately
1128 * run the given block of code in a separate non-blocking fiber, and to return that Fiber.
1129 *
1130 * Minimal suggested implementation is:
1131 *
1132 * def fiber(&block)
1133 * fiber = Fiber.new(blocking: false, &block)
1134 * fiber.resume
1135 * fiber
1136 * end
1137 */
1138VALUE
1139rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
1140{
1141 return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
1142}
1143
1144/*
1145 * C API: Cancel a blocking operation
1146 *
1147 * This function cancels a blocking operation. If the operation is queued,
1148 * it just marks it as cancelled. If it's executing, it marks it as cancelled
1149 * and calls the unblock function to interrupt the operation.
1150 *
1151 * Returns 1 if unblock function was called, 0 if just marked cancelled, -1 on error.
1152 */
1153int
1155{
1156 if (blocking_operation == NULL) {
1157 return -1;
1158 }
1159
1160 rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status);
1161
1162 switch (current_state) {
1163 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED:
1164 // Work hasn't started - just mark as cancelled:
1165 if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) {
1166 // Successfully cancelled before execution:
1167 return 0;
1168 }
1169 // Fall through if state changed between load and CAS
1170
1171 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING:
1172 // Work is running - mark cancelled AND call unblock function
1173 if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) != current_state) {
1174 // State changed between load and CAS - operation may have completed:
1175 return 0;
1176 }
1177 // Otherwise, we successfully marked it as cancelled, so we can call the unblock function:
1178 rb_unblock_function_t *unblock_function = blocking_operation->unblock_function;
1179 if (unblock_function) {
1180 RUBY_ASSERT(unblock_function != (rb_unblock_function_t *)-1 && "unblock_function is still sentinel value -1, should have been resolved earlier");
1181 blocking_operation->unblock_function(blocking_operation->data2);
1182 }
1183 // Cancelled during execution (unblock function called):
1184 return 1;
1185
1186 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED:
1187 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED:
1188 // Already finished or cancelled:
1189 return 0;
1190 }
1191
1192 return 0;
1193}
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition assert.h:219
#define RUBY_ATOMIC_CAS(var, oldval, newval)
Atomic compare-and-swap.
Definition atomic.h:165
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
Definition atomic.h:69
#define RUBY_ATOMIC_LOAD(var)
Atomic load.
Definition atomic.h:175
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
VALUE rb_class_new(VALUE super)
Creates a new, anonymous class.
Definition class.c:864
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition class.c:1520
#define Qundef
Old name of RUBY_Qundef.
#define SIZET2NUM
Old name of RB_SIZE2NUM.
Definition size_t.h:62
#define Qnil
Old name of RUBY_Qnil.
VALUE rb_eRuntimeError
RuntimeError exception.
Definition error.c:1428
void rb_warn(const char *fmt,...)
Identical to rb_warning(), except it reports unless $VERBOSE is nil.
Definition error.c:466
VALUE rb_funcall(VALUE recv, ID mid, int n,...)
Calls a method.
Definition vm_eval.c:1117
VALUE rb_funcall_passing_block_kw(VALUE recv, ID mid, int argc, const VALUE *argv, int kw_splat)
Identical to rb_funcallv_passing_block(), except you can specify how to handle the last element of th...
Definition vm_eval.c:1187
void rb_unblock_function_t(void *)
This is the type of UBFs.
Definition thread.h:336
int rb_respond_to(VALUE obj, ID mid)
Queries if the object responds to the method.
Definition vm_method.c:3359
VALUE rb_check_funcall(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcallv(), except it returns RUBY_Qundef instead of raising rb_eNoMethodError.
Definition vm_eval.c:686
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
static ID rb_intern_const(const char *str)
This is a "tiny optimisation" over rb_intern().
Definition symbol.h:284
VALUE rb_io_timeout(VALUE io)
Get the timeout associated with the specified io object.
Definition io.c:857
@ RUBY_IO_READABLE
IO::READABLE
Definition io.h:97
@ RUBY_IO_WRITABLE
IO::WRITABLE
Definition io.h:98
void * rb_nogvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2, int flags)
Identical to rb_thread_call_without_gvl(), except it additionally takes "flags" that change the behav...
Definition thread.c:1580
#define RB_UINT2NUM
Just another name of rb_uint2num_inline.
Definition int.h:39
#define RB_INT2NUM
Just another name of rb_int2num_inline.
Definition int.h:37
VALUE rb_ensure(type *q, VALUE w, type *e, VALUE r)
An equivalent of ensure clause.
#define OFFT2NUM
Converts a C's off_t into an instance of rb_cInteger.
Definition off_t.h:33
#define PIDT2NUM
Converts a C's pid_t into an instance of rb_cInteger.
Definition pid_t.h:28
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
Definition rtypeddata.h:521
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
Definition rtypeddata.h:503
#define errno
Ractor-aware version of errno.
Definition ruby.h:388
Scheduler APIs.
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void *(*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
Defer the execution of the passed function to the scheduler.
Definition scheduler.c:1061
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition scheduler.c:464
VALUE rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
Non-blocking pread from the passed IO using a native buffer.
Definition scheduler.c:972
VALUE rb_fiber_scheduler_make_timeout(struct timeval *timeout)
Converts the passed timeout to an expression that rb_fiber_scheduler_block() etc.
Definition scheduler.c:507
VALUE rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for reading.
Definition scheduler.c:724
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
Non-blocking read from the passed IO using a native buffer.
Definition scheduler.c:948
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO at the specified offset.
Definition scheduler.c:930
VALUE rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE *argv)
Identical to rb_fiber_scheduler_kernel_sleep(), except it can pass multiple arguments.
Definition scheduler.c:534
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
Interrupt a fiber by raising an exception.
Definition scheduler.c:1092
VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
Non-blocking version of rb_io_wait().
Definition scheduler.c:710
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
Non-blocking version of IO.select.
Definition scheduler.c:745
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO.
Definition scheduler.c:798
int rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
Cancel a blocking operation.
Definition scheduler.c:1154
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
Non-blocking version of IO.select, argv variant.
Definition scheduler.c:754
VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
Non-blocking waitpid.
Definition scheduler.c:604
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition scheduler.c:628
int rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
Execute blocking operation from handle (GVL not required).
Definition scheduler.c:204
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO at the specified offset.
Definition scheduler.c:837
VALUE rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
Non-blocking pwrite to the passed IO using a native buffer.
Definition scheduler.c:984
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO.
Definition scheduler.c:890
VALUE rb_fiber_scheduler_close(VALUE scheduler)
Closes the passed scheduler object.
Definition scheduler.c:485
rb_fiber_scheduler_blocking_operation_t * rb_fiber_scheduler_blocking_operation_extract(VALUE self)
Extract the blocking operation handle from a BlockingOperationRuby object.
Definition scheduler.c:189
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
Identical to rb_fiber_scheduler_current(), except it queries for that of the passed thread instead of...
Definition scheduler.c:469
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
Definition scheduler.c:528
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
Non-blocking DNS lookup.
Definition scheduler.c:1036
VALUE rb_fiber_scheduler_set(VALUE scheduler)
Destructively assigns the passed scheduler to that of the current thread that is calling this functio...
Definition scheduler.c:425
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
Non-blocking write to the passed IO using a native buffer.
Definition scheduler.c:960
VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for writing.
Definition scheduler.c:730
VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
Non-blocking close the given IO.
Definition scheduler.c:996
VALUE rb_fiber_scheduler_get(void)
Queries the current scheduler of the current thread that is calling this function.
Definition scheduler.c:375
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition scheduler.c:647
VALUE rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
Create and schedule a non-blocking fiber.
Definition scheduler.c:1139
@ RUBY_Qundef
Represents so-called undef.
This is the struct that holds necessary info for a struct.
Definition rtypeddata.h:202
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition value.h:40