Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Inactive Reconciler Performance when --lookup-balance-by-block=false #66

Merged
merged 9 commits into from
Jul 15, 2020
19 changes: 18 additions & 1 deletion reconciler/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/coinbase/rosetta-sdk-go/parser"
"github.com/coinbase/rosetta-sdk-go/types"
)

// Option is used to overwrite default values in
Expand Down Expand Up @@ -59,7 +60,7 @@ func WithSeenAccounts(seen []*AccountCurrency) Option {
r.inactiveQueue = append(r.inactiveQueue, &InactiveEntry{
Entry: acct,
})
r.seenAccounts = append(r.seenAccounts, acct)
r.seenAccounts[types.Hash(acct)] = struct{}{}
}

fmt.Printf(
Expand All @@ -85,3 +86,19 @@ func WithLookupBalanceByBlock(lookup bool) Option {
r.lookupBalanceByBlock = lookup
}
}

// WithInactiveFrequency is how many blocks the reconciler
// should wait between inactive reconciliations on each account.
func WithInactiveFrequency(blocks int64) Option {
return func(r *Reconciler) {
r.inactiveFrequency = blocks
}
}

// WithDebugLogging determines if verbose logs should
// be printed.
func WithDebugLogging(debug bool) Option {
return func(r *Reconciler) {
r.debugLogging = debug
}
}
171 changes: 116 additions & 55 deletions reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
// that can be enqueued to reconcile before new
// requests are dropped.
// TODO: Make configurable
backlogThreshold = 1000
backlogThreshold = 50000

// waitToCheckDiff is the syncing difference (live-head)
// to retry instead of exiting. In other words, if the
Expand All @@ -62,13 +62,12 @@ const (

// inactiveReconciliationSleep is used as the time.Duration
// to sleep when there are no seen accounts to reconcile.
inactiveReconciliationSleep = 5 * time.Second
inactiveReconciliationSleep = 1 * time.Second

// inactiveReconciliationRequiredDepth is the minimum
// defaultInactiveFrequency is the minimum
// number of blocks the reconciler should wait between
// inactive reconciliations.
// TODO: make configurable
inactiveReconciliationRequiredDepth = 500
// inactive reconciliations for each account.
defaultInactiveFrequency = 200

// defaultLookupBalanceByBlock is the default setting
// for how to perform balance queries. It is preferable
Expand Down Expand Up @@ -175,6 +174,8 @@ type Reconciler struct {
lookupBalanceByBlock bool
interestingAccounts []*AccountCurrency
changeQueue chan *parser.BalanceChange
inactiveFrequency int64
debugLogging bool

// Reconciler concurrency is separated between
// active and inactive concurrency to allow for
Expand All @@ -198,7 +199,7 @@ type Reconciler struct {
// queue. If this is not done, it is possible a goroutine
// could be processing an account (not in the queue) when
// we do a lookup to determine if we should add to the queue.
seenAccounts []*AccountCurrency
seenAccounts map[string]struct{}
inactiveQueue []*InactiveEntry

// inactiveQueueMutex needed because we can't peek at the tip
Expand All @@ -219,10 +220,11 @@ func New(
helper: helper,
handler: handler,
fetcher: fetcher,
inactiveFrequency: defaultInactiveFrequency,
activeConcurrency: defaultReconcilerConcurrency,
inactiveConcurrency: defaultReconcilerConcurrency,
highWaterMark: -1,
seenAccounts: []*AccountCurrency{},
seenAccounts: map[string]struct{}{},
inactiveQueue: []*InactiveEntry{},

// When lookupBalanceByBlock is enabled, we check
Expand Down Expand Up @@ -269,23 +271,37 @@ func (r *Reconciler) QueueChanges(
})
}

if !r.lookupBalanceByBlock {
// All changes will have the same block. Return
// if we are too far behind to start reconciling.
if block.Index < r.highWaterMark {
return nil
for _, change := range balanceChanges {
// Add all seen accounts to inactive reconciler queue.
//
// Note: accounts are only added if they have not been seen before.
err := r.inactiveAccountQueue(false, &AccountCurrency{
Account: change.Account,
Currency: change.Currency,
}, block)
if err != nil {
return err
}

for _, change := range balanceChanges {
if !r.lookupBalanceByBlock {
// All changes will have the same block. Continue
// if we are too far behind to start reconciling.
//
// Note: we don't return here so that we can ensure
// all seen accounts are added to the inactiveAccountQueue.
if block.Index < r.highWaterMark {
continue
}

select {
case r.changeQueue <- change:
default:
log.Println("skipping active enqueue because backlog")
if r.debugLogging {
log.Println("skipping active enqueue because backlog")
}
}
}
} else {
// Block until all checked for a block or context is Done
for _, change := range balanceChanges {
} else {
// Block until all checked for a block or context is Done
select {
case r.changeQueue <- change:
case <-ctx.Done():
Expand Down Expand Up @@ -443,11 +459,13 @@ func (r *Reconciler) accountReconciliation(
}

// Don't wait to check if we are very far behind
log.Printf(
"Skipping reconciliation for %s: %d blocks behind\n",
types.PrettyPrintStruct(accountCurrency),
diff,
)
if r.debugLogging {
log.Printf(
"Skipping reconciliation for %s: %d blocks behind\n",
types.PrettyPrintStruct(accountCurrency),
diff,
)
}

// Set a highWaterMark to not accept any new
// reconciliation requests unless they happened
Expand Down Expand Up @@ -486,18 +504,13 @@ func (r *Reconciler) accountReconciliation(
liveAmount,
liveBlock,
)
if err != nil {
if err != nil { // error only returned if we should exit on failure
return err
}

return nil
}

err = r.inactiveAccountQueue(inactive, accountCurrency, liveBlock)
if err != nil {
return err
}

return r.handler.ReconciliationSucceeded(
ctx,
reconciliationType,
Expand All @@ -508,6 +521,7 @@ func (r *Reconciler) accountReconciliation(
)
}

// We return here if we gave up trying to reconcile an account.
return nil
}

Expand All @@ -516,22 +530,24 @@ func (r *Reconciler) inactiveAccountQueue(
accountCurrency *AccountCurrency,
liveBlock *types.BlockIdentifier,
) error {
r.inactiveQueueMutex.Lock()

// Only enqueue the first time we see an account on an active reconciliation.
shouldEnqueueInactive := false
if !inactive && !ContainsAccountCurrency(r.seenAccounts, accountCurrency) {
r.seenAccounts = append(r.seenAccounts, accountCurrency)
r.seenAccounts[types.Hash(accountCurrency)] = struct{}{}
shouldEnqueueInactive = true
}

if inactive || shouldEnqueueInactive {
r.inactiveQueueMutex.Lock()
r.inactiveQueue = append(r.inactiveQueue, &InactiveEntry{
Entry: accountCurrency,
LastCheck: liveBlock,
})
r.inactiveQueueMutex.Unlock()
}

r.inactiveQueueMutex.Unlock()

return nil
}

Expand Down Expand Up @@ -577,6 +593,35 @@ func (r *Reconciler) reconcileActiveAccounts(
}
}

// shouldAttemptInactiveReconciliation returns a boolean indicating whether
// inactive reconciliation should be attempted based on syncing status.
func (r *Reconciler) shouldAttemptInactiveReconciliation(
ctx context.Context,
) (bool, *types.BlockIdentifier) {
head, err := r.helper.CurrentBlock(ctx)
// When first start syncing, this loop may run before the genesis block is synced.
// If this is the case, we should sleep and try again later instead of exiting.
if err != nil {
if r.debugLogging {
log.Println("waiting to start intactive reconciliation until a block is synced...")
}

return false, nil
}

if head.Index < r.highWaterMark {
if r.debugLogging {
log.Println(
"waiting to continue intactive reconciliation until reaching high water mark...",
)
}

return false, nil
}

return true, head
}

// reconcileInactiveAccounts selects a random account
// from all previously seen accounts and reconciles
// the balance. This is useful for detecting balance
Expand All @@ -589,27 +634,25 @@ func (r *Reconciler) reconcileInactiveAccounts(
return ctx.Err()
}

head, err := r.helper.CurrentBlock(ctx)
// When first start syncing, this loop may run before the genesis block is synced.
// If this is the case, we should sleep and try again later instead of exiting.
if err != nil {
log.Println("waiting to start intactive reconciliation until a block is synced...")
shouldAttempt, head := r.shouldAttemptInactiveReconciliation(ctx)
if !shouldAttempt {
time.Sleep(inactiveReconciliationSleep)
continue
}

r.inactiveQueueMutex.Lock()
nextValidIndex := r.inactiveQueue[0].LastCheck.Index + r.inactiveFrequency
if len(r.inactiveQueue) > 0 &&
(r.inactiveQueue[0].LastCheck == nil || // block is set to nil when loaded from previous run
r.inactiveQueue[0].LastCheck.Index+inactiveReconciliationRequiredDepth < head.Index) {
randAcct := r.inactiveQueue[0]
nextValidIndex <= head.Index) {
nextAcct := r.inactiveQueue[0]
r.inactiveQueue = r.inactiveQueue[1:]
r.inactiveQueueMutex.Unlock()

block, amount, err := r.bestBalance(
ctx,
randAcct.Entry.Account,
randAcct.Entry.Currency,
nextAcct.Entry.Account,
nextAcct.Entry.Currency,
types.ConstructPartialBlockIdentifier(head),
)
if err != nil {
Expand All @@ -618,17 +661,32 @@ func (r *Reconciler) reconcileInactiveAccounts(

err = r.accountReconciliation(
ctx,
randAcct.Entry.Account,
randAcct.Entry.Currency,
nextAcct.Entry.Account,
nextAcct.Entry.Currency,
amount,
block,
true,
)
if err != nil {
return err
}

// Always re-enqueue accounts after they have been inactively
// reconciled. If we don't re-enqueue, we will never check
// these accounts again.
err = r.inactiveAccountQueue(true, nextAcct.Entry, block)
if err != nil {
return err
}
} else {
r.inactiveQueueMutex.Unlock()
if r.debugLogging {
log.Printf(
"no accounts ready for inactive reconciliation (%d accounts in queue, will reconcile next account at index %d)\n",
len(r.inactiveQueue),
nextValidIndex,
)
}
time.Sleep(inactiveReconciliationSleep)
}
}
Expand Down Expand Up @@ -671,22 +729,20 @@ func ExtractAmount(
return b, nil
}

return nil, fmt.Errorf("could not extract amount for %+v", currency)
return nil, fmt.Errorf(
"account balance response does could not contain currency %s",
types.PrettyPrintStruct(currency),
)
}

// ContainsAccountCurrency returns a boolean indicating if a
// AccountCurrency slice already contains an Account and Currency combination.
// AccountCurrency set already contains an Account and Currency combination.
func ContainsAccountCurrency(
arr []*AccountCurrency,
m map[string]struct{},
change *AccountCurrency,
) bool {
for _, a := range arr {
if types.Hash(a) == types.Hash(change) {
return true
}
}

return false
_, exists := m[types.Hash(change)]
return exists
}

// GetCurrencyBalance fetches the balance of a *types.AccountIdentifier
Expand All @@ -711,7 +767,12 @@ func GetCurrencyBalance(

liveAmount, err := ExtractAmount(liveBalances, currency)
if err != nil {
return nil, "", err
return nil, "", fmt.Errorf(
"%w: could not get %s currency balance for %s",
err,
types.PrettyPrintStruct(currency),
types.PrettyPrintStruct(account),
)
}

return liveBlock, liveAmount.Value, nil
Expand Down
Loading