From 18655f2170e37cb8048ae96edc3f6132d4ff5afc Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 15 Jul 2020 15:56:13 -0700 Subject: [PATCH 1/4] Add CounterStorage in storage package --- cmd/check.go | 4 +- internal/storage/block_storage.go | 1 - internal/storage/block_storage_test.go | 10 +- internal/storage/counter_storage.go | 120 +++++++++++++++++++++++ internal/storage/counter_storage_test.go | 71 ++++++++++++++ 5 files changed, 198 insertions(+), 8 deletions(-) create mode 100644 internal/storage/counter_storage.go create mode 100644 internal/storage/counter_storage_test.go diff --git a/cmd/check.go b/cmd/check.go index 27a62bbe..34f3c9ea 100644 --- a/cmd/check.go +++ b/cmd/check.go @@ -369,7 +369,7 @@ func findMissingOps( nil, ) - blockStorage := storage.NewBlockStorage(ctx, localStore, blockStorageHelper) + blockStorage := storage.NewBlockStorage(localStore, blockStorageHelper) // Ensure storage is in correct state for starting at index if err = blockStorage.SetNewStartIndex(ctx, startIndex); err != nil { @@ -544,7 +544,7 @@ func runCheckCmd(cmd *cobra.Command, args []string) { exemptAccounts, ) - blockStorage := storage.NewBlockStorage(ctx, localStore, blockStorageHelper) + blockStorage := storage.NewBlockStorage(localStore, blockStorageHelper) // Bootstrap balances if provided if len(BootstrapBalances) > 0 { diff --git a/internal/storage/block_storage.go b/internal/storage/block_storage.go index 7b880175..a90c4f80 100644 --- a/internal/storage/block_storage.go +++ b/internal/storage/block_storage.go @@ -160,7 +160,6 @@ type BlockStorage struct { // NewBlockStorage returns a new BlockStorage. func NewBlockStorage( - ctx context.Context, db Database, helper Helper, ) *BlockStorage { diff --git a/internal/storage/block_storage_test.go b/internal/storage/block_storage_test.go index 2549e6bf..d319af61 100644 --- a/internal/storage/block_storage_test.go +++ b/internal/storage/block_storage_test.go @@ -55,7 +55,7 @@ func TestHeadBlockIdentifier(t *testing.T) { assert.NoError(t, err) defer database.Close(ctx) - storage := NewBlockStorage(ctx, database, &MockBlockStorageHelper{}) + storage := NewBlockStorage(database, &MockBlockStorageHelper{}) t.Run("No head block set", func(t *testing.T) { blockIdentifier, err := storage.GetHeadBlockIdentifier(ctx) @@ -268,7 +268,7 @@ func TestBlock(t *testing.T) { assert.NoError(t, err) defer database.Close(ctx) - storage := NewBlockStorage(ctx, database, &MockBlockStorageHelper{}) + storage := NewBlockStorage(database, &MockBlockStorageHelper{}) t.Run("Set and get block", func(t *testing.T) { _, err := storage.StoreBlock(ctx, newBlock) @@ -477,7 +477,7 @@ func TestBalance(t *testing.T) { assert.NoError(t, err) defer database.Close(ctx) - storage := NewBlockStorage(ctx, database, mockHelper) + storage := NewBlockStorage(database, mockHelper) t.Run("Get unset balance", func(t *testing.T) { amount, block, err := storage.GetBalance(ctx, account, currency, newBlock) @@ -800,7 +800,7 @@ func TestBootstrapBalances(t *testing.T) { assert.NoError(t, err) defer database.Close(ctx) - storage := NewBlockStorage(ctx, database, &MockBlockStorageHelper{}) + storage := NewBlockStorage(database, &MockBlockStorageHelper{}) bootstrapBalancesFile := path.Join(newDir, "balances.csv") t.Run("File doesn't exist", func(t *testing.T) { @@ -944,7 +944,7 @@ func TestCreateBlockCache(t *testing.T) { assert.NoError(t, err) defer database.Close(ctx) - storage := NewBlockStorage(ctx, database, &MockBlockStorageHelper{}) + storage := NewBlockStorage(database, &MockBlockStorageHelper{}) t.Run("no blocks processed", func(t *testing.T) { assert.Equal(t, []*types.BlockIdentifier{}, storage.CreateBlockCache(ctx)) diff --git a/internal/storage/counter_storage.go b/internal/storage/counter_storage.go new file mode 100644 index 00000000..e16d2317 --- /dev/null +++ b/internal/storage/counter_storage.go @@ -0,0 +1,120 @@ +// Copyright 2020 Coinbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "fmt" + "math/big" +) + +const ( + // BlockCounter is the number of added blocks. + BlockCounter = "blocks" + + // OrphansCounter is the number of orphaned blocks. + OrphansCounter = "orphans" + + // TransactionCounter is the number of processed transactions. + TransactionCounter = "transactions" + + // OperationCounter is the number of processed operations. + OperationCounter = "operations" + + // AccountsCounter is the number of seen accounts. + AccountsCounter = "accounts" + + // ActiveReconciliationCounter is the number of active + // reconciliations performed. + ActiveReconciliationCounter = "active_reconciliations" + + // InactiveReconciliationCounter is the number of inactive + // reconciliations performed. + InactiveReconciliationCounter = "inactive_reconciliations" + + // counterNamespace is preprended to any counter. + counterNamespace = "counter" +) + +// CounterStorage implements counter-specific storage methods +// on top of a Database and DatabaseTransaction interface. +type CounterStorage struct { + db Database +} + +// NewCounterStorage returns a new CounterStorage. +func NewCounterStorage( + db Database, +) *CounterStorage { + return &CounterStorage{ + db: db, + } +} + +func getCounterKey(counter string) []byte { + return []byte(fmt.Sprintf("%s/%s", counterNamespace, counter)) +} + +func transactionalGet( + ctx context.Context, + counter string, + txn DatabaseTransaction, +) (*big.Int, error) { + exists, val, err := txn.Get(ctx, getCounterKey(counter)) + if err != nil { + return nil, err + } + + if !exists { + return big.NewInt(0), nil + } + + return new(big.Int).SetBytes(val), nil +} + +// Update updates the value of a counter by amount and returns the new value. +func (c *CounterStorage) Update( + ctx context.Context, + counter string, + amount *big.Int, +) (*big.Int, error) { + transaction := c.db.NewDatabaseTransaction(ctx, true) + defer transaction.Discard(ctx) + + val, err := transactionalGet(ctx, counter, transaction) + if err != nil { + return nil, err + } + + newVal := new(big.Int).Add(val, amount) + + if err := transaction.Set(ctx, getCounterKey(counter), newVal.Bytes()); err != nil { + return nil, err + } + + if err := transaction.Commit(ctx); err != nil { + return nil, err + } + + return newVal, nil +} + +// Get returns the current value of a counter. +func (c *CounterStorage) Get(ctx context.Context, counter string) (*big.Int, error) { + transaction := c.db.NewDatabaseTransaction(ctx, false) + defer transaction.Discard(ctx) + + return transactionalGet(ctx, counter, transaction) +} diff --git a/internal/storage/counter_storage_test.go b/internal/storage/counter_storage_test.go new file mode 100644 index 00000000..78ab45a8 --- /dev/null +++ b/internal/storage/counter_storage_test.go @@ -0,0 +1,71 @@ +// Copyright 2020 Coinbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "math/big" + "testing" + + "github.com/coinbase/rosetta-cli/internal/utils" + + "github.com/stretchr/testify/assert" +) + +func TestCounterStorage(t *testing.T) { + ctx := context.Background() + + newDir, err := utils.CreateTempDir() + assert.NoError(t, err) + defer utils.RemoveTempDir(newDir) + + database, err := NewBadgerStorage(ctx, newDir) + assert.NoError(t, err) + defer database.Close(ctx) + + c := NewCounterStorage(database) + + t.Run("get unset counter", func(t *testing.T) { + v, err := c.Get(ctx, "blah") + assert.NoError(t, err) + assert.Equal(t, v, big.NewInt(0)) + }) + + t.Run("increase counter", func(t *testing.T) { + v, err := c.Update(ctx, "blah", big.NewInt(100)) + assert.NoError(t, err) + assert.Equal(t, v, big.NewInt(100)) + + v, err = c.Get(ctx, "blah") + assert.NoError(t, err) + assert.Equal(t, v, big.NewInt(100)) + }) + + t.Run("decrement counter", func(t *testing.T) { + v, err := c.Update(ctx, "blah", big.NewInt(-50)) + assert.NoError(t, err) + assert.Equal(t, v, big.NewInt(50)) + + v, err = c.Get(ctx, "blah") + assert.NoError(t, err) + assert.Equal(t, v, big.NewInt(50)) + }) + + t.Run("get unset counter after update", func(t *testing.T) { + v, err := c.Get(ctx, "blah2") + assert.NoError(t, err) + assert.Equal(t, v, big.NewInt(0)) + }) +} From dc6c157615da03f37f6872eab400257d3771e095 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 15 Jul 2020 16:46:46 -0700 Subject: [PATCH 2/4] Add periodic logging --- cmd/check.go | 36 ++++++++++++++- internal/logger/logger.go | 57 ++++++++++++++++++++++++ internal/processor/reconciler_handler.go | 9 ++++ internal/processor/syncer_handler.go | 28 +++++++----- internal/storage/counter_storage.go | 7 +-- 5 files changed, 121 insertions(+), 16 deletions(-) diff --git a/cmd/check.go b/cmd/check.go index 34f3c9ea..dea3ce70 100644 --- a/cmd/check.go +++ b/cmd/check.go @@ -21,6 +21,7 @@ import ( "fmt" "io/ioutil" "log" + "math/big" "os" "os/signal" "path" @@ -354,7 +355,10 @@ func findMissingOps( return nil, fmt.Errorf("%w: unable to initialize database", err) } + counterStorage := storage.NewCounterStorage(localStore) + logger := logger.NewLogger( + counterStorage, tmpDir, false, false, @@ -529,7 +533,10 @@ func runCheckCmd(cmd *cobra.Command, args []string) { } defer localStore.Close(ctx) + counterStorage := storage.NewCounterStorage(localStore) + logger := logger.NewLogger( + counterStorage, DataDir, LogBlocks, LogTransactions, @@ -609,6 +616,15 @@ func runCheckCmd(cmd *cobra.Command, args []string) { g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + for ctx.Err() == nil { + logger.LogCounterStorage(ctx) + time.Sleep(10 * time.Second) + } + + return nil + }) + g.Go(func() error { return r.Reconcile(ctx) }) @@ -662,8 +678,26 @@ func runCheckCmd(cmd *cobra.Command, args []string) { } if err == nil || err == context.Canceled { // err == context.Canceled when --end - color.Green("Check succeeded") + newCtx := context.Background() + activeReconciliations, err := counterStorage.Get(newCtx, storage.ActiveReconciliationCounter) + if err != nil { + color.Green("Check succeeded") + os.Exit(0) + } + + inactiveReconciliations, err := counterStorage.Get(newCtx, storage.InactiveReconciliationCounter) + if err != nil { + color.Green("Check succeeded") + os.Exit(0) + } + + if new(big.Int).Add(activeReconciliations, inactiveReconciliations).Sign() == 0 { + color.Yellow("Check succeeded, however, no reconciliations were performed!") + } else { + color.Green("Check succeeded") + } os.Exit(0) + } color.Red("Check failed: %s", err.Error()) diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 116e1f72..31267cd0 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -18,9 +18,12 @@ import ( "context" "fmt" "log" + "math/big" "os" "path" + "github.com/coinbase/rosetta-cli/internal/storage" + "github.com/coinbase/rosetta-sdk-go/parser" "github.com/coinbase/rosetta-sdk-go/reconciler" "github.com/coinbase/rosetta-sdk-go/types" @@ -66,10 +69,14 @@ type Logger struct { logTransactions bool logBalanceChanges bool logReconciliation bool + + // CounterStorage is some initialized CounterStorage. + CounterStorage *storage.CounterStorage } // NewLogger constructs a new Logger. func NewLogger( + counterStorage *storage.CounterStorage, logDir string, logBlocks bool, logTransactions bool, @@ -77,6 +84,7 @@ func NewLogger( logReconciliation bool, ) *Logger { return &Logger{ + CounterStorage: counterStorage, logDir: logDir, logBlocks: logBlocks, logTransactions: logTransactions, @@ -85,6 +93,55 @@ func NewLogger( } } +// LogCounterStorage logs all values in CounterStorage. +func (l *Logger) LogCounterStorage(ctx context.Context) error { + blocks, err := l.CounterStorage.Get(ctx, storage.BlockCounter) + if err != nil { + return fmt.Errorf("%w cannot get block counter", err) + } + + if blocks.Sign() == 0 { // wait for at least 1 block to be processed + return nil + } + + orphans, err := l.CounterStorage.Get(ctx, storage.OrphanCounter) + if err != nil { + return fmt.Errorf("%w cannot get orphan counter", err) + } + + txs, err := l.CounterStorage.Get(ctx, storage.TransactionCounter) + if err != nil { + return fmt.Errorf("%w cannot get transaction counter", err) + } + + ops, err := l.CounterStorage.Get(ctx, storage.OperationCounter) + if err != nil { + return fmt.Errorf("%w cannot get operations counter", err) + } + + activeReconciliations, err := l.CounterStorage.Get(ctx, storage.ActiveReconciliationCounter) + if err != nil { + return fmt.Errorf("%w cannot get active reconciliations counter", err) + } + + inactiveReconciliations, err := l.CounterStorage.Get(ctx, storage.InactiveReconciliationCounter) + if err != nil { + return fmt.Errorf("%w cannot get inactive reconciliations counter", err) + } + + color.Cyan( + "[STATS] Blocks: %s (Orphaned: %s) Transactions: %s Operations: %s Reconciliations: %s (Inactive: %s)", + blocks.String(), + orphans.String(), + txs.String(), + ops.String(), + new(big.Int).Add(activeReconciliations, inactiveReconciliations).String(), + inactiveReconciliations.String(), + ) + + return nil +} + // AddBlockStream writes the next processed block to the end of the // blockStreamFile output file. func (l *Logger) AddBlockStream( diff --git a/internal/processor/reconciler_handler.go b/internal/processor/reconciler_handler.go index 38bba22b..67f07956 100644 --- a/internal/processor/reconciler_handler.go +++ b/internal/processor/reconciler_handler.go @@ -17,8 +17,10 @@ package processor import ( "context" "errors" + "math/big" "github.com/coinbase/rosetta-cli/internal/logger" + "github.com/coinbase/rosetta-cli/internal/storage" "github.com/coinbase/rosetta-sdk-go/reconciler" "github.com/coinbase/rosetta-sdk-go/types" @@ -99,6 +101,13 @@ func (h *ReconcilerHandler) ReconciliationSucceeded( balance string, block *types.BlockIdentifier, ) error { + // Update counters + if reconciliationType == reconciler.InactiveReconciliation { + h.logger.CounterStorage.Update(ctx, storage.InactiveReconciliationCounter, big.NewInt(1)) + } else { + h.logger.CounterStorage.Update(ctx, storage.ActiveReconciliationCounter, big.NewInt(1)) + } + return h.logger.ReconcileSuccessStream( ctx, reconciliationType, diff --git a/internal/processor/syncer_handler.go b/internal/processor/syncer_handler.go index 130b48fa..d5dc3d76 100644 --- a/internal/processor/syncer_handler.go +++ b/internal/processor/syncer_handler.go @@ -16,7 +16,7 @@ package processor import ( "context" - "log" + "math/big" "github.com/coinbase/rosetta-cli/internal/logger" "github.com/coinbase/rosetta-cli/internal/storage" @@ -29,7 +29,7 @@ import ( // SyncerHandler implements the syncer.Handler interface. type SyncerHandler struct { - storage *storage.BlockStorage + blockStorage *storage.BlockStorage logger *logger.Logger reconciler *reconciler.Reconciler fetcher *fetcher.Fetcher @@ -38,14 +38,14 @@ type SyncerHandler struct { // NewSyncerHandler returns a new SyncerHandler. func NewSyncerHandler( - storage *storage.BlockStorage, + blockStorage *storage.BlockStorage, logger *logger.Logger, reconciler *reconciler.Reconciler, fetcher *fetcher.Fetcher, interestingAccount *reconciler.AccountCurrency, ) *SyncerHandler { return &SyncerHandler{ - storage: storage, + blockStorage: blockStorage, logger: logger, reconciler: reconciler, fetcher: fetcher, @@ -59,14 +59,12 @@ func (h *SyncerHandler) BlockAdded( ctx context.Context, block *types.Block, ) error { - log.Printf("Adding block %+v\n", block.BlockIdentifier) - // Log processed blocks and balance changes if err := h.logger.AddBlockStream(ctx, block); err != nil { return nil } - balanceChanges, err := h.storage.StoreBlock(ctx, block) + balanceChanges, err := h.blockStorage.StoreBlock(ctx, block) if err != nil { return err } @@ -97,6 +95,15 @@ func (h *SyncerHandler) BlockAdded( } } + // Update Counters + h.logger.CounterStorage.Update(ctx, storage.BlockCounter, big.NewInt(1)) + h.logger.CounterStorage.Update(ctx, storage.TransactionCounter, big.NewInt(int64(len(block.Transactions)))) + opCount := int64(0) + for _, txn := range block.Transactions { + opCount += int64(len(txn.Operations)) + } + h.logger.CounterStorage.Update(ctx, storage.OperationCounter, big.NewInt(opCount)) + // Mark accounts for reconciliation...this may be // blocking return h.reconciler.QueueChanges(ctx, block.BlockIdentifier, balanceChanges) @@ -108,14 +115,12 @@ func (h *SyncerHandler) BlockRemoved( ctx context.Context, blockIdentifier *types.BlockIdentifier, ) error { - log.Printf("Orphaning block %+v\n", blockIdentifier) - // Log processed blocks and balance changes if err := h.logger.RemoveBlockStream(ctx, blockIdentifier); err != nil { return nil } - balanceChanges, err := h.storage.RemoveBlock(ctx, blockIdentifier) + balanceChanges, err := h.blockStorage.RemoveBlock(ctx, blockIdentifier) if err != nil { return err } @@ -124,6 +129,9 @@ func (h *SyncerHandler) BlockRemoved( return nil } + // Update Counters + h.logger.CounterStorage.Update(ctx, storage.OrphanCounter, big.NewInt(1)) + // We only attempt to reconciler changes when blocks are added, // not removed return nil diff --git a/internal/storage/counter_storage.go b/internal/storage/counter_storage.go index e16d2317..4014114e 100644 --- a/internal/storage/counter_storage.go +++ b/internal/storage/counter_storage.go @@ -24,8 +24,8 @@ const ( // BlockCounter is the number of added blocks. BlockCounter = "blocks" - // OrphansCounter is the number of orphaned blocks. - OrphansCounter = "orphans" + // OrphanCounter is the number of orphaned blocks. + OrphanCounter = "orphans" // TransactionCounter is the number of processed transactions. TransactionCounter = "transactions" @@ -33,9 +33,6 @@ const ( // OperationCounter is the number of processed operations. OperationCounter = "operations" - // AccountsCounter is the number of seen accounts. - AccountsCounter = "accounts" - // ActiveReconciliationCounter is the number of active // reconciliations performed. ActiveReconciliationCounter = "active_reconciliations" From de0dd53f3fa350466cf73082d8f12c0ff5db99a8 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 15 Jul 2020 16:52:15 -0700 Subject: [PATCH 3/4] nits --- cmd/check.go | 55 ++++++++++++++++-------- internal/processor/reconciler_handler.go | 4 +- internal/processor/syncer_handler.go | 12 ++++-- 3 files changed, 46 insertions(+), 25 deletions(-) diff --git a/cmd/check.go b/cmd/check.go index dea3ce70..54dc1759 100644 --- a/cmd/check.go +++ b/cmd/check.go @@ -57,6 +57,12 @@ const ( // InactiveFailureLookbackWindow blocks (this process continues // until the client halts the search or the block is found). InactiveFailureLookbackWindow = 250 + + // PeriodicLoggingFrequency is the frequency that stats are printed + // to the terminal. + // + // TODO: make configurable + PeriodicLoggingFrequency = 10 * time.Second ) var ( @@ -618,8 +624,8 @@ func runCheckCmd(cmd *cobra.Command, args []string) { g.Go(func() error { for ctx.Err() == nil { - logger.LogCounterStorage(ctx) - time.Sleep(10 * time.Second) + _ = logger.LogCounterStorage(ctx) + time.Sleep(PeriodicLoggingFrequency) } return nil @@ -670,7 +676,22 @@ func runCheckCmd(cmd *cobra.Command, args []string) { } }() - err = g.Wait() + handleCheckResult(g, counterStorage, reconcilerHandler, sigListeners) +} + +// handleCheckResult interprets the check exectution result +// and terminates with the correct exit status. +func handleCheckResult( + g *errgroup.Group, + counterStorage *storage.CounterStorage, + reconcilerHandler *processor.ReconcilerHandler, + sigListeners []context.CancelFunc, +) { + // Initialize new context because calling context + // will no longer be usable when after termination. + ctx := context.Background() + + err := g.Wait() if signalReceived { color.Red("Check halted") os.Exit(1) @@ -678,26 +699,22 @@ func runCheckCmd(cmd *cobra.Command, args []string) { } if err == nil || err == context.Canceled { // err == context.Canceled when --end - newCtx := context.Background() - activeReconciliations, err := counterStorage.Get(newCtx, storage.ActiveReconciliationCounter) - if err != nil { - color.Green("Check succeeded") - os.Exit(0) - } + activeReconciliations, activeErr := counterStorage.Get( + ctx, + storage.ActiveReconciliationCounter, + ) + inactiveReconciliations, inactiveErr := counterStorage.Get( + ctx, + storage.InactiveReconciliationCounter, + ) - inactiveReconciliations, err := counterStorage.Get(newCtx, storage.InactiveReconciliationCounter) - if err != nil { + if activeErr != nil || inactiveErr != nil || + new(big.Int).Add(activeReconciliations, inactiveReconciliations).Sign() != 0 { color.Green("Check succeeded") - os.Exit(0) - } - - if new(big.Int).Add(activeReconciliations, inactiveReconciliations).Sign() == 0 { + } else { // warn caller when check succeeded but no reconciliations performed (as issues may still exist) color.Yellow("Check succeeded, however, no reconciliations were performed!") - } else { - color.Green("Check succeeded") } os.Exit(0) - } color.Red("Check failed: %s", err.Error()) @@ -714,7 +731,7 @@ func runCheckCmd(cmd *cobra.Command, args []string) { color.Red("Searching for block with missing operations...hold tight") badBlock, err := findMissingOps( - context.Background(), + ctx, &sigListeners, reconcilerHandler.InactiveFailure, reconcilerHandler.InactiveFailureBlock.Index-InactiveFailureLookbackWindow, diff --git a/internal/processor/reconciler_handler.go b/internal/processor/reconciler_handler.go index 67f07956..b00445cf 100644 --- a/internal/processor/reconciler_handler.go +++ b/internal/processor/reconciler_handler.go @@ -103,9 +103,9 @@ func (h *ReconcilerHandler) ReconciliationSucceeded( ) error { // Update counters if reconciliationType == reconciler.InactiveReconciliation { - h.logger.CounterStorage.Update(ctx, storage.InactiveReconciliationCounter, big.NewInt(1)) + _, _ = h.logger.CounterStorage.Update(ctx, storage.InactiveReconciliationCounter, big.NewInt(1)) } else { - h.logger.CounterStorage.Update(ctx, storage.ActiveReconciliationCounter, big.NewInt(1)) + _, _ = h.logger.CounterStorage.Update(ctx, storage.ActiveReconciliationCounter, big.NewInt(1)) } return h.logger.ReconcileSuccessStream( diff --git a/internal/processor/syncer_handler.go b/internal/processor/syncer_handler.go index d5dc3d76..374cb9e6 100644 --- a/internal/processor/syncer_handler.go +++ b/internal/processor/syncer_handler.go @@ -96,13 +96,17 @@ func (h *SyncerHandler) BlockAdded( } // Update Counters - h.logger.CounterStorage.Update(ctx, storage.BlockCounter, big.NewInt(1)) - h.logger.CounterStorage.Update(ctx, storage.TransactionCounter, big.NewInt(int64(len(block.Transactions)))) + _, _ = h.logger.CounterStorage.Update(ctx, storage.BlockCounter, big.NewInt(1)) + _, _ = h.logger.CounterStorage.Update( + ctx, + storage.TransactionCounter, + big.NewInt(int64(len(block.Transactions))), + ) opCount := int64(0) for _, txn := range block.Transactions { opCount += int64(len(txn.Operations)) } - h.logger.CounterStorage.Update(ctx, storage.OperationCounter, big.NewInt(opCount)) + _, _ = h.logger.CounterStorage.Update(ctx, storage.OperationCounter, big.NewInt(opCount)) // Mark accounts for reconciliation...this may be // blocking @@ -130,7 +134,7 @@ func (h *SyncerHandler) BlockRemoved( } // Update Counters - h.logger.CounterStorage.Update(ctx, storage.OrphanCounter, big.NewInt(1)) + _, _ = h.logger.CounterStorage.Update(ctx, storage.OrphanCounter, big.NewInt(1)) // We only attempt to reconciler changes when blocks are added, // not removed From b85c333856e781cda156b330d102c32b23e73ac5 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Wed, 15 Jul 2020 17:12:04 -0700 Subject: [PATCH 4/4] Print temporary directory if used --- internal/utils/utils.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 69879bfc..f00821e4 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -20,6 +20,7 @@ import ( "os" "github.com/coinbase/rosetta-sdk-go/types" + "github.com/fatih/color" ) // CreateTempDir creates a directory in @@ -30,6 +31,7 @@ func CreateTempDir() (string, error) { return "", err } + color.Cyan("Using temporary directory %s", storageDir) return storageDir, nil }