Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/db2/history: Include lower bound on descending history queries #5465

Merged
merged 6 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions services/horizon/internal/actions/effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (handler GetEffectsHandler) GetResourcePage(w HeaderWriter, r *http.Request
return nil, err
}

records, err := loadEffectRecords(r.Context(), historyQ, qp, pq)
records, err := loadEffectRecords(r.Context(), historyQ, qp, pq, handler.LedgerState.CurrentStatus().HistoryElder)
if err != nil {
return nil, errors.Wrap(err, "loading transaction records")
}
Expand All @@ -94,20 +94,20 @@ func (handler GetEffectsHandler) GetResourcePage(w HeaderWriter, r *http.Request
return result, nil
}

func loadEffectRecords(ctx context.Context, hq *history.Q, qp EffectsQuery, pq db2.PageQuery) ([]history.Effect, error) {
func loadEffectRecords(ctx context.Context, hq *history.Q, qp EffectsQuery, pq db2.PageQuery, oldestLedger int32) ([]history.Effect, error) {
switch {
case qp.AccountID != "":
return hq.EffectsForAccount(ctx, qp.AccountID, pq)
return hq.EffectsForAccount(ctx, qp.AccountID, pq, oldestLedger)
case qp.LiquidityPoolID != "":
return hq.EffectsForLiquidityPool(ctx, qp.LiquidityPoolID, pq)
return hq.EffectsForLiquidityPool(ctx, qp.LiquidityPoolID, pq, oldestLedger)
case qp.OperationID > 0:
urvisavla marked this conversation as resolved.
Show resolved Hide resolved
return hq.EffectsForOperation(ctx, int64(qp.OperationID), pq)
case qp.LedgerID > 0:
return hq.EffectsForLedger(ctx, int32(qp.LedgerID), pq)
case qp.TxHash != "":
return hq.EffectsForTransaction(ctx, qp.TxHash, pq)
default:
return hq.Effects(ctx, pq)
return hq.Effects(ctx, pq, oldestLedger)
}
}

Expand Down
3 changes: 2 additions & 1 deletion services/horizon/internal/actions/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func (handler GetLedgersHandler) GetResourcePage(w HeaderWriter, r *http.Request
}

var records []history.Ledger
if err = historyQ.Ledgers().Page(pq).Select(r.Context(), &records); err != nil {
err = historyQ.Ledgers().Page(pq, handler.LedgerState.CurrentStatus().HistoryElder).Select(r.Context(), &records)
if err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/actions/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (handler GetOperationsHandler) GetResourcePage(w HeaderWriter, r *http.Requ
query.OnlyPayments()
}

ops, txs, err := query.Page(pq).Fetch(ctx)
ops, txs, err := query.Page(pq, handler.LedgerState.CurrentStatus().HistoryElder).Fetch(ctx)
if err != nil {
return nil, err
}
Expand Down
9 changes: 5 additions & 4 deletions services/horizon/internal/actions/trade.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,20 @@ func (handler GetTradesHandler) GetResourcePage(w HeaderWriter, r *http.Request)
return nil, err
}

oldestLedger := handler.LedgerState.CurrentStatus().HistoryElder
if baseAsset != nil {
counterAsset, err = qp.Counter()
if err != nil {
return nil, err
}

records, err = historyQ.GetTradesForAssets(ctx, pq, qp.AccountID, qp.TradeType, *baseAsset, *counterAsset)
records, err = historyQ.GetTradesForAssets(ctx, pq, oldestLedger, qp.AccountID, qp.TradeType, *baseAsset, *counterAsset)
} else if qp.OfferID != 0 {
records, err = historyQ.GetTradesForOffer(ctx, pq, int64(qp.OfferID))
records, err = historyQ.GetTradesForOffer(ctx, pq, oldestLedger, int64(qp.OfferID))
} else if qp.PoolID != "" {
records, err = historyQ.GetTradesForLiquidityPool(ctx, pq, qp.PoolID)
records, err = historyQ.GetTradesForLiquidityPool(ctx, pq, oldestLedger, qp.PoolID)
} else {
records, err = historyQ.GetTrades(ctx, pq, qp.AccountID, qp.TradeType)
records, err = historyQ.GetTrades(ctx, pq, oldestLedger, qp.AccountID, qp.TradeType)
}
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/actions/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (handler GetTransactionsHandler) GetResourcePage(w HeaderWriter, r *http.Re
return nil, err
}

records, err := loadTransactionRecords(ctx, historyQ, qp, pq)
records, err := loadTransactionRecords(ctx, historyQ, qp, pq, handler.LedgerState.CurrentStatus().HistoryElder)
if err != nil {
return nil, errors.Wrap(err, "loading transaction records")
}
Expand All @@ -141,7 +141,7 @@ func (handler GetTransactionsHandler) GetResourcePage(w HeaderWriter, r *http.Re
// loadTransactionRecords returns a slice of transaction records of an
// account/ledger identified by accountID/ledgerID based on pq and
// includeFailedTx.
func loadTransactionRecords(ctx context.Context, hq *history.Q, qp TransactionsQuery, pq db2.PageQuery) ([]history.Transaction, error) {
func loadTransactionRecords(ctx context.Context, hq *history.Q, qp TransactionsQuery, pq db2.PageQuery, oldestLedger int32) ([]history.Transaction, error) {
var records []history.Transaction

txs := hq.Transactions()
Expand All @@ -160,7 +160,7 @@ func loadTransactionRecords(ctx context.Context, hq *history.Q, qp TransactionsQ
txs.IncludeFailed()
}

err := txs.Page(pq).Select(ctx, &records)
err := txs.Page(pq, oldestLedger).Select(ctx, &records)
if err != nil {
return nil, errors.Wrap(err, "executing transaction records query")
}
Expand Down
31 changes: 24 additions & 7 deletions services/horizon/internal/db2/history/effect.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/stellar/go/toid"
)

const genesisLedger = 2

// UnmarshalDetails unmarshals the details of this effect into `dest`
func (r *Effect) UnmarshalDetails(dest interface{}) error {
if !r.DetailsString.Valid {
Expand Down Expand Up @@ -70,7 +72,7 @@ func (r *Effect) PagingToken() string {
}

// Effects returns a page of effects without any filters besides the cursor
func (q *Q) Effects(ctx context.Context, page db2.PageQuery) ([]Effect, error) {
func (q *Q) Effects(ctx context.Context, page db2.PageQuery, oldestLedger int32) ([]Effect, error) {
op, idx, err := parseEffectsCursor(page)
if err != nil {
return nil, err
Expand All @@ -87,6 +89,9 @@ func (q *Q) Effects(ctx context.Context, page db2.PageQuery) ([]Effect, error) {
Where("(heff.history_operation_id, heff.order) > (?, ?)", op, idx).
OrderBy("heff.history_operation_id asc, heff.order asc")
case "desc":
if lowerBound := lowestLedgerBound(oldestLedger); lowerBound > 0 {
query = query.Where("heff.history_operation_id > ?", lowerBound)
}
query = query.
Where("(heff.history_operation_id, heff.order) < (?, ?)", op, idx).
OrderBy("heff.history_operation_id desc, heff.order desc")
Expand All @@ -101,14 +106,14 @@ func (q *Q) Effects(ctx context.Context, page db2.PageQuery) ([]Effect, error) {
}

// EffectsForAccount returns a page of effects for a given account
func (q *Q) EffectsForAccount(ctx context.Context, aid string, page db2.PageQuery) ([]Effect, error) {
func (q *Q) EffectsForAccount(ctx context.Context, aid string, page db2.PageQuery, oldestLedger int32) ([]Effect, error) {
var account Account
if err := q.AccountByAddress(ctx, &account, aid); err != nil {
return nil, err
}

query := selectEffect.Where("heff.history_account_id = ?", account.ID)
return q.selectEffectsPage(ctx, query, page)
return q.selectEffectsPage(ctx, query, page, oldestLedger)
}

// EffectsForLedger returns a page of effects for a given ledger sequence
Expand All @@ -125,7 +130,7 @@ func (q *Q) EffectsForLedger(ctx context.Context, seq int32, page db2.PageQuery)
start.ToInt64(),
end.ToInt64(),
)
return q.selectEffectsPage(ctx, query, page)
return q.selectEffectsPage(ctx, query, page, 0)
}

// EffectsForOperation returns a page of effects for a given operation id.
Expand All @@ -138,11 +143,11 @@ func (q *Q) EffectsForOperation(ctx context.Context, id int64, page db2.PageQuer
start.ToInt64(),
end.ToInt64(),
)
return q.selectEffectsPage(ctx, query, page)
return q.selectEffectsPage(ctx, query, page, 0)
}

// EffectsForLiquidityPool returns a page of effects for a given liquidity pool.
func (q *Q) EffectsForLiquidityPool(ctx context.Context, id string, page db2.PageQuery) ([]Effect, error) {
func (q *Q) EffectsForLiquidityPool(ctx context.Context, id string, page db2.PageQuery, oldestLedger int32) ([]Effect, error) {
op, _, err := page.CursorInt64Pair(db2.DefaultPairSep)
if err != nil {
return nil, err
Expand Down Expand Up @@ -173,6 +178,7 @@ func (q *Q) EffectsForLiquidityPool(ctx context.Context, id string, page db2.Pag
"heff.history_operation_id": liquidityPoolOperationIDs,
}),
page,
oldestLedger,
)
}

Expand All @@ -194,6 +200,7 @@ func (q *Q) EffectsForTransaction(ctx context.Context, hash string, page db2.Pag
end.ToInt64(),
),
page,
0,
)
}

Expand All @@ -209,7 +216,14 @@ func parseEffectsCursor(page db2.PageQuery) (int64, int64, error) {
return op, idx, nil
}

func (q *Q) selectEffectsPage(ctx context.Context, query sq.SelectBuilder, page db2.PageQuery) ([]Effect, error) {
func lowestLedgerBound(oldestLedger int32) int64 {
if oldestLedger <= genesisLedger {
return 0
}
return toid.AfterLedger(oldestLedger - 1).ToInt64()
}

func (q *Q) selectEffectsPage(ctx context.Context, query sq.SelectBuilder, page db2.PageQuery, oldestLedger int32) ([]Effect, error) {
op, idx, err := parseEffectsCursor(page)
if err != nil {
return nil, err
Expand All @@ -230,6 +244,9 @@ func (q *Q) selectEffectsPage(ctx context.Context, query sq.SelectBuilder, page
))`, op, op, op, idx).
OrderBy("heff.history_operation_id asc, heff.order asc")
case "desc":
if lowerBound := lowestLedgerBound(oldestLedger); lowerBound > 0 {
query = query.Where("heff.history_operation_id > ?", lowerBound)
}
query = query.
Where(`(
heff.history_operation_id <= ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"encoding/json"
"fmt"
"testing"

"github.com/guregu/null"
Expand All @@ -16,7 +17,7 @@ func TestAddEffect(t *testing.T) {
defer tt.Finish()
test.ResetHorizonDB(t, tt.HorizonDB)
q := &Q{tt.HorizonSession()}
tt.Assert.NoError(q.Begin(tt.Ctx))
tt.Require.NoError(q.Begin(tt.Ctx))

address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY"
muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26"
Expand All @@ -37,25 +38,42 @@ func TestAddEffect(t *testing.T) {
3,
details,
)
tt.Assert.NoError(err)
tt.Require.NoError(err)

tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q))
tt.Assert.NoError(builder.Exec(tt.Ctx, q))
tt.Assert.NoError(q.Commit())
tt.Require.NoError(accountLoader.Exec(tt.Ctx, q))
tt.Require.NoError(builder.Exec(tt.Ctx, q))
tt.Require.NoError(q.Commit())

effects, err := q.Effects(tt.Ctx, db2.PageQuery{
Cursor: "0-0",
Order: "asc",
Limit: 200,
})
}, 0)
tt.Require.NoError(err)
tt.Assert.Len(effects, 1)
tt.Require.Len(effects, 1)

effect := effects[0]
tt.Assert.Equal(address, effect.Account)
tt.Assert.Equal(muxedAddres, effect.AccountMuxed.String)
tt.Assert.Equal(int64(240518172673), effect.HistoryOperationID)
tt.Assert.Equal(int32(1), effect.Order)
tt.Assert.Equal(EffectType(3), effect.Type)
tt.Assert.Equal("{\"amount\": \"1000.0000000\", \"asset_type\": \"native\"}", effect.DetailsString.String)
tt.Require.Equal(address, effect.Account)
tt.Require.Equal(muxedAddres, effect.AccountMuxed.String)
tt.Require.Equal(int64(240518172673), effect.HistoryOperationID)
tt.Require.Equal(int32(1), effect.Order)
tt.Require.Equal(EffectType(3), effect.Type)
tt.Require.Equal("{\"amount\": \"1000.0000000\", \"asset_type\": \"native\"}", effect.DetailsString.String)

effects, err = q.Effects(tt.Ctx, db2.PageQuery{
Cursor: fmt.Sprintf("%d-0", toid.New(sequence+2, 0, 0).ToInt64()),
Order: "desc",
Limit: 200,
}, sequence-3)
tt.Require.NoError(err)
tt.Require.Len(effects, 1)
tt.Require.Equal(effects[0], effect)

effects, err = q.Effects(tt.Ctx, db2.PageQuery{
Cursor: fmt.Sprintf("%d-0", toid.New(sequence+5, 0, 0).ToInt64()),
Order: "desc",
Limit: 200,
}, sequence+2)
tt.Require.NoError(err)
tt.Require.Empty(effects)
}
29 changes: 23 additions & 6 deletions services/horizon/internal/db2/history/effect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,34 @@ func TestEffectsForLiquidityPool(t *testing.T) {

tt.Assert.NoError(q.Commit())

var result []Effect
result, err = q.EffectsForLiquidityPool(tt.Ctx, liquidityPoolID, db2.PageQuery{
var effects []Effect
effects, err = q.EffectsForLiquidityPool(tt.Ctx, liquidityPoolID, db2.PageQuery{
Cursor: "0-0",
Order: "asc",
Limit: 10,
})
}, 0)
tt.Assert.NoError(err)

tt.Assert.Len(result, 1)
tt.Assert.Equal(result[0].Account, address)
tt.Assert.Len(effects, 1)
effect := effects[0]
tt.Assert.Equal(effect.Account, address)

effects, err = q.EffectsForLiquidityPool(tt.Ctx, liquidityPoolID, db2.PageQuery{
Cursor: fmt.Sprintf("%d-0", toid.New(sequence+2, 0, 0).ToInt64()),
Order: "desc",
Limit: 200,
}, sequence-3)
tt.Require.NoError(err)
tt.Require.Len(effects, 1)
tt.Require.Equal(effects[0], effect)

effects, err = q.EffectsForLiquidityPool(tt.Ctx, liquidityPoolID, db2.PageQuery{
Cursor: fmt.Sprintf("%d-0", toid.New(sequence+5, 0, 0).ToInt64()),
Order: "desc",
Limit: 200,
}, sequence+2)
tt.Require.NoError(err)
tt.Require.Empty(effects)
}

func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) {
Expand Down Expand Up @@ -160,7 +177,7 @@ func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) {
Cursor: "0-0",
Order: "asc",
Limit: 200,
})
}, 0)
tt.Require.NoError(err)
tt.Require.Len(results, len(tests))

Expand Down
6 changes: 5 additions & 1 deletion services/horizon/internal/db2/history/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,15 @@ func (q *Q) LedgerCapacityUsageStats(ctx context.Context, currentSeq int32, dest
}

// Page specifies the paging constraints for the query being built by `q`.
func (q *LedgersQ) Page(page db2.PageQuery) *LedgersQ {
func (q *LedgersQ) Page(page db2.PageQuery, oldestLedger int32) *LedgersQ {
if q.Err != nil {
return q
}

if lowerBound := lowestLedgerBound(oldestLedger); lowerBound > 0 && page.Order == "desc" {
q.sql = q.sql.
Where("hl.id > ?", lowerBound)
}
q.sql, q.Err = page.ApplyTo(q.sql, "hl.id")
return q
}
Expand Down
12 changes: 7 additions & 5 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,7 @@ type OperationsQ struct {
opIdCol string
includeFailed bool
includeTransactions bool
boundedIdQuery bool
}

// Q is a helper struct on which to hang common_trades queries against a history
Expand Down Expand Up @@ -878,11 +879,12 @@ func (t *Transaction) HasPreconditions() bool {
// TransactionsQ is a helper struct to aid in configuring queries that loads
// slices of transaction structs.
type TransactionsQ struct {
Err error
parent *Q
sql sq.SelectBuilder
includeFailed bool
txIdCol string
Err error
parent *Q
sql sq.SelectBuilder
includeFailed bool
txIdCol string
boundedIdQuery bool
}

// TrustLine is row of data from the `trust_lines` table from horizon DB
Expand Down
Loading
Loading