diff --git a/ledger/accountdb.go b/ledger/accountdb.go
index bab53168f0..1450d37557 100644
--- a/ledger/accountdb.go
+++ b/ledger/accountdb.go
@@ -271,6 +271,15 @@ func (prd *persistedResourcesData) AccountResource() ledgercore.AccountResource
return ret
}
+//msgp:ignore persistedKVData
+type persistedKVData struct {
+ // kv value
+ value *string
+ // the round number that is associated with the kv value. This field is the corresponding one to the round field
+ // in persistedAccountData, and serves the same purpose.
+ round basics.Round
+}
+
// resourceDelta is used as part of the compactResourcesDeltas to describe a change to a single resource.
type resourceDelta struct {
oldResource persistedResourcesData
@@ -2596,12 +2605,7 @@ func (qs *accountsDbQueries) listCreatables(maxIdx basics.CreatableIndex, maxRes
return
}
-type persistedValue struct {
- value *string
- round basics.Round
-}
-
-func (qs *accountsDbQueries) lookupKeyValue(key string) (pv persistedValue, err error) {
+func (qs *accountsDbQueries) lookupKeyValue(key string) (pv persistedKVData, err error) {
err = db.Retry(func() error {
var v sql.NullString
// Cast to []byte to avoid interpretation as character string, see note in upsertKvPair
@@ -3444,7 +3448,7 @@ func accountsNewRound(
tx *sql.Tx,
updates compactAccountDeltas, resources compactResourcesDeltas, kvPairs map[string]modifiedValue, creatables map[basics.CreatableIndex]ledgercore.ModifiedCreatable,
proto config.ConsensusParams, lastUpdateRound basics.Round,
-) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, err error) {
+) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, updatedKVs map[string]persistedKVData, err error) {
hasAccounts := updates.len() > 0
hasResources := resources.len() > 0
hasKvPairs := len(kvPairs) > 0
@@ -3482,7 +3486,7 @@ func accountsNewRoundImpl(
writer accountsWriter,
updates compactAccountDeltas, resources compactResourcesDeltas, kvPairs map[string]modifiedValue, creatables map[basics.CreatableIndex]ledgercore.ModifiedCreatable,
proto config.ConsensusParams, lastUpdateRound basics.Round,
-) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, err error) {
+) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, updatedKVs map[string]persistedKVData, err error) {
updatedAccounts = make([]persistedAccountData, updates.len())
updatedAccountIdx := 0
newAddressesRowIDs := make(map[basics.Address]int64)
@@ -3680,11 +3684,14 @@ func accountsNewRoundImpl(
}
}
+ updatedKVs = make(map[string]persistedKVData, len(kvPairs))
for key, value := range kvPairs {
if value.data != nil {
err = writer.upsertKvPair(key, *value.data)
+ updatedKVs[key] = persistedKVData{value: value.data, round: lastUpdateRound}
} else {
err = writer.deleteKvPair(key)
+ updatedKVs[key] = persistedKVData{value: nil, round: lastUpdateRound}
}
if err != nil {
return
@@ -4809,6 +4816,12 @@ func (prd *persistedResourcesData) before(other *persistedResourcesData) bool {
return prd.round < other.round
}
+// before compares the round numbers of two persistedKVData and determines if the current persistedKVData
+// happened before the other.
+func (prd persistedKVData) before(other *persistedKVData) bool {
+ return prd.round < other.round
+}
+
// before compares the round numbers of two persistedAccountData and determines if the current persistedAccountData
// happened before the other.
func (pac *persistedOnlineAccountData) before(other *persistedOnlineAccountData) bool {
diff --git a/ledger/accountdb_test.go b/ledger/accountdb_test.go
index 65987429e5..9a48853e48 100644
--- a/ledger/accountdb_test.go
+++ b/ledger/accountdb_test.go
@@ -314,7 +314,7 @@ func TestAccountDBRound(t *testing.T) {
require.NoError(t, err)
expectedOnlineRoundParams = append(expectedOnlineRoundParams, onlineRoundParams)
- updatedAccts, updatesResources, err := accountsNewRound(tx, updatesCnt, resourceUpdatesCnt, nil, ctbsWithDeletes, proto, basics.Round(i))
+ updatedAccts, updatesResources, updatedKVs, err := accountsNewRound(tx, updatesCnt, resourceUpdatesCnt, nil, ctbsWithDeletes, proto, basics.Round(i))
require.NoError(t, err)
require.Equal(t, updatesCnt.len(), len(updatedAccts))
numResUpdates := 0
@@ -322,6 +322,7 @@ func TestAccountDBRound(t *testing.T) {
numResUpdates += len(rs)
}
require.Equal(t, resourceUpdatesCnt.len(), numResUpdates)
+ require.Empty(t, updatedKVs)
updatedOnlineAccts, err := onlineAccountsNewRound(tx, updatesOnlineCnt, proto, basics.Round(i))
require.NoError(t, err)
@@ -451,7 +452,7 @@ func TestAccountDBInMemoryAcct(t *testing.T) {
err = outResourcesDeltas.resourcesLoadOld(tx, knownAddresses)
require.NoError(t, err)
- updatedAccts, updatesResources, err := accountsNewRound(tx, outAccountDeltas, outResourcesDeltas, nil, nil, proto, basics.Round(lastRound))
+ updatedAccts, updatesResources, updatedKVs, err := accountsNewRound(tx, outAccountDeltas, outResourcesDeltas, nil, nil, proto, basics.Round(lastRound))
require.NoError(t, err)
require.Equal(t, 1, len(updatedAccts)) // we store empty even for deleted accounts
require.Equal(t,
@@ -464,6 +465,8 @@ func TestAccountDBInMemoryAcct(t *testing.T) {
persistedResourcesData{addrid: 0, aidx: 100, data: makeResourcesData(0), round: basics.Round(lastRound)},
updatesResources[addr][0],
)
+
+ require.Empty(t, updatedKVs)
})
}
}
@@ -2888,12 +2891,13 @@ func TestAccountUnorderedUpdates(t *testing.T) {
t.Run(fmt.Sprintf("acct-perm-%d|res-perm-%d", i, j), func(t *testing.T) {
a := require.New(t)
mock2 := mock.clone()
- updatedAccounts, updatedResources, err := accountsNewRoundImpl(
+ updatedAccounts, updatedResources, updatedKVs, err := accountsNewRoundImpl(
&mock2, acctVariant, resVariant, nil, nil, config.ConsensusParams{}, latestRound,
)
a.NoError(err)
- a.Equal(3, len(updatedAccounts))
- a.Equal(3, len(updatedResources))
+ a.Len(updatedAccounts, 3)
+ a.Len(updatedResources, 3)
+ a.Empty(updatedKVs)
})
}
}
@@ -2970,12 +2974,13 @@ func TestAccountsNewRoundDeletedResourceEntries(t *testing.T) {
a.Equal(1, len(resDeltas.misses)) // (addr2, aidx) does not exist
a.Equal(2, resDeltas.len()) // (addr1, aidx) found
- updatedAccounts, updatedResources, err := accountsNewRoundImpl(
+ updatedAccounts, updatedResources, updatedKVs, err := accountsNewRoundImpl(
&mock, acctDeltas, resDeltas, nil, nil, config.ConsensusParams{}, latestRound,
)
a.NoError(err)
a.Equal(3, len(updatedAccounts))
a.Equal(2, len(updatedResources))
+ a.Equal(0, len(updatedKVs))
// one deletion entry for pre-existing account addr1, and one entry for in-memory account addr2
// in base accounts updates and in resources updates
@@ -2999,6 +3004,155 @@ func TestAccountsNewRoundDeletedResourceEntries(t *testing.T) {
}
}
+func BenchmarkLRUResources(b *testing.B) {
+ var baseResources lruResources
+ baseResources.init(nil, 1000, 850)
+
+ var data persistedResourcesData
+ var has bool
+ addrs := make([]basics.Address, 850)
+ for i := 0; i < 850; i++ {
+ data.data.ApprovalProgram = make([]byte, 8096*4)
+ data.aidx = basics.CreatableIndex(1)
+ addrBytes := ([]byte(fmt.Sprintf("%d", i)))[:32]
+ var addr basics.Address
+ for i, b := range addrBytes {
+ addr[i] = b
+ }
+ addrs[i] = addr
+ baseResources.write(data, addr)
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ pos := i % 850
+ data, has = baseResources.read(addrs[pos], basics.CreatableIndex(1))
+ require.True(b, has)
+ }
+}
+
+func initBoxDatabase(b *testing.B, totalBoxes, boxSize int) (db.Pair, func(), error) {
+ batchCount := 100
+ if batchCount > totalBoxes {
+ batchCount = 1
+ }
+
+ proto := config.Consensus[protocol.ConsensusCurrentVersion]
+ dbs, fn := dbOpenTest(b, false)
+ setDbLogging(b, dbs)
+ cleanup := func() {
+ cleanupTestDb(dbs, fn, false)
+ }
+
+ tx, err := dbs.Wdb.Handle.Begin()
+ require.NoError(b, err)
+ _, err = accountsInit(tx, make(map[basics.Address]basics.AccountData), proto)
+ require.NoError(b, err)
+ err = tx.Commit()
+ require.NoError(b, err)
+ err = dbs.Wdb.SetSynchronousMode(context.Background(), db.SynchronousModeOff, false)
+ require.NoError(b, err)
+
+ cnt := 0
+ for batch := 0; batch <= batchCount; batch++ {
+ tx, err = dbs.Wdb.Handle.Begin()
+ require.NoError(b, err)
+ writer, err := makeAccountsSQLWriter(tx, false, false, true, false)
+ require.NoError(b, err)
+ for boxIdx := 0; boxIdx < totalBoxes/batchCount; boxIdx++ {
+ err = writer.upsertKvPair(fmt.Sprintf("%d", cnt), string(make([]byte, boxSize)))
+ require.NoError(b, err)
+ cnt++
+ }
+
+ err = tx.Commit()
+ require.NoError(b, err)
+ writer.close()
+ }
+ err = dbs.Wdb.SetSynchronousMode(context.Background(), db.SynchronousModeFull, true)
+ return dbs, cleanup, err
+}
+
+func BenchmarkBoxDatabaseRead(b *testing.B) {
+ getBoxNamePermutation := func(totalBoxes int) []int {
+ rand.Seed(time.Now().UnixNano())
+ boxNames := make([]int, totalBoxes)
+ for i := 0; i < totalBoxes; i++ {
+ boxNames[i] = i
+ }
+ rand.Shuffle(len(boxNames), func(x, y int) { boxNames[x], boxNames[y] = boxNames[y], boxNames[x] })
+ return boxNames
+ }
+
+ boxCnt := []int{10, 1000, 100000}
+ boxSizes := []int{2, 2048, 4 * 8096}
+ for _, totalBoxes := range boxCnt {
+ for _, boxSize := range boxSizes {
+ b.Run(fmt.Sprintf("totalBoxes=%d/boxSize=%d", totalBoxes, boxSize), func(b *testing.B) {
+ b.StopTimer()
+
+ dbs, cleanup, err := initBoxDatabase(b, totalBoxes, boxSize)
+ require.NoError(b, err)
+
+ boxNames := getBoxNamePermutation(totalBoxes)
+ lookupStmt, err := dbs.Wdb.Handle.Prepare("SELECT rnd, value FROM acctrounds LEFT JOIN kvstore ON key = ? WHERE id='acctbase';")
+ require.NoError(b, err)
+ var v sql.NullString
+ for i := 0; i < b.N; i++ {
+ var pv persistedKVData
+ boxName := boxNames[i%totalBoxes]
+ b.StartTimer()
+ err = lookupStmt.QueryRow([]byte(fmt.Sprintf("%d", boxName))).Scan(&pv.round, &v)
+ b.StopTimer()
+ require.NoError(b, err)
+ require.True(b, v.Valid)
+ }
+
+ cleanup()
+ })
+ }
+ }
+
+ // test caching performance
+ lookbacks := []int{1, 32, 256, 2048}
+ for _, lookback := range lookbacks {
+ for _, boxSize := range boxSizes {
+ totalBoxes := 100000
+
+ b.Run(fmt.Sprintf("lookback=%d/boxSize=%d", lookback, boxSize), func(b *testing.B) {
+ b.StopTimer()
+
+ dbs, cleanup, err := initBoxDatabase(b, totalBoxes, boxSize)
+ require.NoError(b, err)
+
+ boxNames := getBoxNamePermutation(totalBoxes)
+ lookupStmt, err := dbs.Wdb.Handle.Prepare("SELECT rnd, value FROM acctrounds LEFT JOIN kvstore ON key = ? WHERE id='acctbase';")
+ require.NoError(b, err)
+ var v sql.NullString
+ for i := 0; i < b.N+lookback; i++ {
+ var pv persistedKVData
+ boxName := boxNames[i%totalBoxes]
+ err = lookupStmt.QueryRow([]byte(fmt.Sprintf("%d", boxName))).Scan(&pv.round, &v)
+ require.NoError(b, err)
+ require.True(b, v.Valid)
+
+ // benchmark reading the potentially cached value that was read lookback boxes ago
+ if i >= lookback {
+ boxName = boxNames[(i-lookback)%totalBoxes]
+ b.StartTimer()
+ err = lookupStmt.QueryRow([]byte(fmt.Sprintf("%d", boxName))).Scan(&pv.round, &v)
+ b.StopTimer()
+ require.NoError(b, err)
+ require.True(b, v.Valid)
+ }
+ }
+
+ cleanup()
+ })
+ }
+ }
+}
+
// TestAccountTopOnline ensures accountsOnlineTop return a right subset of accounts
// from the history table.
// Start with two online accounts A, B at round 1
@@ -3119,7 +3273,7 @@ func TestAccountOnlineQueries(t *testing.T) {
err = accountsPutTotals(tx, totals, false)
require.NoError(t, err)
- updatedAccts, _, err := accountsNewRound(tx, updatesCnt, compactResourcesDeltas{}, nil, nil, proto, rnd)
+ updatedAccts, _, _, err := accountsNewRound(tx, updatesCnt, compactResourcesDeltas{}, nil, nil, proto, rnd)
require.NoError(t, err)
require.Equal(t, updatesCnt.len(), len(updatedAccts))
diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go
index a56b729617..1cba218fce 100644
--- a/ledger/acctupdates.go
+++ b/ledger/acctupdates.go
@@ -68,6 +68,15 @@ const baseResourcesPendingAccountsBufferSize = 100000
// is being flushed into the main base resources cache.
const baseResourcesPendingAccountsWarnThreshold = 85000
+// baseKVPendingBufferSize defines the size of the base KVs pending buffer size.
+// At the beginning of a new round, the entries from this buffer are being flushed into the base KVs map.
+const baseKVPendingBufferSize = 5000
+
+// baseKVPendingWarnThreshold defines the threshold at which the lruKV would generate a warning
+// after we've surpassed a given pending kv size. The warning is being generated when the pending kv data
+// is being flushed into the main base kv cache.
+const baseKVPendingWarnThreshold = 4250
+
// initializeCachesReadaheadBlocksStream defines how many block we're going to attempt to queue for the
// initializeCaches method before it can process and store the account changes to disk.
const initializeCachesReadaheadBlocksStream = 4
@@ -206,6 +215,9 @@ type accountUpdates struct {
// baseResources stores the most recently used resources, at exactly dbRound
baseResources lruResources
+ // baseKVs stores the most recently used KV, at exactly dbRound
+ baseKVs lruKV
+
// logAccountUpdatesMetrics is a flag for enable/disable metrics logging
logAccountUpdatesMetrics bool
@@ -310,6 +322,7 @@ func (au *accountUpdates) close() {
}
au.baseAccounts.prune(0)
au.baseResources.prune(0)
+ au.baseKVs.prune(0)
}
func (au *accountUpdates) LookupResource(rnd basics.Round, addr basics.Address, aidx basics.CreatableIndex, ctype basics.CreatableType) (ledgercore.AccountResource, basics.Round, error) {
@@ -374,7 +387,13 @@ func (au *accountUpdates) lookupKv(rnd basics.Round, key string, synchronized bo
rnd = currentDbRound + basics.Round(currentDeltaLen)
}
- // OTHER LOOKUPS USE "base" caches here.
+ // check the baseKV cache
+ if pbd, has := au.baseKVs.read(key); has {
+ // we don't technically need this, since it's already in the baseKV, however, writing this over
+ // would ensure that we promote this field.
+ au.baseKVs.writePending(pbd, key)
+ return pbd.value, nil
+ }
if synchronized {
au.accountsMu.RUnlock()
@@ -386,7 +405,15 @@ func (au *accountUpdates) lookupKv(rnd basics.Round, key string, synchronized bo
// on-disk DB.
persistedData, err := au.accountsq.lookupKeyValue(key)
+ if err != nil {
+ return nil, err
+ }
+
if persistedData.round == currentDbRound {
+ // if we read actual data return it. This includes deleted values
+ // where persistedData.value == nil to avoid unnecessary db lookups
+ // for deleted KVs.
+ au.baseKVs.writePending(persistedData, key)
return persistedData.value, nil
}
@@ -504,6 +531,10 @@ func (au *accountUpdates) lookupKeysByPrefix(round basics.Round, keyPrefix strin
needUnlock = false
}
+ // NOTE: the kv cache isn't used here because the data structure doesn't support range
+ // queries. It may be preferable to increase the SQLite cache size if these reads become
+ // too slow.
+
// Finishing searching updates of this account in kvDeltas, keep going: use on-disk DB
// to find the rest matching keys in DB.
dbRound, _err := au.accountsq.lookupKeysByPrefix(keyPrefix, maxKeyNum, results, resultCount)
@@ -923,6 +954,7 @@ func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRou
au.baseAccounts.init(au.log, baseAccountsPendingAccountsBufferSize, baseAccountsPendingAccountsWarnThreshold)
au.baseResources.init(au.log, baseResourcesPendingAccountsBufferSize, baseResourcesPendingAccountsWarnThreshold)
+ au.baseKVs.init(au.log, baseKVPendingBufferSize, baseKVPendingWarnThreshold)
return
}
@@ -947,6 +979,7 @@ func (au *accountUpdates) newBlockImpl(blk bookkeeping.Block, delta ledgercore.S
au.baseAccounts.flushPendingWrites()
au.baseResources.flushPendingWrites()
+ au.baseKVs.flushPendingWrites()
for i := 0; i < delta.Accts.Len(); i++ {
addr, data := delta.Accts.GetByIdx(i)
@@ -1001,6 +1034,8 @@ func (au *accountUpdates) newBlockImpl(blk bookkeeping.Block, delta ledgercore.S
au.baseAccounts.prune(newBaseAccountSize)
newBaseResourcesSize := (len(au.resources) + 1) + baseResourcesPendingAccountsBufferSize
au.baseResources.prune(newBaseResourcesSize)
+ newBaseKVSize := (len(au.kvStore) + 1) + baseKVPendingBufferSize
+ au.baseKVs.prune(newBaseKVSize)
}
// lookupLatest returns the account data for a given address for the latest round.
@@ -1626,7 +1661,7 @@ func (au *accountUpdates) commitRound(ctx context.Context, tx *sql.Tx, dcc *defe
// the updates of the actual account data is done last since the accountsNewRound would modify the compactDeltas old values
// so that we can update the base account back.
- dcc.updatedPersistedAccounts, dcc.updatedPersistedResources, err = accountsNewRound(tx, dcc.compactAccountDeltas, dcc.compactResourcesDeltas, dcc.compactKvDeltas, dcc.compactCreatableDeltas, dcc.genesisProto, dbRound+basics.Round(offset))
+ dcc.updatedPersistedAccounts, dcc.updatedPersistedResources, dcc.updatedPersistedKVs, err = accountsNewRound(tx, dcc.compactAccountDeltas, dcc.compactResourcesDeltas, dcc.compactKvDeltas, dcc.compactCreatableDeltas, dcc.genesisProto, dbRound+basics.Round(offset))
if err != nil {
return err
}
@@ -1717,6 +1752,10 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon
}
}
+ for key, persistedKV := range dcc.updatedPersistedKVs {
+ au.baseKVs.write(persistedKV, key)
+ }
+
for cidx, modCrt := range dcc.compactCreatableDeltas {
cnt := modCrt.Ndeltas
mcreat, ok := au.creatables[cidx]
@@ -1846,6 +1885,7 @@ func (au *accountUpdates) vacuumDatabase(ctx context.Context) (err error) {
// rowid are flushed.
au.baseAccounts.prune(0)
au.baseResources.prune(0)
+ au.baseKVs.prune(0)
startTime := time.Now()
vacuumExitCh := make(chan struct{}, 1)
diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go
index 6bb6e7c8e2..7ac098ebea 100644
--- a/ledger/acctupdates_test.go
+++ b/ledger/acctupdates_test.go
@@ -1216,7 +1216,7 @@ func TestListCreatables(t *testing.T) {
// sync with the database
var updates compactAccountDeltas
var resUpdates compactResourcesDeltas
- _, _, err = accountsNewRound(tx, updates, resUpdates, nil, ctbsWithDeletes, proto, basics.Round(1))
+ _, _, _, err = accountsNewRound(tx, updates, resUpdates, nil, ctbsWithDeletes, proto, basics.Round(1))
require.NoError(t, err)
// nothing left in cache
au.creatables = make(map[basics.CreatableIndex]ledgercore.ModifiedCreatable)
@@ -1232,7 +1232,7 @@ func TestListCreatables(t *testing.T) {
// ******* Results are obtained from the database and from the cache *******
// ******* Deletes are in the database and in the cache *******
// sync with the database. This has deletes synced to the database.
- _, _, err = accountsNewRound(tx, updates, resUpdates, nil, au.creatables, proto, basics.Round(1))
+ _, _, _, err = accountsNewRound(tx, updates, resUpdates, nil, au.creatables, proto, basics.Round(1))
require.NoError(t, err)
// get new creatables in the cache. There will be deletes in the cache from the previous batch.
au.creatables = randomCreatableSampling(3, ctbsList, randomCtbs,
@@ -1357,6 +1357,183 @@ func TestBoxNamesByAppIDs(t *testing.T) {
}
}
+func TestKVCache(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ t.Parallel()
+
+ initialBlocksCount := 1
+ accts := make(map[basics.Address]basics.AccountData)
+
+ protoParams := config.Consensus[protocol.ConsensusCurrentVersion]
+ ml := makeMockLedgerForTracker(t, true, initialBlocksCount, protocol.ConsensusCurrentVersion,
+ []map[basics.Address]basics.AccountData{accts},
+ )
+ defer ml.Close()
+
+ conf := config.GetDefaultLocal()
+ au, _ := newAcctUpdates(t, ml, conf)
+ defer au.close()
+
+ knownCreatables := make(map[basics.CreatableIndex]bool)
+ opts := auNewBlockOpts{ledgercore.AccountDeltas{}, protocol.ConsensusCurrentVersion, protoParams, knownCreatables}
+
+ kvCnt := 1000
+ kvsPerBlock := 100
+ curKV := 0
+ var currentRound basics.Round
+ currentDBRound := basics.Round(1)
+
+ kvMap := make(map[string]string)
+ for i := 0; i < kvCnt; i++ {
+ kvMap[fmt.Sprintf("%d", i)] = fmt.Sprintf("value%d", i)
+ }
+
+ // add kvsPerBlock KVs on each iteration. The first kvCnt/kvsPerBlock
+ // iterations produce a block with kvCnt kv manipulations. The last
+ // conf.MaxAcctLookback iterations are meant to verify the contents of the cache
+ // are correct after every kv containing block has been committed.
+ for i := 0; i < kvCnt/kvsPerBlock+int(conf.MaxAcctLookback); i++ {
+ currentRound = currentRound + 1
+ kvMods := make(map[string]*string)
+ if i < kvCnt/kvsPerBlock {
+ for j := 0; j < kvsPerBlock; j++ {
+ name := fmt.Sprintf("%d", curKV)
+ curKV++
+ val := kvMap[name]
+ kvMods[name] = &val
+ }
+ }
+
+ auNewBlock(t, currentRound, au, accts, opts, kvMods)
+ auCommitSync(t, currentRound, au, ml)
+
+ // ensure rounds
+ rnd := au.latest()
+ require.Equal(t, currentRound, rnd)
+ if uint64(currentRound) > conf.MaxAcctLookback {
+ require.Equal(t, basics.Round(uint64(currentRound)-conf.MaxAcctLookback), au.cachedDBRound)
+ } else {
+ require.Equal(t, basics.Round(0), au.cachedDBRound)
+ }
+
+ // verify cache doesn't contain the new kvs until committed to DB.
+ for name := range kvMods {
+ _, has := au.baseKVs.read(name)
+ require.False(t, has)
+ }
+
+ // verify commited kvs appear in the kv cache
+ for ; currentDBRound <= au.cachedDBRound; currentDBRound++ {
+ startKV := (currentDBRound - 1) * basics.Round(kvsPerBlock)
+ for j := 0; j < kvsPerBlock; j++ {
+ name := fmt.Sprintf("%d", uint64(startKV)+uint64(j))
+ persistedValue, has := au.baseKVs.read(name)
+ require.True(t, has)
+ require.Equal(t, kvMap[name], *persistedValue.value)
+ }
+ }
+ }
+
+ // updating inserted KVs
+ curKV = 0
+ for i := 0; i < kvCnt/kvsPerBlock+int(conf.MaxAcctLookback); i++ {
+ currentRound = currentRound + 1
+
+ kvMods := make(map[string]*string)
+ if i < kvCnt/kvsPerBlock {
+ for j := 0; j < kvsPerBlock; j++ {
+ name := fmt.Sprintf("%d", curKV)
+ val := fmt.Sprintf("modified value%d", curKV)
+ kvMods[name] = &val
+ curKV++
+ }
+ }
+
+ auNewBlock(t, currentRound, au, accts, opts, kvMods)
+ auCommitSync(t, currentRound, au, ml)
+
+ // ensure rounds
+ rnd := au.latest()
+ require.Equal(t, currentRound, rnd)
+ require.Equal(t, basics.Round(uint64(currentRound)-conf.MaxAcctLookback), au.cachedDBRound)
+
+ // verify cache doesn't contain updated kv values that haven't been committed to db
+ if i < kvCnt/kvsPerBlock {
+ for name := range kvMods {
+ persistedValue, has := au.baseKVs.read(name)
+ require.True(t, has)
+ require.Equal(t, kvMap[name], *persistedValue.value)
+ }
+ }
+
+ // verify commited updated kv values appear in the kv cache
+ for ; currentDBRound <= au.cachedDBRound; currentDBRound++ {
+ lookback := basics.Round(kvCnt/kvsPerBlock + int(conf.MaxAcctLookback) + 1)
+ if currentDBRound < lookback {
+ continue
+ }
+
+ startKV := (currentDBRound - lookback) * basics.Round(kvsPerBlock)
+ for j := 0; j < kvsPerBlock; j++ {
+ name := fmt.Sprintf("%d", uint64(startKV)+uint64(j))
+ persistedValue, has := au.baseKVs.read(name)
+ require.True(t, has)
+ expectedValue := fmt.Sprintf("modified value%s", name)
+ require.Equal(t, expectedValue, *persistedValue.value)
+ }
+ }
+ }
+
+ // deleting KVs
+ curKV = 0
+ for i := 0; i < kvCnt/kvsPerBlock+int(conf.MaxAcctLookback); i++ {
+ currentRound = currentRound + 1
+
+ kvMods := make(map[string]*string)
+ if i < kvCnt/kvsPerBlock {
+ for j := 0; j < kvsPerBlock; j++ {
+ name := fmt.Sprintf("%d", curKV)
+ kvMods[name] = nil
+ curKV++
+ }
+ }
+
+ auNewBlock(t, currentRound, au, accts, opts, kvMods)
+ auCommitSync(t, currentRound, au, ml)
+
+ // ensure rounds
+ rnd := au.latest()
+ require.Equal(t, currentRound, rnd)
+ require.Equal(t, basics.Round(uint64(currentRound)-conf.MaxAcctLookback), au.cachedDBRound)
+
+ // verify cache doesn't contain updated kv values that haven't been committed to db
+ if i < kvCnt/kvsPerBlock {
+ for name := range kvMods {
+ persistedValue, has := au.baseKVs.read(name)
+ require.True(t, has)
+ value := fmt.Sprintf("modified value%s", name)
+ require.Equal(t, value, *persistedValue.value)
+ }
+ }
+
+ // verify commited updated kv values appear in the kv cache
+ for ; currentDBRound <= au.cachedDBRound; currentDBRound++ {
+ lookback := basics.Round(2*(kvCnt/kvsPerBlock+int(conf.MaxAcctLookback)) + 1)
+ if currentDBRound < lookback {
+ continue
+ }
+
+ startKV := (currentDBRound - lookback) * basics.Round(kvsPerBlock)
+ for j := 0; j < kvsPerBlock; j++ {
+ name := fmt.Sprintf("%d", uint64(startKV)+uint64(j))
+ persistedValue, has := au.baseKVs.read(name)
+ require.True(t, has)
+ require.True(t, persistedValue.value == nil)
+ }
+ }
+ }
+}
+
func accountsAll(tx *sql.Tx) (bals map[basics.Address]basics.AccountData, err error) {
rows, err := tx.Query("SELECT rowid, address, data FROM accountbase")
if err != nil {
@@ -1436,7 +1613,7 @@ func BenchmarkLargeMerkleTrieRebuild(b *testing.B) {
}
err := ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
- _, _, err = accountsNewRound(tx, updates, compactResourcesDeltas{}, nil, nil, proto, basics.Round(1))
+ _, _, _, err = accountsNewRound(tx, updates, compactResourcesDeltas{}, nil, nil, proto, basics.Round(1))
return
})
require.NoError(b, err)
diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go
index 5285bf69fd..4a08d9e1f9 100644
--- a/ledger/catchpointtracker_test.go
+++ b/ledger/catchpointtracker_test.go
@@ -363,7 +363,7 @@ func BenchmarkLargeCatchpointDataWriting(b *testing.B) {
i++
}
- _, _, err = accountsNewRound(tx, updates, compactResourcesDeltas{}, nil, nil, proto, basics.Round(1))
+ _, _, _, err = accountsNewRound(tx, updates, compactResourcesDeltas{}, nil, nil, proto, basics.Round(1))
if err != nil {
return
}
diff --git a/ledger/lrukv.go b/ledger/lrukv.go
new file mode 100644
index 0000000000..45f4f5027d
--- /dev/null
+++ b/ledger/lrukv.go
@@ -0,0 +1,132 @@
+// Copyright (C) 2019-2022 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package ledger
+
+import (
+ "github.com/algorand/go-algorand/logging"
+)
+
+//msgp:ignore cachedKVData
+type cachedKVData struct {
+ persistedKVData
+
+ // kv key
+ key string
+}
+
+// lruKV provides a storage class for the most recently used kv data.
+// It doesn't have any synchronization primitive on it's own and require to be
+// syncronized by the caller.
+type lruKV struct {
+ // kvList contain the list of persistedKVData, where the front ones are the most "fresh"
+ // and the ones on the back are the oldest.
+ kvList *persistedKVDataList
+
+ // kvs provides fast access to the various elements in the list by using the key
+ kvs map[string]*persistedKVDataListNode
+
+ // pendingKVs are used as a way to avoid taking a write-lock. When the caller needs to "materialize" these,
+ // it would call flushPendingWrites and these would be merged into the kvs/kvList
+ pendingKVs chan cachedKVData
+
+ // log interface; used for logging the threshold event.
+ log logging.Logger
+
+ // pendingWritesWarnThreshold is the threshold beyond we would write a warning for exceeding the number of pendingKVs entries
+ pendingWritesWarnThreshold int
+}
+
+// init initializes the lruKV for use.
+// thread locking semantics : write lock
+func (m *lruKV) init(log logging.Logger, pendingWrites int, pendingWritesWarnThreshold int) {
+ m.kvList = newPersistedKVList().allocateFreeNodes(pendingWrites)
+ m.kvs = make(map[string]*persistedKVDataListNode, pendingWrites)
+ m.pendingKVs = make(chan cachedKVData, pendingWrites)
+ m.log = log
+ m.pendingWritesWarnThreshold = pendingWritesWarnThreshold
+}
+
+// read the persistedKVData object that the lruKV has for the given key.
+// thread locking semantics : read lock
+func (m *lruKV) read(key string) (data persistedKVData, has bool) {
+ if el := m.kvs[key]; el != nil {
+ return el.Value.persistedKVData, true
+ }
+ return persistedKVData{}, false
+}
+
+// flushPendingWrites flushes the pending writes to the main lruKV cache.
+// thread locking semantics : write lock
+func (m *lruKV) flushPendingWrites() {
+ pendingEntriesCount := len(m.pendingKVs)
+ if pendingEntriesCount >= m.pendingWritesWarnThreshold {
+ m.log.Warnf("lruKV: number of entries in pendingKVs(%d) exceed the warning threshold of %d", pendingEntriesCount, m.pendingWritesWarnThreshold)
+ }
+ for ; pendingEntriesCount > 0; pendingEntriesCount-- {
+ select {
+ case pendingKVData := <-m.pendingKVs:
+ m.write(pendingKVData.persistedKVData, pendingKVData.key)
+ default:
+ return
+ }
+ }
+}
+
+// writePending write a single persistedKVData entry to the pendingKVs buffer.
+// the function doesn't block, and in case of a buffer overflow the entry would not be added.
+// thread locking semantics : no lock is required.
+func (m *lruKV) writePending(kv persistedKVData, key string) {
+ select {
+ case m.pendingKVs <- cachedKVData{persistedKVData: kv, key: key}:
+ default:
+ }
+}
+
+// write a single persistedKVData to the lruKV cache.
+// when writing the entry, the round number would be used to determine if it's a newer
+// version of what's already on the cache or not. In all cases, the entry is going
+// to be promoted to the front of the list.
+// thread locking semantics : write lock
+func (m *lruKV) write(kvData persistedKVData, key string) {
+ if el := m.kvs[key]; el != nil {
+ // already exists; is it a newer ?
+ if el.Value.before(&kvData) {
+ // we update with a newer version.
+ el.Value = &cachedKVData{persistedKVData: kvData, key: key}
+ }
+ m.kvList.moveToFront(el)
+ } else {
+ // new entry.
+ m.kvs[key] = m.kvList.pushFront(&cachedKVData{persistedKVData: kvData, key: key})
+ }
+}
+
+// prune adjust the current size of the lruKV cache, by dropping the least
+// recently used entries.
+// thread locking semantics : write lock
+func (m *lruKV) prune(newSize int) (removed int) {
+ for {
+ if len(m.kvs) <= newSize {
+ break
+ }
+ back := m.kvList.back()
+ delete(m.kvs, back.Value.key)
+ m.kvList.remove(back)
+ removed++
+ }
+ return
+}
diff --git a/ledger/lrukv_test.go b/ledger/lrukv_test.go
new file mode 100644
index 0000000000..8eebb1459c
--- /dev/null
+++ b/ledger/lrukv_test.go
@@ -0,0 +1,240 @@
+// Copyright (C) 2019-2022 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package ledger
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/algorand/go-algorand/crypto"
+ "github.com/algorand/go-algorand/data/basics"
+ "github.com/algorand/go-algorand/logging"
+ "github.com/algorand/go-algorand/test/partitiontest"
+)
+
+func TestLRUBasicKV(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ var baseKV lruKV
+ baseKV.init(logging.TestingLog(t), 10, 5)
+
+ kvNum := 50
+ // write 50 KVs
+ for i := 0; i < kvNum; i++ {
+ kvValue := fmt.Sprintf("kv %d value", i)
+ kv := persistedKVData{
+ value: &kvValue,
+ round: basics.Round(i),
+ }
+ baseKV.write(kv, fmt.Sprintf("key%d", i))
+ }
+
+ // verify that all these KVs are truly there.
+ for i := 0; i < kvNum; i++ {
+ kv, has := baseKV.read(fmt.Sprintf("key%d", i))
+ require.True(t, has)
+ require.Equal(t, basics.Round(i), kv.round)
+ require.Equal(t, fmt.Sprintf("kv %d value", i), *(kv.value))
+ }
+
+ // verify expected missing entries
+ for i := kvNum; i < kvNum*2; i++ {
+ kv, has := baseKV.read(fmt.Sprintf("key%d", i))
+ require.False(t, has)
+ require.Equal(t, persistedKVData{}, kv)
+ }
+
+ baseKV.prune(kvNum / 2)
+
+ // verify expected (missing/existing) entries
+ for i := 0; i < kvNum*2; i++ {
+ kv, has := baseKV.read(fmt.Sprintf("key%d", i))
+
+ if i >= kvNum/2 && i < kvNum {
+ // expected to have it.
+ require.True(t, has)
+ require.Equal(t, basics.Round(i), kv.round)
+ require.Equal(t, fmt.Sprintf("kv %d value", i), *(kv.value))
+ } else {
+ require.False(t, has)
+ require.Equal(t, persistedKVData{}, kv)
+ }
+ }
+}
+
+func TestLRUKVPendingWrites(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ var baseKV lruKV
+ kvNum := 250
+ baseKV.init(logging.TestingLog(t), kvNum*2, kvNum)
+
+ for i := 0; i < kvNum; i++ {
+ go func(i int) {
+ time.Sleep(time.Duration((crypto.RandUint64() % 50)) * time.Millisecond)
+ kvValue := fmt.Sprintf("kv %d value", i)
+ kv := persistedKVData{
+ value: &kvValue,
+ round: basics.Round(i),
+ }
+ baseKV.writePending(kv, fmt.Sprintf("key%d", i))
+ }(i)
+ }
+ testStarted := time.Now()
+ for {
+ baseKV.flushPendingWrites()
+
+ // check if all kvs were loaded into "main" cache.
+ allKVsLoaded := true
+ for i := 0; i < kvNum; i++ {
+ _, has := baseKV.read(fmt.Sprintf("key%d", i))
+ if !has {
+ allKVsLoaded = false
+ break
+ }
+ }
+ if allKVsLoaded {
+ break
+ }
+ if time.Since(testStarted).Seconds() > 20 {
+ require.Fail(t, "failed after waiting for 20 second")
+ }
+ // not yet, keep looping.
+ }
+}
+
+type lruKVTestLogger struct {
+ logging.Logger
+ WarnfCallback func(string, ...interface{})
+ warnMsgCount int
+}
+
+func (cl *lruKVTestLogger) Warnf(s string, args ...interface{}) {
+ cl.warnMsgCount++
+}
+
+func TestLRUKVPendingWritesWarning(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ var baseKV lruKV
+ pendingWritesBuffer := 50
+ pendingWritesThreshold := 40
+ log := &lruKVTestLogger{Logger: logging.TestingLog(t)}
+ baseKV.init(log, pendingWritesBuffer, pendingWritesThreshold)
+ for j := 0; j < 50; j++ {
+ for i := 0; i < j; i++ {
+ kvValue := fmt.Sprintf("kv %d value", i)
+ kv := persistedKVData{
+ value: &kvValue,
+ round: basics.Round(i),
+ }
+ baseKV.writePending(kv, fmt.Sprintf("key%d", i))
+ }
+ baseKV.flushPendingWrites()
+ if j >= pendingWritesThreshold {
+ // expect a warning in the log
+ require.Equal(t, 1+j-pendingWritesThreshold, log.warnMsgCount)
+ }
+ }
+}
+
+func TestLRUKVOmittedPendingWrites(t *testing.T) {
+ partitiontest.PartitionTest(t)
+
+ var baseKV lruKV
+ pendingWritesBuffer := 50
+ pendingWritesThreshold := 40
+ log := &lruKVTestLogger{Logger: logging.TestingLog(t)}
+ baseKV.init(log, pendingWritesBuffer, pendingWritesThreshold)
+
+ for i := 0; i < pendingWritesBuffer*2; i++ {
+ kvValue := fmt.Sprintf("kv %d value", i)
+ kv := persistedKVData{
+ value: &kvValue,
+ round: basics.Round(i),
+ }
+ baseKV.writePending(kv, fmt.Sprintf("key%d", i))
+ }
+
+ baseKV.flushPendingWrites()
+
+ // verify that all these kvs are truly there.
+ for i := 0; i < pendingWritesBuffer; i++ {
+ kv, has := baseKV.read(fmt.Sprintf("key%d", i))
+ require.True(t, has)
+ require.Equal(t, basics.Round(i), kv.round)
+ require.Equal(t, fmt.Sprintf("kv %d value", i), *(kv.value))
+ }
+
+ // verify expected missing entries
+ for i := pendingWritesBuffer; i < pendingWritesBuffer*2; i++ {
+ kv, has := baseKV.read(fmt.Sprintf("key%d", i))
+ require.False(t, has)
+ require.Equal(t, persistedKVData{}, kv)
+ }
+}
+
+func BenchmarkLRUKVWrite(b *testing.B) {
+ numTestKV := 5000
+ // there are 2500 kvs that overlap
+ fillerKVs := generatePersistedKVData(0, 97500)
+ kvs := generatePersistedKVData(97500-numTestKV/2, 97500+numTestKV/2)
+
+ benchLruWriteKVs(b, fillerKVs, kvs)
+}
+
+func benchLruWriteKVs(b *testing.B, fillerKVs []cachedKVData, kvs []cachedKVData) {
+ b.ResetTimer()
+ b.StopTimer()
+ var baseKV lruKV
+ // setting up the baseKV with a predefined cache size
+ baseKV.init(logging.TestingLog(b), baseKVPendingBufferSize, baseKVPendingWarnThreshold)
+ for i := 0; i < b.N; i++ {
+ baseKV = fillLRUKV(baseKV, fillerKVs)
+
+ b.StartTimer()
+ fillLRUKV(baseKV, kvs)
+ b.StopTimer()
+ baseKV.prune(0)
+ }
+}
+
+func fillLRUKV(baseKV lruKV, fillerKVs []cachedKVData) lruKV {
+ for _, entry := range fillerKVs {
+ baseKV.write(entry.persistedKVData, entry.key)
+ }
+ return baseKV
+}
+
+func generatePersistedKVData(startRound, endRound int) []cachedKVData {
+ kvs := make([]cachedKVData, endRound-startRound)
+ for i := startRound; i < endRound; i++ {
+ kvValue := fmt.Sprintf("kv %d value", i)
+
+ kvs[i-startRound] = cachedKVData{
+ persistedKVData: persistedKVData{
+ value: &kvValue,
+ round: basics.Round(i + startRound),
+ },
+ key: fmt.Sprintf("key%d", i),
+ }
+ }
+ return kvs
+}
diff --git a/ledger/persistedkvs.go b/ledger/persistedkvs.go
new file mode 100644
index 0000000000..34f3c36ecb
--- /dev/null
+++ b/ledger/persistedkvs.go
@@ -0,0 +1,143 @@
+// Copyright (C) 2019-2022 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package ledger
+
+// persistedKVDataList represents a doubly linked list.
+// must initiate with newPersistedKVList.
+type persistedKVDataList struct {
+ root persistedKVDataListNode // sentinel list element, only &root, root.prev, and root.next are used
+ freeList *persistedKVDataListNode // preallocated nodes location
+}
+
+type persistedKVDataListNode struct {
+ // Next and previous pointers in the doubly-linked list of elements.
+ // To simplify the implementation, internally a list l is implemented
+ // as a ring, such that &l.root is both the next element of the last
+ // list element (l.Back()) and the previous element of the first list
+ // element (l.Front()).
+ next, prev *persistedKVDataListNode
+
+ Value *cachedKVData
+}
+
+func newPersistedKVList() *persistedKVDataList {
+ l := new(persistedKVDataList)
+ l.root.next = &l.root
+ l.root.prev = &l.root
+ // used as a helper but does not store value
+ l.freeList = new(persistedKVDataListNode)
+
+ return l
+}
+
+func (l *persistedKVDataList) insertNodeToFreeList(otherNode *persistedKVDataListNode) {
+ otherNode.next = l.freeList.next
+ otherNode.prev = nil
+ otherNode.Value = nil
+
+ l.freeList.next = otherNode
+}
+
+func (l *persistedKVDataList) getNewNode() *persistedKVDataListNode {
+ if l.freeList.next == nil {
+ return new(persistedKVDataListNode)
+ }
+ newNode := l.freeList.next
+ l.freeList.next = newNode.next
+
+ return newNode
+}
+
+func (l *persistedKVDataList) allocateFreeNodes(numAllocs int) *persistedKVDataList {
+ if l.freeList == nil {
+ return l
+ }
+ for i := 0; i < numAllocs; i++ {
+ l.insertNodeToFreeList(new(persistedKVDataListNode))
+ }
+
+ return l
+}
+
+// Back returns the last element of list l or nil if the list is empty.
+func (l *persistedKVDataList) back() *persistedKVDataListNode {
+ isEmpty := func(list *persistedKVDataList) bool {
+ // assumes we are inserting correctly to the list - using pushFront.
+ return list.root.next == &list.root
+ }
+ if isEmpty(l) {
+ return nil
+ }
+ return l.root.prev
+}
+
+// remove removes e from l if e is an element of list l.
+// It returns the element value e.Value.
+// The element must not be nil.
+func (l *persistedKVDataList) remove(e *persistedKVDataListNode) {
+ e.prev.next = e.next
+ e.next.prev = e.prev
+ e.next = nil // avoid memory leaks
+ e.prev = nil // avoid memory leaks
+
+ l.insertNodeToFreeList(e)
+}
+
+// pushFront inserts a new element e with value v at the front of list l and returns e.
+func (l *persistedKVDataList) pushFront(v *cachedKVData) *persistedKVDataListNode {
+ newNode := l.getNewNode()
+ newNode.Value = v
+ return l.insertValue(newNode, &l.root)
+}
+
+// insertValue inserts e after at, increments l.len, and returns e.
+func (l *persistedKVDataList) insertValue(newNode *persistedKVDataListNode, at *persistedKVDataListNode) *persistedKVDataListNode {
+ n := at.next
+ at.next = newNode
+ newNode.prev = at
+ newNode.next = n
+ n.prev = newNode
+
+ return newNode
+}
+
+// moveToFront moves element e to the front of list l.
+// If e is not an element of l, the list is not modified.
+// The element must not be nil.
+func (l *persistedKVDataList) moveToFront(e *persistedKVDataListNode) {
+ if l.root.next == e {
+ return
+ }
+ l.move(e, &l.root)
+}
+
+// move moves e to next to at and returns e.
+func (l *persistedKVDataList) move(e, at *persistedKVDataListNode) *persistedKVDataListNode {
+ if e == at {
+ return e
+ }
+ e.prev.next = e.next
+ e.next.prev = e.prev
+
+ n := at.next
+ at.next = e
+ e.prev = at
+ e.next = n
+ n.prev = e
+
+ return e
+}
diff --git a/ledger/persistedkvs_test.go b/ledger/persistedkvs_test.go
new file mode 100644
index 0000000000..eb5ed9dff7
--- /dev/null
+++ b/ledger/persistedkvs_test.go
@@ -0,0 +1,175 @@
+// Copyright (C) 2019-2022 Algorand, Inc.
+// This file is part of go-algorand
+//
+// go-algorand is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// go-algorand is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with go-algorand. If not, see .
+
+package ledger
+
+import (
+ "testing"
+
+ "github.com/algorand/go-algorand/test/partitiontest"
+)
+
+func (l *persistedKVDataList) getRoot() dataListNode {
+ return &l.root
+}
+
+func (l *persistedKVDataListNode) getNext() dataListNode {
+ // get rid of returning nil wrapped into an interface to let i = x.getNext(); i != nil work.
+ if l.next == nil {
+ return nil
+ }
+ return l.next
+}
+
+func (l *persistedKVDataListNode) getPrev() dataListNode {
+ if l.prev == nil {
+ return nil
+ }
+ return l.prev
+}
+
+// inspect that the list seems like the array
+func checkListPointersBD(t *testing.T, l *persistedKVDataList, es []*persistedKVDataListNode) {
+ es2 := make([]dataListNode, len(es))
+ for i, el := range es {
+ es2[i] = el
+ }
+
+ checkListPointers(t, l, es2)
+}
+
+func TestRemoveFromListBD(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ l := newPersistedKVList()
+ e1 := l.pushFront(&cachedKVData{key: "key1"})
+ e2 := l.pushFront(&cachedKVData{key: "key2"})
+ e3 := l.pushFront(&cachedKVData{key: "key3"})
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e3, e2, e1})
+
+ l.remove(e2)
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e3, e1})
+ l.remove(e3)
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e1})
+}
+
+func TestAddingNewNodeWithAllocatedFreeListBD(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ l := newPersistedKVList().allocateFreeNodes(10)
+ checkListPointersBD(t, l, []*persistedKVDataListNode{})
+ if countListSize(l.freeList) != 10 {
+ t.Errorf("free list did not allocate nodes")
+ return
+ }
+ // test elements
+ e1 := l.pushFront(&cachedKVData{key: "key1"})
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e1})
+
+ if countListSize(l.freeList) != 9 {
+ t.Errorf("free list did not provide a node on new list entry")
+ return
+ }
+}
+
+func TestMultielementListPositioningBD(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ l := newPersistedKVList()
+ checkListPointersBD(t, l, []*persistedKVDataListNode{})
+ // test elements
+ e2 := l.pushFront(&cachedKVData{key: "key1"})
+ e1 := l.pushFront(&cachedKVData{key: "key2"})
+ e3 := l.pushFront(&cachedKVData{key: "key3"})
+ e4 := l.pushFront(&cachedKVData{key: "key4"})
+ e5 := l.pushFront(&cachedKVData{key: "key5"})
+
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e5, e4, e3, e1, e2})
+
+ l.move(e4, e1)
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e5, e3, e1, e4, e2})
+
+ l.remove(e5)
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e3, e1, e4, e2})
+
+ l.move(e1, e4) // swap in middle
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e3, e4, e1, e2})
+
+ l.moveToFront(e4)
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e4, e3, e1, e2})
+
+ l.remove(e2)
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e4, e3, e1})
+
+ l.moveToFront(e3) // move from middle
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e3, e4, e1})
+
+ l.moveToFront(e1) // move from end
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e1, e3, e4})
+
+ l.moveToFront(e1) // no movement
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e1, e3, e4})
+
+ e2 = l.pushFront(&cachedKVData{key: "key2"})
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e2, e1, e3, e4})
+
+ l.remove(e3) // removing from middle
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e2, e1, e4})
+
+ l.remove(e4) // removing from end
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e2, e1})
+
+ l.move(e2, e1) // swapping between two elements
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e1, e2})
+
+ l.remove(e1) // removing front
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e2})
+
+ l.move(e2, l.back()) // swapping element with itself.
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e2})
+
+ l.remove(e2) // remove last one
+ checkListPointersBD(t, l, []*persistedKVDataListNode{})
+}
+
+func TestSingleElementListPositioningBD(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ l := newPersistedKVList()
+ checkListPointersBD(t, l, []*persistedKVDataListNode{})
+ e := l.pushFront(&cachedKVData{key: "key1"})
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e})
+ l.moveToFront(e)
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e})
+ l.remove(e)
+ checkListPointersBD(t, l, []*persistedKVDataListNode{})
+}
+
+func TestRemovedNodeShouldBeMovedToFreeListBD(t *testing.T) {
+ partitiontest.PartitionTest(t)
+ l := newPersistedKVList()
+ e1 := l.pushFront(&cachedKVData{key: "key1"})
+ e2 := l.pushFront(&cachedKVData{key: "key2"})
+
+ checkListPointersBD(t, l, []*persistedKVDataListNode{e2, e1})
+
+ e := l.back()
+ l.remove(e)
+
+ for i := l.freeList.next; i != nil; i = i.next {
+ if i == e {
+ // stopping the tst with good results:
+ return
+ }
+ }
+ t.Error("expected the removed node to appear at the freelist")
+}
diff --git a/ledger/persistedresources_list.go b/ledger/persistedresources_list.go
index 57b0cdc44c..baa7ac351a 100644
--- a/ledger/persistedresources_list.go
+++ b/ledger/persistedresources_list.go
@@ -17,7 +17,7 @@
package ledger
// persistedResourcesDataList represents a doubly linked list.
-// must initiate with newPersistedAccountList.
+// must initiate with newPersistedResourcesList.
type persistedResourcesDataList struct {
root persistedResourcesDataListNode // sentinel list element, only &root, root.prev, and root.next are used
freeList *persistedResourcesDataListNode // preallocated nodes location
diff --git a/ledger/tracker.go b/ledger/tracker.go
index b51fc666e6..4d0e7582df 100644
--- a/ledger/tracker.go
+++ b/ledger/tracker.go
@@ -248,6 +248,7 @@ type deferredCommitContext struct {
updatedPersistedAccounts []persistedAccountData
updatedPersistedResources map[basics.Address][]persistedResourcesData
+ updatedPersistedKVs map[string]persistedKVData
compactOnlineAccountDeltas compactOnlineAccountDeltas
updatedPersistedOnlineAccounts []persistedOnlineAccountData