-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
Copy pathfiber.cr
334 lines (292 loc) · 9.91 KB
/
fiber.cr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
require "crystal/system/thread_linked_list"
require "crystal/print_buffered"
require "./fiber/context"
# :nodoc:
@[NoInline]
fun _fiber_get_stack_top : Void*
dummy = uninitialized Int32
pointerof(dummy).as(Void*)
end
# A `Fiber` is a light-weight execution unit managed by the Crystal runtime.
#
# It is conceptually similar to an operating system thread but with less
# overhead and completely internal to the Crystal process. The runtime includes
# a scheduler which schedules execution of fibers.
#
# A `Fiber` has a stack size of `8 MiB` which is usually also assigned
# to an operating system thread. But only `4KiB` are actually allocated at first
# so the memory footprint is very small.
#
# Communication between fibers is usually passed through `Channel`.
#
# ## Cooperative
#
# Fibers are cooperative. That means execution can only be drawn from a fiber
# when it offers it. It can't be interrupted in its execution at random.
# In order to make concurrency work, fibers must make sure to occasionally
# provide hooks for the scheduler to swap in other fibers.
# IO operations like reading from a file descriptor are natural implementations
# for this and the developer does not need to take further action on that. When
# IO access can't be served immediately by a buffer, the fiber will
# automatically wait and yield execution. When IO is ready it's going to be
# resumed through the event loop.
#
# When a computation-intensive task has none or only rare IO operations, a fiber
# should explicitly offer to yield execution from time to time using
# `Fiber.yield` to break up tight loops. The frequency of this call depends on
# the application and concurrency model.
#
# ## Event loop
#
# The event loop is responsible for keeping track of sleeping fibers waiting for
# notifications that IO is ready or a timeout reached. When a fiber can be woken,
# the event loop enqueues it in the scheduler
class Fiber
# :nodoc:
protected class_getter(fibers) { Thread::LinkedList(Fiber).new }
@context : Context
@stack : Void*
@resume_event : Crystal::EventLoop::Event?
@timeout_event : Crystal::EventLoop::Event?
# :nodoc:
property timeout_select_action : Channel::TimeoutAction?
protected property stack_bottom : Void*
# The name of the fiber, used as internal reference.
property name : String?
@alive = true
{% if flag?(:preview_mt) %} @current_thread = Atomic(Thread?).new(nil) {% end %}
# :nodoc:
property next : Fiber?
# :nodoc:
property previous : Fiber?
# :nodoc:
def self.inactive(fiber : Fiber)
fibers.delete(fiber)
end
# :nodoc:
def self.unsafe_each(&)
# nothing to iterate when @@fibers is nil + don't lazily allocate in a
# method called from a GC collection callback!
@@fibers.try(&.unsafe_each { |fiber| yield fiber })
end
# :nodoc:
def self.each(&)
fibers.each { |fiber| yield fiber }
end
# Creates a new `Fiber` instance.
#
# When the fiber is executed, it runs *proc* in its context.
#
# *name* is an optional and used only as an internal reference.
def initialize(@name : String? = nil, &@proc : ->)
@context = Context.new
@stack, @stack_bottom =
{% if flag?(:interpreted) %}
{Pointer(Void).null, Pointer(Void).null}
{% else %}
Crystal::Scheduler.stack_pool.checkout
{% end %}
fiber_main = ->(f : Fiber) { f.run }
# FIXME: This line shouldn't be necessary (#7975)
stack_ptr = nil
{% if flag?(:win32) %}
# align stack bottom to 16 bytes
@stack_bottom = Pointer(Void).new(@stack_bottom.address & ~0x0f_u64)
# It's the caller's responsibility to allocate 32 bytes of "shadow space" on the stack right
# before calling the function (regardless of the actual number of parameters used)
stack_ptr = @stack_bottom - sizeof(Void*) * 6
{% else %}
# point to first addressable pointer on the stack (@stack_bottom points past
# the stack because the stack grows down):
stack_ptr = @stack_bottom - sizeof(Void*)
{% end %}
# align the stack pointer to 16 bytes:
stack_ptr = Pointer(Void*).new(stack_ptr.address & ~0x0f_u64)
makecontext(stack_ptr, fiber_main)
Fiber.fibers.push(self)
end
# :nodoc:
def initialize(@stack : Void*, thread)
@proc = Proc(Void).new { }
# TODO: should creating a new context for the main fiber also be platform specific?
# It's the same for all platforms except for the interpreted mode.
@context =
{% if flag?(:interpreted) %}
# In interpreted mode the stack_top variable actually points to a fiber
Context.new(Crystal::Interpreter.current_fiber)
{% else %}
Context.new(_fiber_get_stack_top)
{% end %}
thread.gc_thread_handler, @stack_bottom = GC.current_thread_stack_bottom
@name = "main"
{% if flag?(:preview_mt) %} @current_thread.set(thread) {% end %}
Fiber.fibers.push(self)
end
# :nodoc:
def run
GC.unlock_read
@proc.call
rescue ex
if name = @name
Crystal.print_buffered("Unhandled exception in spawn(name: %s)", name, exception: ex, to: STDERR)
else
Crystal.print_buffered("Unhandled exception in spawn", exception: ex, to: STDERR)
end
ensure
# Remove the current fiber from the linked list
Fiber.inactive(self)
# Delete the resume event if it was used by `yield` or `sleep`
@resume_event.try &.free
@timeout_event.try &.free
@timeout_select_action = nil
@alive = false
{% unless flag?(:interpreted) %}
Crystal::Scheduler.stack_pool.release(@stack)
{% end %}
Fiber.suspend
end
# Returns the current fiber.
def self.current : Fiber
Thread.current.current_fiber
end
# The fiber's proc is currently running or didn't fully save its context. The
# fiber can't be resumed.
def running? : Bool
@context.resumable == 0
end
# The fiber's proc is currently not running and fully saved its context. The
# fiber can be resumed safely.
def resumable? : Bool
@context.resumable == 1
end
# The fiber's proc has terminated, and the fiber is now considered dead. The
# fiber is impossible to resume, ever.
def dead? : Bool
!@alive
end
# Immediately resumes execution of this fiber.
#
# There are no provisions for resuming the current fiber (where this
# method is called). Unless it is explicitly added for rescheduling (for
# example using `#enqueue`) the current fiber won't ever reach any instructions
# after the call to this method.
#
# ```
# fiber = Fiber.new do
# puts "in fiber"
# end
# fiber.resume
# puts "never reached"
# ```
def resume : Nil
Crystal::Scheduler.resume(self)
end
# Adds this fiber to the scheduler's runnables queue for the current thread.
#
# This signals to the scheduler that the fiber is eligible for being resumed
# the next time it has the opportunity to reschedule to another fiber. There
# are no guarantees when that will happen.
def enqueue : Nil
Crystal::Scheduler.enqueue(self)
end
# :nodoc:
def resume_event : Crystal::EventLoop::Event
@resume_event ||= Crystal::EventLoop.current.create_resume_event(self)
end
# :nodoc:
def timeout_event : Crystal::EventLoop::Event
@timeout_event ||= Crystal::EventLoop.current.create_timeout_event(self)
end
# :nodoc:
def timeout(timeout : Time::Span, select_action : Channel::TimeoutAction) : Nil
@timeout_select_action = select_action
timeout_event.add(timeout)
end
# :nodoc:
def cancel_timeout : Nil
return unless @timeout_select_action
@timeout_select_action = nil
@timeout_event.try &.delete
end
# :nodoc:
#
# The current fiber will resume after a period of time.
# The timeout can be cancelled with `cancel_timeout`
def self.timeout(timeout : Time::Span, select_action : Channel::TimeoutAction) : Nil
Fiber.current.timeout(timeout, select_action)
end
# :nodoc:
def self.cancel_timeout : Nil
Fiber.current.cancel_timeout
end
# Yields to the scheduler and allows it to swap execution to other
# waiting fibers.
#
# This is equivalent to `sleep 0.seconds`. It gives the scheduler an option
# to interrupt the current fiber's execution. If no other fibers are ready to
# be resumed, it immediately resumes the current fiber.
#
# This method is particularly useful to break up tight loops which are only
# computation intensive and don't offer natural opportunities for swapping
# fibers as with IO operations.
#
# ```
# counter = 0
# spawn name: "status" do
# loop do
# puts "Status: #{counter}"
# sleep(2.seconds)
# end
# end
#
# while counter < Int32::MAX
# counter += 1
# if counter % 1_000_000 == 0
# # Without this, there would never be an opportunity to resume the status fiber
# Fiber.yield
# end
# end
# ```
def self.yield : Nil
Crystal::Scheduler.yield
end
# Suspends execution of the current fiber indefinitely.
#
# Unlike `Fiber.yield` the current fiber is not automatically
# reenqueued and can only be resumed whith an explicit call to `#enqueue`.
#
# This is equivalent to `sleep` without a time.
#
# This method is meant to be used in concurrency primitives. It's particularly
# useful if the fiber needs to wait for something to happen (for example an IO
# event, a message is ready in a channel, etc.) which triggers a re-enqueue.
def self.suspend : Nil
Crystal::Scheduler.reschedule
end
def to_s(io : IO) : Nil
io << "#<" << self.class.name << ":0x"
object_id.to_s(io, 16)
if name = @name
io << ": " << name
end
io << '>'
end
def inspect(io : IO) : Nil
to_s(io)
end
# :nodoc:
def push_gc_roots : Nil
# Push the used section of the stack
GC.push_stack @context.stack_top, @stack_bottom
end
{% if flag?(:preview_mt) %}
# :nodoc:
def set_current_thread(thread = Thread.current) : Thread
@current_thread.set(thread)
end
# :nodoc:
def get_current_thread : Thread?
@current_thread.lazy_get
end
{% end %}
end