Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

event: fix Resubscribe deadlock when unsubscribing after inner sub ends #28359

Merged
merged 4 commits into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion event/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscriptio
backoffMax: backoffMax,
fn: fn,
err: make(chan error),
unsub: make(chan struct{}),
unsub: make(chan struct{}, 1),
}
go s.loop()
return s
Expand Down
20 changes: 20 additions & 0 deletions event/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,23 @@ func TestResubscribeWithErrorHandler(t *testing.T) {
t.Fatalf("unexpected subscription errors %v, want %v", subErrs, expectedSubErrs)
}
}

func TestResubscribeWithCompletedSubscription(t *testing.T) {
t.Parallel()

innerSubDone := make(chan struct{}, 1)
sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) {
return NewSubscription(func(unsubscribed <-chan struct{}) error {
select {
case <-time.After(2 * time.Second):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird to have a timeout here. You can achieve the necessary synchronization event with another channel, like so

quitInner := make(chan struct{})
...
    select {
    case <-quitInner:
...

close(quitInner)
<-innerSubDone
sub.Unsubscribe()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good suggestion. Made the change.

innerSubDone <- struct{}{}
return nil
case <-unsubscribed:
return nil
}
}), nil
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too familiar with how the semantics of subscriptions work in any depth, but I've been trying to figure out if this is 'correct' or not. The docs for NewSubscription says

NewSubscription runs a producer function as a subscription in a new goroutine. The
channel given to the producer is closed when Unsubscribe is called. If fn returns an
error, it is sent on the subscription's error channel.

In other words, unsubscribed will be closed when externalities wants the producer to stop. In this testcase, however, the producer just stops producing, and exiting without returning any error or closing any channel. And yeah, that intuitively seems like something that could lead to a deadlock.

If the select is changed into

			select {
			case <-time.After(2 * time.Second):
				innerSubDone <- struct{}{}
				return errors.New("time to go")
			case <-unsubscribed:
				return nil
			}

Then the deadlock disappears.

So my thinking goes: this testcase is based on a flawed producer. But also, the NewSubscription documentation should mention something like "the producer must either exit with an error or keep listening to the given channel".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Clarifying the behavior of producers would be a better fix in that case. Will do that instead. Perhaps it'll be good to add defensive checks against nil producer errors by panicking in that case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note, though: I'm no authority here. @fjl would know

Copy link
Contributor Author

@Inphi Inphi Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this documentation already clarifies the intended behavior based on if fn returns an error; implying that producers are permitted to return nil errors. And Subscription explicitly checks for a nil error here. While this behavior doesn't compose well with Resubscriptions, we should maintain the current API contract.


<-innerSubDone
sub.Unsubscribe()
}