-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
channel: call ops.unwait as soon as fiber is resumed/enqueued; fixes select segfault #7414
channel: call ops.unwait as soon as fiber is resumed/enqueued; fixes select segfault #7414
Conversation
8f85246
to
18138c4
Compare
Hey @forksaber, thanks for your initiative! This PR most likely conflicts with #7407 which also applies substantial changes to /cc @ysbaddaden |
18138c4
to
9f3960c
Compare
I think the approach is correct: the ready fiber must unwait/cancel other clauses before yielding to another fiber. The wrapper class should be a |
For example: struct SelectFiber
def initialize(@fiber = Fiber.current)
@callback = nil
end
def initialize(@fiber = Fiber.current, &@callback : ->)
end
def resume
@callback.try(&.call)
@fiber.resume
end
def enqueue
@callback.try(&.call)
Crystal::Scheduler.enqueue(@fiber)
end
end
f1 = SelectFiber.new
f2 = SelectFiber.new { p 123 } Or maybe add it to class Fiber
# :nodoc:
setter callback : Proc(Nil, Nil)?
def resume
exec_callback
Crystal::Scheduler.resume(self)
end
def enqueue
exec_callback
Crystal::Scheduler.enqueue(self)
end
private def exec_callback
if callback = @callback
@callback = nil
callback.call
end
end
end
f = Fiber.current
f.callback = -> { ops.each(&.unwait(f)) } |
Thanks @ysbaddaden for your input. I prefer adding the self erasing callback to |
Yes, the patch will be smaller, and it's pushing more of the scheduling API to Fiber itself, which I think if good (less calls to |
9f3960c
to
bc9fb87
Compare
updated PR to store callback in |
bc9fb87
to
7c79fea
Compare
7c79fea
to
8ba6ba3
Compare
Is this GTG? Would be nice to have it 🚢 :) |
It's GTG from my side. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -60,6 +60,9 @@ class Fiber | |||
# :nodoc: | |||
property previous : Fiber? | |||
|
|||
# :nodoc: | |||
setter callback : Proc(Nil)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add a comment to explain the purpose of this?
Comments are welcome in other places as well ;)
@@ -111,9 +117,11 @@ abstract class Channel(T) | |||
return ops.size, nil | |||
end | |||
|
|||
fiber = Fiber.current | |||
fiber.callback = ->{ ops.each &.unwait(fiber) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This forms a closure and so it allocates memory, which is not ideal. Please change it to store a reference to ops
directly in Fiber
. For example you can have record UnwaitCallback, ops : ...
and then callback : UnwaitCallback
or something like that (and if we need more callbacks we create a union and son on).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asterite I tried implementing your suggestion but I am not able to figure out the type
for ops
. If I use Tuple | Array
, I get the following error:
can't use Array(T) in unions yet, use a more specific type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, right, it can't be implemented. Nevermind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would converting the tuple to an array be more efficient than a closure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that w don't know the types inside the array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, right...
Yet we know it's all Channel
, we just don't know the generic arguments. But that's actually irrelevant for calling #unwait
. Just thinking here, but we could hack around it by introducing a module which is included in Channel
:
abstract class Channel(T)
module Unwaitable
def unwait(fiber : Fiber)
end
end
include Unwaitable
end
record UnwaitCallback, ops : Array(Channel::Unwaitable)
It works. Question is, is it worth the effort of having an additional type and needing to explicitly cast tuple and array arguments to Array(Channel::Unwaitable)
? Probably not.
Fixes #3900
A
select
call registers multiple wait events (one for each of its branches) and yields. When any of the branches is ready,select
fiber resumes and unwaits the remaining events by callingops.each &.unwait
. This works correctly with unbuffered channels as upon readiness, they immediately switch to the waiting/selecting fiber.However, since buffered channels don't block unless buffer is full(send blocks) or empty(receive blocks), the
select
ing fiber is not resumed upon readiness. Hence,unwait
is not called on time. By the timeunwait
is called, theselect
ing fiber may be enqueued multiple times.Resuming this fiber after it has finished executing causes a segfault.
So, the fix here to call
ops.each &.unwait
as soon as the first resume/enqueue happens.