diff --git a/README.md b/README.md index 046e860e..640b4fa2 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,9 @@ Flags: --block-concurrency uint concurrency to use while fetching blocks (default 8) --data-dir string folder used to store logs and any data used to perform validation (default "./validator-data") --end int block index to stop syncing (default -1) + --exempt-accounts string Absolute path to a file listing all accounts to exempt from balance + tracking and reconciliation. Look at the examples directory for an example of + how to structure this file. --halt-on-reconciliation-error Determines if block processing should halt on a reconciliation error. It can be beneficial to collect all reconciliation errors or silence reconciliation errors during development. (default true) diff --git a/cmd/check_account.go b/cmd/check_account.go index a397b166..eebce43b 100644 --- a/cmd/check_account.go +++ b/cmd/check_account.go @@ -16,10 +16,7 @@ package cmd import ( "context" - "encoding/json" - "io/ioutil" "log" - "path" "github.com/coinbase/rosetta-cli/internal/logger" "github.com/coinbase/rosetta-cli/internal/reconciler" @@ -67,22 +64,15 @@ func runCheckAccountCmd(cmd *cobra.Command, args []string) { // TODO: unify startup logic with stateless ctx, cancel := context.WithCancel(context.Background()) - // Try to load interesting accounts - interestingAccountsRaw, err := ioutil.ReadFile(path.Clean(accountFile)) + interestingAccounts, err := loadAccounts(accountFile) if err != nil { log.Fatal(err) } - interestingAccounts := []*reconciler.AccountCurrency{} - if err := json.Unmarshal(interestingAccountsRaw, &interestingAccounts); err != nil { - log.Fatal(err) - } - - accts, err := json.MarshalIndent(interestingAccounts, "", " ") + exemptAccounts, err := loadAccounts(ExemptFile) if err != nil { log.Fatal(err) } - log.Printf("Checking: %s\n", string(accts)) fetcher := fetcher.New( ServerURL, @@ -121,6 +111,7 @@ func runCheckAccountCmd(cmd *cobra.Command, args []string) { syncHandler := syncer.NewBaseHandler( logger, r, + exemptAccounts, ) statelessSyncer := syncer.NewStateless( diff --git a/cmd/check_complete.go b/cmd/check_complete.go index ce1eea5a..32f91215 100644 --- a/cmd/check_complete.go +++ b/cmd/check_complete.go @@ -77,6 +77,11 @@ historical balance lookup should set this to false.`, func runCheckCompleteCmd(cmd *cobra.Command, args []string) { ctx, cancel := context.WithCancel(context.Background()) + exemptAccounts, err := loadAccounts(ExemptFile) + if err != nil { + log.Fatal(err) + } + fetcher := fetcher.New( ServerURL, fetcher.WithBlockConcurrency(BlockConcurrency), @@ -133,6 +138,7 @@ func runCheckCompleteCmd(cmd *cobra.Command, args []string) { syncHandler := syncer.NewBaseHandler( logger, r, + exemptAccounts, ) statefulSyncer := syncer.NewStateful( diff --git a/cmd/check_quick.go b/cmd/check_quick.go index cf8b0cea..e6c5f918 100644 --- a/cmd/check_quick.go +++ b/cmd/check_quick.go @@ -56,6 +56,11 @@ use the check:complete command.`, func runCheckQuickCmd(cmd *cobra.Command, args []string) { ctx, cancel := context.WithCancel(context.Background()) + exemptAccounts, err := loadAccounts(ExemptFile) + if err != nil { + log.Fatal(err) + } + fetcher := fetcher.New( ServerURL, fetcher.WithBlockConcurrency(BlockConcurrency), @@ -93,6 +98,7 @@ func runCheckQuickCmd(cmd *cobra.Command, args []string) { syncHandler := syncer.NewBaseHandler( logger, r, + exemptAccounts, ) statelessSyncer := syncer.NewStateless( diff --git a/cmd/root.go b/cmd/root.go index 31446934..f0f49f06 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -15,6 +15,13 @@ package cmd import ( + "encoding/json" + "io/ioutil" + "log" + "path" + + "github.com/coinbase/rosetta-cli/internal/reconciler" + "github.com/spf13/cobra" ) @@ -171,3 +178,27 @@ how to structure this file.`, rootCmd.AddCommand(checkQuickCmd) rootCmd.AddCommand(checkAccountCmd) } + +func loadAccounts(filePath string) ([]*reconciler.AccountCurrency, error) { + if len(filePath) == 0 { + return []*reconciler.AccountCurrency{}, nil + } + + accountsRaw, err := ioutil.ReadFile(path.Clean(filePath)) + if err != nil { + return nil, err + } + + accounts := []*reconciler.AccountCurrency{} + if err := json.Unmarshal(accountsRaw, &accounts); err != nil { + return nil, err + } + + prettyAccounts, err := json.MarshalIndent(accounts, "", " ") + if err != nil { + return nil, err + } + log.Printf("Found %d accounts at %s: %s\n", len(accounts), filePath, string(prettyAccounts)) + + return accounts, nil +} diff --git a/internal/syncer/base_handler.go b/internal/syncer/base_handler.go index a04b7604..136beaa8 100644 --- a/internal/syncer/base_handler.go +++ b/internal/syncer/base_handler.go @@ -28,18 +28,21 @@ import ( // BaseHandler logs processed blocks // and reconciles modified balances. type BaseHandler struct { - logger *logger.Logger - reconciler reconciler.Reconciler + logger *logger.Logger + reconciler reconciler.Reconciler + exemptAccounts []*reconciler.AccountCurrency } // NewBaseHandler constructs a basic Handler. func NewBaseHandler( logger *logger.Logger, reconciler reconciler.Reconciler, + exemptAccounts []*reconciler.AccountCurrency, ) Handler { return &BaseHandler{ - logger: logger, - reconciler: reconciler, + logger: logger, + reconciler: reconciler, + exemptAccounts: exemptAccounts, } } @@ -70,3 +73,20 @@ func (h *BaseHandler) BlockProcessed( // blocking return h.reconciler.QueueChanges(ctx, block.BlockIdentifier, balanceChanges) } + +// AccountExempt returns a boolean indicating if the provided +// account and currency are exempt from balance tracking and +// reconciliation. +func (h *BaseHandler) AccountExempt( + ctx context.Context, + account *types.AccountIdentifier, + currency *types.Currency, +) bool { + return reconciler.ContainsAccountCurrency( + h.exemptAccounts, + &reconciler.AccountCurrency{ + Account: account, + Currency: currency, + }, + ) +} diff --git a/internal/syncer/stateful_syncer.go b/internal/syncer/stateful_syncer.go index 6b52f566..52d0d44f 100644 --- a/internal/syncer/stateful_syncer.go +++ b/internal/syncer/stateful_syncer.go @@ -126,6 +126,7 @@ func (s *StatefulSyncer) storeBlockBalanceChanges( s.fetcher.Asserter, block, orphan, + s.handler, ) if err != nil { return nil, err diff --git a/internal/syncer/stateless_syncer.go b/internal/syncer/stateless_syncer.go index 9c3e1996..66e5f76c 100644 --- a/internal/syncer/stateless_syncer.go +++ b/internal/syncer/stateless_syncer.go @@ -95,6 +95,7 @@ func (s *StatelessSyncer) SyncRange( s.fetcher.Asserter, block, false, + s.handler, ) if err != nil { return err diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index bf337c84..b43d7c85 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -152,6 +152,12 @@ type Handler interface { orphan bool, changes []*storage.BalanceChange, ) error + + AccountExempt( + ctx context.Context, + account *types.AccountIdentifier, + current *types.Currency, + ) bool } // BalanceChanges returns all balance changes for @@ -165,6 +171,7 @@ func BalanceChanges( asserter *asserter.Asserter, block *types.Block, orphan bool, + handler Handler, ) ([]*storage.BalanceChange, error) { balanceChanges := map[string]*storage.BalanceChange{} for _, tx := range block.Transactions { @@ -183,6 +190,14 @@ func BalanceChanges( continue } + // Exempting account in BalanceChanges ensures that storage is not updated + // and that the account is not reconciled. If a handler is not provided, + // no account will be marked exempt. + if handler != nil && handler.AccountExempt(ctx, op.Account, op.Amount.Currency) { + log.Printf("Skipping exempt account %+v\n", op.Account) + continue + } + amount := op.Amount blockIdentifier := block.BlockIdentifier if orphan { diff --git a/internal/syncer/syncer_test.go b/internal/syncer/syncer_test.go index 537653ed..3fa7e92c 100644 --- a/internal/syncer/syncer_test.go +++ b/internal/syncer/syncer_test.go @@ -18,6 +18,7 @@ import ( "context" "testing" + "github.com/coinbase/rosetta-cli/internal/reconciler" "github.com/coinbase/rosetta-cli/internal/storage" "github.com/coinbase/rosetta-sdk-go/asserter" @@ -56,10 +57,11 @@ func simpleTransactionFactory( func TestBalanceChanges(t *testing.T) { var tests = map[string]struct { - block *types.Block - orphan bool - changes []*storage.BalanceChange - err error + block *types.Block + orphan bool + changes []*storage.BalanceChange + exemptAccounts []*reconciler.AccountCurrency + err error }{ "simple block": { block: &types.Block{ @@ -90,6 +92,31 @@ func TestBalanceChanges(t *testing.T) { }, err: nil, }, + "simple block account exempt": { + block: &types.Block{ + BlockIdentifier: &types.BlockIdentifier{ + Hash: "1", + Index: 1, + }, + ParentBlockIdentifier: &types.BlockIdentifier{ + Hash: "0", + Index: 0, + }, + Transactions: []*types.Transaction{ + recipientTransaction, + }, + Timestamp: asserter.MinUnixEpoch + 1, + }, + orphan: false, + changes: []*storage.BalanceChange{}, + exemptAccounts: []*reconciler.AccountCurrency{ + { + Account: recipient, + Currency: currency, + }, + }, + err: nil, + }, "single account sum block": { block: &types.Block{ BlockIdentifier: &types.BlockIdentifier{ @@ -191,11 +218,13 @@ func TestBalanceChanges(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { + handler := NewBaseHandler(nil, nil, test.exemptAccounts) changes, err := BalanceChanges( ctx, asserter, test.block, test.orphan, + handler, ) assert.ElementsMatch(t, test.changes, changes)