Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print Periodic Stats #58

Merged
merged 4 commits into from
Jul 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions cmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io/ioutil"
"log"
"math/big"
"os"
"os/signal"
"path"
Expand Down Expand Up @@ -56,6 +57,12 @@ const (
// InactiveFailureLookbackWindow blocks (this process continues
// until the client halts the search or the block is found).
InactiveFailureLookbackWindow = 250

// PeriodicLoggingFrequency is the frequency that stats are printed
// to the terminal.
//
// TODO: make configurable
PeriodicLoggingFrequency = 10 * time.Second
)

var (
Expand Down Expand Up @@ -354,7 +361,10 @@ func findMissingOps(
return nil, fmt.Errorf("%w: unable to initialize database", err)
}

counterStorage := storage.NewCounterStorage(localStore)

logger := logger.NewLogger(
counterStorage,
tmpDir,
false,
false,
Expand All @@ -369,7 +379,7 @@ func findMissingOps(
nil,
)

blockStorage := storage.NewBlockStorage(ctx, localStore, blockStorageHelper)
blockStorage := storage.NewBlockStorage(localStore, blockStorageHelper)

// Ensure storage is in correct state for starting at index
if err = blockStorage.SetNewStartIndex(ctx, startIndex); err != nil {
Expand Down Expand Up @@ -529,7 +539,10 @@ func runCheckCmd(cmd *cobra.Command, args []string) {
}
defer localStore.Close(ctx)

counterStorage := storage.NewCounterStorage(localStore)

logger := logger.NewLogger(
counterStorage,
DataDir,
LogBlocks,
LogTransactions,
Expand All @@ -544,7 +557,7 @@ func runCheckCmd(cmd *cobra.Command, args []string) {
exemptAccounts,
)

blockStorage := storage.NewBlockStorage(ctx, localStore, blockStorageHelper)
blockStorage := storage.NewBlockStorage(localStore, blockStorageHelper)

// Bootstrap balances if provided
if len(BootstrapBalances) > 0 {
Expand Down Expand Up @@ -609,6 +622,15 @@ func runCheckCmd(cmd *cobra.Command, args []string) {

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
for ctx.Err() == nil {
_ = logger.LogCounterStorage(ctx)
time.Sleep(PeriodicLoggingFrequency)
}

return nil
})

g.Go(func() error {
return r.Reconcile(ctx)
})
Expand Down Expand Up @@ -654,15 +676,44 @@ func runCheckCmd(cmd *cobra.Command, args []string) {
}
}()

err = g.Wait()
handleCheckResult(g, counterStorage, reconcilerHandler, sigListeners)
}

// handleCheckResult interprets the check exectution result
// and terminates with the correct exit status.
func handleCheckResult(
g *errgroup.Group,
counterStorage *storage.CounterStorage,
reconcilerHandler *processor.ReconcilerHandler,
sigListeners []context.CancelFunc,
) {
// Initialize new context because calling context
// will no longer be usable when after termination.
ctx := context.Background()

err := g.Wait()
if signalReceived {
color.Red("Check halted")
os.Exit(1)
return
}

if err == nil || err == context.Canceled { // err == context.Canceled when --end
color.Green("Check succeeded")
activeReconciliations, activeErr := counterStorage.Get(
ctx,
storage.ActiveReconciliationCounter,
)
inactiveReconciliations, inactiveErr := counterStorage.Get(
ctx,
storage.InactiveReconciliationCounter,
)

if activeErr != nil || inactiveErr != nil ||
new(big.Int).Add(activeReconciliations, inactiveReconciliations).Sign() != 0 {
color.Green("Check succeeded")
} else { // warn caller when check succeeded but no reconciliations performed (as issues may still exist)
color.Yellow("Check succeeded, however, no reconciliations were performed!")
}
os.Exit(0)
}

Expand All @@ -680,7 +731,7 @@ func runCheckCmd(cmd *cobra.Command, args []string) {

color.Red("Searching for block with missing operations...hold tight")
badBlock, err := findMissingOps(
context.Background(),
ctx,
&sigListeners,
reconcilerHandler.InactiveFailure,
reconcilerHandler.InactiveFailureBlock.Index-InactiveFailureLookbackWindow,
Expand Down
57 changes: 57 additions & 0 deletions internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ import (
"context"
"fmt"
"log"
"math/big"
"os"
"path"

"github.com/coinbase/rosetta-cli/internal/storage"

"github.com/coinbase/rosetta-sdk-go/parser"
"github.com/coinbase/rosetta-sdk-go/reconciler"
"github.com/coinbase/rosetta-sdk-go/types"
Expand Down Expand Up @@ -66,17 +69,22 @@ type Logger struct {
logTransactions bool
logBalanceChanges bool
logReconciliation bool

// CounterStorage is some initialized CounterStorage.
CounterStorage *storage.CounterStorage
}

// NewLogger constructs a new Logger.
func NewLogger(
counterStorage *storage.CounterStorage,
logDir string,
logBlocks bool,
logTransactions bool,
logBalanceChanges bool,
logReconciliation bool,
) *Logger {
return &Logger{
CounterStorage: counterStorage,
logDir: logDir,
logBlocks: logBlocks,
logTransactions: logTransactions,
Expand All @@ -85,6 +93,55 @@ func NewLogger(
}
}

// LogCounterStorage logs all values in CounterStorage.
func (l *Logger) LogCounterStorage(ctx context.Context) error {
blocks, err := l.CounterStorage.Get(ctx, storage.BlockCounter)
if err != nil {
return fmt.Errorf("%w cannot get block counter", err)
}

if blocks.Sign() == 0 { // wait for at least 1 block to be processed
return nil
}

orphans, err := l.CounterStorage.Get(ctx, storage.OrphanCounter)
if err != nil {
return fmt.Errorf("%w cannot get orphan counter", err)
}

txs, err := l.CounterStorage.Get(ctx, storage.TransactionCounter)
if err != nil {
return fmt.Errorf("%w cannot get transaction counter", err)
}

ops, err := l.CounterStorage.Get(ctx, storage.OperationCounter)
if err != nil {
return fmt.Errorf("%w cannot get operations counter", err)
}

activeReconciliations, err := l.CounterStorage.Get(ctx, storage.ActiveReconciliationCounter)
if err != nil {
return fmt.Errorf("%w cannot get active reconciliations counter", err)
}

inactiveReconciliations, err := l.CounterStorage.Get(ctx, storage.InactiveReconciliationCounter)
if err != nil {
return fmt.Errorf("%w cannot get inactive reconciliations counter", err)
}

color.Cyan(
"[STATS] Blocks: %s (Orphaned: %s) Transactions: %s Operations: %s Reconciliations: %s (Inactive: %s)",
blocks.String(),
orphans.String(),
txs.String(),
ops.String(),
new(big.Int).Add(activeReconciliations, inactiveReconciliations).String(),
inactiveReconciliations.String(),
)

return nil
}

// AddBlockStream writes the next processed block to the end of the
// blockStreamFile output file.
func (l *Logger) AddBlockStream(
Expand Down
9 changes: 9 additions & 0 deletions internal/processor/reconciler_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package processor
import (
"context"
"errors"
"math/big"

"github.com/coinbase/rosetta-cli/internal/logger"
"github.com/coinbase/rosetta-cli/internal/storage"

"github.com/coinbase/rosetta-sdk-go/reconciler"
"github.com/coinbase/rosetta-sdk-go/types"
Expand Down Expand Up @@ -99,6 +101,13 @@ func (h *ReconcilerHandler) ReconciliationSucceeded(
balance string,
block *types.BlockIdentifier,
) error {
// Update counters
if reconciliationType == reconciler.InactiveReconciliation {
_, _ = h.logger.CounterStorage.Update(ctx, storage.InactiveReconciliationCounter, big.NewInt(1))
} else {
_, _ = h.logger.CounterStorage.Update(ctx, storage.ActiveReconciliationCounter, big.NewInt(1))
}

return h.logger.ReconcileSuccessStream(
ctx,
reconciliationType,
Expand Down
32 changes: 22 additions & 10 deletions internal/processor/syncer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package processor

import (
"context"
"log"
"math/big"

"github.com/coinbase/rosetta-cli/internal/logger"
"github.com/coinbase/rosetta-cli/internal/storage"
Expand All @@ -29,7 +29,7 @@ import (

// SyncerHandler implements the syncer.Handler interface.
type SyncerHandler struct {
storage *storage.BlockStorage
blockStorage *storage.BlockStorage
logger *logger.Logger
reconciler *reconciler.Reconciler
fetcher *fetcher.Fetcher
Expand All @@ -38,14 +38,14 @@ type SyncerHandler struct {

// NewSyncerHandler returns a new SyncerHandler.
func NewSyncerHandler(
storage *storage.BlockStorage,
blockStorage *storage.BlockStorage,
logger *logger.Logger,
reconciler *reconciler.Reconciler,
fetcher *fetcher.Fetcher,
interestingAccount *reconciler.AccountCurrency,
) *SyncerHandler {
return &SyncerHandler{
storage: storage,
blockStorage: blockStorage,
logger: logger,
reconciler: reconciler,
fetcher: fetcher,
Expand All @@ -59,14 +59,12 @@ func (h *SyncerHandler) BlockAdded(
ctx context.Context,
block *types.Block,
) error {
log.Printf("Adding block %+v\n", block.BlockIdentifier)

// Log processed blocks and balance changes
if err := h.logger.AddBlockStream(ctx, block); err != nil {
return nil
}

balanceChanges, err := h.storage.StoreBlock(ctx, block)
balanceChanges, err := h.blockStorage.StoreBlock(ctx, block)
if err != nil {
return err
}
Expand Down Expand Up @@ -97,6 +95,19 @@ func (h *SyncerHandler) BlockAdded(
}
}

// Update Counters
_, _ = h.logger.CounterStorage.Update(ctx, storage.BlockCounter, big.NewInt(1))
_, _ = h.logger.CounterStorage.Update(
ctx,
storage.TransactionCounter,
big.NewInt(int64(len(block.Transactions))),
)
opCount := int64(0)
for _, txn := range block.Transactions {
opCount += int64(len(txn.Operations))
}
_, _ = h.logger.CounterStorage.Update(ctx, storage.OperationCounter, big.NewInt(opCount))

// Mark accounts for reconciliation...this may be
// blocking
return h.reconciler.QueueChanges(ctx, block.BlockIdentifier, balanceChanges)
Expand All @@ -108,14 +119,12 @@ func (h *SyncerHandler) BlockRemoved(
ctx context.Context,
blockIdentifier *types.BlockIdentifier,
) error {
log.Printf("Orphaning block %+v\n", blockIdentifier)

// Log processed blocks and balance changes
if err := h.logger.RemoveBlockStream(ctx, blockIdentifier); err != nil {
return nil
}

balanceChanges, err := h.storage.RemoveBlock(ctx, blockIdentifier)
balanceChanges, err := h.blockStorage.RemoveBlock(ctx, blockIdentifier)
if err != nil {
return err
}
Expand All @@ -124,6 +133,9 @@ func (h *SyncerHandler) BlockRemoved(
return nil
}

// Update Counters
_, _ = h.logger.CounterStorage.Update(ctx, storage.OrphanCounter, big.NewInt(1))

// We only attempt to reconciler changes when blocks are added,
// not removed
return nil
Expand Down
1 change: 0 additions & 1 deletion internal/storage/block_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ type BlockStorage struct {

// NewBlockStorage returns a new BlockStorage.
func NewBlockStorage(
ctx context.Context,
db Database,
helper Helper,
) *BlockStorage {
Expand Down
10 changes: 5 additions & 5 deletions internal/storage/block_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestHeadBlockIdentifier(t *testing.T) {
assert.NoError(t, err)
defer database.Close(ctx)

storage := NewBlockStorage(ctx, database, &MockBlockStorageHelper{})
storage := NewBlockStorage(database, &MockBlockStorageHelper{})

t.Run("No head block set", func(t *testing.T) {
blockIdentifier, err := storage.GetHeadBlockIdentifier(ctx)
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestBlock(t *testing.T) {
assert.NoError(t, err)
defer database.Close(ctx)

storage := NewBlockStorage(ctx, database, &MockBlockStorageHelper{})
storage := NewBlockStorage(database, &MockBlockStorageHelper{})

t.Run("Set and get block", func(t *testing.T) {
_, err := storage.StoreBlock(ctx, newBlock)
Expand Down Expand Up @@ -477,7 +477,7 @@ func TestBalance(t *testing.T) {
assert.NoError(t, err)
defer database.Close(ctx)

storage := NewBlockStorage(ctx, database, mockHelper)
storage := NewBlockStorage(database, mockHelper)

t.Run("Get unset balance", func(t *testing.T) {
amount, block, err := storage.GetBalance(ctx, account, currency, newBlock)
Expand Down Expand Up @@ -800,7 +800,7 @@ func TestBootstrapBalances(t *testing.T) {
assert.NoError(t, err)
defer database.Close(ctx)

storage := NewBlockStorage(ctx, database, &MockBlockStorageHelper{})
storage := NewBlockStorage(database, &MockBlockStorageHelper{})
bootstrapBalancesFile := path.Join(newDir, "balances.csv")

t.Run("File doesn't exist", func(t *testing.T) {
Expand Down Expand Up @@ -944,7 +944,7 @@ func TestCreateBlockCache(t *testing.T) {
assert.NoError(t, err)
defer database.Close(ctx)

storage := NewBlockStorage(ctx, database, &MockBlockStorageHelper{})
storage := NewBlockStorage(database, &MockBlockStorageHelper{})

t.Run("no blocks processed", func(t *testing.T) {
assert.Equal(t, []*types.BlockIdentifier{}, storage.CreateBlockCache(ctx))
Expand Down
Loading