From 16ff25a85d4f0e9635ac28920d4fb084c1fbbcfc Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Tue, 28 Apr 2020 18:08:10 -0700 Subject: [PATCH 1/2] First pass at sophistacted inactive reconciliation --- internal/reconciler/stateful_reconciler.go | 69 ++++++++++++++++------ 1 file changed, 52 insertions(+), 17 deletions(-) diff --git a/internal/reconciler/stateful_reconciler.go b/internal/reconciler/stateful_reconciler.go index d120ac43..7bde38c3 100644 --- a/internal/reconciler/stateful_reconciler.go +++ b/internal/reconciler/stateful_reconciler.go @@ -19,7 +19,7 @@ import ( "errors" "fmt" "log" - "math/rand" + "sync" "time" "github.com/coinbase/rosetta-cli/internal/logger" @@ -66,6 +66,12 @@ const ( // inactiveReconciliationSleep is used as the time.Duration // to sleep when there are no seen accounts to reconcile. inactiveReconciliationSleep = 5 * time.Second + + // inactiveReconciliationRequiredDepth is the minimum + // number of blocks the reconciler should wait between + // inactive reconciliations. + // TODO: make configurable + inactiveReconciliationRequiredDepth = 500 ) var ( @@ -106,7 +112,12 @@ type StatefulReconciler struct { // seenAccts are stored for inactive account // reconciliation. - seenAccts []*AccountCurrency + seenAccts []*AccountCurrency + inactiveQueue []*storage.BalanceChange + + // inactiveQueueMutex needed because we can't peek at the tip + // of a channel to determine when it is ready to look at. + inactiveQueueMutex sync.Mutex } // NewStateful creates a new StatefulReconciler. @@ -130,6 +141,7 @@ func NewStateful( changeQueue: make(chan *storage.BalanceChange, backlogThreshold), highWaterMark: -1, seenAccts: make([]*AccountCurrency, 0), + inactiveQueue: make([]*storage.BalanceChange, 0), } } @@ -152,7 +164,7 @@ func (r *StatefulReconciler) QueueChanges( select { case r.changeQueue <- change: default: - log.Printf("skipping enqueue because backlog\n") + log.Println("skipping active enqueue because backlog") } } @@ -349,10 +361,7 @@ func (r *StatefulReconciler) accountReconciliation( return nil } - if !inactive && !ContainsAccountCurrency(r.seenAccts, accountCurrency) { - r.seenAccts = append(r.seenAccts, accountCurrency) - } - + r.inactiveAccountQueue(inactive, accountCurrency, liveBlock) return r.logger.ReconcileSuccessStream( ctx, reconciliationType, @@ -368,6 +377,29 @@ func (r *StatefulReconciler) accountReconciliation( return nil } +func (r *StatefulReconciler) inactiveAccountQueue( + inactive bool, + accountCurrency *AccountCurrency, + liveBlock *types.BlockIdentifier, +) { + // Only enqueue the first time we see an account on an active reconciliation. + shouldEnqueueInactive := false + if !inactive && !ContainsAccountCurrency(r.seenAccts, accountCurrency) { + r.seenAccts = append(r.seenAccts, accountCurrency) + shouldEnqueueInactive = true + } + + if inactive || shouldEnqueueInactive { + r.inactiveQueueMutex.Lock() + r.inactiveQueue = append(r.inactiveQueue, &storage.BalanceChange{ + Account: accountCurrency.Account, + Currency: accountCurrency.Currency, + Block: liveBlock, + }) + r.inactiveQueueMutex.Unlock() + } +} + // simpleAccountCurrency returns a string that is a simple // representation of an AccountCurrency struct. func simpleAccountCurrency( @@ -432,18 +464,20 @@ func (r *StatefulReconciler) reconcileActiveAccounts( func (r *StatefulReconciler) reconcileInactiveAccounts( ctx context.Context, ) error { - randSource := rand.NewSource(time.Now().UnixNano()) - randGenerator := rand.New(randSource) for ctx.Err() == nil { - if len(r.seenAccts) > 0 { - randAcct := r.seenAccts[randGenerator.Intn(len(r.seenAccts))] + txn := r.storage.NewDatabaseTransaction(ctx, false) + head, err := r.storage.GetHeadBlockIdentifier(ctx, txn) + if err != nil { + return err + } + txn.Discard(ctx) - txn := r.storage.NewDatabaseTransaction(ctx, false) - head, err := r.storage.GetHeadBlockIdentifier(ctx, txn) - if err != nil { - return err - } - txn.Discard(ctx) + r.inactiveQueueMutex.Lock() + if len(r.inactiveQueue) > 0 && + r.inactiveQueue[0].Block.Index+inactiveReconciliationRequiredDepth < head.Index { + randAcct := r.inactiveQueue[0] + r.inactiveQueue = r.inactiveQueue[1:] + r.inactiveQueueMutex.Unlock() block, amount, err := r.bestBalance( ctx, @@ -467,6 +501,7 @@ func (r *StatefulReconciler) reconcileInactiveAccounts( return err } } else { + r.inactiveQueueMutex.Unlock() time.Sleep(inactiveReconciliationSleep) } } From baa79071415bb8a5897eaefa0bd3d733ad05c12e Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Thu, 30 Apr 2020 13:15:54 -0700 Subject: [PATCH 2/2] Extend retry time --- cmd/check_account.go | 1 + cmd/check_complete.go | 1 + cmd/check_quick.go | 1 + cmd/root.go | 11 +++++++++++ 4 files changed, 14 insertions(+) diff --git a/cmd/check_account.go b/cmd/check_account.go index eebce43b..9dfdb8fb 100644 --- a/cmd/check_account.go +++ b/cmd/check_account.go @@ -78,6 +78,7 @@ func runCheckAccountCmd(cmd *cobra.Command, args []string) { ServerURL, fetcher.WithBlockConcurrency(BlockConcurrency), fetcher.WithTransactionConcurrency(TransactionConcurrency), + fetcher.WithRetryElapsedTime(ExtendedRetryElapsedTime), ) primaryNetwork, _, err := fetcher.InitializeAsserter(ctx) diff --git a/cmd/check_complete.go b/cmd/check_complete.go index 32f91215..f8a05cfe 100644 --- a/cmd/check_complete.go +++ b/cmd/check_complete.go @@ -86,6 +86,7 @@ func runCheckCompleteCmd(cmd *cobra.Command, args []string) { ServerURL, fetcher.WithBlockConcurrency(BlockConcurrency), fetcher.WithTransactionConcurrency(TransactionConcurrency), + fetcher.WithRetryElapsedTime(ExtendedRetryElapsedTime), ) // TODO: sync and reconcile on subnetworks, if they exist. diff --git a/cmd/check_quick.go b/cmd/check_quick.go index e6c5f918..365dde60 100644 --- a/cmd/check_quick.go +++ b/cmd/check_quick.go @@ -65,6 +65,7 @@ func runCheckQuickCmd(cmd *cobra.Command, args []string) { ServerURL, fetcher.WithBlockConcurrency(BlockConcurrency), fetcher.WithTransactionConcurrency(TransactionConcurrency), + fetcher.WithRetryElapsedTime(ExtendedRetryElapsedTime), ) primaryNetwork, _, err := fetcher.InitializeAsserter(ctx) diff --git a/cmd/root.go b/cmd/root.go index f0f49f06..6ea99150 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -19,12 +19,23 @@ import ( "io/ioutil" "log" "path" + "time" "github.com/coinbase/rosetta-cli/internal/reconciler" "github.com/spf13/cobra" ) +const ( + // ExtendedRetryElapsedTime is used to override the default fetcher + // retry elapsed time. In practice, extending the retry elapsed time + // has prevented retry exhaustion errors when many goroutines are + // used to fetch data from the Rosetta server. + // + // TODO: make configurable + ExtendedRetryElapsedTime = 5 * time.Minute +) + var ( rootCmd = &cobra.Command{ Use: "rosetta-cli",