Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workaround for eventstream disconnects #116

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion clients/consensus/rpc/beaconstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,14 @@ func (bs *BeaconStream) startStream() {
Ready: true,
}
case err := <-stream.Errors:
bs.logger.Warnf("beacon block stream error: %v", err)
if strings.Contains(err.Error(), "INTERNAL_ERROR; received from peer") {
// this seems to be a go bug, silently reconnect to the stream
time.Sleep(10 * time.Millisecond)
stream.RetryNow()
} else {
bs.logger.Warnf("beacon block stream error: %v", err)
}

select {
case bs.ReadyChan <- &BeaconStreamStatus{
Ready: false,
Expand Down
16 changes: 15 additions & 1 deletion clients/consensus/rpc/eventstream/eventstream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package eventstream

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -34,6 +35,8 @@ type Stream struct {
isClosed bool
// isClosedMutex is a mutex protecting concurrent read/write access of isClosed
closeMutex sync.Mutex
// retrySleepCancel is a function that can be called to cancel the retry sleep
retrySleepCancel context.CancelFunc
}

type StreamEvent interface {
Expand Down Expand Up @@ -114,6 +117,13 @@ func (stream *Stream) Close() {
}()
}

// RetryNow will force the stream to reconnect a disconnected stream immediately.
func (stream *Stream) RetryNow() {
if cancelFn := stream.retrySleepCancel; cancelFn != nil {
cancelFn()
}
}

// Go's http package doesn't copy headers across when it encounters
// redirects so we need to do that manually.
func checkRedirect(req *http.Request, via []*http.Request) error {
Expand Down Expand Up @@ -215,7 +225,11 @@ func (stream *Stream) retryRestartStream() {
stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds())
}

time.Sleep(backoff)
ctx, cancel := context.WithTimeout(context.Background(), backoff)
stream.retrySleepCancel = cancel
<-ctx.Done()

stream.retrySleepCancel = nil

if stream.isClosed {
return
Expand Down
Loading