From 4964f1953ed7d0e013e492d488869eb5867e69d6 Mon Sep 17 00:00:00 2001 From: inphi Date: Mon, 16 Oct 2023 23:02:28 -0400 Subject: [PATCH 1/4] event: fix deadlock during Unsubscribe A goroutine is used to manage the lifetime of subscriptions managed by resubscriptions. When the subscription ends with no error, the resub goroutine ends as well. However, the resub goroutine needs to live long enough to read from the unsub channel. Otheriwse, an Unsubscribe call deadlocks when writing to the unsub channel. --- event/subscription.go | 17 ++++++++++++----- event/subscription_test.go | 20 ++++++++++++++++++++ 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/event/subscription.go b/event/subscription.go index 6c62874719f2..1c1626b16555 100644 --- a/event/subscription.go +++ b/event/subscription.go @@ -153,14 +153,21 @@ func (s *resubscribeSub) Err() <-chan error { } func (s *resubscribeSub) loop() { - defer close(s.err) var done bool + var unsubbed bool + defer func() { + close(s.err) + // Read unsub chan to avoid blocking Unsubscribe + if !unsubbed { + <-s.unsub + } + }() for !done { sub := s.subscribe() if sub == nil { break } - done = s.waitForError(sub) + done, unsubbed = s.waitForError(sub) sub.Unsubscribe() } } @@ -197,14 +204,14 @@ func (s *resubscribeSub) subscribe() Subscription { } } -func (s *resubscribeSub) waitForError(sub Subscription) bool { +func (s *resubscribeSub) waitForError(sub Subscription) (bool, bool) { defer sub.Unsubscribe() select { case err := <-sub.Err(): s.lastSubErr = err - return err == nil + return err == nil, false case <-s.unsub: - return true + return true, true } } diff --git a/event/subscription_test.go b/event/subscription_test.go index ba081705c44c..06b4a4941bf9 100644 --- a/event/subscription_test.go +++ b/event/subscription_test.go @@ -154,3 +154,23 @@ func TestResubscribeWithErrorHandler(t *testing.T) { t.Fatalf("unexpected subscription errors %v, want %v", subErrs, expectedSubErrs) } } + +func TestResubscribeWithCompletedSubscription(t *testing.T) { + t.Parallel() + + innerSubDone := make(chan struct{}, 1) + sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) { + return NewSubscription(func(unsubscribed <-chan struct{}) error { + select { + case <-time.After(2 * time.Second): + innerSubDone <- struct{}{} + return nil + case <-unsubscribed: + return nil + } + }), nil + }) + + <-innerSubDone + sub.Unsubscribe() +} From ad2fa66e91f636a980c10231f944b34a35e8ebb9 Mon Sep 17 00:00:00 2001 From: inphi Date: Wed, 18 Oct 2023 15:55:58 -0400 Subject: [PATCH 2/4] make the Resub unsub channel buffered --- event/subscription.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/event/subscription.go b/event/subscription.go index 1c1626b16555..bacfc7766470 100644 --- a/event/subscription.go +++ b/event/subscription.go @@ -120,7 +120,7 @@ func ResubscribeErr(backoffMax time.Duration, fn ResubscribeErrFunc) Subscriptio backoffMax: backoffMax, fn: fn, err: make(chan error), - unsub: make(chan struct{}), + unsub: make(chan struct{}, 1), } go s.loop() return s @@ -154,20 +154,13 @@ func (s *resubscribeSub) Err() <-chan error { func (s *resubscribeSub) loop() { var done bool - var unsubbed bool - defer func() { - close(s.err) - // Read unsub chan to avoid blocking Unsubscribe - if !unsubbed { - <-s.unsub - } - }() + defer close(s.err) for !done { sub := s.subscribe() if sub == nil { break } - done, unsubbed = s.waitForError(sub) + done = s.waitForError(sub) sub.Unsubscribe() } } @@ -204,14 +197,14 @@ func (s *resubscribeSub) subscribe() Subscription { } } -func (s *resubscribeSub) waitForError(sub Subscription) (bool, bool) { +func (s *resubscribeSub) waitForError(sub Subscription) bool { defer sub.Unsubscribe() select { case err := <-sub.Err(): s.lastSubErr = err - return err == nil, false + return err == nil case <-s.unsub: - return true, true + return true } } From 9b3bf9ac3af3a13f2941ee37c3dfd83eeaa7094b Mon Sep 17 00:00:00 2001 From: inphi Date: Wed, 18 Oct 2023 16:02:35 -0400 Subject: [PATCH 3/4] reorder close statement --- event/subscription.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/event/subscription.go b/event/subscription.go index bacfc7766470..07e059c6db30 100644 --- a/event/subscription.go +++ b/event/subscription.go @@ -153,8 +153,8 @@ func (s *resubscribeSub) Err() <-chan error { } func (s *resubscribeSub) loop() { - var done bool defer close(s.err) + var done bool for !done { sub := s.subscribe() if sub == nil { From 6f9a6504e44b1ce1b969daa850fb7bb1d4347959 Mon Sep 17 00:00:00 2001 From: inphi Date: Thu, 19 Oct 2023 12:54:09 -0400 Subject: [PATCH 4/4] replace timers with chan signals --- event/subscription_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/event/subscription_test.go b/event/subscription_test.go index 06b4a4941bf9..743d0bf67de0 100644 --- a/event/subscription_test.go +++ b/event/subscription_test.go @@ -158,12 +158,14 @@ func TestResubscribeWithErrorHandler(t *testing.T) { func TestResubscribeWithCompletedSubscription(t *testing.T) { t.Parallel() - innerSubDone := make(chan struct{}, 1) + quitProducerAck := make(chan struct{}) + quitProducer := make(chan struct{}) + sub := ResubscribeErr(100*time.Millisecond, func(ctx context.Context, lastErr error) (Subscription, error) { return NewSubscription(func(unsubscribed <-chan struct{}) error { select { - case <-time.After(2 * time.Second): - innerSubDone <- struct{}{} + case <-quitProducer: + quitProducerAck <- struct{}{} return nil case <-unsubscribed: return nil @@ -171,6 +173,8 @@ func TestResubscribeWithCompletedSubscription(t *testing.T) { }), nil }) - <-innerSubDone + // Ensure producer has started and exited before Unsubscribe + close(quitProducer) + <-quitProducerAck sub.Unsubscribe() }