diff --git a/cmd/algorand-indexer/daemon.go b/cmd/algorand-indexer/daemon.go index 13cee1a31..d21fb26f3 100644 --- a/cmd/algorand-indexer/daemon.go +++ b/cmd/algorand-indexer/daemon.go @@ -243,7 +243,7 @@ var daemonCmd = &cobra.Command{ imp := importer.NewImporter(db) logger.Info("Initializing local ledger.") - proc, err := blockprocessor.MakeProcessorWithLedgerInit(logger, catchpoint, &genesis, nextDBRound, opts, imp.ImportBlock) + proc, err := blockprocessor.MakeProcessorWithLedgerInit(ctx, logger, catchpoint, &genesis, nextDBRound, opts, imp.ImportBlock) if err != nil { maybeFail(err, "blockprocessor.MakeProcessor() err %v", err) } diff --git a/processor/blockprocessor/block_processor.go b/processor/blockprocessor/block_processor.go index 5cacda195..c7661745a 100644 --- a/processor/blockprocessor/block_processor.go +++ b/processor/blockprocessor/block_processor.go @@ -1,7 +1,9 @@ package blockprocessor import ( + "context" "fmt" + log "github.com/sirupsen/logrus" "github.com/algorand/go-algorand/config" @@ -37,7 +39,7 @@ func MakeProcessorWithLedger(l *ledger.Ledger, handler func(block *ledgercore.Va } // MakeProcessorWithLedgerInit creates a block processor and initializes the ledger. -func MakeProcessorWithLedgerInit(logger *log.Logger, catchpoint string, genesis *bookkeeping.Genesis, nextDBRound uint64, opts idb.IndexerDbOptions, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) { +func MakeProcessorWithLedgerInit(ctx context.Context, logger *log.Logger, catchpoint string, genesis *bookkeeping.Genesis, nextDBRound uint64, opts idb.IndexerDbOptions, handler func(block *ledgercore.ValidatedBlock) error) (processor.Processor, error) { if nextDBRound > 0 { if catchpoint != "" { round, _, err := ledgercore.ParseCatchpointLabel(catchpoint) @@ -47,19 +49,18 @@ func MakeProcessorWithLedgerInit(logger *log.Logger, catchpoint string, genesis if uint64(round) >= nextDBRound { logger.Warnf("round for given catchpoint is ahead of db round. skip fast catchup") } else { - err = InitializeLedgerFastCatchup(logger, catchpoint, opts.IndexerDatadir, *genesis) + err = InitializeLedgerFastCatchup(ctx, logger, catchpoint, opts.IndexerDatadir, *genesis) if err != nil { return &blockProcessor{}, fmt.Errorf("MakeProcessorWithCatchup() fast catchup err: %w", err) } } - } - err := InitializeLedgerSimple(logger, nextDBRound-1, &opts) + err := InitializeLedgerSimple(ctx, logger, nextDBRound-1, &opts) if err != nil { return &blockProcessor{}, fmt.Errorf("MakeProcessorWithCatchup() slow catchup err: %w", err) } } - return MakeProcessor(logger, genesis, nextDBRound, opts.AlgodDataDir, handler) + return MakeProcessor(logger, genesis, nextDBRound, opts.IndexerDatadir, handler) } // MakeProcessor creates a block processor diff --git a/processor/blockprocessor/initialize.go b/processor/blockprocessor/initialize.go index 73e327eda..3d6f31227 100644 --- a/processor/blockprocessor/initialize.go +++ b/processor/blockprocessor/initialize.go @@ -4,9 +4,7 @@ import ( "context" "fmt" "os" - "os/signal" "path/filepath" - "syscall" "time" "github.com/algorand/go-algorand-sdk/client/v2/algod" @@ -26,21 +24,9 @@ import ( ) // InitializeLedgerSimple executes the migration core functionality. -func InitializeLedgerSimple(logger *log.Logger, round uint64, opts *idb.IndexerDbOptions) error { - ctx, cf := context.WithCancel(context.Background()) +func InitializeLedgerSimple(ctx context.Context, logger *log.Logger, round uint64, opts *idb.IndexerDbOptions) error { + ctx, cf := context.WithCancel(ctx) defer cf() - { - cancelCh := make(chan os.Signal, 1) - signal.Notify(cancelCh, syscall.SIGTERM, syscall.SIGINT) - go func() { - <-cancelCh - logger.Errorf("Ledger migration interrupted") - // exit process if migration is interrupted so that - // migration state doesn't get updated in db - os.Exit(1) - }() - } - var bot fetcher.Fetcher var err error if opts.IndexerDatadir == "" { @@ -81,9 +67,10 @@ func InitializeLedgerSimple(logger *log.Logger, round uint64, opts *idb.IndexerD return nil } -func fullNodeCatchup(logger *log.Logger, round basics.Round, catchpoint, dataDir string, genesis bookkeeping.Genesis) error { +func fullNodeCatchup(ctx context.Context, logger *log.Logger, round basics.Round, catchpoint, dataDir string, genesis bookkeeping.Genesis) error { + ctx, cf := context.WithCancel(ctx) + defer cf() wrappedLogger := logging.NewWrappedLogger(logger) - node, err := node.MakeFull( wrappedLogger, dataDir, @@ -93,30 +80,45 @@ func fullNodeCatchup(logger *log.Logger, round basics.Round, catchpoint, dataDir if err != nil { return err } - // remove node directory after when exiting fast catchup mode - defer os.RemoveAll(filepath.Join(dataDir, genesis.ID())) node.Start() - time.Sleep(5 * time.Second) - logger.Info("algod node running") + defer func() { + node.Stop() + logger.Info("algod node stopped") + }() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(5 * time.Second): + logger.Info("algod node running") + } + status, err := node.Status() + if err != nil { + return err + } node.StartCatchup(catchpoint) - // If the node isn't in fast catchup mode, catchpoint will be empty. + logger.Infof("Running fast catchup using catchpoint %s", catchpoint) for status.LastRound < round { - time.Sleep(2 * time.Second) - status, err = node.Status() - if status.CatchpointCatchupTotalBlocks > 0 { - logger.Debugf("current round %d ", status.LastRound) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(2 * time.Second): + status, err = node.Status() + logger.Infof("Catchpoint Catchup Total Accounts %d ", status.CatchpointCatchupTotalAccounts) + logger.Infof("Catchpoint Catchup Processed Accounts %d ", status.CatchpointCatchupProcessedAccounts) + logger.Infof("Catchpoint Catchup Verified Accounts %d ", status.CatchpointCatchupVerifiedAccounts) + logger.Infof("Catchpoint Catchup Total Blocks %d ", status.CatchpointCatchupTotalBlocks) + logger.Infof("Catchpoint Catchup Acquired Blocks %d ", status.CatchpointCatchupAcquiredBlocks) } + } - logger.Info("fast catchup completed") - node.Stop() - logger.Info("algod node stopped") + logger.Infof("fast catchup completed in %v", status.CatchupTime.Seconds()) return nil } // InitializeLedgerFastCatchup executes the migration core functionality. -func InitializeLedgerFastCatchup(logger *log.Logger, catchpoint, dataDir string, genesis bookkeeping.Genesis) error { +func InitializeLedgerFastCatchup(ctx context.Context, logger *log.Logger, catchpoint, dataDir string, genesis bookkeeping.Genesis) error { if dataDir == "" { return fmt.Errorf("InitializeLedgerFastCatchup() err: indexer data directory missing") } @@ -128,11 +130,12 @@ func InitializeLedgerFastCatchup(logger *log.Logger, catchpoint, dataDir string, // TODO: switch to catchup service catchup. //err = internal.CatchupServiceCatchup(logger, round, catchpoint, dataDir, genesis) - err = fullNodeCatchup(logger, round, catchpoint, dataDir, genesis) + err = fullNodeCatchup(ctx, logger, round, catchpoint, dataDir, genesis) if err != nil { return fmt.Errorf("fullNodeCatchup() err: %w", err) } - + // remove node directory after fast catchup completes + defer os.RemoveAll(filepath.Join(dataDir, genesis.ID())) // move ledger to indexer directory ledgerFiles := []string{ "ledger.block.sqlite", @@ -185,6 +188,7 @@ func handleBlock(block *rpcs.EncodedBlockCert, proc processor.Processor) error { "block %d import failed", block.Block.Round()) return fmt.Errorf("handleBlock() err: %w", err) } + logger.Infof("Initialize Ledger: added block %d to ledger", block.Block.Round()) return nil } func getGenesis(client *algod.Client) (bookkeeping.Genesis, error) { diff --git a/processor/blockprocessor/initialize_test.go b/processor/blockprocessor/initialize_test.go index 25a10aef3..412305d68 100644 --- a/processor/blockprocessor/initialize_test.go +++ b/processor/blockprocessor/initialize_test.go @@ -1,6 +1,7 @@ package blockprocessor import ( + "context" "fmt" "os" "testing" @@ -64,7 +65,7 @@ func TestRunMigration(t *testing.T) { } // migrate 3 rounds - err = InitializeLedgerSimple(logrus.New(), 3, &opts) + err = InitializeLedgerSimple(context.Background(), logrus.New(), 3, &opts) assert.NoError(t, err) log, _ := test2.NewNullLogger() l, err := util.MakeLedger(log, false, &genesis, opts.IndexerDatadir) @@ -74,7 +75,7 @@ func TestRunMigration(t *testing.T) { l.Close() // migration continues from last round - err = InitializeLedgerSimple(logrus.New(), 6, &opts) + err = InitializeLedgerSimple(context.Background(), logrus.New(), 6, &opts) assert.NoError(t, err) l, err = util.MakeLedger(log, false, &genesis, opts.IndexerDatadir)