Ruby  3.4.0dev (2024-12-06 revision 892c46283a5ea4179500d951c9d4866c0051f27b)
scheduler.c (892c46283a5ea4179500d951c9d4866c0051f27b)
1 /**********************************************************************
2 
3  scheduler.c
4 
5  $Author$
6 
7  Copyright (C) 2020 Samuel Grant Dawson Williams
8 
9 **********************************************************************/
10 
11 #include "vm_core.h"
12 #include "ruby/fiber/scheduler.h"
13 #include "ruby/io.h"
14 #include "ruby/io/buffer.h"
15 
16 #include "ruby/thread.h"
17 
18 // For `ruby_thread_has_gvl_p`.
19 #include "internal/thread.h"
20 
21 static ID id_close;
22 static ID id_scheduler_close;
23 
24 static ID id_block;
25 static ID id_unblock;
26 
27 static ID id_timeout_after;
28 static ID id_kernel_sleep;
29 static ID id_process_wait;
30 
31 static ID id_io_read, id_io_pread;
32 static ID id_io_write, id_io_pwrite;
33 static ID id_io_wait;
34 static ID id_io_select;
35 static ID id_io_close;
36 
37 static ID id_address_resolve;
38 
39 static ID id_blocking_operation_wait;
40 
41 static ID id_fiber_schedule;
42 
43 /*
44  * Document-class: Fiber::Scheduler
45  *
46  * This is not an existing class, but documentation of the interface that Scheduler
47  * object should comply to in order to be used as argument to Fiber.scheduler and handle non-blocking
48  * fibers. See also the "Non-blocking fibers" section in Fiber class docs for explanations
49  * of some concepts.
50  *
51  * Scheduler's behavior and usage are expected to be as follows:
52  *
53  * * When the execution in the non-blocking Fiber reaches some blocking operation (like
54  * sleep, wait for a process, or a non-ready I/O), it calls some of the scheduler's
55  * hook methods, listed below.
56  * * Scheduler somehow registers what the current fiber is waiting on, and yields control
57  * to other fibers with Fiber.yield (so the fiber would be suspended while expecting its
58  * wait to end, and other fibers in the same thread can perform)
59  * * At the end of the current thread execution, the scheduler's method #scheduler_close is called
60  * * The scheduler runs into a wait loop, checking all the blocked fibers (which it has
61  * registered on hook calls) and resuming them when the awaited resource is ready
62  * (e.g. I/O ready or sleep time elapsed).
63  *
64  * This way concurrent execution will be achieved transparently for every
65  * individual Fiber's code.
66  *
67  * Scheduler implementations are provided by gems, like
68  * Async[https://github.com/socketry/async].
69  *
70  * Hook methods are:
71  *
72  * * #io_wait, #io_read, #io_write, #io_pread, #io_pwrite, and #io_select, #io_close
73  * * #process_wait
74  * * #kernel_sleep
75  * * #timeout_after
76  * * #address_resolve
77  * * #block and #unblock
78  * * (the list is expanded as Ruby developers make more methods having non-blocking calls)
79  *
80  * When not specified otherwise, the hook implementations are mandatory: if they are not
81  * implemented, the methods trying to call hook will fail. To provide backward compatibility,
82  * in the future hooks will be optional (if they are not implemented, due to the scheduler
83  * being created for the older Ruby version, the code which needs this hook will not fail,
84  * and will just behave in a blocking fashion).
85  *
86  * It is also strongly recommended that the scheduler implements the #fiber method, which is
87  * delegated to by Fiber.schedule.
88  *
89  * Sample _toy_ implementation of the scheduler can be found in Ruby's code, in
90  * <tt>test/fiber/scheduler.rb</tt>
91  *
92  */
93 void
94 Init_Fiber_Scheduler(void)
95 {
96  id_close = rb_intern_const("close");
97  id_scheduler_close = rb_intern_const("scheduler_close");
98 
99  id_block = rb_intern_const("block");
100  id_unblock = rb_intern_const("unblock");
101 
102  id_timeout_after = rb_intern_const("timeout_after");
103  id_kernel_sleep = rb_intern_const("kernel_sleep");
104  id_process_wait = rb_intern_const("process_wait");
105 
106  id_io_read = rb_intern_const("io_read");
107  id_io_pread = rb_intern_const("io_pread");
108  id_io_write = rb_intern_const("io_write");
109  id_io_pwrite = rb_intern_const("io_pwrite");
110 
111  id_io_wait = rb_intern_const("io_wait");
112  id_io_select = rb_intern_const("io_select");
113  id_io_close = rb_intern_const("io_close");
114 
115  id_address_resolve = rb_intern_const("address_resolve");
116 
117  id_blocking_operation_wait = rb_intern_const("blocking_operation_wait");
118 
119  id_fiber_schedule = rb_intern_const("fiber");
120 
121 #if 0 /* for RDoc */
122  rb_cFiberScheduler = rb_define_class_under(rb_cFiber, "Scheduler", rb_cObject);
123  rb_define_method(rb_cFiberScheduler, "close", rb_fiber_scheduler_close, 0);
124  rb_define_method(rb_cFiberScheduler, "process_wait", rb_fiber_scheduler_process_wait, 2);
125  rb_define_method(rb_cFiberScheduler, "io_wait", rb_fiber_scheduler_io_wait, 3);
126  rb_define_method(rb_cFiberScheduler, "io_read", rb_fiber_scheduler_io_read, 4);
127  rb_define_method(rb_cFiberScheduler, "io_write", rb_fiber_scheduler_io_write, 4);
128  rb_define_method(rb_cFiberScheduler, "io_pread", rb_fiber_scheduler_io_pread, 5);
129  rb_define_method(rb_cFiberScheduler, "io_pwrite", rb_fiber_scheduler_io_pwrite, 5);
130  rb_define_method(rb_cFiberScheduler, "io_select", rb_fiber_scheduler_io_select, 4);
131  rb_define_method(rb_cFiberScheduler, "kernel_sleep", rb_fiber_scheduler_kernel_sleep, 1);
132  rb_define_method(rb_cFiberScheduler, "address_resolve", rb_fiber_scheduler_address_resolve, 1);
133  rb_define_method(rb_cFiberScheduler, "timeout_after", rb_fiber_scheduler_timeout_after, 3);
134  rb_define_method(rb_cFiberScheduler, "block", rb_fiber_scheduler_block, 2);
135  rb_define_method(rb_cFiberScheduler, "unblock", rb_fiber_scheduler_unblock, 2);
136  rb_define_method(rb_cFiberScheduler, "fiber", rb_fiber_scheduler, -2);
137 #endif
138 }
139 
140 VALUE
142 {
143  RUBY_ASSERT(ruby_thread_has_gvl_p());
144 
145  rb_thread_t *thread = GET_THREAD();
146  RUBY_ASSERT(thread);
147 
148  return thread->scheduler;
149 }
150 
151 static void
152 verify_interface(VALUE scheduler)
153 {
154  if (!rb_respond_to(scheduler, id_block)) {
155  rb_raise(rb_eArgError, "Scheduler must implement #block");
156  }
157 
158  if (!rb_respond_to(scheduler, id_unblock)) {
159  rb_raise(rb_eArgError, "Scheduler must implement #unblock");
160  }
161 
162  if (!rb_respond_to(scheduler, id_kernel_sleep)) {
163  rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep");
164  }
165 
166  if (!rb_respond_to(scheduler, id_io_wait)) {
167  rb_raise(rb_eArgError, "Scheduler must implement #io_wait");
168  }
169 }
170 
171 static VALUE
172 fiber_scheduler_close(VALUE scheduler)
173 {
174  return rb_fiber_scheduler_close(scheduler);
175 }
176 
177 static VALUE
178 fiber_scheduler_close_ensure(VALUE _thread)
179 {
180  rb_thread_t *thread = (rb_thread_t*)_thread;
181  thread->scheduler = Qnil;
182 
183  return Qnil;
184 }
185 
186 VALUE
188 {
189  RUBY_ASSERT(ruby_thread_has_gvl_p());
190 
191  rb_thread_t *thread = GET_THREAD();
192  RUBY_ASSERT(thread);
193 
194  if (scheduler != Qnil) {
195  verify_interface(scheduler);
196  }
197 
198  // We invoke Scheduler#close when setting it to something else, to ensure
199  // the previous scheduler runs to completion before changing the scheduler.
200  // That way, we do not need to consider interactions, e.g., of a Fiber from
201  // the previous scheduler with the new scheduler.
202  if (thread->scheduler != Qnil) {
203  // rb_fiber_scheduler_close(thread->scheduler);
204  rb_ensure(fiber_scheduler_close, thread->scheduler, fiber_scheduler_close_ensure, (VALUE)thread);
205  }
206 
207  thread->scheduler = scheduler;
208 
209  return thread->scheduler;
210 }
211 
212 static VALUE
213 rb_fiber_scheduler_current_for_threadptr(rb_thread_t *thread)
214 {
215  RUBY_ASSERT(thread);
216 
217  if (thread->blocking == 0) {
218  return thread->scheduler;
219  }
220  else {
221  return Qnil;
222  }
223 }
224 
225 VALUE
227 {
228  return rb_fiber_scheduler_current_for_threadptr(GET_THREAD());
229 }
230 
232 {
233  return rb_fiber_scheduler_current_for_threadptr(rb_thread_ptr(thread));
234 }
235 
236 /*
237  *
238  * Document-method: Fiber::Scheduler#close
239  *
240  * Called when the current thread exits. The scheduler is expected to implement this
241  * method in order to allow all waiting fibers to finalize their execution.
242  *
243  * The suggested pattern is to implement the main event loop in the #close method.
244  *
245  */
246 VALUE
248 {
249  RUBY_ASSERT(ruby_thread_has_gvl_p());
250 
251  VALUE result;
252 
253  // The reason for calling `scheduler_close` before calling `close` is for
254  // legacy schedulers which implement `close` and expect the user to call
255  // it. Subsequently, that method would call `Fiber.set_scheduler(nil)`
256  // which should call `scheduler_close`. If it were to call `close`, it
257  // would create an infinite loop.
258 
259  result = rb_check_funcall(scheduler, id_scheduler_close, 0, NULL);
260  if (!UNDEF_P(result)) return result;
261 
262  result = rb_check_funcall(scheduler, id_close, 0, NULL);
263  if (!UNDEF_P(result)) return result;
264 
265  return Qnil;
266 }
267 
268 VALUE
270 {
271  if (timeout) {
272  return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
273  }
274 
275  return Qnil;
276 }
277 
278 /*
279  * Document-method: Fiber::Scheduler#kernel_sleep
280  * call-seq: kernel_sleep(duration = nil)
281  *
282  * Invoked by Kernel#sleep and Mutex#sleep and is expected to provide
283  * an implementation of sleeping in a non-blocking way. Implementation might
284  * register the current fiber in some list of "which fiber wait until what
285  * moment", call Fiber.yield to pass control, and then in #close resume
286  * the fibers whose wait period has elapsed.
287  *
288  */
289 VALUE
291 {
292  return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
293 }
294 
295 VALUE
296 rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
297 {
298  return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
299 }
300 
301 #if 0
302 /*
303  * Document-method: Fiber::Scheduler#timeout_after
304  * call-seq: timeout_after(duration, exception_class, *exception_arguments, &block) -> result of block
305  *
306  * Invoked by Timeout.timeout to execute the given +block+ within the given
307  * +duration+. It can also be invoked directly by the scheduler or user code.
308  *
309  * Attempt to limit the execution time of a given +block+ to the given
310  * +duration+ if possible. When a non-blocking operation causes the +block+'s
311  * execution time to exceed the specified +duration+, that non-blocking
312  * operation should be interrupted by raising the specified +exception_class+
313  * constructed with the given +exception_arguments+.
314  *
315  * General execution timeouts are often considered risky. This implementation
316  * will only interrupt non-blocking operations. This is by design because it's
317  * expected that non-blocking operations can fail for a variety of
318  * unpredictable reasons, so applications should already be robust in handling
319  * these conditions and by implication timeouts.
320  *
321  * However, as a result of this design, if the +block+ does not invoke any
322  * non-blocking operations, it will be impossible to interrupt it. If you
323  * desire to provide predictable points for timeouts, consider adding
324  * +sleep(0)+.
325  *
326  * If the block is executed successfully, its result will be returned.
327  *
328  * The exception will typically be raised using Fiber#raise.
329  */
330 VALUE
331 rb_fiber_scheduler_timeout_after(VALUE scheduler, VALUE timeout, VALUE exception, VALUE message)
332 {
333  VALUE arguments[] = {
334  timeout, exception, message
335  };
336 
337  return rb_check_funcall(scheduler, id_timeout_after, 3, arguments);
338 }
339 
340 VALUE
341 rb_fiber_scheduler_timeout_afterv(VALUE scheduler, int argc, VALUE * argv)
342 {
343  return rb_check_funcall(scheduler, id_timeout_after, argc, argv);
344 }
345 #endif
346 
347 /*
348  * Document-method: Fiber::Scheduler#process_wait
349  * call-seq: process_wait(pid, flags)
350  *
351  * Invoked by Process::Status.wait in order to wait for a specified process.
352  * See that method description for arguments description.
353  *
354  * Suggested minimal implementation:
355  *
356  * Thread.new do
357  * Process::Status.wait(pid, flags)
358  * end.value
359  *
360  * This hook is optional: if it is not present in the current scheduler,
361  * Process::Status.wait will behave as a blocking method.
362  *
363  * Expected to return a Process::Status instance.
364  */
365 VALUE
366 rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
367 {
368  VALUE arguments[] = {
369  PIDT2NUM(pid), RB_INT2NUM(flags)
370  };
371 
372  return rb_check_funcall(scheduler, id_process_wait, 2, arguments);
373 }
374 
375 /*
376  * Document-method: Fiber::Scheduler#block
377  * call-seq: block(blocker, timeout = nil)
378  *
379  * Invoked by methods like Thread.join, and by Mutex, to signify that current
380  * Fiber is blocked until further notice (e.g. #unblock) or until +timeout+ has
381  * elapsed.
382  *
383  * +blocker+ is what we are waiting on, informational only (for debugging and
384  * logging). There are no guarantee about its value.
385  *
386  * Expected to return boolean, specifying whether the blocking operation was
387  * successful or not.
388  */
389 VALUE
390 rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
391 {
392  return rb_funcall(scheduler, id_block, 2, blocker, timeout);
393 }
394 
395 /*
396  * Document-method: Fiber::Scheduler#unblock
397  * call-seq: unblock(blocker, fiber)
398  *
399  * Invoked to wake up Fiber previously blocked with #block (for example, Mutex#lock
400  * calls #block and Mutex#unlock calls #unblock). The scheduler should use
401  * the +fiber+ parameter to understand which fiber is unblocked.
402  *
403  * +blocker+ is what was awaited for, but it is informational only (for debugging
404  * and logging), and it is not guaranteed to be the same value as the +blocker+ for
405  * #block.
406  *
407  */
408 VALUE
409 rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
410 {
412 
413  return rb_funcall(scheduler, id_unblock, 2, blocker, fiber);
414 }
415 
416 /*
417  * Document-method: Fiber::Scheduler#io_wait
418  * call-seq: io_wait(io, events, timeout)
419  *
420  * Invoked by IO#wait, IO#wait_readable, IO#wait_writable to ask whether the
421  * specified descriptor is ready for specified events within
422  * the specified +timeout+.
423  *
424  * +events+ is a bit mask of <tt>IO::READABLE</tt>, <tt>IO::WRITABLE</tt>, and
425  * <tt>IO::PRIORITY</tt>.
426  *
427  * Suggested implementation should register which Fiber is waiting for which
428  * resources and immediately calling Fiber.yield to pass control to other
429  * fibers. Then, in the #close method, the scheduler might dispatch all the
430  * I/O resources to fibers waiting for it.
431  *
432  * Expected to return the subset of events that are ready immediately.
433  *
434  */
435 VALUE
436 rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
437 {
438  return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
439 }
440 
441 VALUE
443 {
445 }
446 
447 VALUE
449 {
451 }
452 
453 /*
454  * Document-method: Fiber::Scheduler#io_select
455  * call-seq: io_select(readables, writables, exceptables, timeout)
456  *
457  * Invoked by IO.select to ask whether the specified descriptors are ready for
458  * specified events within the specified +timeout+.
459  *
460  * Expected to return the 3-tuple of Array of IOs that are ready.
461  *
462  */
463 VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
464 {
465  VALUE arguments[] = {
466  readables, writables, exceptables, timeout
467  };
468 
469  return rb_fiber_scheduler_io_selectv(scheduler, 4, arguments);
470 }
471 
472 VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
473 {
474  // I wondered about extracting argv, and checking if there is only a single
475  // IO instance, and instead calling `io_wait`. However, it would require a
476  // decent amount of work and it would be hard to preserve the exact
477  // semantics of IO.select.
478 
479  return rb_check_funcall(scheduler, id_io_select, argc, argv);
480 }
481 
482 /*
483  * Document-method: Fiber::Scheduler#io_read
484  * call-seq: io_read(io, buffer, length, offset) -> read length or -errno
485  *
486  * Invoked by IO#read or IO#Buffer.read to read +length+ bytes from +io+ into a
487  * specified +buffer+ (see IO::Buffer) at the given +offset+.
488  *
489  * The +length+ argument is the "minimum length to be read". If the IO buffer
490  * size is 8KiB, but the +length+ is +1024+ (1KiB), up to 8KiB might be read,
491  * but at least 1KiB will be. Generally, the only case where less data than
492  * +length+ will be read is if there is an error reading the data.
493  *
494  * Specifying a +length+ of 0 is valid and means try reading at least once and
495  * return any available data.
496  *
497  * Suggested implementation should try to read from +io+ in a non-blocking
498  * manner and call #io_wait if the +io+ is not ready (which will yield control
499  * to other fibers).
500  *
501  * See IO::Buffer for an interface available to return data.
502  *
503  * Expected to return number of bytes read, or, in case of an error,
504  * <tt>-errno</tt> (negated number corresponding to system's error code).
505  *
506  * The method should be considered _experimental_.
507  */
508 VALUE
509 rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
510 {
511  VALUE arguments[] = {
512  io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
513  };
514 
515  return rb_check_funcall(scheduler, id_io_read, 4, arguments);
516 }
517 
518 /*
519  * Document-method: Fiber::Scheduler#io_read
520  * call-seq: io_pread(io, buffer, from, length, offset) -> read length or -errno
521  *
522  * Invoked by IO#pread or IO::Buffer#pread to read +length+ bytes from +io+
523  * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
524  * +offset+.
525  *
526  * This method is semantically the same as #io_read, but it allows to specify
527  * the offset to read from and is often better for asynchronous IO on the same
528  * file.
529  *
530  * The method should be considered _experimental_.
531  */
532 VALUE
533 rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
534 {
535  VALUE arguments[] = {
536  io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
537  };
538 
539  return rb_check_funcall(scheduler, id_io_pread, 5, arguments);
540 }
541 
542 /*
543  * Document-method: Scheduler#io_write
544  * call-seq: io_write(io, buffer, length, offset) -> written length or -errno
545  *
546  * Invoked by IO#write or IO::Buffer#write to write +length+ bytes to +io+ from
547  * from a specified +buffer+ (see IO::Buffer) at the given +offset+.
548  *
549  * The +length+ argument is the "minimum length to be written". If the IO
550  * buffer size is 8KiB, but the +length+ specified is 1024 (1KiB), at most 8KiB
551  * will be written, but at least 1KiB will be. Generally, the only case where
552  * less data than +length+ will be written is if there is an error writing the
553  * data.
554  *
555  * Specifying a +length+ of 0 is valid and means try writing at least once, as
556  * much data as possible.
557  *
558  * Suggested implementation should try to write to +io+ in a non-blocking
559  * manner and call #io_wait if the +io+ is not ready (which will yield control
560  * to other fibers).
561  *
562  * See IO::Buffer for an interface available to get data from buffer
563  * efficiently.
564  *
565  * Expected to return number of bytes written, or, in case of an error,
566  * <tt>-errno</tt> (negated number corresponding to system's error code).
567  *
568  * The method should be considered _experimental_.
569  */
570 VALUE
571 rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
572 {
573  VALUE arguments[] = {
574  io, buffer, SIZET2NUM(length), SIZET2NUM(offset)
575  };
576 
577  return rb_check_funcall(scheduler, id_io_write, 4, arguments);
578 }
579 
580 /*
581  * Document-method: Fiber::Scheduler#io_pwrite
582  * call-seq: io_pwrite(io, buffer, from, length, offset) -> written length or -errno
583  *
584  * Invoked by IO#pwrite or IO::Buffer#pwrite to write +length+ bytes to +io+
585  * at offset +from+ into a specified +buffer+ (see IO::Buffer) at the given
586  * +offset+.
587  *
588  * This method is semantically the same as #io_write, but it allows to specify
589  * the offset to write to and is often better for asynchronous IO on the same
590  * file.
591  *
592  * The method should be considered _experimental_.
593  *
594  */
595 VALUE
596 rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
597 {
598  VALUE arguments[] = {
599  io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset)
600  };
601 
602  return rb_check_funcall(scheduler, id_io_pwrite, 5, arguments);
603 }
604 
605 VALUE
606 rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
607 {
608  VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
609 
610  VALUE result = rb_fiber_scheduler_io_read(scheduler, io, buffer, length, 0);
611 
612  rb_io_buffer_free_locked(buffer);
613 
614  return result;
615 }
616 
617 VALUE
618 rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
619 {
620  VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
621 
622  VALUE result = rb_fiber_scheduler_io_write(scheduler, io, buffer, length, 0);
623 
624  rb_io_buffer_free_locked(buffer);
625 
626  return result;
627 }
628 
629 VALUE
630 rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
631 {
632  VALUE buffer = rb_io_buffer_new(base, size, RB_IO_BUFFER_LOCKED);
633 
634  VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, from, buffer, length, 0);
635 
636  rb_io_buffer_free_locked(buffer);
637 
638  return result;
639 }
640 
641 VALUE
642 rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
643 {
644  VALUE buffer = rb_io_buffer_new((void*)base, size, RB_IO_BUFFER_LOCKED|RB_IO_BUFFER_READONLY);
645 
646  VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, from, buffer, length, 0);
647 
648  rb_io_buffer_free_locked(buffer);
649 
650  return result;
651 }
652 
653 VALUE
655 {
656  VALUE arguments[] = {io};
657 
658  return rb_check_funcall(scheduler, id_io_close, 1, arguments);
659 }
660 
661 /*
662  * Document-method: Fiber::Scheduler#address_resolve
663  * call-seq: address_resolve(hostname) -> array_of_strings or nil
664  *
665  * Invoked by any method that performs a non-reverse DNS lookup. The most
666  * notable method is Addrinfo.getaddrinfo, but there are many other.
667  *
668  * The method is expected to return an array of strings corresponding to ip
669  * addresses the +hostname+ is resolved to, or +nil+ if it can not be resolved.
670  *
671  * Fairly exhaustive list of all possible call-sites:
672  *
673  * - Addrinfo.getaddrinfo
674  * - Addrinfo.tcp
675  * - Addrinfo.udp
676  * - Addrinfo.ip
677  * - Addrinfo.new
678  * - Addrinfo.marshal_load
679  * - SOCKSSocket.new
680  * - TCPServer.new
681  * - TCPSocket.new
682  * - IPSocket.getaddress
683  * - TCPSocket.gethostbyname
684  * - UDPSocket#connect
685  * - UDPSocket#bind
686  * - UDPSocket#send
687  * - Socket.getaddrinfo
688  * - Socket.gethostbyname
689  * - Socket.pack_sockaddr_in
690  * - Socket.sockaddr_in
691  * - Socket.unpack_sockaddr_in
692  */
693 VALUE
695 {
696  VALUE arguments[] = {
697  hostname
698  };
699 
700  return rb_check_funcall(scheduler, id_address_resolve, 1, arguments);
701 }
702 
704  void *(*function)(void *);
705  void *data;
706  rb_unblock_function_t *unblock_function;
707  void *data2;
708  int flags;
709 
711 };
712 
713 static VALUE
714 rb_fiber_scheduler_blocking_operation_wait_proc(RB_BLOCK_CALL_FUNC_ARGLIST(value, _arguments))
715 {
716  struct rb_blocking_operation_wait_arguments *arguments = (struct rb_blocking_operation_wait_arguments*)_arguments;
717 
718  if (arguments->state == NULL) {
719  rb_raise(rb_eRuntimeError, "Blocking function was already invoked!");
720  }
721 
722  arguments->state->result = rb_nogvl(arguments->function, arguments->data, arguments->unblock_function, arguments->data2, arguments->flags);
723  arguments->state->saved_errno = rb_errno();
724 
725  // Make sure it's only invoked once.
726  arguments->state = NULL;
727 
728  return Qnil;
729 }
730 
731 /*
732  * Document-method: Fiber::Scheduler#blocking_operation_wait
733  * call-seq: blocking_operation_wait(work)
734  *
735  * Invoked by Ruby's core methods to run a blocking operation in a non-blocking way.
736  *
737  * Minimal suggested implementation is:
738  *
739  * def blocking_operation_wait(work)
740  * Thread.new(&work).join
741  * end
742  */
743 VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void* (*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
744 {
745  struct rb_blocking_operation_wait_arguments arguments = {
746  .function = function,
747  .data = data,
748  .unblock_function = unblock_function,
749  .data2 = data2,
750  .flags = flags,
751  .state = state
752  };
753 
754  VALUE proc = rb_proc_new(rb_fiber_scheduler_blocking_operation_wait_proc, (VALUE)&arguments);
755 
756  return rb_check_funcall(scheduler, id_blocking_operation_wait, 1, &proc);
757 }
758 
759 /*
760  * Document-method: Fiber::Scheduler#fiber
761  * call-seq: fiber(&block)
762  *
763  * Implementation of the Fiber.schedule. The method is <em>expected</em> to immediately
764  * run the given block of code in a separate non-blocking fiber, and to return that Fiber.
765  *
766  * Minimal suggested implementation is:
767  *
768  * def fiber(&block)
769  * fiber = Fiber.new(blocking: false, &block)
770  * fiber.resume
771  * fiber
772  * end
773  */
774 VALUE
775 rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
776 {
777  return rb_funcall_passing_block_kw(scheduler, id_fiber_schedule, argc, argv, kw_splat);
778 }
#define RUBY_ASSERT(...)
Asserts that the given expression is truthy if and only if RUBY_DEBUG is truthy.
Definition: assert.h:219
VALUE rb_float_new(double d)
Converts a C's double into an instance of rb_cFloat.
Definition: numeric.c:6525
VALUE rb_define_class_under(VALUE outer, const char *name, VALUE super)
Defines a class under the namespace of outer.
Definition: class.c:1012
void rb_define_method(VALUE klass, const char *name, VALUE(*func)(ANYARGS), int argc)
Defines a method.
Definition: class.c:2142
#define SIZET2NUM
Old name of RB_SIZE2NUM.
Definition: size_t.h:62
#define Qnil
Old name of RUBY_Qnil.
void rb_raise(VALUE exc_class, const char *fmt,...)
Exception entry point.
Definition: error.c:3635
VALUE rb_eRuntimeError
RuntimeError exception.
Definition: error.c:1406
VALUE rb_eArgError
ArgumentError exception.
Definition: error.c:1409
VALUE rb_ensure(VALUE(*b_proc)(VALUE), VALUE data1, VALUE(*e_proc)(VALUE), VALUE data2)
An equivalent to ensure clause.
Definition: eval.c:1045
VALUE rb_funcall(VALUE recv, ID mid, int n,...)
Calls a method.
Definition: vm_eval.c:1099
VALUE rb_funcallv(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcall(), except it takes the method arguments as a C array.
Definition: vm_eval.c:1058
VALUE rb_funcall_passing_block_kw(VALUE recv, ID mid, int argc, const VALUE *argv, int kw_splat)
Identical to rb_funcallv_passing_block(), except you can specify how to handle the last element of th...
Definition: vm_eval.c:1169
VALUE rb_obj_is_fiber(VALUE obj)
Queries if an object is a fiber.
Definition: cont.c:1181
VALUE rb_proc_new(rb_block_call_func_t func, VALUE callback_arg)
This is an rb_iterate() + rb_block_proc() combo.
Definition: proc.c:3332
void rb_unblock_function_t(void *)
This is the type of UBFs.
Definition: thread.h:336
int rb_respond_to(VALUE obj, ID mid)
Queries if the object responds to the method.
Definition: vm_method.c:2960
VALUE rb_check_funcall(VALUE recv, ID mid, int argc, const VALUE *argv)
Identical to rb_funcallv(), except it returns RUBY_Qundef instead of raising rb_eNoMethodError.
Definition: vm_eval.c:668
static ID rb_intern_const(const char *str)
This is a "tiny optimisation" over rb_intern().
Definition: symbol.h:277
VALUE rb_io_timeout(VALUE io)
Get the timeout associated with the specified io object.
Definition: io.c:857
@ RUBY_IO_READABLE
IO::READABLE
Definition: io.h:82
@ RUBY_IO_WRITABLE
IO::WRITABLE
Definition: io.h:83
void * rb_nogvl(void *(*func)(void *), void *data1, rb_unblock_function_t *ubf, void *data2, int flags)
Identical to rb_thread_call_without_gvl(), except it additionally takes "flags" that change the behav...
Definition: thread.c:1537
#define RB_UINT2NUM
Just another name of rb_uint2num_inline.
Definition: int.h:39
#define RB_INT2NUM
Just another name of rb_int2num_inline.
Definition: int.h:37
#define RB_BLOCK_CALL_FUNC_ARGLIST(yielded_arg, callback_arg)
Shim for block function parameters.
Definition: iterator.h:58
#define OFFT2NUM
Converts a C's off_t into an instance of rb_cInteger.
Definition: off_t.h:33
#define PIDT2NUM
Converts a C's pid_t into an instance of rb_cInteger.
Definition: pid_t.h:28
int rb_errno(void)
Identical to system errno.
Definition: eval.c:2149
Scheduler APIs.
VALUE rb_fiber_scheduler_blocking_operation_wait(VALUE scheduler, void *(*function)(void *), void *data, rb_unblock_function_t *unblock_function, void *data2, int flags, struct rb_fiber_scheduler_blocking_operation_state *state)
Defer the execution of the passed function to the scheduler.
Definition: scheduler.c:743
VALUE rb_fiber_scheduler_current(void)
Identical to rb_fiber_scheduler_get(), except it also returns RUBY_Qnil in case of a blocking fiber.
Definition: scheduler.c:226
VALUE rb_fiber_scheduler_io_pread_memory(VALUE scheduler, VALUE io, rb_off_t from, void *base, size_t size, size_t length)
Non-blocking pread from the passed IO using a native buffer.
Definition: scheduler.c:630
VALUE rb_fiber_scheduler_make_timeout(struct timeval *timeout)
Converts the passed timeout to an expression that rb_fiber_scheduler_block() etc.
Definition: scheduler.c:269
VALUE rb_fiber_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for reading.
Definition: scheduler.c:442
VALUE rb_fiber_scheduler_io_read_memory(VALUE scheduler, VALUE io, void *base, size_t size, size_t length)
Non-blocking read from the passed IO using a native buffer.
Definition: scheduler.c:606
VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO at the specified offset.
Definition: scheduler.c:596
VALUE rb_fiber_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE *argv)
Identical to rb_fiber_scheduler_kernel_sleep(), except it can pass multiple arguments.
Definition: scheduler.c:296
VALUE rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
Non-blocking version of rb_io_wait().
Definition: scheduler.c:436
VALUE rb_fiber_scheduler_io_select(VALUE scheduler, VALUE readables, VALUE writables, VALUE exceptables, VALUE timeout)
Non-blocking version of IO.select.
Definition: scheduler.c:463
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO.
Definition: scheduler.c:509
VALUE rb_fiber_scheduler_io_selectv(VALUE scheduler, int argc, VALUE *argv)
Non-blocking version of IO.select, argv variant.
Definition: scheduler.c:472
VALUE rb_fiber_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
Non-blocking waitpid.
Definition: scheduler.c:366
VALUE rb_fiber_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
Non-blocking wait for the passed "blocker", which is for instance Thread.join or Mutex....
Definition: scheduler.c:390
VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buffer, size_t length, size_t offset)
Non-blocking read from the passed IO at the specified offset.
Definition: scheduler.c:533
VALUE rb_fiber_scheduler_io_pwrite_memory(VALUE scheduler, VALUE io, rb_off_t from, const void *base, size_t size, size_t length)
Non-blocking pwrite to the passed IO using a native buffer.
Definition: scheduler.c:642
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length, size_t offset)
Non-blocking write to the passed IO.
Definition: scheduler.c:571
VALUE rb_fiber_scheduler_close(VALUE scheduler)
Closes the passed scheduler object.
Definition: scheduler.c:247
VALUE rb_fiber_scheduler_current_for_thread(VALUE thread)
Identical to rb_fiber_scheduler_current(), except it queries for that of the passed thread instead of...
Definition: scheduler.c:231
VALUE rb_fiber_scheduler_kernel_sleep(VALUE scheduler, VALUE duration)
Non-blocking sleep.
Definition: scheduler.c:290
VALUE rb_fiber_scheduler_address_resolve(VALUE scheduler, VALUE hostname)
Non-blocking DNS lookup.
Definition: scheduler.c:694
VALUE rb_fiber_scheduler_set(VALUE scheduler)
Destructively assigns the passed scheduler to that of the current thread that is calling this functio...
Definition: scheduler.c:187
VALUE rb_fiber_scheduler_io_write_memory(VALUE scheduler, VALUE io, const void *base, size_t size, size_t length)
Non-blocking write to the passed IO using a native buffer.
Definition: scheduler.c:618
VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
Non-blocking wait until the passed IO is ready for writing.
Definition: scheduler.c:448
VALUE rb_fiber_scheduler_io_close(VALUE scheduler, VALUE io)
Non-blocking close the given IO.
Definition: scheduler.c:654
VALUE rb_fiber_scheduler_get(void)
Queries the current scheduler of the current thread that is calling this function.
Definition: scheduler.c:141
VALUE rb_fiber_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
Wakes up a fiber previously blocked using rb_fiber_scheduler_block().
Definition: scheduler.c:409
VALUE rb_fiber_scheduler_fiber(VALUE scheduler, int argc, VALUE *argv, int kw_splat)
Create and schedule a non-blocking fiber.
Definition: scheduler.c:775
uintptr_t ID
Type that represents a Ruby identifier such as a variable name.
Definition: value.h:52
uintptr_t VALUE
Type that represents a Ruby object.
Definition: value.h:40