Skip to content

Commit

Permalink
services/horizon: Reap in batches of 100k ledgers per second, to play…
Browse files Browse the repository at this point in the history
… nicely with others (#3823)

* Reap in batches of 100k ledgers per second, to play nicely with others

* Update changelog

* use historyElder, and improve reaper logging
  • Loading branch information
Paul Bellamy authored Aug 13, 2021
1 parent 98cfc8e commit 78dbff6
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 19 deletions.
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
* Fix bug in horizon reap system (used by `horizon db reap` command and when horizon is configured with `--history-retention-count`) which could lead to partial deletions. ([3754](https://github.com/stellar/go/pull/3754))
* Log debug messages from captive core at the appropriate log level. ([3746](https://github.com/stellar/go/pull/3746))
* Add a feature flag `--captive-core-reuse-storage-path`/`CAPTIVE_CORE_REUSE_STORAGE_PATH` that will reuse Captive Core's storage path for bucket files when applicable for better performance. ([3750](https://github.com/stellar/go/pull/3750))
* Limit reap to 100k ledgers/second, to prevent excess CPU usage ([3823](https://github.com/stellar/go/pull/3823)).

## v2.6.1

Expand Down
58 changes: 39 additions & 19 deletions services/horizon/internal/reap/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (r *System) DeleteUnretainedHistory(ctx context.Context) error {
return nil
}

err := r.clearBefore(ctx, targetElder)
err := r.clearBefore(ctx, latest.HistoryElder, targetElder)
if err != nil {
return err
}
Expand Down Expand Up @@ -70,28 +70,48 @@ func (r *System) runOnce(ctx context.Context) {
}
}

func (r *System) clearBefore(ctx context.Context, seq int32) error {
log.WithField("new_elder", seq).Info("reaper: clearing")
// Work backwards in 100k ledger blocks to prevent using all the CPU.
//
// This runs every hour, so we need to make sure it doesn't
// run for longer than an hour.
//
// Current ledger at 2021-08-12 is 36,827,497, so 100k means 368 batches. At 1
// batch/second, that seems like a reasonable balance between running well
// under an hour, and slowing it down enough to leave some CPU for other
// processes.
var batchSize = int32(100_000)
var sleep = 1 * time.Second

func (r *System) clearBefore(ctx context.Context, startSeq, endSeq int32) error {
for batchEndSeq := endSeq - 1; batchEndSeq >= startSeq; batchEndSeq -= batchSize {
batchStartSeq := batchEndSeq - batchSize
if batchStartSeq < startSeq {
batchStartSeq = startSeq
}
log.WithField("start_ledger", batchStartSeq).WithField("end_ledger", batchEndSeq).Info("reaper: clearing")

start, end, err := toid.LedgerRangeInclusive(1, seq-1)
if err != nil {
return err
}
batchStart, batchEnd, err := toid.LedgerRangeInclusive(batchStartSeq, batchEndSeq)
if err != nil {
return err
}

err = r.HistoryQ.Begin()
if err != nil {
return errors.Wrap(err, "Error in begin")
}
defer r.HistoryQ.Rollback()
err = r.HistoryQ.Begin()
if err != nil {
return errors.Wrap(err, "Error in begin")
}
defer r.HistoryQ.Rollback()

err = r.HistoryQ.DeleteRangeAll(ctx, start, end)
if err != nil {
return errors.Wrap(err, "Error in DeleteRangeAll")
}
err = r.HistoryQ.DeleteRangeAll(ctx, batchStart, batchEnd)
if err != nil {
return errors.Wrap(err, "Error in DeleteRangeAll")
}

err = r.HistoryQ.Commit()
if err != nil {
return errors.Wrap(err, "Error in commit")
err = r.HistoryQ.Commit()
if err != nil {
return errors.Wrap(err, "Error in commit")
}

time.Sleep(sleep)
}

return nil
Expand Down
3 changes: 3 additions & 0 deletions services/horizon/internal/reap/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func TestDeleteUnretainedHistory(t *testing.T) {

sys := New(0, db, ledgerState)

// Disable sleeps for this.
sleep = 0

var (
prev int
cur int
Expand Down

0 comments on commit 78dbff6

Please sign in to comment.