Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channel: call ops.unwait as soon as fiber is resumed/enqueued; fixes select segfault #7414

Closed

Conversation

forksaber
Copy link

Fixes #3900

A select call registers multiple wait events (one for each of its branches) and yields. When any of the branches is ready, select fiber resumes and unwaits the remaining events by calling
ops.each &.unwait. This works correctly with unbuffered channels as upon readiness, they immediately switch to the waiting/selecting fiber.

However, since buffered channels don't block unless buffer is full(send blocks) or empty(receive blocks), the selecting fiber is not resumed upon readiness. Hence, unwait is not called on time. By the time unwait is called, the selecting fiber may be enqueued multiple times.

Resuming this fiber after it has finished executing causes a segfault.

So, the fix here to call ops.each &.unwait as soon as the first resume/enqueue happens.

src/channel.cr Outdated Show resolved Hide resolved
src/channel.cr Outdated Show resolved Hide resolved
@straight-shoota
Copy link
Member

Hey @forksaber, thanks for your initiative!

This PR most likely conflicts with #7407 which also applies substantial changes to Channel.select. That PR has a far broader scope on multithreading concurrency, and it's quite heavy but definitely worth a look!

/cc @ysbaddaden

@ysbaddaden
Copy link
Contributor

I think the approach is correct: the ready fiber must unwait/cancel other clauses before yielding to another fiber. The wrapper class should be a struct, thought, to avoid an allocation on each and every channel operation, which would noticeably slow down channels, even outside of select.

@ysbaddaden
Copy link
Contributor

For example:

struct SelectFiber
  def initialize(@fiber = Fiber.current)
    @callback = nil
  end

  def initialize(@fiber = Fiber.current, &@callback : ->)
  end

  def resume
    @callback.try(&.call)
    @fiber.resume
  end

  def enqueue
    @callback.try(&.call)
    Crystal::Scheduler.enqueue(@fiber)
  end
end

f1 = SelectFiber.new
f2 = SelectFiber.new { p 123 }

Or maybe add it to Fiber itself, as a self erasing callback:

class Fiber
  # :nodoc:
  setter callback : Proc(Nil, Nil)?

  def resume
    exec_callback
    Crystal::Scheduler.resume(self)
  end

  def enqueue
    exec_callback
    Crystal::Scheduler.enqueue(self)
  end

  private def exec_callback
    if callback = @callback
      @callback = nil
      callback.call
    end
  end
end

f = Fiber.current
f.callback = -> { ops.each(&.unwait(f)) }

@forksaber
Copy link
Author

Thanks @ysbaddaden for your input. I prefer adding the self erasing callback to Fiber itself. I also had the same approach in mind....but was reluctant to make a change to Fiber . Now with your go-ahead, I'll update this PR with changes to Fiber.

@ysbaddaden
Copy link
Contributor

Yes, the patch will be smaller, and it's pushing more of the scheduling API to Fiber itself, which I think if good (less calls to Crystal::Scheduler).

@forksaber forksaber force-pushed the bugfix/3900-select-segfault branch from 9f3960c to bc9fb87 Compare February 13, 2019 03:17
@forksaber
Copy link
Author

updated PR to store callback in Fiber object

src/channel.cr Outdated Show resolved Hide resolved
@forksaber forksaber force-pushed the bugfix/3900-select-segfault branch from bc9fb87 to 7c79fea Compare February 13, 2019 06:01
@forksaber forksaber force-pushed the bugfix/3900-select-segfault branch from 7c79fea to 8ba6ba3 Compare February 13, 2019 06:23
@Sija
Copy link
Contributor

Sija commented Mar 2, 2019

Is this GTG? Would be nice to have it 🚢 :)

@forksaber
Copy link
Author

It's GTG from my side.

Copy link
Contributor

@ysbaddaden ysbaddaden left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for me, but I'd like @RX14 or @bcardiff to review as well.

@straight-shoota straight-shoota requested review from RX14 and bcardiff March 5, 2019 23:36
@@ -60,6 +60,9 @@ class Fiber
# :nodoc:
property previous : Fiber?

# :nodoc:
setter callback : Proc(Nil)?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a comment to explain the purpose of this?
Comments are welcome in other places as well ;)

@straight-shoota straight-shoota added kind:bug A bug in the code. Does not apply to documentation, specs, etc. topic:stdlib:concurrency labels Mar 5, 2019
@@ -111,9 +117,11 @@ abstract class Channel(T)
return ops.size, nil
end

fiber = Fiber.current
fiber.callback = ->{ ops.each &.unwait(fiber) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This forms a closure and so it allocates memory, which is not ideal. Please change it to store a reference to ops directly in Fiber. For example you can have record UnwaitCallback, ops : ... and then callback : UnwaitCallback or something like that (and if we need more callbacks we create a union and son on).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asterite I tried implementing your suggestion but I am not able to figure out the type for ops. If I use Tuple | Array, I get the following error:

can't use Array(T) in unions yet, use a more specific type

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right, it can't be implemented. Nevermind.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would converting the tuple to an array be more efficient than a closure?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that w don't know the types inside the array

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, right...
Yet we know it's all Channel, we just don't know the generic arguments. But that's actually irrelevant for calling #unwait. Just thinking here, but we could hack around it by introducing a module which is included in Channel:

abstract class Channel(T)
  module Unwaitable
    def unwait(fiber : Fiber)
    end
  end
    
  include Unwaitable
end

record UnwaitCallback, ops : Array(Channel::Unwaitable)

It works. Question is, is it worth the effort of having an additional type and needing to explicitly cast tuple and array arguments to Array(Channel::Unwaitable)? Probably not.

@straight-shoota straight-shoota added the pr:needs-work A PR requires modifications by the author. label Mar 25, 2019
@asterite
Copy link
Member

asterite commented Sep 2, 2019

Closing because #3900 was closed because of #8112

@asterite asterite closed this Sep 2, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug A bug in the code. Does not apply to documentation, specs, etc. pr:needs-work A PR requires modifications by the author. topic:stdlib:concurrency
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Crash in fiber switching with select
5 participants