diff --git a/reconciler/configuration.go b/reconciler/configuration.go index bca875ef..0b03c5b7 100644 --- a/reconciler/configuration.go +++ b/reconciler/configuration.go @@ -18,6 +18,7 @@ import ( "fmt" "github.com/coinbase/rosetta-sdk-go/parser" + "github.com/coinbase/rosetta-sdk-go/types" ) // Option is used to overwrite default values in @@ -59,7 +60,7 @@ func WithSeenAccounts(seen []*AccountCurrency) Option { r.inactiveQueue = append(r.inactiveQueue, &InactiveEntry{ Entry: acct, }) - r.seenAccounts = append(r.seenAccounts, acct) + r.seenAccounts[types.Hash(acct)] = struct{}{} } fmt.Printf( @@ -85,3 +86,19 @@ func WithLookupBalanceByBlock(lookup bool) Option { r.lookupBalanceByBlock = lookup } } + +// WithInactiveFrequency is how many blocks the reconciler +// should wait between inactive reconciliations on each account. +func WithInactiveFrequency(blocks int64) Option { + return func(r *Reconciler) { + r.inactiveFrequency = blocks + } +} + +// WithDebugLogging determines if verbose logs should +// be printed. +func WithDebugLogging(debug bool) Option { + return func(r *Reconciler) { + r.debugLogging = debug + } +} diff --git a/reconciler/reconciler.go b/reconciler/reconciler.go index 42ecc0a6..6b194439 100644 --- a/reconciler/reconciler.go +++ b/reconciler/reconciler.go @@ -43,7 +43,7 @@ const ( // that can be enqueued to reconcile before new // requests are dropped. // TODO: Make configurable - backlogThreshold = 1000 + backlogThreshold = 50000 // waitToCheckDiff is the syncing difference (live-head) // to retry instead of exiting. In other words, if the @@ -62,13 +62,12 @@ const ( // inactiveReconciliationSleep is used as the time.Duration // to sleep when there are no seen accounts to reconcile. - inactiveReconciliationSleep = 5 * time.Second + inactiveReconciliationSleep = 1 * time.Second - // inactiveReconciliationRequiredDepth is the minimum + // defaultInactiveFrequency is the minimum // number of blocks the reconciler should wait between - // inactive reconciliations. - // TODO: make configurable - inactiveReconciliationRequiredDepth = 500 + // inactive reconciliations for each account. + defaultInactiveFrequency = 200 // defaultLookupBalanceByBlock is the default setting // for how to perform balance queries. It is preferable @@ -175,6 +174,8 @@ type Reconciler struct { lookupBalanceByBlock bool interestingAccounts []*AccountCurrency changeQueue chan *parser.BalanceChange + inactiveFrequency int64 + debugLogging bool // Reconciler concurrency is separated between // active and inactive concurrency to allow for @@ -198,7 +199,7 @@ type Reconciler struct { // queue. If this is not done, it is possible a goroutine // could be processing an account (not in the queue) when // we do a lookup to determine if we should add to the queue. - seenAccounts []*AccountCurrency + seenAccounts map[string]struct{} inactiveQueue []*InactiveEntry // inactiveQueueMutex needed because we can't peek at the tip @@ -219,10 +220,11 @@ func New( helper: helper, handler: handler, fetcher: fetcher, + inactiveFrequency: defaultInactiveFrequency, activeConcurrency: defaultReconcilerConcurrency, inactiveConcurrency: defaultReconcilerConcurrency, highWaterMark: -1, - seenAccounts: []*AccountCurrency{}, + seenAccounts: map[string]struct{}{}, inactiveQueue: []*InactiveEntry{}, // When lookupBalanceByBlock is enabled, we check @@ -269,23 +271,37 @@ func (r *Reconciler) QueueChanges( }) } - if !r.lookupBalanceByBlock { - // All changes will have the same block. Return - // if we are too far behind to start reconciling. - if block.Index < r.highWaterMark { - return nil + for _, change := range balanceChanges { + // Add all seen accounts to inactive reconciler queue. + // + // Note: accounts are only added if they have not been seen before. + err := r.inactiveAccountQueue(false, &AccountCurrency{ + Account: change.Account, + Currency: change.Currency, + }, block) + if err != nil { + return err } - for _, change := range balanceChanges { + if !r.lookupBalanceByBlock { + // All changes will have the same block. Continue + // if we are too far behind to start reconciling. + // + // Note: we don't return here so that we can ensure + // all seen accounts are added to the inactiveAccountQueue. + if block.Index < r.highWaterMark { + continue + } + select { case r.changeQueue <- change: default: - log.Println("skipping active enqueue because backlog") + if r.debugLogging { + log.Println("skipping active enqueue because backlog") + } } - } - } else { - // Block until all checked for a block or context is Done - for _, change := range balanceChanges { + } else { + // Block until all checked for a block or context is Done select { case r.changeQueue <- change: case <-ctx.Done(): @@ -443,11 +459,13 @@ func (r *Reconciler) accountReconciliation( } // Don't wait to check if we are very far behind - log.Printf( - "Skipping reconciliation for %s: %d blocks behind\n", - types.PrettyPrintStruct(accountCurrency), - diff, - ) + if r.debugLogging { + log.Printf( + "Skipping reconciliation for %s: %d blocks behind\n", + types.PrettyPrintStruct(accountCurrency), + diff, + ) + } // Set a highWaterMark to not accept any new // reconciliation requests unless they happened @@ -486,18 +504,13 @@ func (r *Reconciler) accountReconciliation( liveAmount, liveBlock, ) - if err != nil { + if err != nil { // error only returned if we should exit on failure return err } return nil } - err = r.inactiveAccountQueue(inactive, accountCurrency, liveBlock) - if err != nil { - return err - } - return r.handler.ReconciliationSucceeded( ctx, reconciliationType, @@ -508,6 +521,7 @@ func (r *Reconciler) accountReconciliation( ) } + // We return here if we gave up trying to reconcile an account. return nil } @@ -516,22 +530,24 @@ func (r *Reconciler) inactiveAccountQueue( accountCurrency *AccountCurrency, liveBlock *types.BlockIdentifier, ) error { + r.inactiveQueueMutex.Lock() + // Only enqueue the first time we see an account on an active reconciliation. shouldEnqueueInactive := false if !inactive && !ContainsAccountCurrency(r.seenAccounts, accountCurrency) { - r.seenAccounts = append(r.seenAccounts, accountCurrency) + r.seenAccounts[types.Hash(accountCurrency)] = struct{}{} shouldEnqueueInactive = true } if inactive || shouldEnqueueInactive { - r.inactiveQueueMutex.Lock() r.inactiveQueue = append(r.inactiveQueue, &InactiveEntry{ Entry: accountCurrency, LastCheck: liveBlock, }) - r.inactiveQueueMutex.Unlock() } + r.inactiveQueueMutex.Unlock() + return nil } @@ -577,6 +593,35 @@ func (r *Reconciler) reconcileActiveAccounts( } } +// shouldAttemptInactiveReconciliation returns a boolean indicating whether +// inactive reconciliation should be attempted based on syncing status. +func (r *Reconciler) shouldAttemptInactiveReconciliation( + ctx context.Context, +) (bool, *types.BlockIdentifier) { + head, err := r.helper.CurrentBlock(ctx) + // When first start syncing, this loop may run before the genesis block is synced. + // If this is the case, we should sleep and try again later instead of exiting. + if err != nil { + if r.debugLogging { + log.Println("waiting to start intactive reconciliation until a block is synced...") + } + + return false, nil + } + + if head.Index < r.highWaterMark { + if r.debugLogging { + log.Println( + "waiting to continue intactive reconciliation until reaching high water mark...", + ) + } + + return false, nil + } + + return true, head +} + // reconcileInactiveAccounts selects a random account // from all previously seen accounts and reconciles // the balance. This is useful for detecting balance @@ -589,27 +634,25 @@ func (r *Reconciler) reconcileInactiveAccounts( return ctx.Err() } - head, err := r.helper.CurrentBlock(ctx) - // When first start syncing, this loop may run before the genesis block is synced. - // If this is the case, we should sleep and try again later instead of exiting. - if err != nil { - log.Println("waiting to start intactive reconciliation until a block is synced...") + shouldAttempt, head := r.shouldAttemptInactiveReconciliation(ctx) + if !shouldAttempt { time.Sleep(inactiveReconciliationSleep) continue } r.inactiveQueueMutex.Lock() + nextValidIndex := r.inactiveQueue[0].LastCheck.Index + r.inactiveFrequency if len(r.inactiveQueue) > 0 && (r.inactiveQueue[0].LastCheck == nil || // block is set to nil when loaded from previous run - r.inactiveQueue[0].LastCheck.Index+inactiveReconciliationRequiredDepth < head.Index) { - randAcct := r.inactiveQueue[0] + nextValidIndex <= head.Index) { + nextAcct := r.inactiveQueue[0] r.inactiveQueue = r.inactiveQueue[1:] r.inactiveQueueMutex.Unlock() block, amount, err := r.bestBalance( ctx, - randAcct.Entry.Account, - randAcct.Entry.Currency, + nextAcct.Entry.Account, + nextAcct.Entry.Currency, types.ConstructPartialBlockIdentifier(head), ) if err != nil { @@ -618,8 +661,8 @@ func (r *Reconciler) reconcileInactiveAccounts( err = r.accountReconciliation( ctx, - randAcct.Entry.Account, - randAcct.Entry.Currency, + nextAcct.Entry.Account, + nextAcct.Entry.Currency, amount, block, true, @@ -627,8 +670,23 @@ func (r *Reconciler) reconcileInactiveAccounts( if err != nil { return err } + + // Always re-enqueue accounts after they have been inactively + // reconciled. If we don't re-enqueue, we will never check + // these accounts again. + err = r.inactiveAccountQueue(true, nextAcct.Entry, block) + if err != nil { + return err + } } else { r.inactiveQueueMutex.Unlock() + if r.debugLogging { + log.Printf( + "no accounts ready for inactive reconciliation (%d accounts in queue, will reconcile next account at index %d)\n", + len(r.inactiveQueue), + nextValidIndex, + ) + } time.Sleep(inactiveReconciliationSleep) } } @@ -671,22 +729,20 @@ func ExtractAmount( return b, nil } - return nil, fmt.Errorf("could not extract amount for %+v", currency) + return nil, fmt.Errorf( + "account balance response does could not contain currency %s", + types.PrettyPrintStruct(currency), + ) } // ContainsAccountCurrency returns a boolean indicating if a -// AccountCurrency slice already contains an Account and Currency combination. +// AccountCurrency set already contains an Account and Currency combination. func ContainsAccountCurrency( - arr []*AccountCurrency, + m map[string]struct{}, change *AccountCurrency, ) bool { - for _, a := range arr { - if types.Hash(a) == types.Hash(change) { - return true - } - } - - return false + _, exists := m[types.Hash(change)] + return exists } // GetCurrencyBalance fetches the balance of a *types.AccountIdentifier @@ -711,7 +767,12 @@ func GetCurrencyBalance( liveAmount, err := ExtractAmount(liveBalances, currency) if err != nil { - return nil, "", err + return nil, "", fmt.Errorf( + "%w: could not get %s currency balance for %s", + err, + types.PrettyPrintStruct(currency), + types.PrettyPrintStruct(account), + ) } return liveBlock, liveAmount.Value, nil diff --git a/reconciler/reconciler_test.go b/reconciler/reconciler_test.go index fb41688b..0d9773e5 100644 --- a/reconciler/reconciler_test.go +++ b/reconciler/reconciler_test.go @@ -88,8 +88,8 @@ func TestNewReconciler(t *testing.T) { Entry: accountCurrency, }, } - r.seenAccounts = []*AccountCurrency{ - accountCurrency, + r.seenAccounts = map[string]struct{}{ + types.Hash(accountCurrency): struct{}{}, } return r @@ -113,7 +113,7 @@ func TestNewReconciler(t *testing.T) { t.Run(name, func(t *testing.T) { result := New(nil, nil, nil, nil, test.options...) assert.ElementsMatch(t, test.expected.inactiveQueue, result.inactiveQueue) - assert.ElementsMatch(t, test.expected.seenAccounts, result.seenAccounts) + assert.Equal(t, test.expected.seenAccounts, result.seenAccounts) assert.ElementsMatch(t, test.expected.interestingAccounts, result.interestingAccounts) assert.Equal(t, test.expected.inactiveConcurrency, result.inactiveConcurrency) assert.Equal(t, test.expected.activeConcurrency, result.activeConcurrency) @@ -132,7 +132,7 @@ func TestContainsAccountCurrency(t *testing.T) { Symbol: "Blah2", Decimals: 2, } - accts := []*AccountCurrency{ + acctSlice := []*AccountCurrency{ { Account: &types.AccountIdentifier{ Address: "test", @@ -162,6 +162,11 @@ func TestContainsAccountCurrency(t *testing.T) { }, } + accts := map[string]struct{}{} + for _, acct := range acctSlice { + accts[types.Hash(acct)] = struct{}{} + } + t.Run("Non-existent account", func(t *testing.T) { assert.False(t, ContainsAccountCurrency(accts, &AccountCurrency{ Account: &types.AccountIdentifier{ @@ -268,7 +273,14 @@ func TestExtractAmount(t *testing.T) { t.Run("Non-existent currency", func(t *testing.T) { result, err := ExtractAmount(balances, badCurr) assert.Nil(t, result) - assert.EqualError(t, err, fmt.Errorf("could not extract amount for %+v", badCurr).Error()) + assert.EqualError( + t, + err, + fmt.Errorf( + "account balance response does could not contain currency %s", + types.PrettyPrintStruct(badCurr), + ).Error(), + ) }) t.Run("Simple account", func(t *testing.T) { @@ -489,6 +501,13 @@ func TestCompareBalance(t *testing.T) { }) } +func assertContainsAllAccounts(t *testing.T, m map[string]struct{}, a []*AccountCurrency) { + for _, account := range a { + _, exists := m[types.Hash(account)] + assert.True(t, exists) + } +} + func TestInactiveAccountQueue(t *testing.T) { var ( handler = &MockReconcilerHandler{} @@ -528,7 +547,7 @@ func TestInactiveAccountQueue(t *testing.T) { block, ) assert.Nil(t, err) - assert.ElementsMatch(t, r.seenAccounts, []*AccountCurrency{accountCurrency}) + assertContainsAllAccounts(t, r.seenAccounts, []*AccountCurrency{accountCurrency}) assert.ElementsMatch(t, r.inactiveQueue, []*InactiveEntry{ { Entry: accountCurrency, @@ -544,7 +563,7 @@ func TestInactiveAccountQueue(t *testing.T) { block2, ) assert.Nil(t, err) - assert.ElementsMatch( + assertContainsAllAccounts( t, r.seenAccounts, []*AccountCurrency{accountCurrency, accountCurrency2}, @@ -570,7 +589,7 @@ func TestInactiveAccountQueue(t *testing.T) { block, ) assert.Nil(t, err) - assert.ElementsMatch( + assertContainsAllAccounts( t, r.seenAccounts, []*AccountCurrency{accountCurrency, accountCurrency2}, @@ -585,7 +604,7 @@ func TestInactiveAccountQueue(t *testing.T) { block, ) assert.Nil(t, err) - assert.ElementsMatch( + assertContainsAllAccounts( t, r.seenAccounts, []*AccountCurrency{accountCurrency, accountCurrency2}, @@ -605,7 +624,7 @@ func TestInactiveAccountQueue(t *testing.T) { block2, ) assert.Nil(t, err) - assert.ElementsMatch( + assertContainsAllAccounts( t, r.seenAccounts, []*AccountCurrency{accountCurrency, accountCurrency2},