Skip to content

Commit

Permalink
remove prunesmallbatchesdb (#13337)
Browse files Browse the repository at this point in the history
  • Loading branch information
sudeepdino008 authored Jan 8, 2025
1 parent ecbe768 commit 05d72f0
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 105 deletions.
90 changes: 0 additions & 90 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,96 +841,6 @@ func (ac *AggregatorRoTx) CanUnwindBeforeBlockNum(blockNum uint64, tx kv.Tx) (un
return blockNum, true, nil
}

func (ac *AggregatorRoTx) PruneSmallBatchesDb(ctx context.Context, timeout time.Duration, db kv.RwDB) (haveMore bool, err error) {
// On tip-of-chain timeout is about `3sec`
// On tip of chain: must be real-time - prune by small batches and prioritize exact-`timeout`
// Not on tip of chain: must be aggressive (prune as much as possible) by bigger batches

furiousPrune := timeout > 5*time.Hour
aggressivePrune := !furiousPrune && timeout >= 1*time.Minute

var pruneLimit uint64 = 1_000
if furiousPrune {
pruneLimit = 10_000_000
/* disabling this feature for now - seems it doesn't cancel even after prune finished
// start from a bit high limit to give time for warmup
// will disable warmup after first iteration and will adjust pruneLimit based on `time`
withWarmup = true
*/
}

started := time.Now()
localTimeout := time.NewTicker(timeout)
defer localTimeout.Stop()
logPeriod := 30 * time.Second
logEvery := time.NewTicker(logPeriod)
defer logEvery.Stop()
aggLogEvery := time.NewTicker(600 * time.Second) // to hide specific domain/idx logging
defer aggLogEvery.Stop()

fullStat := newAggregatorPruneStat()
innerCtx := context.Background()
goExit := false

for {
err = db.Update(innerCtx, func(tx kv.RwTx) error {
iterationStarted := time.Now()
// `context.Background()` is important here!
// it allows keep DB consistent - prune all keys-related data or noting
// can't interrupt by ctrl+c and leave dirt in DB
stat, err := ac.Prune(innerCtx, tx, pruneLimit, aggLogEvery)
if err != nil {
ac.a.logger.Warn("[snapshots] PruneSmallBatches failed", "err", err)
return err
}
if stat == nil {
if fstat := fullStat.String(); fstat != "" {
ac.a.logger.Info("[snapshots] PruneSmallBatches finished", "took", time.Since(started).String(), "stat", fstat)
}
goExit = true
return nil
}
fullStat.Accumulate(stat)

if aggressivePrune {
took := time.Since(iterationStarted)
if took < 2*time.Second {
pruneLimit *= 10
}
if took > logPeriod {
pruneLimit /= 10
}
}

select {
case <-logEvery.C:
ac.a.logger.Info("[snapshots] pruning state",
"until commit", time.Until(started.Add(timeout)).String(),
"pruneLimit", pruneLimit,
"aggregatedStep", ac.StepsInFiles(kv.StateDomains...),
"stepsRangeInDB", ac.a.StepsRangeInDBAsStr(tx),
"pruned", fullStat.String(),
)
default:
}
return nil
})
if err != nil {
return false, err
}
select {
case <-localTimeout.C: //must be first to improve responsivness
return true, nil
case <-ctx.Done():
return false, ctx.Err()
default:
}
if goExit {
return false, nil
}
}
}

// PruneSmallBatches is not cancellable, it's over when it's over or failed.
// It fills whole timeout with pruning by small batches (of 100 keys) and making some progress
func (ac *AggregatorRoTx) PruneSmallBatches(ctx context.Context, timeout time.Duration, tx kv.RwTx) (haveMore bool, err error) {
Expand Down
9 changes: 4 additions & 5 deletions erigon-lib/state/domain_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,10 @@ func TestSharedDomain_StorageIter(t *testing.T) {
ac.Close()
ac = agg.BeginFilesRo()

//err = db.Update(ctx, func(tx kv.RwTx) error {
// _, err = ac.PruneSmallBatches(ctx, 1*time.Minute, tx)
// return err
//})
_, err = ac.PruneSmallBatchesDb(ctx, 1*time.Minute, db)
err = db.Update(ctx, func(tx kv.RwTx) error {
_, err = ac.PruneSmallBatches(ctx, 1*time.Minute, tx)
return err
})
require.NoError(t, err)

ac.Close()
Expand Down
26 changes: 16 additions & 10 deletions turbo/app/snapshots_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -1381,15 +1381,18 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
}

logger.Info("Prune state history")
ac := agg.BeginFilesRo()
defer ac.Close()
for hasMoreToPrune := true; hasMoreToPrune; {
hasMoreToPrune, err = ac.PruneSmallBatchesDb(ctx, 2*time.Minute, db)
if err != nil {
if err := db.Update(ctx, func(tx kv.RwTx) error {
ac := tx.(libstate.HasAggTx).AggTx().(*libstate.AggregatorRoTx)
hasMoreToPrune, err = ac.PruneSmallBatches(ctx, 2*time.Minute, tx)
if err != nil {
return err
}
return nil
}); err != nil {
return err
}
}
ac.Close()

logger.Info("Work on state history snapshots")
indexWorkers := estimate.IndexSnapshot.Workers()
Expand Down Expand Up @@ -1435,15 +1438,18 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
return err
}

ac = agg.BeginFilesRo()
defer ac.Close()
for hasMoreToPrune := true; hasMoreToPrune; {
hasMoreToPrune, err = ac.PruneSmallBatchesDb(context.Background(), 2*time.Minute, db)
if err != nil {
if err := db.Update(ctx, func(tx kv.RwTx) error {
ac := tx.(libstate.HasAggTx).AggTx().(*libstate.AggregatorRoTx)
hasMoreToPrune, err = ac.PruneSmallBatches(ctx, 2*time.Minute, tx)
if err != nil {
return err
}
return nil
}); err != nil {
return err
}
}
ac.Close()

if err = agg.MergeLoop(ctx); err != nil {
return err
Expand Down

0 comments on commit 05d72f0

Please sign in to comment.