Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Batch liquidity pool updates/removals #3944

Merged
merged 2 commits into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions services/horizon/internal/actions/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,12 +554,7 @@ func createLP(tt *test.T, q *history.Q) history.LiquidityPool {
LastModifiedLedger: 123,
}

builder := q.NewLiquidityPoolsBatchInsertBuilder(2)

err := builder.Add(tt.Ctx, lp)
tt.Assert.NoError(err)

err = builder.Exec(tt.Ctx)
err := q.UpsertLiquidityPools(tt.Ctx, []history.LiquidityPool{lp})
tt.Assert.NoError(err)
return lp
}
Expand Down
18 changes: 6 additions & 12 deletions services/horizon/internal/actions/liquidity_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,7 @@ func TestGetLiquidityPoolByID(t *testing.T) {
LastModifiedLedger: 100,
}

builder := q.NewLiquidityPoolsBatchInsertBuilder(2)
err := builder.Add(tt.Ctx, lp)
tt.Assert.NoError(err)
err = builder.Exec(tt.Ctx)
err := q.UpsertLiquidityPools(tt.Ctx, []history.LiquidityPool{lp})
tt.Assert.NoError(err)

handler := GetLiquidityPoolByIDHandler{}
Expand Down Expand Up @@ -95,8 +92,7 @@ func TestGetLiquidityPools(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
q := &history.Q{tt.HorizonSession()}

builder := q.NewLiquidityPoolsBatchInsertBuilder(2)
err := builder.Add(tt.Ctx, history.LiquidityPool{
lp1 := history.LiquidityPool{
PoolID: "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad",
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
Fee: 30,
Expand All @@ -113,9 +109,8 @@ func TestGetLiquidityPools(t *testing.T) {
},
},
LastModifiedLedger: 100,
})
tt.Assert.NoError(err)
err = builder.Add(tt.Ctx, history.LiquidityPool{
}
lp2 := history.LiquidityPool{
PoolID: "d827bf10a721d217de3cd9ab3f10198a54de558c093a511ec426028618df2633",
Type: xdr.LiquidityPoolTypeLiquidityPoolConstantProduct,
Fee: 30,
Expand All @@ -132,9 +127,8 @@ func TestGetLiquidityPools(t *testing.T) {
},
},
LastModifiedLedger: 100,
})
tt.Assert.NoError(err)
err = builder.Exec(tt.Ctx)
}
err := q.UpsertLiquidityPools(tt.Ctx, []history.LiquidityPool{lp1, lp2})
tt.Assert.NoError(err)

handler := GetLiquidityPoolsHandler{}
Expand Down
97 changes: 30 additions & 67 deletions services/horizon/internal/db2/history/liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@ import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"strings"

sq "github.com/Masterminds/squirrel"
"github.com/stellar/go/services/horizon/internal/db2"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
Expand Down Expand Up @@ -80,16 +77,9 @@ func (lpar *LiquidityPoolAssetReserve) UnmarshalJSON(data []byte) error {
return nil
}

type LiquidityPoolsBatchInsertBuilder interface {
Add(ctx context.Context, lp LiquidityPool) error
Exec(ctx context.Context) error
}

// QLiquidityPools defines liquidity-pool-related queries.
type QLiquidityPools interface {
NewLiquidityPoolsBatchInsertBuilder(maxBatchSize int) LiquidityPoolsBatchInsertBuilder
UpdateLiquidityPool(ctx context.Context, lp LiquidityPool) (int64, error)
RemoveLiquidityPool(ctx context.Context, liquidityPoolID string, lastModifiedLedger uint32) (int64, error)
UpsertLiquidityPools(ctx context.Context, lps []LiquidityPool) error
GetLiquidityPoolsByID(ctx context.Context, poolIDs []string) ([]LiquidityPool, error)
GetAllLiquidityPools(ctx context.Context) ([]LiquidityPool, error)
CountLiquidityPools(ctx context.Context) (int, error)
Expand All @@ -98,25 +88,37 @@ type QLiquidityPools interface {
CompactLiquidityPools(ctx context.Context, cutOffSequence uint32) (int64, error)
}

// NewLiquidityPoolsBatchInsertBuilder constructs a new LiquidityPoolsBatchInsertBuilder instance
func (q *Q) NewLiquidityPoolsBatchInsertBuilder(maxBatchSize int) LiquidityPoolsBatchInsertBuilder {
cols := db.ColumnsForStruct(LiquidityPool{})
excludedCols := make([]string, len(cols))
for i, col := range cols {
excludedCols[i] = "EXCLUDED." + col
// UpsertLiquidityPools upserts a batch of liquidity pools in the liquidity_pools table.
// There's currently no limit of the number of liquidity pools this method can
// accept other than 2GB limit of the query string length what should be enough
// for each ledger with the current limits.
func (q *Q) UpsertLiquidityPools(ctx context.Context, lps []LiquidityPool) error {
var poolID, typ, fee, shareCount, trustlineCount,
assetReserves, lastModifiedLedger, deleted []interface{}

for _, lp := range lps {
poolID = append(poolID, lp.PoolID)
typ = append(typ, lp.Type)
fee = append(fee, lp.Fee)
trustlineCount = append(trustlineCount, lp.TrustlineCount)
shareCount = append(shareCount, lp.ShareCount)
assetReserves = append(assetReserves, lp.AssetReserves)
lastModifiedLedger = append(lastModifiedLedger, lp.LastModifiedLedger)
deleted = append(deleted, lp.Deleted)
}
suffix := fmt.Sprintf(
"ON CONFLICT (id) DO UPDATE SET (%s) = (%s)",
strings.Join(cols, ", "),
strings.Join(excludedCols, ", "),
)
return &liquidityPoolsBatchInsertBuilder{
builder: db.BatchInsertBuilder{
Table: q.GetTable("liquidity_pools"),
MaxBatchSize: maxBatchSize,
Suffix: suffix,
},

upsertFields := []upsertField{
{"id", "text", poolID},
{"type", "smallint", typ},
{"fee", "integer", fee},
{"trustline_count", "bigint", trustlineCount},
{"share_count", "bigint", shareCount},
{"asset_reserves", "jsonb", assetReserves},
{"last_modified_ledger", "integer", lastModifiedLedger},
{"deleted", "boolean", deleted},
}

return q.upsertRows(ctx, "liquidity_pools", "id", upsertFields)
}

// CountLiquidityPools returns the total number of liquidity pools in the DB
Expand All @@ -140,33 +142,6 @@ func (q *Q) GetLiquidityPoolsByID(ctx context.Context, poolIDs []string) ([]Liqu
return liquidityPools, err
}

// UpdateLiquidityPool updates a row in the liquidity_pools table.
// Returns number of rows affected and error.
func (q *Q) UpdateLiquidityPool(ctx context.Context, lp LiquidityPool) (int64, error) {
updateBuilder := q.GetTable("liquidity_pools").Update()
result, err := updateBuilder.SetStruct(lp, []string{}).Where("id = ?", lp.PoolID).Exec(ctx)
if err != nil {
return 0, err
}

return result.RowsAffected()
}

// RemoveLiquidityPool marks the given liquidity pool as deleted.
// Returns number of rows affected and error.
func (q *Q) RemoveLiquidityPool(ctx context.Context, liquidityPoolID string, lastModifiedLedger uint32) (int64, error) {
sql := sq.Update("liquidity_pools").
Set("deleted", true).
Set("last_modified_ledger", lastModifiedLedger).
Where(sq.Eq{"id": liquidityPoolID})
result, err := q.Exec(ctx, sql)
if err != nil {
return 0, err
}

return result.RowsAffected()
}

// FindLiquidityPoolByID returns a liquidity pool.
func (q *Q) FindLiquidityPoolByID(ctx context.Context, liquidityPoolID string) (LiquidityPool, error) {
var lp LiquidityPool
Expand Down Expand Up @@ -234,18 +209,6 @@ func (q *Q) CompactLiquidityPools(ctx context.Context, cutOffSequence uint32) (i
return result.RowsAffected()
}

type liquidityPoolsBatchInsertBuilder struct {
builder db.BatchInsertBuilder
}

func (i *liquidityPoolsBatchInsertBuilder) Add(ctx context.Context, lp LiquidityPool) error {
return i.builder.RowStruct(ctx, lp)
}

func (i *liquidityPoolsBatchInsertBuilder) Exec(ctx context.Context) error {
return i.builder.Exec(ctx)
}

var liquidityPoolsSelectStatement = "lp.id, " +
"lp.type, " +
"lp.fee, " +
Expand Down

This file was deleted.

Loading