Skip to content

Commit

Permalink
Merge pull request #1287 from herwinw/fiber_scheduler_sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
seven1m authored Oct 14, 2023
2 parents 678f484 + f18a2fb commit be285d3
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 5 deletions.
1 change: 1 addition & 0 deletions include/natalie/fiber_object.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class FiberObject : public Object {
static Value refeq(Env *env, Value, Value);
Value resume(Env *env, Args args);
static Value scheduler();
static bool scheduler_is_relevant();
static Value set_scheduler(Env *, Value);
Value set_storage(Env *, Value);
Value storage(Env *) const;
Expand Down
4 changes: 4 additions & 0 deletions src/fiber_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ Value FiberObject::scheduler() {
return s_scheduler;
}

bool FiberObject::scheduler_is_relevant() {
return !FiberObject::current()->is_blocking() && FiberObject::scheduler() && !FiberObject::scheduler()->is_nil();
}

Value FiberObject::set_scheduler(Env *env, Value scheduler) {
if (scheduler->is_nil()) {
s_scheduler = nullptr;
Expand Down
32 changes: 27 additions & 5 deletions src/kernel_module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,11 +580,12 @@ Value KernelModule::remove_instance_variable(Env *env, Value name_val) {
Value KernelModule::sleep(Env *env, Value length) {
if (!length) {
while (true) {
if (FiberObject::scheduler_is_relevant())
FiberObject::scheduler()->send(env, "kernel_sleep"_s);
::sleep(1000);
}
NAT_UNREACHABLE();
}
timespec ts;
float secs;
if (length->is_integer()) {
secs = length->as_integer()->to_nat_int_t();
Expand All @@ -601,14 +602,35 @@ Value KernelModule::sleep(Env *env, Value length) {
}
if (secs < 0.0)
env->raise("ArgumentError", "time interval must not be negative");
timespec ts, t_begin, t_end;
ts.tv_sec = ::floor(secs);
ts.tv_nsec = (secs - ts.tv_sec) * 1000000000;
timespec t_begin, t_end;
if (::clock_gettime(CLOCK_MONOTONIC, &t_begin) < 0)
env->raise_errno();
nanosleep(&ts, nullptr);
if (::clock_gettime(CLOCK_MONOTONIC, &t_end) < 0)
env->raise_errno();
if (FiberObject::scheduler_is_relevant()) {
ts.tv_sec += t_begin.tv_sec;
ts.tv_nsec += t_begin.tv_nsec;
if (ts.tv_nsec >= 1000000000) {
ts.tv_sec++;
ts.tv_nsec -= 1000000000;
}
while (true) {
FiberObject::scheduler()->send(env, "kernel_sleep"_s, { length });
// When we get here, the scheduler should have checked our timeout. But loop in case we got
// restarted too early
if (::clock_gettime(CLOCK_MONOTONIC, &t_end) < 0)
env->raise_errno();
if (t_end.tv_sec > ts.tv_sec || (t_end.tv_sec == ts.tv_sec && t_end.tv_nsec > ts.tv_nsec))
break;
secs = t_end.tv_sec - ts.tv_sec;
secs += (t_end.tv_nsec - ts.tv_nsec) / 1000000000.0;
length = new FloatObject { secs };
}
} else {
nanosleep(&ts, nullptr);
if (::clock_gettime(CLOCK_MONOTONIC, &t_end) < 0)
env->raise_errno();
}
int elapsed = t_end.tv_sec - t_begin.tv_sec;
if (t_end.tv_nsec < t_begin.tv_nsec) elapsed--;
return IntegerObject::create(elapsed);
Expand Down
97 changes: 97 additions & 0 deletions test/natalie/fiber/scheduler_sleep_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
require_relative '../../spec_helper'
require_relative 'shared/scheduler'

describe 'Fiber.scheduler with Kernel.sleep' do
after :each do
Fiber.set_scheduler(nil)
end

it 'can interleave fibers with Kernel.sleep with a duration' do
scheduler = Scheduler.new
Fiber.set_scheduler(scheduler)
events = []

sleeper = Fiber.new do
events << 'Going to sleep'
sleep(0.01)
events << 'Woken up'
end

barista = Fiber.new do
events << 'Coffee'
end

sleeper.resume
barista.resume

scheduler.close

events.should == ['Going to sleep', 'Coffee', 'Woken up']
end

it 'can interleave fibers with Kernel.sleep without a duration' do
scheduler = Scheduler.new
Fiber.set_scheduler(scheduler)
events = []

sleeper = Fiber.new do
events << 'Going to sleep'
sleep
events << 'Woken up'
end

barista = Fiber.new do
events << 'Coffee'
end

sleeper.resume
barista.resume

scheduler.close

events.should == ['Going to sleep', 'Coffee']
end

it 'does not interleave blocking fibers' do
scheduler = Scheduler.new
Fiber.set_scheduler(scheduler)
events = []

sleeper = Fiber.new(blocking: true) do
events << 'Going to sleep'
sleep(0.01)
events << 'Woken up'
end

barista = Fiber.new do
events << 'Coffee'
end

sleeper.resume
barista.resume

scheduler.close

events.should == ['Going to sleep', 'Woken up', 'Coffee']
end

it 'does not interleave fibers without scheduler' do
events = []

sleeper = Fiber.new do
events << 'Going to sleep'
sleep(0.01)
events << 'Woken up'
end

barista = Fiber.new do
events << 'Coffee'
end

sleeper.resume
barista.resume

events.should == ['Going to sleep', 'Woken up', 'Coffee']
end
end

43 changes: 43 additions & 0 deletions test/natalie/fiber/shared/scheduler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
require 'fiber'

class Scheduler
def initialize
@waiting = {}
end

def run
until @waiting.empty?
obj_id, (fiber, _) = @waiting.find { |_, (fiber, timeout)| fiber.alive? && timeout <= current_time }

unless obj_id.nil?
@waiting.delete(obj_id)
fiber.resume
end
end
end

def close
run
end

def kernel_sleep(duration = nil)
# NATFIXME: We don't have any mechanism to stop a fiber with an infinite sleep, so we should not
# resume that one.
if duration.nil?
Fiber.yield
return
end

# NATFIXME: Issues when using Fiber objects as hash key, use object_id for the time being.
@waiting[Fiber.current.object_id] = [Fiber.current, current_time + duration]
Fiber.yield
end

def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

def io_wait(io, events, duration) end
def block(blocker, timeout = nil) end
def unblock(blocker, fiber) end
end

0 comments on commit be285d3

Please sign in to comment.