Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow channels to be closed on blocking select #8243

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions spec/std/concurrent/select_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,58 @@ describe "select" do
x.should eq 2
end

it "closing a channel over strict select does not yield a value" do
ready = Channel(Int32).new
done = Channel(Int32).new

ch1 = Channel(Int32).new
ch2 = Channel(Int32).new

f = spawn do
ready.send(1)
select
when ch1.receive
when b = ch2.receive
done.send(b)
end
end

ready.receive
sleep 0.1
ch1.close
sleep 0.1
ch2.send 42
done.receive.should eq(42)
end

it "closing all channels over strict select raise exception" do
ready = Channel(Int32).new
done = Channel(Int32).new

ch1 = Channel(Int32).new
ch2 = Channel(Int32).new

f = spawn do
ready.send(1)
begin
select
when ch1.receive
when ch2.receive
end
rescue e
done.send(42)
end
end

ready.receive
sleep 0.1
ch1.close
sleep 0.1
ch2.close
sleep 0.1
done.receive.should eq(42)
end

it "stress select with send/receive in multiple fibers" do
fibers = 4
msg_per_sender = 1000
Expand Down
89 changes: 64 additions & 25 deletions src/channel.cr
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Channel(T)
abstract def lock_object_id
abstract def lock
abstract def unlock
abstract def closed? : Bool

def create_context_and_wait(state_ptr)
context = SelectContext.new(state_ptr, self)
Expand Down Expand Up @@ -286,42 +287,72 @@ class Channel(T)
.uniq(&.lock_object_id)
.sort_by(&.lock_object_id)

ops_locks.each &.lock
while true
ops_locks.each &.lock

ops.each_with_index do |op, index|
ignore = false
result = op.execute
ops.each_with_index do |op, index|
result = op.execute

unless result.is_a?(NotReady)
unless result.is_a?(NotReady)
ops_locks.each &.unlock
return index, result
end
end

if has_else
ops_locks.each &.unlock
return index, result
return ops.size, NotReady
end
end

if has_else
ops_locks.each &.unlock
return ops.size, NotReady
end
state = Atomic(SelectState).new(SelectState::Active)
all_closed = true
contexts = ops.map_with_index do |op, index|
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The index is never used?

# Avoid waiting over a closed channel
if op.closed?
nil
else
all_closed = false
op.create_context_and_wait(pointerof(state))
# Closing one of the channels will wakeup this path
end
end

state = Atomic(SelectState).new(SelectState::Active)
contexts = ops.map &.create_context_and_wait(pointerof(state))
if all_closed
ops_locks.each &.unlock

ops_locks.each &.unlock
Crystal::Scheduler.reschedule
# If channels are closed, there is no need to reschedule
# if there is no has_else an exception is more accurate probable
if has_else
return ops.size, NotReady
else
raise "All channels are closed on a blocking select"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should raise ClosedError.new, exactly as if you had used receive on any one of the closed channels.

ideally this would be designed to literally call ch.receive on the last channel to be closed, as that's semantically what's happening on my head, but that's not feasible.

end
end

ops.each do |op|
op.lock
op.unwait
op.unlock
end
ops_locks.each &.unlock
Crystal::Scheduler.reschedule

contexts.each_with_index do |context, index|
if context.activated?
return index, context.action.result
ops.each do |op|
op.lock
op.unwait
op.unlock
end

contexts.each_with_index do |context, index|
if context && context.activated?
return index, context.action.result
end
end
end

raise "BUG: Fiber was awaken from select but no action was activated"
# if this point is reached !all_closed and no context was activated
# so the wakeup was due to a close of some channels.
# has_else indicated a non-blocking select, so we can stop the execution
# right away. We were waiting for channels, we got a signal of some channel closing
# but no data is ready.
if has_else
return ops.size, NotReady
end
end
end

# :nodoc:
Expand Down Expand Up @@ -372,6 +403,10 @@ class Channel(T)
def unlock
@[email protected]
end

def closed? : Bool
@channel.closed?
end
end

# :nodoc:
Expand Down Expand Up @@ -409,5 +444,9 @@ class Channel(T)
def unlock
@[email protected]
end

def closed? : Bool
@channel.closed?
end
end
end