Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Reap history object tables when ingestion is idle #4518

Merged
merged 9 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ type IngestionQ interface {
NewTradeBatchInsertBuilder(maxBatchSize int) TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error)
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -825,6 +826,145 @@ func (q *Q) CloneIngestionQ() IngestionQ {
return &Q{q.Clone()}
}

type tableObjectFieldPair struct {
// name is a table name of history table
name string
// objectField is a name of object field in history table which uses
// the lookup table.
objectField string
}

// ReapLookupTables removes rows from lookup tables like history_claimable_balances
// which aren't used (orphaned), i.e. history entries for them were reaped.
// This method must be executed inside ingestion transaction. Otherwise it may
// create invalid state in lookup and history tables.
func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, error) {
if q.GetTx() == nil {
return nil, errors.New("cannot be called outside of an ingestion transaction")
}

const batchSize = 10000

if offsets == nil {
offsets = make(map[string]int64)
}

for table, historyTables := range map[string][]tableObjectFieldPair{
"history_claimable_balances": {
{
name: "history_operation_claimable_balances",
objectField: "history_claimable_balance_id",
},
{
name: "history_transaction_claimable_balances",
objectField: "history_claimable_balance_id",
},
},
"history_liquidity_pools": {
{
name: "history_operation_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
{
name: "history_transaction_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
},
} {
query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])
if err != nil {
return nil, errors.Wrap(err, "error constructing a query")
}

_, err = q.ExecRaw(
context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType),
query,
)
if err != nil {
return nil, errors.Wrapf(err, "error running query: %s", query)
}

offsets[table] += batchSize

// Check if offset exceeds table size and then reset it
var count int64
err = q.GetRaw(ctx, &count, fmt.Sprintf("SELECT COUNT(*) FROM %s", table))
if err != nil {
return nil, err
}

if offsets[table] > count {
offsets[table] = 0
}
}
return offsets, nil
}

// constructReapLookupTablesQuery creates a query like (using history_claimable_balances
// as an example):
//
// delete from history_claimable_balances where id in
// (select id from
// (select id,
// (select count(*) from history_operation_claimable_balances
// where history_claimable_balance_id = hcb.id) as c1,
// (select count(*) from history_transaction_claimable_balances
// where history_claimable_balance_id = hcb.id) as c2,
// 1 as cx,
// from history_claimable_balances hcb order by id limit 100 offset 1000)
// as sub where c1 = 0 and c2 = 0 and 1=1);
//
// In short it checks the 100 rows omiting 1000 row of history_claimable_balances
// and counts occurences of each row in corresponding history tables.
// If there are no history rows for a given id, the row in
// history_claimable_balances is removed.
Comment on lines +917 to +920
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting way to determine "age." It creates a dependence between the history tables, right? Is there a reason we don't rely on ledger close time, instead? I guess probably because history_claimable_balances doesn't have that row 😞

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really about age. The rows are sorted by id which is just a sequence integer value assigned to specific ledger object (like claimable balance). The limit ... offset listing here is just to ensure we iterate over entire table in multiple cycles.

//
// The offset param should be increased before each execution. Given that
// some rows will be removed and some will be added by ingestion it's
// possible that rows will be skipped from deletion. But offset is reset
// when it reaches the table size so eventually all orphaned rows are
// deleted.
func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) (string, error) {
var sb strings.Builder
var err error
_, err = fmt.Fprintf(&sb, "delete from %s where id IN (select id from (select id, ", table)
if err != nil {
return "", err
}

for i, historyTable := range historyTables {
_, err = fmt.Fprintf(
&sb,
`(select count(*) from %s where %s = hcb.id) as c%d, `,
historyTable.name,
historyTable.objectField,
i,
)
if err != nil {
return "", err
}
}

_, err = fmt.Fprintf(&sb, "1 as cx from %s hcb order by id limit %d offset %d) as sub where ", table, batchSize, offset)
if err != nil {
return "", err
}

for i := range historyTables {
_, err = fmt.Fprintf(&sb, "c%d = 0 and ", i)
if err != nil {
return "", err
}
}

_, err = sb.WriteString("1=1);")
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return "", err
}

return sb.String(), nil
}

// DeleteRangeAll deletes a range of rows from all history tables between
// `start` and `end` (exclusive).
func (q *Q) DeleteRangeAll(ctx context.Context, start, end int64) error {
Expand Down
44 changes: 44 additions & 0 deletions services/horizon/internal/db2/history/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

"github.com/stellar/go/services/horizon/internal/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestLatestLedger(t *testing.T) {
Expand Down Expand Up @@ -66,3 +68,45 @@ func TestElderLedger(t *testing.T) {
tt.Assert.Equal(1, seq)
}
}

func TestConstructReapLookupTablesQuery(t *testing.T) {
query, err := constructReapLookupTablesQuery(
"history_accounts",
[]tableObjectFieldPair{
{
name: "history_effects",
objectField: "history_account_id",
},
{
name: "history_operation_participants",
objectField: "history_account_id",
},
{
name: "history_trades",
objectField: "base_account_id",
},
{
name: "history_trades",
objectField: "counter_account_id",
},
{
name: "history_transaction_participants",
objectField: "history_account_id",
},
},
10,
0,
)

require.NoError(t, err)
assert.Equal(t,
"delete from history_accounts where id IN "+
"(select id from "+
"(select id, (select count(*) from history_effects where history_account_id = hcb.id) as c0, "+
"(select count(*) from history_operation_participants where history_account_id = hcb.id) as c1, "+
"(select count(*) from history_trades where base_account_id = hcb.id) as c2, "+
"(select count(*) from history_trades where counter_account_id = hcb.id) as c3, "+
"(select count(*) from history_transaction_participants where history_account_id = hcb.id) as c4, "+
"1 as cx from history_accounts hcb order by id limit 10 offset 0) as sub "+
"where c0 = 0 and c1 = 0 and c2 = 0 and c3 = 0 and c4 = 0 and 1=1);", query)
}
74 changes: 74 additions & 0 deletions services/horizon/internal/db2/history/reap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package history_test

import (
"testing"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ledger"
"github.com/stellar/go/services/horizon/internal/reap"
"github.com/stellar/go/services/horizon/internal/test"
)

func TestReapLookupTables(t *testing.T) {
tt := test.Start(t)
defer tt.Finish()
ledgerState := &ledger.State{}
ledgerState.SetStatus(tt.Scenario("kahuna"))

db := tt.HorizonSession()

sys := reap.New(0, db, ledgerState)

var (
prevLedgers, curLedgers int
prevClaimableBalances, curClaimableBalances int
prevLiquidityPools, curLiquidityPools int
)

// Prev
{
err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`)
tt.Require.NoError(err)
}

ledgerState.SetStatus(tt.LoadLedgerStatus())
sys.RetentionCount = 1
err := sys.DeleteUnretainedHistory(tt.Ctx)
tt.Require.NoError(err)

q := &history.Q{tt.HorizonSession()}

err = q.Begin()
tt.Require.NoError(err)

newOffsets, err := q.ReapLookupTables(tt.Ctx, nil)
tt.Require.NoError(err)

err = q.Commit()
tt.Require.NoError(err)

// cur
{
err := db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`)
tt.Require.NoError(err)
err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`)
tt.Require.NoError(err)
}

tt.Assert.Equal(61, prevLedgers, "prevLedgers")
tt.Assert.Equal(1, curLedgers, "curLedgers")
tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances")
tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances")
tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools")
tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools")

tt.Assert.Len(newOffsets, 2)
tt.Assert.Equal(int64(0), newOffsets["history_claimable_balances"])
tt.Assert.Equal(int64(0), newOffsets["history_liquidity_pools"])
}
1 change: 1 addition & 0 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ func (r resumeState) run(s *system) (transition, error) {
localLog.Info("Processed ledger")

s.maybeVerifyState(ingestLedger)
s.maybeReapLookupTables(ingestLedger)

return resumeImmediately(ingestLedger), nil
}
Expand Down
62 changes: 62 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ type Metrics struct {
// duration of rebuilding trade aggregation buckets.
LedgerIngestionTradeAggregationDuration prometheus.Summary

// LedgerIngestionReapLookupTablesDuration exposes timing metrics about the rate and
// duration of reaping lookup tables.
LedgerIngestionReapLookupTablesDuration prometheus.Summary

// StateVerifyDuration exposes timing metrics about the rate and
// duration of state verification.
StateVerifyDuration prometheus.Summary
Expand Down Expand Up @@ -201,6 +205,8 @@ type system struct {
disableStateVerification bool

checkpointManager historyarchive.CheckpointManager

reapOffsets map[string]int64
}

func NewSystem(config Config) (System, error) {
Expand Down Expand Up @@ -313,6 +319,11 @@ func (s *system) initMetrics() {
Help: "ledger ingestion trade aggregation rebuild durations, sliding window = 10m",
})

s.metrics.LedgerIngestionReapLookupTablesDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "ledger_ingestion_reap_lookup_tables_duration_seconds",
Help: "ledger ingestion reap lookup tables durations, sliding window = 10m",
})

s.metrics.StateVerifyDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: "horizon", Subsystem: "ingest", Name: "state_verify_duration_seconds",
Help: "state verification durations, sliding window = 10m",
Expand Down Expand Up @@ -445,6 +456,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(s.metrics.LocalLatestLedger)
registry.MustRegister(s.metrics.LedgerIngestionDuration)
registry.MustRegister(s.metrics.LedgerIngestionTradeAggregationDuration)
registry.MustRegister(s.metrics.LedgerIngestionReapLookupTablesDuration)
registry.MustRegister(s.metrics.StateVerifyDuration)
registry.MustRegister(s.metrics.StateInvalidGauge)
registry.MustRegister(s.metrics.LedgerStatsCounter)
Expand Down Expand Up @@ -663,6 +675,56 @@ func (s *system) maybeVerifyState(lastIngestedLedger uint32) {
}
}

func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) {
// Check if lastIngestedLedger is the last one available in the backend
sequence, err := s.ledgerBackend.GetLatestLedgerSequence(s.ctx)
if err != nil {
log.WithField("err", err).Error("Error getting latest ledger sequence from backend")
return
}

if sequence != lastIngestedLedger {
// Catching up - skip reaping tables in this cycle.
return
}

err = s.historyQ.Begin()
if err != nil {
log.WithField("err", err).Error("Error starting a transaction")
return
}
defer s.historyQ.Rollback()

// If so block ingestion in the cluster to reap tables
_, err = s.historyQ.GetLastLedgerIngest(s.ctx)
if err != nil {
log.WithField("err", err).Error(getLastIngestedErrMsg)
return
}

// Make sure reaping will not take more than 5s, which is average ledger
// closing time.
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
defer cancel()

reapStart := time.Now()
newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets)
if err != nil {
log.WithField("err", err).Warn("Error reaping lookup tables")
return
}

err = s.historyQ.Commit()
if err != nil {
log.WithField("err", err).Error("Error commiting a transaction")
return
}

s.reapOffsets = newOffsets
reapDuration := time.Since(reapStart).Seconds()
s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration))
}

func (s *system) incrementStateVerificationErrors() int {
s.stateVerificationMutex.Lock()
defer s.stateVerificationMutex.Unlock()
Expand Down
Loading