Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Reap in batches of 100k ledgers per second, to play nicely with others #3823

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
* Fix bug in horizon reap system (used by `horizon db reap` command and when horizon is configured with `--history-retention-count`) which could lead to partial deletions. ([3754](https://github.com/stellar/go/pull/3754))
* Log debug messages from captive core at the appropriate log level. ([3746](https://github.com/stellar/go/pull/3746))
* Add a feature flag `--captive-core-reuse-storage-path`/`CAPTIVE_CORE_REUSE_STORAGE_PATH` that will reuse Captive Core's storage path for bucket files when applicable for better performance. ([3750](https://github.com/stellar/go/pull/3750))
* Limit reap to 100k ledgers/second, to prevent excess CPU usage ([3823](https://github.com/stellar/go/pull/3823)).

## v2.6.1

Expand Down
58 changes: 39 additions & 19 deletions services/horizon/internal/reap/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (r *System) DeleteUnretainedHistory(ctx context.Context) error {
return nil
}

err := r.clearBefore(ctx, targetElder)
err := r.clearBefore(ctx, latest.HistoryElder, targetElder)
if err != nil {
return err
}
Expand Down Expand Up @@ -70,28 +70,48 @@ func (r *System) runOnce(ctx context.Context) {
}
}

func (r *System) clearBefore(ctx context.Context, seq int32) error {
log.WithField("new_elder", seq).Info("reaper: clearing")
// Work backwards in 100k ledger blocks to prevent using all the CPU.
//
// This runs every hour, so we need to make sure it doesn't
// run for longer than an hour.
//
// Current ledger at 2021-08-12 is 36,827,497, so 100k means 368 batches. At 1
// batch/second, that seems like a reasonable balance between running well
// under an hour, and slowing it down enough to leave some CPU for other
// processes.
var batchSize = int32(100_000)
var sleep = 1 * time.Second

func (r *System) clearBefore(ctx context.Context, startSeq, endSeq int32) error {
for batchEndSeq := endSeq - 1; batchEndSeq >= startSeq; batchEndSeq -= batchSize {
batchStartSeq := batchEndSeq - batchSize
if batchStartSeq < startSeq {
batchStartSeq = startSeq
}
log.WithField("start_ledger", batchStartSeq).WithField("end_ledger", batchEndSeq).Info("reaper: clearing")

start, end, err := toid.LedgerRangeInclusive(1, seq-1)
if err != nil {
return err
}
batchStart, batchEnd, err := toid.LedgerRangeInclusive(batchStartSeq, batchEndSeq)
if err != nil {
return err
}

err = r.HistoryQ.Begin()
if err != nil {
return errors.Wrap(err, "Error in begin")
}
defer r.HistoryQ.Rollback()
err = r.HistoryQ.Begin()
if err != nil {
return errors.Wrap(err, "Error in begin")
}
defer r.HistoryQ.Rollback()

err = r.HistoryQ.DeleteRangeAll(ctx, start, end)
if err != nil {
return errors.Wrap(err, "Error in DeleteRangeAll")
}
err = r.HistoryQ.DeleteRangeAll(ctx, batchStart, batchEnd)
if err != nil {
return errors.Wrap(err, "Error in DeleteRangeAll")
}

err = r.HistoryQ.Commit()
if err != nil {
return errors.Wrap(err, "Error in commit")
err = r.HistoryQ.Commit()
if err != nil {
return errors.Wrap(err, "Error in commit")
}

time.Sleep(sleep)
paulbellamy marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
Expand Down
3 changes: 3 additions & 0 deletions services/horizon/internal/reap/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ func TestDeleteUnretainedHistory(t *testing.T) {

sys := New(0, db, ledgerState)

// Disable sleeps for this.
sleep = 0

var (
prev int
cur int
Expand Down