Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow channels to be closed on blocking select #8243

Closed
wants to merge 2 commits into from

Conversation

bcardiff
Copy link
Member

This PR might deserve a bit of discussion. On the MT recent work, we focus on non-blocking select and I think there were some cases for blocking select that might have slipped.

_, r = Channel.select(ch1.receive_select_action, ch2.receive_select_action)

is equivalent to

select
when r = ch1.receive
when r = ch2.receive
end

and to

_, r = Channel.select([ch1.receive_select_action, ch2.receive_select_action], false)

And it is expected to wait for a send over any ch1 and ch2.

Then, there is a construct for non-blocking select (when an else is present):

select
when r = ch1.receive
when r = ch2.receive
else
  r = Channel::NotReady # this is to match the behaviour of the next example.
end

Or equivalent:

_, r = Channel.select([ch1.receive_select_action, ch2.receive_select_action], true)

For non-blocking select there is no change in the behaviour in this PR.

For blocking select, since closing a channel awakes the fiber, there is a need to loop and re-wait over the channels. This is what is mainly added in this PR.

A decision was made regarding what should happen on blocking select over closed channels: It will generate a runtime exception with the message "All channels are closed on a blocking select".

Additionally, we avoid waiting over already closed channels.

Related to #8231, but the opening issue will still raise since it was a blocking select over a closed channel. But it pushed the think exposed here in the specs.

@bcardiff bcardiff added kind:bug A bug in the code. Does not apply to documentation, specs, etc. topic:stdlib:concurrency labels Sep 27, 2019
@bcardiff bcardiff requested a review from waj September 27, 2019 15:23
@bcardiff
Copy link
Member Author

As a comparison to Go,

select {
case m, e := <-c:
   // ...
}

when the channel is closed, e will inform so. There is an explicit way to know that the channel is been closed. I think in Crystal we prefer to use unions or typed messages when needed to signal that. Otherwise every Channel(T)#receive should return T | Channel::Closed.

Copy link
Member

@RX14 RX14 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no spec for all channels being closed running the else?

end
state = Atomic(SelectState).new(SelectState::Active)
all_closed = true
contexts = ops.map_with_index do |op, index|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index is never used?

if has_else
return ops.size, NotReady
else
raise "All channels are closed on a blocking select"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should raise ClosedError.new, exactly as if you had used receive on any one of the closed channels.

ideally this would be designed to literally call ch.receive on the last channel to be closed, as that's semantically what's happening on my head, but that's not feasible.

@bcardiff
Copy link
Member Author

bcardiff commented Oct 1, 2019

We were discussing with @waj about the design.

Another idea is that instead of going the route of this PR, the select could remain loopless and when an awaited channel is closed the select will exit. If the select was using ch.receive then an exception will be raised when closing the channel. If the select was using ch.receive? then a distinguished value will be sent when the channel is closed.

This will be actually more similar to Go in the sense that a close event will awake the select, but will keep the simple API for when the user does not care about channel been closed.

A select with ch.receive? will allow the user to know about which channel is been closed.

Regarding the Channel#receive? there are two alternatives:

a) It returns a Nil when the channel is closed. This will be cumbersome for Channel(Nil) (and any nilable value) since there will be no way to distinguish a message from the closing

b) It returns a Channel::Closed value. So Channel(T)#receive : T and Channel(T)#receive? : T | Closed. The user will need a m = ch.receive? ; if m.is_a?(Channel::Closed) ... which is a bit more code than if m.

@j8r
Copy link
Contributor

j8r commented Oct 1, 2019

I didn't see select documented anywhere, could you add documentation? (It doesn't have to be in this PR)

It would also be great to document more Channel in the API docs, and the book. People are certainly missing powerful Channel features.

I did try, but I don't know the topic well enough 🤐

@ysbaddaden
Copy link
Contributor

@bcardiff I like the simple design: wake up the select, let receive raise and receive? return nil.

For the alternatives, the case of Channel(Nil) is independent of select; we never know whether receive? returns because the channel is closed or you got a value. One must manually check for #closed? or use another type (e.g. Int32). I don't see any benefit for passing an union.

@bcardiff
Copy link
Member Author

bcardiff commented Oct 2, 2019

@ysbaddaden that won't work. It is not possible to effectively use close? to check if the state of the channel when received? returned. It could change in between.

That why the alternative is to either

a) use nil as a signal of the closed channel. Which will disallow, in practice, the usage of nilable types as channel messages.

b) introduce Channel::Closed to allow nil as a kind of data.

Either way, an independent ch.receive or ch.receive? (ie: wihout a select) will be blocking.
The non-blocking behaviour is brought by the select.
So the difference between ch.receive and ch.receive? is about how to handle the closing reaction (ie: exception or explicit value)

@asterite
Copy link
Member

asterite commented Oct 2, 2019

An alternative is ch.receive { ... } which will invoke the block if the channel is closed. That way you can return any type in the block. This is similar to Hash#fetch, where we have [] (raises), []? (returns nil, not useful if values can be nil) and fetch (if values are nil you can know because the block will or won't be invoked).

@bcardiff
Copy link
Member Author

bcardiff commented Oct 8, 2019

I've been iterating on this. As said before

  • the difference between ch.receive and ch.receive? is about how to handle the closing reaction
  • there should be no difference whether the channel was closed or is closed while the current fiber is suspended
  • closing a channel that is been wait will awake the fiber
    • if that raises an exception or not it depends on whether receive or receive? was used
  • there will be no loop in the implementation of select
  • the implementation of blocking and non-blocking select will need to be different functions because their types need to defer.
    • Channel.select is considered a private API, it should have :nodoc: and its usage should be mainly from a select statement.

The current review of what types is expected can be viewed at: https://gist.github.com/bcardiff/289953a80eb3a0512a2a2f8c8dfeb1db

Closing this PR. I should submit a new one with the above implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug A bug in the code. Does not apply to documentation, specs, etc. topic:stdlib:concurrency
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants