Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LRU Cache for Boxes/KVs #4275

Merged
merged 23 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions ledger/accountdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,15 @@ func (prd *persistedResourcesData) AccountResource() ledgercore.AccountResource
return ret
}

//msgp:ignore persistedKVData
type persistedKVData struct {
// kv value
value *string
// the round number that is associated with the kv value. This field is the corresponding one to the round field
// in persistedAccountData, and serves the same purpose.
round basics.Round
}

// resourceDelta is used as part of the compactResourcesDeltas to describe a change to a single resource.
type resourceDelta struct {
oldResource persistedResourcesData
Expand Down Expand Up @@ -2567,12 +2576,7 @@ func (qs *accountsDbQueries) listCreatables(maxIdx basics.CreatableIndex, maxRes
return
}

type persistedValue struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just renamed and moved this but I don't mind undoing it if there's an objection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@algoidurovic I think the rename touches on a central question: What is box-specific about the cache?

I realize the question prompts a potentially large renaming exercise, but I'm wondering if the cache ought to be kv themed rather than box themed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think it should. Anything below the ledger tries to talk about kv rather than about boxes. But I don't think it changes much code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@algoidurovic From my perspective, based on renames to KV, the thread is now resolved. Since I didn't open it, I'm leaving it open for you to resolve.

value *string
round basics.Round
}

func (qs *accountsDbQueries) lookupKeyValue(key string) (pv persistedValue, err error) {
func (qs *accountsDbQueries) lookupKeyValue(key string) (pv persistedKVData, err error) {
err = db.Retry(func() error {
var v sql.NullString
// Cast to []byte to avoid interpretation as character string, see note in upsertKvPair
Expand Down Expand Up @@ -3397,7 +3401,7 @@ func accountsNewRound(
tx *sql.Tx,
updates compactAccountDeltas, resources compactResourcesDeltas, kvPairs map[string]modifiedValue, creatables map[basics.CreatableIndex]ledgercore.ModifiedCreatable,
proto config.ConsensusParams, lastUpdateRound basics.Round,
) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, err error) {
) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, updatedKVs map[string]persistedKVData, err error) {
hasAccounts := updates.len() > 0
hasResources := resources.len() > 0
hasKvPairs := len(kvPairs) > 0
Expand Down Expand Up @@ -3435,7 +3439,7 @@ func accountsNewRoundImpl(
writer accountsWriter,
updates compactAccountDeltas, resources compactResourcesDeltas, kvPairs map[string]modifiedValue, creatables map[basics.CreatableIndex]ledgercore.ModifiedCreatable,
proto config.ConsensusParams, lastUpdateRound basics.Round,
) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, err error) {
) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, updatedKVs map[string]persistedKVData, err error) {
updatedAccounts = make([]persistedAccountData, updates.len())
updatedAccountIdx := 0
newAddressesRowIDs := make(map[basics.Address]int64)
Expand Down Expand Up @@ -3633,11 +3637,14 @@ func accountsNewRoundImpl(
}
}

updatedKVs = make(map[string]persistedKVData, len(kvPairs))
for key, value := range kvPairs {
if value.data != nil {
err = writer.upsertKvPair(key, *value.data)
updatedKVs[key] = persistedKVData{value: value.data, round: lastUpdateRound}
} else {
err = writer.deleteKvPair(key)
updatedKVs[key] = persistedKVData{value: nil, round: lastUpdateRound}
}
if err != nil {
return
Expand Down Expand Up @@ -4762,6 +4769,12 @@ func (prd *persistedResourcesData) before(other *persistedResourcesData) bool {
return prd.round < other.round
}

// before compares the round numbers of two persistedKVData and determines if the current persistedKVData
// happened before the other.
func (prd *persistedKVData) before(other *persistedKVData) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since they are small objects, I might not use pointers for the receiver or the arg.

return prd.round < other.round
}

// before compares the round numbers of two persistedAccountData and determines if the current persistedAccountData
// happened before the other.
func (pac *persistedOnlineAccountData) before(other *persistedOnlineAccountData) bool {
Expand Down
168 changes: 161 additions & 7 deletions ledger/accountdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,15 @@ func TestAccountDBRound(t *testing.T) {
require.NoError(t, err)
expectedOnlineRoundParams = append(expectedOnlineRoundParams, onlineRoundParams)

updatedAccts, updatesResources, err := accountsNewRound(tx, updatesCnt, resourceUpdatesCnt, nil, ctbsWithDeletes, proto, basics.Round(i))
updatedAccts, updatesResources, updatedBoxes, err := accountsNewRound(tx, updatesCnt, resourceUpdatesCnt, nil, ctbsWithDeletes, proto, basics.Round(i))
require.NoError(t, err)
require.Equal(t, updatesCnt.len(), len(updatedAccts))
numResUpdates := 0
for _, rs := range updatesResources {
numResUpdates += len(rs)
}
require.Equal(t, resourceUpdatesCnt.len(), numResUpdates)
require.Empty(t, updatedBoxes)

updatedOnlineAccts, err := onlineAccountsNewRound(tx, updatesOnlineCnt, proto, basics.Round(i))
require.NoError(t, err)
Expand Down Expand Up @@ -448,7 +449,7 @@ func TestAccountDBInMemoryAcct(t *testing.T) {
err = outResourcesDeltas.resourcesLoadOld(tx, knownAddresses)
require.NoError(t, err)

updatedAccts, updatesResources, err := accountsNewRound(tx, outAccountDeltas, outResourcesDeltas, nil, nil, proto, basics.Round(lastRound))
updatedAccts, updatesResources, updatedBoxes, err := accountsNewRound(tx, outAccountDeltas, outResourcesDeltas, nil, nil, proto, basics.Round(lastRound))
require.NoError(t, err)
require.Equal(t, 1, len(updatedAccts)) // we store empty even for deleted accounts
require.Equal(t,
Expand All @@ -461,6 +462,8 @@ func TestAccountDBInMemoryAcct(t *testing.T) {
persistedResourcesData{addrid: 0, aidx: 100, data: makeResourcesData(0), round: basics.Round(lastRound)},
updatesResources[addr][0],
)

require.Empty(t, updatedBoxes)
})
}
}
Expand Down Expand Up @@ -2885,12 +2888,13 @@ func TestAccountUnorderedUpdates(t *testing.T) {
t.Run(fmt.Sprintf("acct-perm-%d|res-perm-%d", i, j), func(t *testing.T) {
a := require.New(t)
mock2 := mock.clone()
updatedAccounts, updatedResources, err := accountsNewRoundImpl(
updatedAccounts, updatedResources, updatedBoxes, err := accountsNewRoundImpl(
&mock2, acctVariant, resVariant, nil, nil, config.ConsensusParams{}, latestRound,
)
a.NoError(err)
a.Equal(3, len(updatedAccounts))
a.Equal(3, len(updatedResources))
a.Len(updatedAccounts, 3)
a.Len(updatedResources, 3)
a.Empty(updatedBoxes)
})
}
}
Expand Down Expand Up @@ -2967,12 +2971,13 @@ func TestAccountsNewRoundDeletedResourceEntries(t *testing.T) {
a.Equal(1, len(resDeltas.misses)) // (addr2, aidx) does not exist
a.Equal(2, resDeltas.len()) // (addr1, aidx) found

updatedAccounts, updatedResources, err := accountsNewRoundImpl(
updatedAccounts, updatedResources, updatedBoxes, err := accountsNewRoundImpl(
&mock, acctDeltas, resDeltas, nil, nil, config.ConsensusParams{}, latestRound,
)
a.NoError(err)
a.Equal(3, len(updatedAccounts))
a.Equal(2, len(updatedResources))
a.Equal(0, len(updatedBoxes))

// one deletion entry for pre-existing account addr1, and one entry for in-memory account addr2
// in base accounts updates and in resources updates
Expand All @@ -2996,6 +3001,155 @@ func TestAccountsNewRoundDeletedResourceEntries(t *testing.T) {
}
}

func BenchmarkLRUResources(b *testing.B) {
var baseResources lruResources
baseResources.init(nil, 1000, 850)

var data persistedResourcesData
var has bool
addrs := make([]basics.Address, 850)
for i := 0; i < 850; i++ {
data.data.ApprovalProgram = make([]byte, 8096*4)
data.aidx = basics.CreatableIndex(1)
addrBytes := ([]byte(fmt.Sprintf("%d", i)))[:32]
var addr basics.Address
for i, b := range addrBytes {
addr[i] = b
}
addrs[i] = addr
baseResources.write(data, addr)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
pos := i % 850
data, has = baseResources.read(addrs[pos], basics.CreatableIndex(1))
require.True(b, has)
}
}

func initBoxDatabase(b *testing.B, totalBoxes, boxSize int) (db.Pair, func(), error) {
batchCount := 100
if batchCount > totalBoxes {
batchCount = 1
}

proto := config.Consensus[protocol.ConsensusCurrentVersion]
dbs, fn := dbOpenTest(b, false)
setDbLogging(b, dbs)
cleanup := func() {
cleanupTestDb(dbs, fn, false)
}

tx, err := dbs.Wdb.Handle.Begin()
require.NoError(b, err)
_, err = accountsInit(tx, make(map[basics.Address]basics.AccountData), proto)
require.NoError(b, err)
err = tx.Commit()
require.NoError(b, err)
err = dbs.Wdb.SetSynchronousMode(context.Background(), db.SynchronousModeOff, false)
require.NoError(b, err)

cnt := 0
for batch := 0; batch <= batchCount; batch++ {
tx, err = dbs.Wdb.Handle.Begin()
require.NoError(b, err)
writer, err := makeAccountsSQLWriter(tx, false, false, true, false)
require.NoError(b, err)
for boxIdx := 0; boxIdx < totalBoxes/batchCount; boxIdx++ {
err = writer.upsertKvPair(fmt.Sprintf("%d", cnt), string(make([]byte, boxSize)))
require.NoError(b, err)
cnt++
}

err = tx.Commit()
require.NoError(b, err)
writer.close()
}
err = dbs.Wdb.SetSynchronousMode(context.Background(), db.SynchronousModeFull, true)
return dbs, cleanup, err
}

func BenchmarkBoxDatabaseRead(b *testing.B) {
getBoxNamePermutation := func(totalBoxes int) []int {
rand.Seed(time.Now().UnixNano())
boxNames := make([]int, totalBoxes)
for i := 0; i < totalBoxes; i++ {
boxNames[i] = i
}
rand.Shuffle(len(boxNames), func(x, y int) { boxNames[x], boxNames[y] = boxNames[y], boxNames[x] })
return boxNames
}

boxCnt := []int{10, 1000, 100000}
boxSizes := []int{2, 2048, 4 * 8096}
for _, totalBoxes := range boxCnt {
for _, boxSize := range boxSizes {
b.Run(fmt.Sprintf("totalBoxes=%d/boxSize=%d", totalBoxes, boxSize), func(b *testing.B) {
b.StopTimer()

dbs, cleanup, err := initBoxDatabase(b, totalBoxes, boxSize)
require.NoError(b, err)

boxNames := getBoxNamePermutation(totalBoxes)
lookupStmt, err := dbs.Wdb.Handle.Prepare("SELECT rnd, value FROM acctrounds LEFT JOIN kvstore ON key = ? WHERE id='acctbase';")
require.NoError(b, err)
var v sql.NullString
for i := 0; i < b.N; i++ {
var pv persistedKVData
boxName := boxNames[i%totalBoxes]
b.StartTimer()
err = lookupStmt.QueryRow([]byte(fmt.Sprintf("%d", boxName))).Scan(&pv.round, &v)
b.StopTimer()
require.NoError(b, err)
require.True(b, v.Valid)
}

cleanup()
})
}
}

// test caching performance
lookbacks := []int{1, 32, 256, 2048}
for _, lookback := range lookbacks {
for _, boxSize := range boxSizes {
totalBoxes := 100000

b.Run(fmt.Sprintf("lookback=%d/boxSize=%d", lookback, boxSize), func(b *testing.B) {
b.StopTimer()

dbs, cleanup, err := initBoxDatabase(b, totalBoxes, boxSize)
require.NoError(b, err)

boxNames := getBoxNamePermutation(totalBoxes)
lookupStmt, err := dbs.Wdb.Handle.Prepare("SELECT rnd, value FROM acctrounds LEFT JOIN kvstore ON key = ? WHERE id='acctbase';")
require.NoError(b, err)
var v sql.NullString
for i := 0; i < b.N+lookback; i++ {
var pv persistedKVData
boxName := boxNames[i%totalBoxes]
err = lookupStmt.QueryRow([]byte(fmt.Sprintf("%d", boxName))).Scan(&pv.round, &v)
require.NoError(b, err)
require.True(b, v.Valid)

// benchmark reading the potentially cached value that was read lookback boxes ago
if i >= lookback {
boxName = boxNames[(i-lookback)%totalBoxes]
b.StartTimer()
err = lookupStmt.QueryRow([]byte(fmt.Sprintf("%d", boxName))).Scan(&pv.round, &v)
b.StopTimer()
require.NoError(b, err)
require.True(b, v.Valid)
}
}

cleanup()
})
}
}
}

// TestAccountTopOnline ensures accountsOnlineTop return a right subset of accounts
// from the history table.
// Start with two online accounts A, B at round 1
Expand Down Expand Up @@ -3116,7 +3270,7 @@ func TestAccountOnlineQueries(t *testing.T) {

err = accountsPutTotals(tx, totals, false)
require.NoError(t, err)
updatedAccts, _, err := accountsNewRound(tx, updatesCnt, compactResourcesDeltas{}, nil, nil, proto, rnd)
updatedAccts, _, _, err := accountsNewRound(tx, updatesCnt, compactResourcesDeltas{}, nil, nil, proto, rnd)
require.NoError(t, err)
require.Equal(t, updatesCnt.len(), len(updatedAccts))

Expand Down
Loading