Skip to content

Commit

Permalink
Add separate configuration for active/inactive reconciliation
Browse files Browse the repository at this point in the history
concurrency
  • Loading branch information
patrick-ogrady committed May 16, 2020
1 parent 2278020 commit 0bdff7d
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 61 deletions.
14 changes: 11 additions & 3 deletions reconciler/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@ import (
// falls back to the default value.
type Option func(r *Reconciler)

// WithReconcilerConcurrency overrides the default reconciler
// WithInactiveConcurrency overrides the default inactive
// concurrency.
func WithReconcilerConcurrency(concurrency int) Option {
func WithInactiveConcurrency(concurrency int) Option {
return func(r *Reconciler) {
r.reconcilerConcurrency = concurrency
r.inactiveConcurrency = concurrency
}
}

// WithActiveConcurrency overrides the default inactive
// concurrency.
func WithActiveConcurrency(concurrency int) Option {
return func(r *Reconciler) {
r.activeConcurrency = concurrency
}
}

Expand Down
70 changes: 37 additions & 33 deletions reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,6 @@ type Handler interface {
balance string,
block *types.BlockIdentifier,
) error

NewAccountSeen(
ctx context.Context,
account *types.AccountIdentifier,
currency *types.Currency,
) error
}

// InactiveEntry is used to track the last
Expand All @@ -174,21 +168,36 @@ type AccountCurrency struct {
// types.AccountIdentifiers returned in types.Operations
// by a Rosetta Server.
type Reconciler struct {
network *types.NetworkIdentifier
helper Helper
handler Handler
fetcher *fetcher.Fetcher
reconcilerConcurrency int
lookupBalanceByBlock bool
interestingAccounts []*AccountCurrency
changeQueue chan *parser.BalanceChange
network *types.NetworkIdentifier
helper Helper
handler Handler
fetcher *fetcher.Fetcher
lookupBalanceByBlock bool
interestingAccounts []*AccountCurrency
changeQueue chan *parser.BalanceChange

// Reconciler concurrency is separated between
// active and inactive concurrency to allow for
// fine-grained tuning of reconciler behavior.
// When there are many transactions in a block
// on a resource-constrained machine (laptop),
// it is useful to allocate more resources to
// active reconciliation as it is synchronous
// (when lookupBalanceByBlock is enabled).
activeConcurrency int
inactiveConcurrency int

// highWaterMark is used to skip requests when
// we are very far behind the live head.
highWaterMark int64

// seenAccounts are stored for inactive account
// reconciliation.
// reconciliation. seenAccounts must be stored
// separately from inactiveQueue to prevent duplicate
// accounts from being added to the inactive reconciliation
// queue. If this is not done, it is possible a goroutine
// could be processing an account (not in the queue) when
// we do a lookup to determine if we should add to the queue.
seenAccounts []*AccountCurrency
inactiveQueue []*InactiveEntry

Expand All @@ -206,14 +215,15 @@ func New(
options ...Option,
) *Reconciler {
r := &Reconciler{
network: network,
helper: helper,
handler: handler,
fetcher: fetcher,
reconcilerConcurrency: defaultReconcilerConcurrency,
highWaterMark: -1,
seenAccounts: []*AccountCurrency{},
inactiveQueue: []*InactiveEntry{},
network: network,
helper: helper,
handler: handler,
fetcher: fetcher,
activeConcurrency: defaultReconcilerConcurrency,
inactiveConcurrency: defaultReconcilerConcurrency,
highWaterMark: -1,
seenAccounts: []*AccountCurrency{},
inactiveQueue: []*InactiveEntry{},

// When lookupBalanceByBlock is enabled, we check
// balance changes synchronously.
Expand Down Expand Up @@ -508,15 +518,6 @@ func (r *Reconciler) inactiveAccountQueue(
if !inactive && !ContainsAccountCurrency(r.seenAccounts, accountCurrency) {
r.seenAccounts = append(r.seenAccounts, accountCurrency)
shouldEnqueueInactive = true

err := r.handler.NewAccountSeen(
ctx,
accountCurrency.Account,
accountCurrency.Currency,
)
if err != nil {
return err
}
}

if inactive || shouldEnqueueInactive {
Expand Down Expand Up @@ -632,11 +633,14 @@ func (r *Reconciler) reconcileInactiveAccounts(
// If any goroutine errors, the function will return an error.
func (r *Reconciler) Reconcile(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)
for j := 0; j < r.reconcilerConcurrency/2; j++ {
for j := 0; j < r.activeConcurrency; j++ {
g.Go(func() error {
return r.reconcileActiveAccounts(ctx)
})

}

for j := 0; j < r.inactiveConcurrency; j++ {
g.Go(func() error {
return r.reconcileInactiveAccounts(ctx)
})
Expand Down
32 changes: 7 additions & 25 deletions reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ func TestNewReconciler(t *testing.T) {
},
"with reconciler concurrency": {
options: []Option{
WithReconcilerConcurrency(100),
WithInactiveConcurrency(100),
WithActiveConcurrency(200),
},
expected: func() *Reconciler {
r := templateReconciler()
r.reconcilerConcurrency = 100
r.inactiveConcurrency = 100
r.activeConcurrency = 200

return r
}(),
Expand Down Expand Up @@ -113,7 +115,8 @@ func TestNewReconciler(t *testing.T) {
assert.ElementsMatch(t, test.expected.inactiveQueue, result.inactiveQueue)
assert.ElementsMatch(t, test.expected.seenAccounts, result.seenAccounts)
assert.ElementsMatch(t, test.expected.interestingAccounts, result.interestingAccounts)
assert.Equal(t, test.expected.reconcilerConcurrency, result.reconcilerConcurrency)
assert.Equal(t, test.expected.inactiveConcurrency, result.inactiveConcurrency)
assert.Equal(t, test.expected.activeConcurrency, result.activeConcurrency)
assert.Equal(t, test.expected.lookupBalanceByBlock, result.lookupBalanceByBlock)
assert.Equal(t, cap(test.expected.changeQueue), cap(result.changeQueue))
})
Expand Down Expand Up @@ -526,7 +529,6 @@ func TestInactiveAccountQueue(t *testing.T) {
block,
)
assert.Nil(t, err)
assert.Equal(t, handler.LastAccountCurrency, accountCurrency)
assert.ElementsMatch(t, r.seenAccounts, []*AccountCurrency{accountCurrency})
assert.ElementsMatch(t, r.inactiveQueue, []*InactiveEntry{
{
Expand All @@ -544,7 +546,6 @@ func TestInactiveAccountQueue(t *testing.T) {
block2,
)
assert.Nil(t, err)
assert.Equal(t, handler.LastAccountCurrency, accountCurrency2)
assert.ElementsMatch(
t,
r.seenAccounts,
Expand All @@ -564,7 +565,6 @@ func TestInactiveAccountQueue(t *testing.T) {

t.Run("previous account in active reconciliation", func(t *testing.T) {
r.inactiveQueue = []*InactiveEntry{}
handler.LastAccountCurrency = nil

err := r.inactiveAccountQueue(
context.Background(),
Expand All @@ -573,7 +573,6 @@ func TestInactiveAccountQueue(t *testing.T) {
block,
)
assert.Nil(t, err)
assert.Nil(t, handler.LastAccountCurrency)
assert.ElementsMatch(
t,
r.seenAccounts,
Expand All @@ -590,7 +589,6 @@ func TestInactiveAccountQueue(t *testing.T) {
block,
)
assert.Nil(t, err)
assert.Nil(t, handler.LastAccountCurrency)
assert.ElementsMatch(
t,
r.seenAccounts,
Expand All @@ -612,7 +610,6 @@ func TestInactiveAccountQueue(t *testing.T) {
block2,
)
assert.Nil(t, err)
assert.Nil(t, handler.LastAccountCurrency)
assert.ElementsMatch(
t,
r.seenAccounts,
Expand All @@ -635,9 +632,7 @@ func templateReconciler() *Reconciler {
return New(nil, nil, nil, nil)
}

type MockReconcilerHandler struct {
LastAccountCurrency *AccountCurrency
}
type MockReconcilerHandler struct{}

func (h *MockReconcilerHandler) ReconciliationFailed(
ctx context.Context,
Expand All @@ -662,19 +657,6 @@ func (h *MockReconcilerHandler) ReconciliationSucceeded(
return nil
}

func (h *MockReconcilerHandler) NewAccountSeen(
ctx context.Context,
account *types.AccountIdentifier,
currency *types.Currency,
) error {
h.LastAccountCurrency = &AccountCurrency{
Account: account,
Currency: currency,
}

return nil
}

type MockReconcilerHelper struct {
HeadBlock *types.BlockIdentifier
StoredBlocks map[string]*types.Block
Expand Down

0 comments on commit 0bdff7d

Please sign in to comment.