Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/ingest: reap lookup tables without blocking ingestion #5405

Merged
merged 18 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 77 additions & 24 deletions services/horizon/internal/db2/history/account_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,35 @@ import (

var errSealed = errors.New("cannot register more entries to Loader after calling Exec()")

// ConcurrencyMode is used to configure the level of thread-safety for a loader
type ConcurrencyMode int
tamirms marked this conversation as resolved.
Show resolved Hide resolved

func (cm ConcurrencyMode) String() string {
switch cm {
case ConcurrentInserts:
return "ConcurrentInserts"
case ConcurrentDeletes:
return "ConcurrentDeletes"
default:
return "unknown"
}
}

const (
_ ConcurrencyMode = iota
// ConcurrentInserts configures the loader to maintain safety when there are multiple loaders
// inserting into the same table concurrently. This ConcurrencyMode is suitable for parallel reingestion.
// Note while ConcurrentInserts is enabled it is not safe to have deletes occurring concurrently on the
// same table.
ConcurrentInserts
// ConcurrentDeletes configures the loader to maintain safety when there is another thread which is invoking
// reapLookupTable() to delete rows from the same table concurrently. This ConcurrencyMode is suitable for
// live ingestion when reaping of lookup tables is enabled.
// Note while ConcurrentDeletes is enabled it is not safe to have multiple threads inserting concurrently to the
// same table.
ConcurrentDeletes
)

// LoaderStats describes the result of executing a history lookup id Loader
type LoaderStats struct {
// Total is the number of elements registered to the Loader
Expand All @@ -38,7 +67,7 @@ type FutureAccountID = future[string, Account]
type AccountLoader = loader[string, Account]

// NewAccountLoader will construct a new AccountLoader instance.
func NewAccountLoader() *AccountLoader {
func NewAccountLoader(concurrencyMode ConcurrencyMode) *AccountLoader {
return &AccountLoader{
sealed: false,
set: set.Set[string]{},
Expand All @@ -58,20 +87,22 @@ func NewAccountLoader() *AccountLoader {
mappingFromRow: func(account Account) (string, int64) {
return account.Address, account.ID
},
less: cmp.Less[string],
less: cmp.Less[string],
concurrencyMode: concurrencyMode,
}
}

type loader[K comparable, T any] struct {
sealed bool
set set.Set[K]
ids map[K]int64
stats LoaderStats
name string
table string
columnsForKeys func([]K) []columnValues
mappingFromRow func(T) (K, int64)
less func(K, K) bool
sealed bool
set set.Set[K]
ids map[K]int64
stats LoaderStats
name string
table string
columnsForKeys func([]K) []columnValues
mappingFromRow func(T) (K, int64)
less func(K, K) bool
concurrencyMode ConcurrencyMode
}

type future[K comparable, T any] struct {
Expand Down Expand Up @@ -134,17 +165,34 @@ func (l *loader[K, T]) Exec(ctx context.Context, session db.SessionInterface) er
return l.less(keys[i], keys[j])
})

if count, err := l.insert(ctx, q, keys); err != nil {
return err
} else {
l.stats.Total += count
l.stats.Inserted += count
}
if l.concurrencyMode == ConcurrentInserts {
if count, err := l.insert(ctx, q, keys); err != nil {
return err
} else {
l.stats.Total += count
l.stats.Inserted += count
}

if count, err := l.query(ctx, q, keys, false); err != nil {
return err
} else {
l.stats.Total += count
}
} else if l.concurrencyMode == ConcurrentDeletes {
if count, err := l.query(ctx, q, keys, true); err != nil {
return err
} else {
l.stats.Total += count
}

if count, err := l.query(ctx, q, keys); err != nil {
return err
if count, err := l.insert(ctx, q, keys); err != nil {
return err
} else {
l.stats.Total += count
l.stats.Inserted += count
}
} else {
l.stats.Total += count
return fmt.Errorf("concurrency mode %v is invalid", l.concurrencyMode)
tamirms marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
Expand Down Expand Up @@ -204,11 +252,15 @@ func (l *loader[K, T]) insert(ctx context.Context, q *Q, keys []K) (int, error)
return len(rows), nil
}

func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K) (int, error) {
func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K, lockRows bool) (int, error) {
tamirms marked this conversation as resolved.
Show resolved Hide resolved
keys = l.filter(keys)
if len(keys) == 0 {
return 0, nil
}
var suffix string
if lockRows {
suffix = "ORDER BY id ASC FOR KEY SHARE"
}

var rows []T
err := bulkGet(
Expand All @@ -217,6 +269,7 @@ func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K) (int, error) {
l.table,
l.columnsForKeys(keys),
&rows,
suffix,
)
if err != nil {
return 0, err
Expand Down Expand Up @@ -293,7 +346,7 @@ func bulkInsert(ctx context.Context, q *Q, table string, fields []columnValues,
)
}

func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error {
func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}, suffix string) error {
unnestPart := make([]string, 0, len(fields))
columns := make([]string, 0, len(fields))
pqArrays := make([]interface{}, 0, len(fields))
Expand Down Expand Up @@ -329,7 +382,7 @@ func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, res
)
}
sql := `SELECT * FROM ` + table + ` WHERE (` + strings.Join(columns, ",") + `) IN
(SELECT ` + strings.Join(unnestPart, ",") + `)`
(SELECT ` + strings.Join(unnestPart, ",") + `) ` + suffix

return q.SelectRaw(
ctx,
Expand All @@ -347,7 +400,7 @@ type AccountLoaderStub struct {

// NewAccountLoaderStub returns a new AccountLoaderStub instance
func NewAccountLoaderStub() AccountLoaderStub {
return AccountLoaderStub{Loader: NewAccountLoader()}
return AccountLoaderStub{Loader: NewAccountLoader(ConcurrentInserts)}
}

// Insert updates the wrapped AccountLoader so that the given account
Expand Down
12 changes: 9 additions & 3 deletions services/horizon/internal/db2/history/account_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/stellar/go/keypair"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/support/db"
)

func TestAccountLoader(t *testing.T) {
Expand All @@ -16,12 +17,18 @@ func TestAccountLoader(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
session := tt.HorizonSession()

testAccountLoader(t, session, ConcurrentInserts)
test.ResetHorizonDB(t, tt.HorizonDB)
testAccountLoader(t, session, ConcurrentDeletes)
}

func testAccountLoader(t *testing.T, session *db.Session, mode ConcurrencyMode) {
var addresses []string
for i := 0; i < 100; i++ {
addresses = append(addresses, keypair.MustRandom().Address())
}

loader := NewAccountLoader()
loader := NewAccountLoader(mode)
for _, address := range addresses {
future := loader.GetFuture(address)
_, err := future.Value()
Expand Down Expand Up @@ -58,7 +65,7 @@ func TestAccountLoader(t *testing.T) {

// check that Loader works when all the previous values are already
// present in the db and also add 10 more rows to insert
loader = NewAccountLoader()
loader = NewAccountLoader(mode)
for i := 0; i < 10; i++ {
addresses = append(addresses, keypair.MustRandom().Address())
}
Expand All @@ -85,5 +92,4 @@ func TestAccountLoader(t *testing.T) {
assert.Equal(t, account.ID, internalId)
assert.Equal(t, account.Address, address)
}

}
5 changes: 3 additions & 2 deletions services/horizon/internal/db2/history/asset_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type FutureAssetID = future[AssetKey, Asset]
type AssetLoader = loader[AssetKey, Asset]

// NewAssetLoader will construct a new AssetLoader instance.
func NewAssetLoader() *AssetLoader {
func NewAssetLoader(concurrencyMode ConcurrencyMode) *AssetLoader {
return &AssetLoader{
sealed: false,
set: set.Set[AssetKey]{},
Expand Down Expand Up @@ -88,6 +88,7 @@ func NewAssetLoader() *AssetLoader {
less: func(a AssetKey, b AssetKey) bool {
return a.String() < b.String()
},
concurrencyMode: concurrencyMode,
}
}

Expand All @@ -99,7 +100,7 @@ type AssetLoaderStub struct {

// NewAssetLoaderStub returns a new AssetLoaderStub instance
func NewAssetLoaderStub() AssetLoaderStub {
return AssetLoaderStub{Loader: NewAssetLoader()}
return AssetLoaderStub{Loader: NewAssetLoader(ConcurrentInserts)}
}

// Insert updates the wrapped AssetLoaderStub so that the given asset
Expand Down
11 changes: 9 additions & 2 deletions services/horizon/internal/db2/history/asset_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stellar/go/keypair"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -40,6 +41,12 @@ func TestAssetLoader(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
session := tt.HorizonSession()

testAssetLoader(t, session, ConcurrentInserts)
test.ResetHorizonDB(t, tt.HorizonDB)
testAssetLoader(t, session, ConcurrentDeletes)
}

func testAssetLoader(t *testing.T, session *db.Session, mode ConcurrencyMode) {
var keys []AssetKey
for i := 0; i < 100; i++ {
var key AssetKey
Expand All @@ -66,7 +73,7 @@ func TestAssetLoader(t *testing.T) {
keys = append(keys, key)
}

loader := NewAssetLoader()
loader := NewAssetLoader(mode)
for _, key := range keys {
future := loader.GetFuture(key)
_, err := future.Value()
Expand Down Expand Up @@ -109,7 +116,7 @@ func TestAssetLoader(t *testing.T) {

// check that Loader works when all the previous values are already
// present in the db and also add 10 more rows to insert
loader = NewAssetLoader()
loader = NewAssetLoader(mode)
for i := 0; i < 10; i++ {
var key AssetKey
if i%2 == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type FutureClaimableBalanceID = future[string, HistoryClaimableBalance]
type ClaimableBalanceLoader = loader[string, HistoryClaimableBalance]

// NewClaimableBalanceLoader will construct a new ClaimableBalanceLoader instance.
func NewClaimableBalanceLoader() *ClaimableBalanceLoader {
func NewClaimableBalanceLoader(concurrencyMode ConcurrencyMode) *ClaimableBalanceLoader {
return &ClaimableBalanceLoader{
sealed: false,
set: set.Set[string]{},
Expand All @@ -39,6 +39,7 @@ func NewClaimableBalanceLoader() *ClaimableBalanceLoader {
mappingFromRow: func(row HistoryClaimableBalance) (string, int64) {
return row.BalanceID, row.InternalID
},
less: cmp.Less[string],
less: cmp.Less[string],
concurrencyMode: concurrencyMode,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"
)

Expand All @@ -17,6 +18,12 @@ func TestClaimableBalanceLoader(t *testing.T) {
test.ResetHorizonDB(t, tt.HorizonDB)
session := tt.HorizonSession()

testCBLoader(t, tt, session, ConcurrentInserts)
test.ResetHorizonDB(t, tt.HorizonDB)
testCBLoader(t, tt, session, ConcurrentDeletes)
}

func testCBLoader(t *testing.T, tt *test.T, session *db.Session, mode ConcurrencyMode) {
var ids []string
for i := 0; i < 100; i++ {
balanceID := xdr.ClaimableBalanceId{
Expand All @@ -28,7 +35,7 @@ func TestClaimableBalanceLoader(t *testing.T) {
ids = append(ids, id)
}

loader := NewClaimableBalanceLoader()
loader := NewClaimableBalanceLoader(mode)
var futures []FutureClaimableBalanceID
for _, id := range ids {
future := loader.GetFuture(id)
Expand Down Expand Up @@ -70,7 +77,7 @@ func TestClaimableBalanceLoader(t *testing.T) {

// check that Loader works when all the previous values are already
// present in the db and also add 10 more rows to insert
loader = NewClaimableBalanceLoader()
loader = NewClaimableBalanceLoader(mode)
for i := 100; i < 110; i++ {
balanceID := xdr.ClaimableBalanceId{
Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestAddEffect(t *testing.T) {

address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY"
muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26"
accountLoader := NewAccountLoader()
accountLoader := NewAccountLoader(ConcurrentInserts)

builder := q.NewEffectBatchInsertBuilder()
sequence := int32(56)
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/db2/history/effect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestEffectsForLiquidityPool(t *testing.T) {
// Insert Effect
address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY"
muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26"
accountLoader := NewAccountLoader()
accountLoader := NewAccountLoader(ConcurrentInserts)

builder := q.NewEffectBatchInsertBuilder()
sequence := int32(56)
Expand All @@ -47,7 +47,7 @@ func TestEffectsForLiquidityPool(t *testing.T) {

// Insert Liquidity Pool history
liquidityPoolID := "abcde"
lpLoader := NewLiquidityPoolLoader()
lpLoader := NewLiquidityPoolLoader(ConcurrentInserts)

operationBuilder := q.NewOperationLiquidityPoolBatchInsertBuilder()
tt.Assert.NoError(operationBuilder.Add(opID, lpLoader.GetFuture(liquidityPoolID)))
Expand Down Expand Up @@ -78,7 +78,7 @@ func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) {

address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY"
muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26"
accountLoader := NewAccountLoader()
accountLoader := NewAccountLoader(ConcurrentInserts)

builder := q.NewEffectBatchInsertBuilder()
sequence := int32(56)
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/db2/history/fee_bump_scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture {
details, err = json.Marshal(map[string]interface{}{"new_seq": 98})
tt.Assert.NoError(err)

accountLoader := NewAccountLoader()
accountLoader := NewAccountLoader(ConcurrentInserts)

err = effectBuilder.Add(
accountLoader.GetFuture(account.Address()),
Expand Down
Loading
Loading