Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Inactive Account Reconciliation #13

Merged
merged 2 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/check_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func runCheckAccountCmd(cmd *cobra.Command, args []string) {
ServerURL,
fetcher.WithBlockConcurrency(BlockConcurrency),
fetcher.WithTransactionConcurrency(TransactionConcurrency),
fetcher.WithRetryElapsedTime(ExtendedRetryElapsedTime),
)

primaryNetwork, _, err := fetcher.InitializeAsserter(ctx)
Expand Down
1 change: 1 addition & 0 deletions cmd/check_complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func runCheckCompleteCmd(cmd *cobra.Command, args []string) {
ServerURL,
fetcher.WithBlockConcurrency(BlockConcurrency),
fetcher.WithTransactionConcurrency(TransactionConcurrency),
fetcher.WithRetryElapsedTime(ExtendedRetryElapsedTime),
)

// TODO: sync and reconcile on subnetworks, if they exist.
Expand Down
1 change: 1 addition & 0 deletions cmd/check_quick.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func runCheckQuickCmd(cmd *cobra.Command, args []string) {
ServerURL,
fetcher.WithBlockConcurrency(BlockConcurrency),
fetcher.WithTransactionConcurrency(TransactionConcurrency),
fetcher.WithRetryElapsedTime(ExtendedRetryElapsedTime),
)

primaryNetwork, _, err := fetcher.InitializeAsserter(ctx)
Expand Down
11 changes: 11 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,23 @@ import (
"io/ioutil"
"log"
"path"
"time"

"github.com/coinbase/rosetta-cli/internal/reconciler"

"github.com/spf13/cobra"
)

const (
// ExtendedRetryElapsedTime is used to override the default fetcher
// retry elapsed time. In practice, extending the retry elapsed time
// has prevented retry exhaustion errors when many goroutines are
// used to fetch data from the Rosetta server.
//
// TODO: make configurable
ExtendedRetryElapsedTime = 5 * time.Minute
)

var (
rootCmd = &cobra.Command{
Use: "rosetta-cli",
Expand Down
69 changes: 52 additions & 17 deletions internal/reconciler/stateful_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"errors"
"fmt"
"log"
"math/rand"
"sync"
"time"

"github.com/coinbase/rosetta-cli/internal/logger"
Expand Down Expand Up @@ -66,6 +66,12 @@ const (
// inactiveReconciliationSleep is used as the time.Duration
// to sleep when there are no seen accounts to reconcile.
inactiveReconciliationSleep = 5 * time.Second

// inactiveReconciliationRequiredDepth is the minimum
// number of blocks the reconciler should wait between
// inactive reconciliations.
// TODO: make configurable
inactiveReconciliationRequiredDepth = 500
)

var (
Expand Down Expand Up @@ -106,7 +112,12 @@ type StatefulReconciler struct {

// seenAccts are stored for inactive account
// reconciliation.
seenAccts []*AccountCurrency
seenAccts []*AccountCurrency
inactiveQueue []*storage.BalanceChange

// inactiveQueueMutex needed because we can't peek at the tip
// of a channel to determine when it is ready to look at.
inactiveQueueMutex sync.Mutex
}

// NewStateful creates a new StatefulReconciler.
Expand All @@ -130,6 +141,7 @@ func NewStateful(
changeQueue: make(chan *storage.BalanceChange, backlogThreshold),
highWaterMark: -1,
seenAccts: make([]*AccountCurrency, 0),
inactiveQueue: make([]*storage.BalanceChange, 0),
}
}

Expand All @@ -152,7 +164,7 @@ func (r *StatefulReconciler) QueueChanges(
select {
case r.changeQueue <- change:
default:
log.Printf("skipping enqueue because backlog\n")
log.Println("skipping active enqueue because backlog")
}
}

Expand Down Expand Up @@ -349,10 +361,7 @@ func (r *StatefulReconciler) accountReconciliation(
return nil
}

if !inactive && !ContainsAccountCurrency(r.seenAccts, accountCurrency) {
r.seenAccts = append(r.seenAccts, accountCurrency)
}

r.inactiveAccountQueue(inactive, accountCurrency, liveBlock)
return r.logger.ReconcileSuccessStream(
ctx,
reconciliationType,
Expand All @@ -368,6 +377,29 @@ func (r *StatefulReconciler) accountReconciliation(
return nil
}

func (r *StatefulReconciler) inactiveAccountQueue(
inactive bool,
accountCurrency *AccountCurrency,
liveBlock *types.BlockIdentifier,
) {
// Only enqueue the first time we see an account on an active reconciliation.
shouldEnqueueInactive := false
if !inactive && !ContainsAccountCurrency(r.seenAccts, accountCurrency) {
r.seenAccts = append(r.seenAccts, accountCurrency)
shouldEnqueueInactive = true
}

if inactive || shouldEnqueueInactive {
r.inactiveQueueMutex.Lock()
r.inactiveQueue = append(r.inactiveQueue, &storage.BalanceChange{
Account: accountCurrency.Account,
Currency: accountCurrency.Currency,
Block: liveBlock,
})
r.inactiveQueueMutex.Unlock()
}
}

// simpleAccountCurrency returns a string that is a simple
// representation of an AccountCurrency struct.
func simpleAccountCurrency(
Expand Down Expand Up @@ -432,18 +464,20 @@ func (r *StatefulReconciler) reconcileActiveAccounts(
func (r *StatefulReconciler) reconcileInactiveAccounts(
ctx context.Context,
) error {
randSource := rand.NewSource(time.Now().UnixNano())
randGenerator := rand.New(randSource)
for ctx.Err() == nil {
if len(r.seenAccts) > 0 {
randAcct := r.seenAccts[randGenerator.Intn(len(r.seenAccts))]
txn := r.storage.NewDatabaseTransaction(ctx, false)
head, err := r.storage.GetHeadBlockIdentifier(ctx, txn)
if err != nil {
return err
}
txn.Discard(ctx)

txn := r.storage.NewDatabaseTransaction(ctx, false)
head, err := r.storage.GetHeadBlockIdentifier(ctx, txn)
if err != nil {
return err
}
txn.Discard(ctx)
r.inactiveQueueMutex.Lock()
if len(r.inactiveQueue) > 0 &&
r.inactiveQueue[0].Block.Index+inactiveReconciliationRequiredDepth < head.Index {
randAcct := r.inactiveQueue[0]
r.inactiveQueue = r.inactiveQueue[1:]
r.inactiveQueueMutex.Unlock()

block, amount, err := r.bestBalance(
ctx,
Expand All @@ -467,6 +501,7 @@ func (r *StatefulReconciler) reconcileInactiveAccounts(
return err
}
} else {
r.inactiveQueueMutex.Unlock()
time.Sleep(inactiveReconciliationSleep)
}
}
Expand Down