Skip to content

Commit

Permalink
RFC 2: Skeleton for ExecutionContext (#15350)
Browse files Browse the repository at this point in the history
Integrates the skeleton as per crystal-lang/rfcs#2

- Add the `ExecutionContext` module;
- Add the `ExecutionContext::Scheduler` module;
- Add the `execution_context` compile-time flag.

When the `execution_context` flag is set:

- Don't load `Crystal::Scheduler`;
- Plug `ExecutionContext` instead of `Crystal::Scheduler` in `spawn`, `Fiber`, ...

This is only the skeleton: there are no implementations (yet). Trying to compile anything with `-Dexecution_context` will fail until the ST and/or MT context are implemented.

Co-authored-by: Johannes Müller <[email protected]>
  • Loading branch information
ysbaddaden and straight-shoota authored Feb 22, 2025
1 parent f420f49 commit 0582376
Show file tree
Hide file tree
Showing 15 changed files with 486 additions and 52 deletions.
41 changes: 31 additions & 10 deletions src/concurrent.cr
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
require "fiber"
require "channel"
require "crystal/scheduler"
require "crystal/tracing"

{% if flag?(:execution_context) %}
require "fiber/execution_context"
{% else %}
require "crystal/scheduler"
{% end %}

# Blocks the current fiber for the specified number of seconds.
#
# While this fiber is waiting this time, other ready-to-execute
Expand All @@ -12,25 +17,36 @@ def sleep(seconds : Number) : Nil
if seconds < 0
raise ArgumentError.new "Sleep seconds must be positive"
end

Crystal::Scheduler.sleep(seconds.seconds)
sleep(seconds.seconds)
end

# Blocks the current Fiber for the specified time span.
#
# While this fiber is waiting this time, other ready-to-execute
# fibers might start their execution.
def sleep(time : Time::Span) : Nil
Crystal::Scheduler.sleep(time)
Crystal.trace :sched, "sleep", for: time

{% if flag?(:execution_context) %}
Fiber.current.resume_event.add(time)
Fiber::ExecutionContext.reschedule
{% else %}
Crystal::Scheduler.sleep(time)
{% end %}
end

# Blocks the current fiber forever.
#
# Meanwhile, other ready-to-execute fibers might start their execution.
def sleep : Nil
Crystal::Scheduler.reschedule
{% if flag?(:execution_context) %}
Fiber::ExecutionContext.reschedule
{% else %}
Crystal::Scheduler.reschedule
{% end %}
end

{% begin %}
# Spawns a new fiber.
#
# NOTE: The newly created fiber doesn't run as soon as spawned.
Expand Down Expand Up @@ -64,12 +80,17 @@ end
# wg.wait
# ```
def spawn(*, name : String? = nil, same_thread = false, &block)
fiber = Fiber.new(name, &block)
Crystal.trace :sched, "spawn", fiber: fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
{% if flag?(:execution_context) %}
Fiber::ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block)
{% else %}
fiber = Fiber.new(name, &block)
Crystal.trace :sched, "spawn", fiber: fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
{% end %}
end
{% end %}

# Spawns a fiber by first creating a `Proc`, passing the *call*'s
# expressions to it, and letting the `Proc` finally invoke the *call*.
Expand Down
21 changes: 18 additions & 3 deletions src/crystal/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ abstract class Crystal::EventLoop

@[AlwaysInline]
def self.current : self
Crystal::Scheduler.event_loop
{% if flag?(:execution_context) %}
Fiber::ExecutionContext.current.event_loop
{% else %}
Crystal::Scheduler.event_loop
{% end %}
end

@[AlwaysInline]
def self.current? : self?
Crystal::Scheduler.event_loop?
def self.current? : self | Nil
{% if flag?(:execution_context) %}
Fiber::ExecutionContext.current.event_loop
{% else %}
Crystal::Scheduler.event_loop?
{% end %}
end

# Runs the loop.
Expand All @@ -46,6 +54,13 @@ abstract class Crystal::EventLoop
# events.
abstract def run(blocking : Bool) : Bool

{% if flag?(:execution_context) %}
# Same as `#run` but collects runnable fibers into *queue* instead of
# enqueueing in parallel, so the caller is responsible and in control for
# when and how the fibers will be enqueued.
abstract def run(queue : Fiber::List*, blocking : Bool) : Nil
{% end %}

# Tells a blocking run loop to no longer wait for events to activate. It may
# for example enqueue a NOOP event with an immediate (or past) timeout. Having
# activated an event, the loop shall return, allowing the blocked thread to
Expand Down
8 changes: 8 additions & 0 deletions src/crystal/event_loop/iocp.cr
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
iocp
end

# thread unsafe
def run(blocking : Bool) : Bool
enqueued = false

Expand All @@ -66,6 +67,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
enqueued
end

{% if flag?(:execution_context) %}
# thread unsafe
def run(queue : Fiber::List*, blocking : Bool) : Nil
run_impl(blocking) { |fiber| queue.value.push(fiber) }
end
{% end %}

# Runs the event loop and enqueues the fiber for the next upcoming event or
# completion.
private def run_impl(blocking : Bool, &) : Nil
Expand Down
43 changes: 36 additions & 7 deletions src/crystal/event_loop/libevent.cr
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,55 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
event_base.loop(flags)
end

{% if flag?(:execution_context) %}
def run(queue : Fiber::List*, blocking : Bool) : Nil
Crystal.trace :evloop, "run", fiber: fiber, blocking: blocking
@runnables = queue
run(blocking)
ensure
@runnables = nil
end

def callback_enqueue(fiber : Fiber) : Nil
if queue = @runnables
queue.value.push(fiber)
else
raise "BUG: libevent callback executed outside of #run(queue*, blocking) call"
end
end
{% end %}

def interrupt : Nil
event_base.loop_exit
end

# Create a new resume event for a fiber.
# Create a new resume event for a fiber (sleep).
def create_resume_event(fiber : Fiber) : Crystal::EventLoop::LibEvent::Event
event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data|
data.as(Fiber).enqueue
f = data.as(Fiber)
{% if flag?(:execution_context) %}
event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent)
event_loop.callback_enqueue(f)
{% else %}
f.enqueue
{% end %}
end
end

# Creates a timeout_event.
# Creates a timeout event (timeout action of select expression).
def create_timeout_event(fiber) : Crystal::EventLoop::LibEvent::Event
event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data|
f = data.as(Fiber)
if (select_action = f.timeout_select_action)
if select_action = f.timeout_select_action
f.timeout_select_action = nil
select_action.time_expired(f)
else
f.enqueue
if select_action.time_expired?
{% if flag?(:execution_context) %}
event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent)
event_loop.callback_enqueue(f)
{% else %}
f.enqueue
{% end %}
end
end
end
end
Expand Down
27 changes: 23 additions & 4 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,25 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
end
{% end %}

# NOTE: thread unsafe
# thread unsafe
def run(blocking : Bool) : Bool
system_run(blocking) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end
true
end

{% if flag?(:execution_context) %}
# thread unsafe
def run(queue : Fiber::List*, blocking : Bool) : Nil
system_run(blocking) { |fiber| queue.value.push(fiber) }
end
{% end %}

# fiber interface, see Crystal::EventLoop

def create_resume_event(fiber : Fiber) : FiberEvent
Expand Down Expand Up @@ -327,13 +338,21 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
Polling.arena.free(index) do |pd|
pd.value.@readers.ready_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end)
end

pd.value.@writers.ready_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end)
end

Expand Down
3 changes: 2 additions & 1 deletion src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{% skip_file if flag?(:execution_context) %}

require "crystal/event_loop"
require "crystal/system/print_error"
require "fiber"
Expand Down Expand Up @@ -66,7 +68,6 @@ class Crystal::Scheduler
end

def self.sleep(time : Time::Span) : Nil
Crystal.trace :sched, "sleep", for: time
Thread.current.scheduler.sleep(time)
end

Expand Down
49 changes: 41 additions & 8 deletions src/crystal/system/thread.cr
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,47 @@ class Thread

getter name : String?

{% if flag?(:execution_context) %}
# :nodoc:
getter! execution_context : Fiber::ExecutionContext

# :nodoc:
property! scheduler : Fiber::ExecutionContext::Scheduler

# :nodoc:
def execution_context=(@execution_context : Fiber::ExecutionContext) : Fiber::ExecutionContext
main_fiber.execution_context = execution_context
end

# When a fiber terminates we can't release its stack until we swap context
# to another fiber. We can't free/unmap nor push it to a shared stack pool,
# that would result in a segfault.
@dead_fiber_stack : Fiber::Stack?

# :nodoc:
def dying_fiber(fiber : Fiber) : Fiber::Stack?
stack = @dead_fiber_stack
@dead_fiber_stack = fiber.@stack
stack
end

# :nodoc:
def dead_fiber_stack? : Fiber::Stack?
if stack = @dead_fiber_stack
@dead_fiber_stack = nil
stack
end
end
{% else %}
# :nodoc:
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }

# :nodoc:
def scheduler? : ::Crystal::Scheduler?
@scheduler
end
{% end %}

def self.unsafe_each(&)
# nothing to iterate when @@threads is nil + don't lazily allocate in a
# method called from a GC collection callback!
Expand Down Expand Up @@ -165,14 +206,6 @@ class Thread
thread.name = name
end

# :nodoc:
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }

# :nodoc:
def scheduler? : ::Crystal::Scheduler?
@scheduler
end

protected def start
Thread.threads.push(self)
Thread.current = self
Expand Down
10 changes: 10 additions & 0 deletions src/crystal/tracing.cr
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ module Crystal
write value.name || '?'
end

{% if flag?(:execution_context) %}
def write(value : Fiber::ExecutionContext) : Nil
write value.name
end

def write(value : Fiber::ExecutionContext::Scheduler) : Nil
write value.name
end
{% end %}

def write(value : Pointer) : Nil
write "0x"
write System.to_int_slice(@int_buf.to_slice, value.address, 16, true, 2)
Expand Down
Loading

0 comments on commit 0582376

Please sign in to comment.