19#include "internal/thread.h"
22static ID id_scheduler_close;
27static ID id_timeout_after;
28static ID id_kernel_sleep;
29static ID id_process_wait;
31static ID id_io_read, id_io_pread;
32static ID id_io_write, id_io_pwrite;
34static ID id_io_select;
37static ID id_address_resolve;
39static ID id_blocking_operation_wait;
41static ID id_fiber_schedule;
95Init_Fiber_Scheduler(
void)
118 id_blocking_operation_wait =
rb_intern_const(
"blocking_operation_wait");
125 rb_define_method(rb_cFiberScheduler,
"process_wait", rb_fiber_scheduler_process_wait, 2);
126 rb_define_method(rb_cFiberScheduler,
"io_wait", rb_fiber_scheduler_io_wait, 3);
127 rb_define_method(rb_cFiberScheduler,
"io_read", rb_fiber_scheduler_io_read, 4);
128 rb_define_method(rb_cFiberScheduler,
"io_write", rb_fiber_scheduler_io_write, 4);
129 rb_define_method(rb_cFiberScheduler,
"io_pread", rb_fiber_scheduler_io_pread, 5);
130 rb_define_method(rb_cFiberScheduler,
"io_pwrite", rb_fiber_scheduler_io_pwrite, 5);
131 rb_define_method(rb_cFiberScheduler,
"io_select", rb_fiber_scheduler_io_select, 4);
132 rb_define_method(rb_cFiberScheduler,
"kernel_sleep", rb_fiber_scheduler_kernel_sleep, 1);
133 rb_define_method(rb_cFiberScheduler,
"address_resolve", rb_fiber_scheduler_address_resolve, 1);
134 rb_define_method(rb_cFiberScheduler,
"timeout_after", rb_fiber_scheduler_timeout_after, 3);
136 rb_define_method(rb_cFiberScheduler,
"unblock", rb_fiber_scheduler_unblock, 2);
138 rb_define_method(rb_cFiberScheduler,
"blocking_operation_wait", rb_fiber_scheduler_blocking_operation_wait, -2);
150 return thread->scheduler;
154verify_interface(
VALUE scheduler)
157 rb_raise(rb_eArgError,
"Scheduler must implement #block");
161 rb_raise(rb_eArgError,
"Scheduler must implement #unblock");
165 rb_raise(rb_eArgError,
"Scheduler must implement #kernel_sleep");
169 rb_raise(rb_eArgError,
"Scheduler must implement #io_wait");
174fiber_scheduler_close(
VALUE scheduler)
180fiber_scheduler_close_ensure(
VALUE _thread)
183 thread->scheduler =
Qnil;
196 if (scheduler !=
Qnil) {
197 verify_interface(scheduler);
204 if (thread->scheduler !=
Qnil) {
206 rb_ensure(fiber_scheduler_close, thread->scheduler, fiber_scheduler_close_ensure, (
VALUE)thread);
209 thread->scheduler = scheduler;
211 return thread->scheduler;
215rb_fiber_scheduler_current_for_threadptr(
rb_thread_t *thread)
219 if (thread->blocking == 0) {
220 return thread->scheduler;
230 return rb_fiber_scheduler_current_for_threadptr(GET_THREAD());
235 return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
262 if (!UNDEF_P(result))
return result;
265 if (!UNDEF_P(result))
return result;
274 return rb_float_new((
double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
294 return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
300 return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
335 VALUE arguments[] = {
336 timeout, exception, message
343rb_fiber_scheduler_timeout_afterv(
VALUE scheduler,
int argc,
VALUE * argv)
370 VALUE arguments[] = {
394 return rb_funcall(scheduler, id_block, 2, blocker, timeout);
417 int saved_errno =
errno;
448 return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
475 VALUE arguments[] = {
476 readables, writables, exceptables, timeout
521 VALUE arguments[] = {
545 VALUE arguments[] = {
583 VALUE arguments[] = {
608 VALUE arguments[] = {
618 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
622 rb_io_buffer_free_locked(buffer);
630 VALUE buffer = rb_io_buffer_new((
void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
634 rb_io_buffer_free_locked(buffer);
642 VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
646 rb_io_buffer_free_locked(buffer);
654 VALUE buffer = rb_io_buffer_new((
void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
658 rb_io_buffer_free_locked(buffer);
666 VALUE arguments[] = {io};
706 VALUE arguments[] = {
714 void *(*function)(
void *);
728 if (arguments->state == NULL) {
732 arguments->state->result =
rb_nogvl(arguments->function, arguments->data, arguments->unblock_function, arguments->data2, arguments->flags);
733 arguments->state->saved_errno = rb_errno();
736 arguments->state = NULL;
756 .function = function,
758 .unblock_function = unblock_function,
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
#define rb_define_method(klass, mid, func, arity)
Defines klass#mid.
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
#define SIZET2NUM
Old name of RB_SIZE2NUM.
#define Qnil
Old name of RUBY_Qnil.
VALUE rb_eRuntimeError
RuntimeError exception.
VALUE rb_funcall(VALUE recv, ID mid, int n,...)
Calls a method.
VALUE rb_funcall_passing_block_kw(VALUE recv, ID mid, int argc, const VALUE *argv, int kw_splat)
Identical to rb_funcallv_passing_block(), except you can specify how to handle the last element of th...
void rb_unblock_function_t(void *)
This is the type of UBFs.
int rb_respond_to(VALUE obj, ID mid)
Queries if the object responds to the method.
VALUE rb_check_funcall(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcallv(), except it returns RUBY_Qundef instead of raising rb_eNoMethodError.
static ID rb_intern_const(const char *str)
This is a "tiny optimisation" over rb_intern().
VALUE rb_io_timeout(VALUE io)
Get the timeout associated with the specified io object.
@ RUBY_IO_READABLE
IO::READABLE
@ RUBY_IO_WRITABLE
IO::WRITABLE
void * rb_nogvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2, int flags)
Identical to rb_thread_call_without_gvl(), except it additionally takes "flags" that change the behav...
#define RB_UINT2NUM
Just another name of rb_uint2num_inline.
#define RB_INT2NUM
Just another name of rb_int2num_inline.
#define RB_BLOCK_CALL_FUNC_ARGLIST(yielded_arg, callback_arg)
Shim for block function parameters.
VALUE rb_proc_new(type *q, VALUE w)
Creates a rb_cProc instance.
VALUE rb_ensure(type *q, VALUE w, type *e, VALUE r)
An equivalent of ensure clause.
#define OFFT2NUM
Converts a C's off_t into an instance of rb_cInteger.
#define PIDT2NUM
Converts a C's pid_t into an instance of rb_cInteger.
#define errno
Ractor-aware version of errno.
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void *(*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
Defer the execution of the passed function to the scheduler.
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
VALUE rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
Non-blocking pread from the passed IO using a native buffer.
VALUE rb_fiber_scheduler_make_timeout(struct timeval *timeout)
Converts the passed timeout to an expression that rb_fiber_scheduler_block() etc.
VALUE rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for reading.
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
Non-blocking read from the passed IO using a native buffer.
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO at the specified offset.
VALUE rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE *argv)
Identical to rb_fiber_scheduler_kernel_sleep(), except it can pass multiple arguments.
VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
Non-blocking version of rb_io_wait().
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
Non-blocking version of IO.select.
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO.
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
Non-blocking version of IO.select, argv variant.
VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
Non-blocking waitpid.
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO at the specified offset.
VALUE rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
Non-blocking pwrite to the passed IO using a native buffer.
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO.
VALUE rb_fiber_scheduler_close(VALUE scheduler)
Closes the passed scheduler object.
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
Identical to rb_fiber_scheduler_current(), except it queries for that of the passed thread instead of...
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
Non-blocking DNS lookup.
VALUE rb_fiber_scheduler_set(VALUE scheduler)
Destructively assigns the passed scheduler to that of the current thread that is calling this functio...
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
Non-blocking write to the passed IO using a native buffer.
VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for writing.
VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
Non-blocking close the given IO.
VALUE rb_fiber_scheduler_get(void)
Queries the current scheduler of the current thread that is calling this function.
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
VALUE rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
Create and schedule a non-blocking fiber.
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
uintptr_t VALUE
Type that represents a Ruby object.