diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index 65971ee28..0192b5ee8 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -38,41 +38,46 @@ const ( func NewBlockClient( ctx context.Context, deps depinject.Config, -) (client.BlockClient, error) { - ctx, close := context.WithCancel(ctx) + opts ...client.BlockClientOption, +) (_ client.BlockClient, err error) { + ctx, cancel := context.WithCancel(ctx) + + // latestBlockPublishCh is the channel that notifies the latestBlockReplayObs of a + // new block, whether it comes from a direct query or an event subscription query. + latestBlockReplayObs, latestBlockPublishCh := channel.NewReplayObservable[client.Block](ctx, 10) + bClient := &blockReplayClient{ + latestBlockReplayObs: latestBlockReplayObs, + close: cancel, + } + + for _, opt := range opts { + opt(bClient) + } - eventsReplayClient, err := events.NewEventsReplayClient[client.Block]( + bClient.eventsReplayClient, err = events.NewEventsReplayClient[client.Block]( ctx, deps, committedBlocksQuery, UnmarshalNewBlock, defaultBlocksReplayLimit, + events.WithConnRetryLimit[client.Block](bClient.connRetryLimit), ) if err != nil { - close() + cancel() return nil, err } - // latestBlockPublishCh is the channel that notifies the latestBlockReplayObs of a - // new block, whether it comes from a direct query or an event subscription query. - latestBlockReplayObs, latestBlockPublishCh := channel.NewReplayObservable[client.Block](ctx, 10) - blockReplayClient := &blockReplayClient{ - eventsReplayClient: eventsReplayClient, - latestBlockReplayObs: latestBlockReplayObs, - close: close, - } - - if err := depinject.Inject(deps, &blockReplayClient.onStartQueryClient); err != nil { + if err := depinject.Inject(deps, &bClient.onStartQueryClient); err != nil { return nil, err } - blockReplayClient.asyncForwardBlockEvent(ctx, latestBlockPublishCh) + bClient.asyncForwardBlockEvent(ctx, latestBlockPublishCh) - if err := blockReplayClient.getInitialBlock(ctx, latestBlockPublishCh); err != nil { + if err := bClient.getInitialBlock(ctx, latestBlockPublishCh); err != nil { return nil, err } - return blockReplayClient, nil + return bClient, nil } // blockReplayClient is BlockClient implementation that combines a CometRPC client @@ -99,6 +104,11 @@ type blockReplayClient struct { // close is a function that cancels the context of the blockReplayClient. close context.CancelFunc + + // connRetryLimit is the number of times the underlying replay client + // should retry in the event that it encounters an error or its connection is interrupted. + // If connRetryLimit is < 0, it will retry indefinitely. + connRetryLimit int } // CommittedBlocksSequence returns a replay observable of new block events. diff --git a/pkg/client/block/client_test.go b/pkg/client/block/client_test.go index cd860d6c6..0817eb905 100644 --- a/pkg/client/block/client_test.go +++ b/pkg/client/block/client_test.go @@ -81,7 +81,8 @@ func TestBlockClient(t *testing.T) { Hash: expectedHash, }, }, nil - }) + }). + AnyTimes() deps := depinject.Supply(eventsQueryClient, cometClientMock) diff --git a/pkg/client/block/options.go b/pkg/client/block/options.go new file mode 100644 index 000000000..83a52357b --- /dev/null +++ b/pkg/client/block/options.go @@ -0,0 +1,13 @@ +package block + +import "github.com/pokt-network/poktroll/pkg/client" + +// WithConnRetryLimit returns an option function which sets the number +// of times the underlying replay client should retry in the event that it encounters +// an error or its connection is interrupted. +// If connRetryLimit is < 0, it will retry indefinitely. +func WithConnRetryLimit(limit int) client.BlockClientOption { + return func(client client.BlockClient) { + client.(*blockReplayClient).connRetryLimit = limit + } +} diff --git a/pkg/client/delegation/client.go b/pkg/client/delegation/client.go index 013b094df..00f45e2fa 100644 --- a/pkg/client/delegation/client.go +++ b/pkg/client/delegation/client.go @@ -46,18 +46,27 @@ const ( func NewDelegationClient( ctx context.Context, deps depinject.Config, -) (client.DelegationClient, error) { - client, err := events.NewEventsReplayClient[client.Redelegation]( + opts ...client.DelegationClientOption, +) (_ client.DelegationClient, err error) { + dClient := &delegationClient{} + + for _, opt := range opts { + opt(dClient) + } + + dClient.eventsReplayClient, err = events.NewEventsReplayClient[client.Redelegation]( ctx, deps, delegationEventQuery, newRedelegationEventFactoryFn(), defaultRedelegationsReplayLimit, + events.WithConnRetryLimit[client.Redelegation](dClient.connRetryLimit), ) if err != nil { return nil, err } - return &delegationClient{eventsReplayClient: client}, nil + + return dClient, nil } // delegationClient is a wrapper around an EventsReplayClient that implements @@ -69,6 +78,11 @@ type delegationClient struct { // These enable the EventsReplayClient to correctly map the raw event bytes // to Redelegation objects and to correctly return a RedelegationReplayObservable eventsReplayClient client.EventsReplayClient[client.Redelegation] + + // connRetryLimit is the number of times the underlying replay client + // should retry in the event that it encounters an error or its connection is interrupted. + // If connRetryLimit is < 0, it will retry indefinitely. + connRetryLimit int } // RedelegationsSequence returns a replay observable of Redelgation events diff --git a/pkg/client/delegation/options.go b/pkg/client/delegation/options.go new file mode 100644 index 000000000..b618a0b3b --- /dev/null +++ b/pkg/client/delegation/options.go @@ -0,0 +1,13 @@ +package delegation + +import "github.com/pokt-network/poktroll/pkg/client" + +// WithConnRetryLimit returns an option function which sets the number +// of times the underlying replay client should retry in the event that it encounters +// an error or its connection is interrupted. +// If connRetryLimit is < 0, it will retry indefinitely. +func WithConnRetryLimit(limit int) client.DelegationClientOption { + return func(client client.DelegationClient) { + client.(*delegationClient).connRetryLimit = limit + } +} diff --git a/pkg/client/events/options.go b/pkg/client/events/options.go index 41fca7548..b3a959fb0 100644 --- a/pkg/client/events/options.go +++ b/pkg/client/events/options.go @@ -9,3 +9,18 @@ func WithDialer(dialer client.Dialer) client.EventsQueryClientOption { evtClient.(*eventsQueryClient).dialer = dialer } } + +// WithConnRetryLimit returns an option function which sets the number +// of times the replay client should retry in the event that it encounters +// an error or its connection is interrupted. +// If connRetryLimit is < 0, it will retry indefinitely. +func WithConnRetryLimit[T any](limit int) client.EventsReplayClientOption[T] { + return func(client client.EventsReplayClient[T]) { + // Ignore the zero value because limit may be provided via a partially + // configured config struct (i.e. no retry limit set). + // The default will be used instead. + if limit != 0 { + client.(*replayClient[T]).connRetryLimit = limit + } + } +} diff --git a/pkg/client/events/replay_client.go b/pkg/client/events/replay_client.go index abf8ec9fc..73db3acd1 100644 --- a/pkg/client/events/replay_client.go +++ b/pkg/client/events/replay_client.go @@ -15,12 +15,21 @@ import ( ) const ( + // DefaultConnRetryLimit is used to indicate how many times the + // underlying replay client should attempt to retry if it encounters an error + // or its connection is interrupted. + // + // TODO_IMPROVE: this should be configurable but can be overridden at compile-time: + // go build -ldflags "-X github.com/pokt-network/poktroll/DefaultConnRetryLimit=value". + DefaultConnRetryLimit = 10 + // eventsBytesRetryDelay is the delay between retry attempts when the events // bytes observable returns an error. eventsBytesRetryDelay = time.Second // eventsBytesRetryLimit is the maximum number of times to attempt to // re-establish the events query bytes subscription when the events bytes // observable returns an error or closes. + // TODO_TECHDEBT: to make this a customizable parameter in the appgateserver and relayminer config files. eventsBytesRetryLimit = 10 eventsBytesRetryResetTimeout = 10 * time.Second // replayObsCacheBufferSize is the replay buffer size of the @@ -81,6 +90,10 @@ type replayClient[T any] struct { // replayClientCancelCtx is the function to cancel the context of the replay client. // It is called when the replay client is closed. replayClientCancelCtx func() + // connRetryLimit is the number of times the replay client should retry + // in the event that it encounters an error or its connection is interrupted. + // If connRetryLimit is < 0, it will retry indefinitely. + connRetryLimit int } // NewEventsReplayClient creates a new EventsReplayClient from the given @@ -98,6 +111,7 @@ func NewEventsReplayClient[T any]( queryString string, newEventFn NewEventsFn[T], replayObsBufferSize int, + opts ...client.EventsReplayClientOption[T], ) (client.EventsReplayClient[T], error) { ctx, cancel := context.WithCancel(ctx) @@ -107,7 +121,13 @@ func NewEventsReplayClient[T any]( eventDecoder: newEventFn, replayObsBufferSize: replayObsBufferSize, replayClientCancelCtx: cancel, + connRetryLimit: DefaultConnRetryLimit, + } + + for _, opt := range opts { + opt(rClient) } + // TODO_REFACTOR(@h5law): Look into making this a regular observable as // we may no longer depend on it being replayable. replayObsCache, replayObsCachePublishCh := channel.NewReplayObservable[observable.ReplayObservable[T]]( @@ -189,26 +209,26 @@ func (rClient *replayClient[T]) Close() { // goPublishEvents runs the work function returned by retryPublishEventsFactory, // re-invoking it according to the arguments to retry.OnError when the events bytes // observable returns an asynchronous error. -// This function is intended to be called in a goroutine. func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) { // React to errors by getting a new events bytes observable, re-mapping it, // and send it to replayObsCachePublishCh such that // replayObsCache.Last(ctx, 1) will return it. - publishError := retry.OnError( + publishErr := retry.OnError( ctx, - eventsBytesRetryLimit, + rClient.connRetryLimit, eventsBytesRetryDelay, eventsBytesRetryResetTimeout, "goPublishEvents", rClient.retryPublishEventsFactory(ctx), ) - // If we get here, the retry limit was reached and the retry loop exited. // Since this function runs in a goroutine, we can't return the error to the // caller. Instead, we panic. - if publishError != nil { - panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishError)) + if publishErr != nil { + panic(fmt.Errorf("EventsReplayClient[%T].goPublishEvents should never reach this spot: %w", *new(T), publishErr)) } + + return } // retryPublishEventsFactory returns a function which is intended to be passed @@ -217,20 +237,24 @@ func (rClient *replayClient[T]) goPublishEvents(ctx context.Context) { // replayObsCache replay observable. func (rClient *replayClient[T]) retryPublishEventsFactory(ctx context.Context) func() chan error { return func() chan error { + eventsBzCtx, cancelEventsBzObs := context.WithCancel(ctx) errCh := make(chan error, 1) - eventsBytesObs, err := rClient.eventsClient.EventsBytes(ctx, rClient.queryString) + + eventsBytesObs, err := rClient.eventsClient.EventsBytes(eventsBzCtx, rClient.queryString) if err != nil { + // No need to cancel eventsBytesObs in the case of a synchronous error. errCh <- err return errCh } // NB: must cast back to generic observable type to use with Map. eventsBzObs := observable.Observable[either.Either[[]byte]](eventsBytesObs) + typedObs := channel.MapReplay( - ctx, + eventsBzCtx, replayObsCacheBufferSize, eventsBzObs, - rClient.newMapEventsBytesToTFn(errCh), + rClient.newMapEventsBytesToTFn(errCh, cancelEventsBzObs), ) // Subscribe to the eventBzObs and block until the channel closes. @@ -269,12 +293,12 @@ func (rClient *replayClient[T]) retryPublishEventsFactory(ctx context.Context) f // If deserialisation failed because the event bytes were for a different event // type, this value is also skipped. If deserialisation failed for some other // reason, this function panics. -func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error) func( - context.Context, - either.Bytes, -) (T, bool) { +func (rClient *replayClient[T]) newMapEventsBytesToTFn( + errCh chan<- error, + cancelEventsBzObs context.CancelFunc, +) func(context.Context, either.Bytes) (T, bool) { return func( - _ context.Context, + ctx context.Context, eitherEventBz either.Bytes, ) (_ T, skip bool) { eventBz, err := eitherEventBz.ValueOrError() @@ -296,10 +320,15 @@ func (rClient *replayClient[T]) newMapEventsBytesToTFn(errCh chan<- error) func( return *new(T), true } - panic(fmt.Sprintf( - "unexpected error deserialising event: %v; eventBz: %s", - err, string(eventBz), - )) + // Don't publish (skip) if there was some other kind of error, + // and send that error on the errCh. + errCh <- err + + // The source observable may not necessarily close automatically in this case, + // cancel its context to ensure its closure and prevent a memory/goroutine leak. + cancelEventsBzObs() + + return *new(T), true } return event, false } diff --git a/pkg/client/interface.go b/pkg/client/interface.go index b9515ecd8..3649cd995 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -229,6 +229,15 @@ type TxClientOption func(TxClient) // SupplierClientOption defines a function type that modifies the SupplierClient. type SupplierClientOption func(SupplierClient) +// DelegationClientOption defines a function type that modifies the DelegationClient. +type DelegationClientOption func(DelegationClient) + +// BlockClientOption defines a function type that modifies the BlockClient. +type BlockClientOption func(BlockClient) + +// EventsReplayClientOption defines a function type that modifies the ReplayClient. +type EventsReplayClientOption[T any] func(EventsReplayClient[T]) + // AccountQueryClient defines an interface that enables the querying of the // on-chain account information type AccountQueryClient interface { diff --git a/pkg/client/tx/client.go b/pkg/client/tx/client.go index 12af116ef..430f71aaf 100644 --- a/pkg/client/tx/client.go +++ b/pkg/client/tx/client.go @@ -100,6 +100,11 @@ type txClient struct { // is used to ensure that transactions error channels receive and close in the event // that they have not already by the given timeout height. txTimeoutPool txTimeoutPool + + // connRetryLimit is the number of times the underlying replay client + // should retry in the event that it encounters an error or its connection is interrupted. + // If connRetryLimit is < 0, it will retry indefinitely. + connRetryLimit int } type ( @@ -167,6 +172,7 @@ func NewTxClient( eventQuery, UnmarshalTxResult, defaultTxReplayLimit, + events.WithConnRetryLimit[*abci.TxResult](txnClient.connRetryLimit), ) if err != nil { return nil, err diff --git a/pkg/client/tx/options.go b/pkg/client/tx/options.go index 5364008a5..46e3dbcd2 100644 --- a/pkg/client/tx/options.go +++ b/pkg/client/tx/options.go @@ -18,3 +18,13 @@ func WithSigningKeyName(keyName string) client.TxClientOption { client.(*txClient).signingKeyName = keyName } } + +// WithConnRetryLimit returns an option function which sets the number +// of times the underlying replay client should retry in the event that it encounters +// an error or its connection is interrupted. +// If connRetryLimit is < 0, it will retry indefinitely. +func WithConnRetryLimit(limit int) client.TxClientOption { + return func(client client.TxClient) { + client.(*txClient).connRetryLimit = limit + } +} diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index d356ee44b..277b9721d 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -5,6 +5,7 @@ import ( "sync" "cosmossdk.io/depinject" + cosmosclient "github.com/cosmos/cosmos-sdk/client" "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/observable" @@ -43,6 +44,9 @@ type relayerSessionsManager struct { // supplierClient is used to create claims and submit proofs for sessions. supplierClient client.SupplierClient + // blockQueryClient is the CometBFT RPC client used to query blocks + blockQueryClient cosmosclient.CometRPC + // storesDirectory points to a path on disk where KVStore data files are created. storesDirectory string } @@ -59,7 +63,7 @@ func NewRelayerSessions( ctx context.Context, deps depinject.Config, opts ...relayer.RelayerSessionsManagerOption, -) (relayer.RelayerSessionsManager, error) { +) (_ relayer.RelayerSessionsManager, err error) { rs := &relayerSessionsManager{ logger: polylog.Ctx(ctx), sessionsTrees: make(sessionsTreesMap), @@ -69,6 +73,7 @@ func NewRelayerSessions( if err := depinject.Inject( deps, &rs.blockClient, + &rs.blockQueryClient, &rs.supplierClient, ); err != nil { return nil, err diff --git a/pkg/relayer/session/session_test.go b/pkg/relayer/session/session_test.go index 642456ada..5b8e13ff9 100644 --- a/pkg/relayer/session/session_test.go +++ b/pkg/relayer/session/session_test.go @@ -7,6 +7,7 @@ import ( "time" "cosmossdk.io/depinject" + "github.com/golang/mock/gomock" "github.com/pokt-network/smt" "github.com/stretchr/testify/require" @@ -15,6 +16,7 @@ import ( "github.com/pokt-network/poktroll/pkg/polylog/polyzero" "github.com/pokt-network/poktroll/pkg/relayer" "github.com/pokt-network/poktroll/pkg/relayer/session" + "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/testutil/testclient/testblock" "github.com/pokt-network/poktroll/testutil/testclient/testsupplier" "github.com/pokt-network/poktroll/testutil/testpolylog" @@ -40,7 +42,14 @@ func TestRelayerSessionsManager_Start(t *testing.T) { blockClient := testblock.NewAnyTimesCommittedBlocksSequenceBlockClient(t, emptyBlockHash, blocksObs) supplierClient := testsupplier.NewOneTimeClaimProofSupplierClient(ctx, t) - deps := depinject.Supply(blockClient, supplierClient) + ctrl := gomock.NewController(t) + blockQueryClientMock := mockclient.NewMockCometRPC(ctrl) + blockQueryClientMock.EXPECT(). + Block(gomock.Any(), gomock.AssignableToTypeOf((*int64)(nil))). + Return(nil, nil). + AnyTimes() + + deps := depinject.Supply(blockClient, blockQueryClientMock, supplierClient) storesDirectoryOpt := testrelayer.WithTempStoresDirectory(t) // Create a new relayer sessions manager. diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index bd06af941..3f8f244aa 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -58,7 +58,9 @@ func OnError( return nil } - if retryCount >= retryLimit { + // Return error if retry limit reached + // A negative retryLimit allows limitless retries + if retryLimit >= 0 && retryCount >= retryLimit { return err } diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go index c9dab961c..a99761a68 100644 --- a/pkg/retry/retry_test.go +++ b/pkg/retry/retry_test.go @@ -11,7 +11,6 @@ import ( "bytes" "context" "fmt" - "log" "strings" "sync/atomic" "testing" @@ -19,6 +18,8 @@ import ( "github.com/stretchr/testify/require" + "github.com/pokt-network/poktroll/pkg/polylog/polyzero" + _ "github.com/pokt-network/poktroll/pkg/polylog/polyzero" "github.com/pokt-network/poktroll/pkg/retry" ) @@ -33,7 +34,7 @@ func TestOnError(t *testing.T) { // Setting up the test variables. var ( // logOutput captures the log output for verification of logged messages. - logOutput bytes.Buffer + logOutput = new(bytes.Buffer) // expectedRetryDelay is the duration we expect between retries. expectedRetryDelay = time.Millisecond // expectedRetryLimit is the maximum number of retries the test expects. @@ -48,8 +49,10 @@ func TestOnError(t *testing.T) { ctx = context.Background() ) - // Redirect the standard logger's output to our custom buffer for later verification. - log.SetOutput(&logOutput) + // Redirect the log output for verification later + logOpt := polyzero.WithOutput(logOutput) + // Construct a new polylog logger & attach it to the context. + ctx = polyzero.NewLogger(logOpt).WithContext(ctx) // Define testFn, a function that simulates a failing operation and logs its invocation times. testFn := func() chan error { @@ -118,7 +121,7 @@ func TestOnError(t *testing.T) { } // Verify the error messages logged during the retries. - expectedErrLine := "ERROR: retrying TestOnError after error: test error" + expectedErrLine := `"error":"test error"` trimmedLogOutput := strings.Trim(logOutput.String(), "\n") logOutputLines := strings.Split(trimmedLogOutput, "\n") require.Lenf(t, logOutputLines, expectedRetryLimit, "unexpected number of log lines") @@ -137,7 +140,7 @@ func TestOnError_ExitsWhenErrChCloses(t *testing.T) { // Setup test variables and log capture var ( - logOutput bytes.Buffer + logOutput = new(bytes.Buffer) testFnCallCount int32 expectedRetryDelay = time.Millisecond expectedRetryLimit = 3 @@ -148,7 +151,9 @@ func TestOnError_ExitsWhenErrChCloses(t *testing.T) { ) // Redirect the log output for verification later - log.SetOutput(&logOutput) + logOpt := polyzero.WithOutput(logOutput) + // Construct a new polylog logger & attach it to the context. + ctx = polyzero.NewLogger(logOpt).WithContext(ctx) // Define the test function that simulates an error and counts its invocations testFn := func() chan error { @@ -216,8 +221,8 @@ func TestOnError_ExitsWhenErrChCloses(t *testing.T) { logOutputLines = strings.Split(strings.Trim(logOutput.String(), "\n"), "\n") errorLines = logOutputLines[:len(logOutputLines)-1] warnLine = logOutputLines[len(logOutputLines)-1] - expectedWarnMsg = "WARN: error channel for TestOnError_ExitsWhenErrChCloses closed, will no longer retry on error" - expectedErrMsg = "ERROR: retrying TestOnError_ExitsWhenErrChCloses after error: test error" + expectedWarnMsg = "error channel closed, will no longer retry on error" + expectedErrMsg = `"error":"test error"` ) require.Lenf( @@ -238,7 +243,7 @@ func TestOnError_RetryCountResetTimeout(t *testing.T) { // Setup test variables and log capture var ( - logOutput bytes.Buffer + logOutput = new(bytes.Buffer) testFnCallCount int32 expectedRetryDelay = time.Millisecond expectedRetryLimit = 9 @@ -249,7 +254,9 @@ func TestOnError_RetryCountResetTimeout(t *testing.T) { ) // Redirect the log output for verification later - log.SetOutput(&logOutput) + logOpt := polyzero.WithOutput(logOutput) + // Construct a new polylog logger & attach it to the context. + ctx = polyzero.NewLogger(logOpt).WithContext(ctx) // Define the test function that simulates an error and counts its invocations testFn := func() chan error { @@ -315,7 +322,7 @@ func TestOnError_RetryCountResetTimeout(t *testing.T) { // Verify the logged error messages var ( logOutputLines = strings.Split(strings.Trim(logOutput.String(), "\n"), "\n") - expectedPrefix = "ERROR: retrying TestOnError after error: test error" + expectedErrMsg = `"error":"test error"` ) select { @@ -332,6 +339,103 @@ func TestOnError_RetryCountResetTimeout(t *testing.T) { expectedRetryLimit-1, len(logOutputLines), ) for _, line := range logOutputLines { - require.Contains(t, line, expectedPrefix) + require.Contains(t, line, expectedErrMsg) + } +} + +// assert that a negative retry limit continually calls workFn +func TestOnError_NegativeRetryLimit(t *testing.T) { + t.Skip("TODO_TECHDEBT(@bryanchriswhite): this test should pass but contains a race condition around the logOutput buffer") + + // Setup test variables and log capture + var ( + logOutput = new(bytes.Buffer) + testFnCallCount int32 + minimumCallCount = 90 + expectedRetryDelay = time.Millisecond + retryLimit = -1 + retryResetTimeout = 3 * time.Millisecond + testFnCallTimeCh = make(chan time.Time, minimumCallCount) + ctx = context.Background() + ) + + // Redirect the log output for verification later + logOpt := polyzero.WithOutput(logOutput) + // Construct a new polylog logger & attach it to the context. + ctx = polyzero.NewLogger(logOpt).WithContext(ctx) + + // Define the test function that simulates an error and counts its invocations + testFn := func() chan error { + // Track the invocation time + testFnCallTimeCh <- time.Now() + + errCh := make(chan error, 1) + + // Increment the invocation count atomically + count := atomic.AddInt32(&testFnCallCount, 1) - 1 + if count == int32(retryLimit) { + go func() { + time.Sleep(retryResetTimeout) + errCh <- testErr + }() + } else { + errCh <- testErr + } + return errCh + } + + retryOnErrorErrCh := make(chan error, 1) + // Spawn a goroutine to test the OnError function + go func() { + retryOnErrorErrCh <- retry.OnError( + ctx, + retryLimit, + expectedRetryDelay, + retryResetTimeout, + "TestOnError", + testFn, + ) + }() + + // Wait for the OnError function to execute and retry the expected number of times + totalExpectedDelay := expectedRetryDelay * time.Duration(minimumCallCount) + time.Sleep(totalExpectedDelay + 100*time.Millisecond) + + // Assert that the test function was called the expected number of times + require.GreaterOrEqual(t, minimumCallCount, int(testFnCallCount)) + + // Assert that the retry delay between function calls matches the expected delay + var prevCallTime = new(time.Time) + for i := 0; i < minimumCallCount; i++ { + select { + case nextCallTime := <-testFnCallTimeCh: + if i != 0 { + actualRetryDelay := nextCallTime.Sub(*prevCallTime) + require.GreaterOrEqual(t, actualRetryDelay, expectedRetryDelay) + } + + *prevCallTime = nextCallTime + default: + t.Fatalf( + "expected %d calls to testFn, but only received %d", + minimumCallCount, i+1, + ) + } + } + + // Verify the logged error messages + var ( + logOutputLines = strings.Split(strings.Trim(logOutput.String(), "\n"), "\n") + expectedErrMsg = `"error":"test error"` + ) + + require.Lenf( + t, logOutputLines, + minimumCallCount-1, + "expected %d log lines, got %d", + minimumCallCount-1, len(logOutputLines), + ) + for _, line := range logOutputLines { + require.Contains(t, line, expectedErrMsg) } } diff --git a/pkg/sdk/deps_builder.go b/pkg/sdk/deps_builder.go index 351ea962b..f3f5bf35f 100644 --- a/pkg/sdk/deps_builder.go +++ b/pkg/sdk/deps_builder.go @@ -46,8 +46,10 @@ func (sdk *poktrollSDK) buildDeps( eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketURL) deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) + blockClientConnRetryLimitOpt := block.WithConnRetryLimit(sdk.config.ConnRetryLimit) + // Create and supply the block client that depends on the events query client - blockClient, err := block.NewBlockClient(ctx, deps) + blockClient, err := block.NewBlockClient(ctx, deps, blockClientConnRetryLimitOpt) if err != nil { return nil, err } @@ -91,8 +93,10 @@ func (sdk *poktrollSDK) buildDeps( } deps = depinject.Configs(deps, depinject.Supply(sessionQuerier)) + delegationClientConnRetryLimitOpt := delegation.WithConnRetryLimit(sdk.config.ConnRetryLimit) + // Create and supply the delegation client - delegationClient, err := delegation.NewDelegationClient(ctx, deps) + delegationClient, err := delegation.NewDelegationClient(ctx, deps, delegationClientConnRetryLimitOpt) if err != nil { return nil, err } diff --git a/pkg/sdk/sdk.go b/pkg/sdk/sdk.go index 5cc4d1458..790173abb 100644 --- a/pkg/sdk/sdk.go +++ b/pkg/sdk/sdk.go @@ -26,6 +26,7 @@ var _ POKTRollSDK = (*poktrollSDK)(nil) type POKTRollSDKConfig struct { QueryNodeGRPCUrl *url.URL QueryNodeUrl *url.URL + ConnRetryLimit int PrivateKey cryptotypes.PrivKey Deps depinject.Config }