12#include "eval_intern.h"
20#include "internal/thread.h"
23#include "ruby_atomic.h"
26static ID id_scheduler_close;
33static ID id_timeout_after;
34static ID id_kernel_sleep;
35static ID id_process_wait;
37static ID id_io_read, id_io_pread;
38static ID id_io_write, id_io_pwrite;
40static ID id_io_select;
43static ID id_address_resolve;
45static ID id_blocking_operation_wait;
46static ID id_fiber_interrupt;
48static ID id_fiber_schedule;
51static VALUE rb_cFiberSchedulerBlockingOperation;
60 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED,
61 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING,
62 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED,
63 RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED
64} rb_fiber_blocking_operation_status_t;
67 void *(*function)(
void *);
81blocking_operation_mark(
void *ptr)
87blocking_operation_free(
void *ptr)
90 ruby_xfree(blocking_operation);
94blocking_operation_memsize(
const void *ptr)
100 "Fiber::Scheduler::BlockingOperation",
102 blocking_operation_mark,
103 blocking_operation_free,
104 blocking_operation_memsize,
106 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED
113blocking_operation_alloc(
VALUE klass)
118 blocking_operation->function = NULL;
119 blocking_operation->data = NULL;
120 blocking_operation->unblock_function = NULL;
121 blocking_operation->data2 = NULL;
122 blocking_operation->flags = 0;
123 blocking_operation->state = NULL;
124 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
133get_blocking_operation(
VALUE obj)
137 return blocking_operation;
149blocking_operation_call(
VALUE self)
153 if (blocking_operation->status != RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
154 rb_raise(
rb_eRuntimeError,
"Blocking operation has already been executed!");
157 if (blocking_operation->function == NULL) {
158 rb_raise(
rb_eRuntimeError,
"Blocking operation has no function to execute!");
161 if (blocking_operation->state == NULL) {
166 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
169 blocking_operation->state->result =
rb_nogvl(blocking_operation->function, blocking_operation->data,
170 blocking_operation->unblock_function, blocking_operation->data2,
171 blocking_operation->flags);
172 blocking_operation->state->saved_errno = rb_errno();
175 blocking_operation->status = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED;
193 return get_blocking_operation(self);
208 if (blocking_operation == NULL) {
212 if (blocking_operation->function == NULL || blocking_operation->state == NULL) {
217 rb_thread_resolve_unblock_function(&blocking_operation->unblock_function, &blocking_operation->data2, GET_THREAD());
220 rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
221 if (
RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) {
227 blocking_operation->state->result = blocking_operation->function(blocking_operation->data);
228 blocking_operation->state->saved_errno =
errno;
231 expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING;
232 if (
RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED) == expected) {
237 blocking_operation->state->saved_errno = EINTR;
249rb_fiber_scheduler_blocking_operation_new(
void *(*function)(
void *),
void *data,
253 VALUE self = blocking_operation_alloc(rb_cFiberSchedulerBlockingOperation);
256 blocking_operation->function = function;
257 blocking_operation->data = data;
258 blocking_operation->unblock_function = unblock_function;
259 blocking_operation->data2 = data2;
260 blocking_operation->flags = flags;
261 blocking_operation->state = state;
319Init_Fiber_Scheduler(
void)
343 id_blocking_operation_wait =
rb_intern_const(
"blocking_operation_wait");
350 rb_cFiberSchedulerBlockingOperation =
rb_class_new(rb_cObject);
352 rb_define_method(rb_cFiberSchedulerBlockingOperation,
"call", blocking_operation_call, 0);
355 rb_gc_register_mark_object(rb_cFiberSchedulerBlockingOperation);
369 rb_define_method(rb_cFiberScheduler,
"timeout_after", rb_fiber_scheduler_timeout_after, 3);
385 return thread->scheduler;
389verify_interface(
VALUE scheduler)
392 rb_raise(rb_eArgError,
"Scheduler must implement #block");
396 rb_raise(rb_eArgError,
"Scheduler must implement #unblock");
400 rb_raise(rb_eArgError,
"Scheduler must implement #kernel_sleep");
404 rb_raise(rb_eArgError,
"Scheduler must implement #io_wait");
408 rb_warn(
"Scheduler should implement #fiber_interrupt");
413fiber_scheduler_close(
VALUE scheduler)
419fiber_scheduler_close_ensure(
VALUE _thread)
422 thread->scheduler =
Qnil;
435 if (scheduler !=
Qnil) {
436 verify_interface(scheduler);
443 if (thread->scheduler !=
Qnil) {
445 rb_ensure(fiber_scheduler_close, thread->scheduler, fiber_scheduler_close_ensure, (
VALUE)thread);
448 thread->scheduler = scheduler;
450 return thread->scheduler;
454fiber_scheduler_current_for_threadptr(
rb_thread_t *thread)
458 if (thread->blocking == 0) {
459 return thread->scheduler;
470 return fiber_scheduler_current_for_threadptr(GET_THREAD());
476 return fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
481 return fiber_scheduler_current_for_threadptr(thread);
508 if (!UNDEF_P(result))
return result;
511 if (!UNDEF_P(result))
return result;
520 return rb_float_new((
double)timeout->tv_sec + (0.000001 * timeout->tv_usec));
540 return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
546 return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
560 if (!UNDEF_P(result))
return result;
598 VALUE arguments[] = {
599 timeout, exception, message
606rb_fiber_scheduler_timeout_afterv(
VALUE scheduler,
int argc,
VALUE * argv)
633 VALUE arguments[] = {
657 return rb_funcall(scheduler, id_block, 2, blocker, timeout);
679 enum ruby_tag_type state;
684 int saved_errno =
errno;
688 int saved_interrupt_mask = ec->interrupt_mask;
689 ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
692 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
693 result =
rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
697 ec->interrupt_mask = saved_interrupt_mask;
700 EC_JUMP_TAG(ec, state);
703 RUBY_VM_CHECK_INTS(ec);
730fiber_scheduler_io_wait(
VALUE _argument) {
733 return rb_funcallv(arguments[0], id_io_wait, 3, arguments + 1);
739 VALUE arguments[] = {
740 scheduler, io, events, timeout
744 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (
VALUE)&arguments);
746 return fiber_scheduler_io_wait((
VALUE)&arguments);
774 VALUE arguments[] = {
775 readables, writables, exceptables, timeout
818fiber_scheduler_io_read(
VALUE _argument) {
821 return rb_funcallv(arguments[0], id_io_read, 4, arguments + 1);
831 VALUE arguments[] = {
836 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (
VALUE)&arguments);
838 return fiber_scheduler_io_read((
VALUE)&arguments);
857fiber_scheduler_io_pread(
VALUE _argument) {
860 return rb_funcallv(arguments[0], id_io_pread, 5, arguments + 1);
870 VALUE arguments[] = {
875 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (
VALUE)&arguments);
877 return fiber_scheduler_io_pread((
VALUE)&arguments);
910fiber_scheduler_io_write(
VALUE _argument) {
913 return rb_funcallv(arguments[0], id_io_write, 4, arguments + 1);
923 VALUE arguments[] = {
928 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (
VALUE)&arguments);
930 return fiber_scheduler_io_write((
VALUE)&arguments);
950fiber_scheduler_io_pwrite(
VALUE _argument) {
953 return rb_funcallv(arguments[0], id_io_pwrite, 5, arguments + 1);
965 VALUE arguments[] = {
970 return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (
VALUE)&arguments);
972 return fiber_scheduler_io_pwrite((
VALUE)&arguments);
979 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
983 rb_io_buffer_free_locked(buffer);
991 VALUE buffer = rb_io_buffer_new((
void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
995 rb_io_buffer_free_locked(buffer);
1003 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
1007 rb_io_buffer_free_locked(buffer);
1015 VALUE buffer = rb_io_buffer_new((
void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
1019 rb_io_buffer_free_locked(buffer);
1027 VALUE arguments[] = {io};
1067 VALUE arguments[] = {
1093 if (!
rb_respond_to(scheduler, id_blocking_operation_wait)) {
1098 VALUE blocking_operation = rb_fiber_scheduler_blocking_operation_new(function, data, unblock_function, data2, flags, state);
1100 VALUE result =
rb_funcall(scheduler, id_blocking_operation_wait, 1, blocking_operation);
1107 operation->function = NULL;
1108 operation->state = NULL;
1109 operation->data = NULL;
1110 operation->data2 = NULL;
1111 operation->unblock_function = NULL;
1114 if (current_status == RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED) {
1123 VALUE arguments[] = {
1128 enum ruby_tag_type state;
1132 int saved_interrupt_mask = ec->interrupt_mask;
1133 ec->interrupt_mask |= PENDING_INTERRUPT_MASK;
1136 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1141 ec->interrupt_mask = saved_interrupt_mask;
1144 EC_JUMP_TAG(ec, state);
1147 RUBY_VM_CHECK_INTS(ec);
1185 if (blocking_operation == NULL) {
1191 switch (current_state) {
1192 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED:
1194 if (
RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) {
1200 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING:
1202 if (
RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) != current_state) {
1208 if (unblock_function) {
1210 blocking_operation->unblock_function(blocking_operation->data2);
1215 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED:
1216 case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED:
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
#define RUBY_ATOMIC_CAS(var, oldval, newval)
Atomic compare-and-swap.
std::atomic< unsigned > rb_atomic_t
Type that is eligible for atomic operations.
#define RUBY_ATOMIC_LOAD(var)
Atomic load.
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
VALUE rb_class_new(VALUE super)
Creates a new, anonymous class.
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
#define Qundef
Old name of RUBY_Qundef.
#define SIZET2NUM
Old name of RB_SIZE2NUM.
#define Qnil
Old name of RUBY_Qnil.
VALUE rb_eRuntimeError
RuntimeError exception.
void rb_warn(const char *fmt,...)
Identical to rb_warning(), except it reports unless $VERBOSE is nil.
VALUE rb_funcall(VALUE recv, ID mid, int n,...)
Calls a method.
VALUE rb_funcall_passing_block_kw(VALUE recv, ID mid, int argc, const VALUE *argv, int kw_splat)
Identical to rb_funcallv_passing_block(), except you can specify how to handle the last element of th...
void rb_unblock_function_t(void *)
This is the type of UBFs.
int rb_respond_to(VALUE obj, ID mid)
Queries if the object responds to the method.
VALUE rb_check_funcall(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcallv(), except it returns RUBY_Qundef instead of raising rb_eNoMethodError.
void rb_define_alloc_func(VALUE klass, rb_alloc_func_t func)
Sets the allocator function of a class.
static ID rb_intern_const(const char *str)
This is a "tiny optimisation" over rb_intern().
VALUE rb_io_timeout(VALUE io)
Get the timeout associated with the specified io object.
@ RUBY_IO_READABLE
IO::READABLE
@ RUBY_IO_WRITABLE
IO::WRITABLE
void * rb_nogvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2, int flags)
Identical to rb_thread_call_without_gvl(), except it additionally takes "flags" that change the behav...
#define RB_UINT2NUM
Just another name of rb_uint2num_inline.
#define RB_INT2NUM
Just another name of rb_int2num_inline.
VALUE rb_ensure(type *q, VALUE w, type *e, VALUE r)
An equivalent of ensure clause.
#define OFFT2NUM
Converts a C's off_t into an instance of rb_cInteger.
#define PIDT2NUM
Converts a C's pid_t into an instance of rb_cInteger.
#define TypedData_Get_Struct(obj, type, data_type, sval)
Obtains a C struct from inside of a wrapper Ruby object.
#define TypedData_Make_Struct(klass, type, data_type, sval)
Identical to TypedData_Wrap_Struct, except it allocates a new data region internally instead of takin...
#define errno
Ractor-aware version of errno.
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void *(*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
Defer the execution of the passed function to the scheduler.
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
VALUE rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
Non-blocking pread from the passed IO using a native buffer.
VALUE rb_fiber_scheduler_make_timeout(struct timeval *timeout)
Converts the passed timeout to an expression that rb_fiber_scheduler_block() etc.
VALUE rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for reading.
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
Non-blocking read from the passed IO using a native buffer.
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO at the specified offset.
VALUE rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE *argv)
Identical to rb_fiber_scheduler_kernel_sleep(), except it can pass multiple arguments.
VALUE rb_fiber_scheduler_fiber_interrupt(VALUE scheduler, VALUE fiber, VALUE exception)
Interrupt a fiber by raising an exception.
VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
Non-blocking version of rb_io_wait().
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
Non-blocking version of IO.select.
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO.
int rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
Cancel a blocking operation.
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
Non-blocking version of IO.select, argv variant.
VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
Non-blocking waitpid.
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
int rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operation_t *blocking_operation)
Execute blocking operation from handle (GVL not required).
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO at the specified offset.
VALUE rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
Non-blocking pwrite to the passed IO using a native buffer.
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO.
VALUE rb_fiber_scheduler_close(VALUE scheduler)
Closes the passed scheduler object.
rb_fiber_scheduler_blocking_operation_t * rb_fiber_scheduler_blocking_operation_extract(VALUE self)
Extract the blocking operation handle from a BlockingOperationRuby object.
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
Identical to rb_fiber_scheduler_current(), except it queries for that of the passed thread value inst...
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
Non-blocking DNS lookup.
VALUE rb_fiber_scheduler_yield(VALUE scheduler)
Yield to the scheduler, to be resumed on the next scheduling cycle.
VALUE rb_fiber_scheduler_set(VALUE scheduler)
Destructively assigns the passed scheduler to that of the current thread that is calling this functio...
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
Non-blocking write to the passed IO using a native buffer.
VALUE rb_fiber_scheduler_current_for_threadptr(struct rb_thread_struct *thread)
Identical to rb_fiber_scheduler_current_for_thread(), except it expects a threadptr instead of a thre...
VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for writing.
VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
Non-blocking close the given IO.
VALUE rb_fiber_scheduler_get(void)
Queries the current scheduler of the current thread that is calling this function.
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
VALUE rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
Create and schedule a non-blocking fiber.
@ RUBY_Qundef
Represents so-called undef.
This is the struct that holds necessary info for a struct.
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
uintptr_t VALUE
Type that represents a Ruby object.