diff --git a/ledger/accountdb.go b/ledger/accountdb.go index bab53168f0..1450d37557 100644 --- a/ledger/accountdb.go +++ b/ledger/accountdb.go @@ -271,6 +271,15 @@ func (prd *persistedResourcesData) AccountResource() ledgercore.AccountResource return ret } +//msgp:ignore persistedKVData +type persistedKVData struct { + // kv value + value *string + // the round number that is associated with the kv value. This field is the corresponding one to the round field + // in persistedAccountData, and serves the same purpose. + round basics.Round +} + // resourceDelta is used as part of the compactResourcesDeltas to describe a change to a single resource. type resourceDelta struct { oldResource persistedResourcesData @@ -2596,12 +2605,7 @@ func (qs *accountsDbQueries) listCreatables(maxIdx basics.CreatableIndex, maxRes return } -type persistedValue struct { - value *string - round basics.Round -} - -func (qs *accountsDbQueries) lookupKeyValue(key string) (pv persistedValue, err error) { +func (qs *accountsDbQueries) lookupKeyValue(key string) (pv persistedKVData, err error) { err = db.Retry(func() error { var v sql.NullString // Cast to []byte to avoid interpretation as character string, see note in upsertKvPair @@ -3444,7 +3448,7 @@ func accountsNewRound( tx *sql.Tx, updates compactAccountDeltas, resources compactResourcesDeltas, kvPairs map[string]modifiedValue, creatables map[basics.CreatableIndex]ledgercore.ModifiedCreatable, proto config.ConsensusParams, lastUpdateRound basics.Round, -) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, err error) { +) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, updatedKVs map[string]persistedKVData, err error) { hasAccounts := updates.len() > 0 hasResources := resources.len() > 0 hasKvPairs := len(kvPairs) > 0 @@ -3482,7 +3486,7 @@ func accountsNewRoundImpl( writer accountsWriter, updates compactAccountDeltas, resources compactResourcesDeltas, kvPairs map[string]modifiedValue, creatables map[basics.CreatableIndex]ledgercore.ModifiedCreatable, proto config.ConsensusParams, lastUpdateRound basics.Round, -) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, err error) { +) (updatedAccounts []persistedAccountData, updatedResources map[basics.Address][]persistedResourcesData, updatedKVs map[string]persistedKVData, err error) { updatedAccounts = make([]persistedAccountData, updates.len()) updatedAccountIdx := 0 newAddressesRowIDs := make(map[basics.Address]int64) @@ -3680,11 +3684,14 @@ func accountsNewRoundImpl( } } + updatedKVs = make(map[string]persistedKVData, len(kvPairs)) for key, value := range kvPairs { if value.data != nil { err = writer.upsertKvPair(key, *value.data) + updatedKVs[key] = persistedKVData{value: value.data, round: lastUpdateRound} } else { err = writer.deleteKvPair(key) + updatedKVs[key] = persistedKVData{value: nil, round: lastUpdateRound} } if err != nil { return @@ -4809,6 +4816,12 @@ func (prd *persistedResourcesData) before(other *persistedResourcesData) bool { return prd.round < other.round } +// before compares the round numbers of two persistedKVData and determines if the current persistedKVData +// happened before the other. +func (prd persistedKVData) before(other *persistedKVData) bool { + return prd.round < other.round +} + // before compares the round numbers of two persistedAccountData and determines if the current persistedAccountData // happened before the other. func (pac *persistedOnlineAccountData) before(other *persistedOnlineAccountData) bool { diff --git a/ledger/accountdb_test.go b/ledger/accountdb_test.go index 65987429e5..9a48853e48 100644 --- a/ledger/accountdb_test.go +++ b/ledger/accountdb_test.go @@ -314,7 +314,7 @@ func TestAccountDBRound(t *testing.T) { require.NoError(t, err) expectedOnlineRoundParams = append(expectedOnlineRoundParams, onlineRoundParams) - updatedAccts, updatesResources, err := accountsNewRound(tx, updatesCnt, resourceUpdatesCnt, nil, ctbsWithDeletes, proto, basics.Round(i)) + updatedAccts, updatesResources, updatedKVs, err := accountsNewRound(tx, updatesCnt, resourceUpdatesCnt, nil, ctbsWithDeletes, proto, basics.Round(i)) require.NoError(t, err) require.Equal(t, updatesCnt.len(), len(updatedAccts)) numResUpdates := 0 @@ -322,6 +322,7 @@ func TestAccountDBRound(t *testing.T) { numResUpdates += len(rs) } require.Equal(t, resourceUpdatesCnt.len(), numResUpdates) + require.Empty(t, updatedKVs) updatedOnlineAccts, err := onlineAccountsNewRound(tx, updatesOnlineCnt, proto, basics.Round(i)) require.NoError(t, err) @@ -451,7 +452,7 @@ func TestAccountDBInMemoryAcct(t *testing.T) { err = outResourcesDeltas.resourcesLoadOld(tx, knownAddresses) require.NoError(t, err) - updatedAccts, updatesResources, err := accountsNewRound(tx, outAccountDeltas, outResourcesDeltas, nil, nil, proto, basics.Round(lastRound)) + updatedAccts, updatesResources, updatedKVs, err := accountsNewRound(tx, outAccountDeltas, outResourcesDeltas, nil, nil, proto, basics.Round(lastRound)) require.NoError(t, err) require.Equal(t, 1, len(updatedAccts)) // we store empty even for deleted accounts require.Equal(t, @@ -464,6 +465,8 @@ func TestAccountDBInMemoryAcct(t *testing.T) { persistedResourcesData{addrid: 0, aidx: 100, data: makeResourcesData(0), round: basics.Round(lastRound)}, updatesResources[addr][0], ) + + require.Empty(t, updatedKVs) }) } } @@ -2888,12 +2891,13 @@ func TestAccountUnorderedUpdates(t *testing.T) { t.Run(fmt.Sprintf("acct-perm-%d|res-perm-%d", i, j), func(t *testing.T) { a := require.New(t) mock2 := mock.clone() - updatedAccounts, updatedResources, err := accountsNewRoundImpl( + updatedAccounts, updatedResources, updatedKVs, err := accountsNewRoundImpl( &mock2, acctVariant, resVariant, nil, nil, config.ConsensusParams{}, latestRound, ) a.NoError(err) - a.Equal(3, len(updatedAccounts)) - a.Equal(3, len(updatedResources)) + a.Len(updatedAccounts, 3) + a.Len(updatedResources, 3) + a.Empty(updatedKVs) }) } } @@ -2970,12 +2974,13 @@ func TestAccountsNewRoundDeletedResourceEntries(t *testing.T) { a.Equal(1, len(resDeltas.misses)) // (addr2, aidx) does not exist a.Equal(2, resDeltas.len()) // (addr1, aidx) found - updatedAccounts, updatedResources, err := accountsNewRoundImpl( + updatedAccounts, updatedResources, updatedKVs, err := accountsNewRoundImpl( &mock, acctDeltas, resDeltas, nil, nil, config.ConsensusParams{}, latestRound, ) a.NoError(err) a.Equal(3, len(updatedAccounts)) a.Equal(2, len(updatedResources)) + a.Equal(0, len(updatedKVs)) // one deletion entry for pre-existing account addr1, and one entry for in-memory account addr2 // in base accounts updates and in resources updates @@ -2999,6 +3004,155 @@ func TestAccountsNewRoundDeletedResourceEntries(t *testing.T) { } } +func BenchmarkLRUResources(b *testing.B) { + var baseResources lruResources + baseResources.init(nil, 1000, 850) + + var data persistedResourcesData + var has bool + addrs := make([]basics.Address, 850) + for i := 0; i < 850; i++ { + data.data.ApprovalProgram = make([]byte, 8096*4) + data.aidx = basics.CreatableIndex(1) + addrBytes := ([]byte(fmt.Sprintf("%d", i)))[:32] + var addr basics.Address + for i, b := range addrBytes { + addr[i] = b + } + addrs[i] = addr + baseResources.write(data, addr) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + pos := i % 850 + data, has = baseResources.read(addrs[pos], basics.CreatableIndex(1)) + require.True(b, has) + } +} + +func initBoxDatabase(b *testing.B, totalBoxes, boxSize int) (db.Pair, func(), error) { + batchCount := 100 + if batchCount > totalBoxes { + batchCount = 1 + } + + proto := config.Consensus[protocol.ConsensusCurrentVersion] + dbs, fn := dbOpenTest(b, false) + setDbLogging(b, dbs) + cleanup := func() { + cleanupTestDb(dbs, fn, false) + } + + tx, err := dbs.Wdb.Handle.Begin() + require.NoError(b, err) + _, err = accountsInit(tx, make(map[basics.Address]basics.AccountData), proto) + require.NoError(b, err) + err = tx.Commit() + require.NoError(b, err) + err = dbs.Wdb.SetSynchronousMode(context.Background(), db.SynchronousModeOff, false) + require.NoError(b, err) + + cnt := 0 + for batch := 0; batch <= batchCount; batch++ { + tx, err = dbs.Wdb.Handle.Begin() + require.NoError(b, err) + writer, err := makeAccountsSQLWriter(tx, false, false, true, false) + require.NoError(b, err) + for boxIdx := 0; boxIdx < totalBoxes/batchCount; boxIdx++ { + err = writer.upsertKvPair(fmt.Sprintf("%d", cnt), string(make([]byte, boxSize))) + require.NoError(b, err) + cnt++ + } + + err = tx.Commit() + require.NoError(b, err) + writer.close() + } + err = dbs.Wdb.SetSynchronousMode(context.Background(), db.SynchronousModeFull, true) + return dbs, cleanup, err +} + +func BenchmarkBoxDatabaseRead(b *testing.B) { + getBoxNamePermutation := func(totalBoxes int) []int { + rand.Seed(time.Now().UnixNano()) + boxNames := make([]int, totalBoxes) + for i := 0; i < totalBoxes; i++ { + boxNames[i] = i + } + rand.Shuffle(len(boxNames), func(x, y int) { boxNames[x], boxNames[y] = boxNames[y], boxNames[x] }) + return boxNames + } + + boxCnt := []int{10, 1000, 100000} + boxSizes := []int{2, 2048, 4 * 8096} + for _, totalBoxes := range boxCnt { + for _, boxSize := range boxSizes { + b.Run(fmt.Sprintf("totalBoxes=%d/boxSize=%d", totalBoxes, boxSize), func(b *testing.B) { + b.StopTimer() + + dbs, cleanup, err := initBoxDatabase(b, totalBoxes, boxSize) + require.NoError(b, err) + + boxNames := getBoxNamePermutation(totalBoxes) + lookupStmt, err := dbs.Wdb.Handle.Prepare("SELECT rnd, value FROM acctrounds LEFT JOIN kvstore ON key = ? WHERE id='acctbase';") + require.NoError(b, err) + var v sql.NullString + for i := 0; i < b.N; i++ { + var pv persistedKVData + boxName := boxNames[i%totalBoxes] + b.StartTimer() + err = lookupStmt.QueryRow([]byte(fmt.Sprintf("%d", boxName))).Scan(&pv.round, &v) + b.StopTimer() + require.NoError(b, err) + require.True(b, v.Valid) + } + + cleanup() + }) + } + } + + // test caching performance + lookbacks := []int{1, 32, 256, 2048} + for _, lookback := range lookbacks { + for _, boxSize := range boxSizes { + totalBoxes := 100000 + + b.Run(fmt.Sprintf("lookback=%d/boxSize=%d", lookback, boxSize), func(b *testing.B) { + b.StopTimer() + + dbs, cleanup, err := initBoxDatabase(b, totalBoxes, boxSize) + require.NoError(b, err) + + boxNames := getBoxNamePermutation(totalBoxes) + lookupStmt, err := dbs.Wdb.Handle.Prepare("SELECT rnd, value FROM acctrounds LEFT JOIN kvstore ON key = ? WHERE id='acctbase';") + require.NoError(b, err) + var v sql.NullString + for i := 0; i < b.N+lookback; i++ { + var pv persistedKVData + boxName := boxNames[i%totalBoxes] + err = lookupStmt.QueryRow([]byte(fmt.Sprintf("%d", boxName))).Scan(&pv.round, &v) + require.NoError(b, err) + require.True(b, v.Valid) + + // benchmark reading the potentially cached value that was read lookback boxes ago + if i >= lookback { + boxName = boxNames[(i-lookback)%totalBoxes] + b.StartTimer() + err = lookupStmt.QueryRow([]byte(fmt.Sprintf("%d", boxName))).Scan(&pv.round, &v) + b.StopTimer() + require.NoError(b, err) + require.True(b, v.Valid) + } + } + + cleanup() + }) + } + } +} + // TestAccountTopOnline ensures accountsOnlineTop return a right subset of accounts // from the history table. // Start with two online accounts A, B at round 1 @@ -3119,7 +3273,7 @@ func TestAccountOnlineQueries(t *testing.T) { err = accountsPutTotals(tx, totals, false) require.NoError(t, err) - updatedAccts, _, err := accountsNewRound(tx, updatesCnt, compactResourcesDeltas{}, nil, nil, proto, rnd) + updatedAccts, _, _, err := accountsNewRound(tx, updatesCnt, compactResourcesDeltas{}, nil, nil, proto, rnd) require.NoError(t, err) require.Equal(t, updatesCnt.len(), len(updatedAccts)) diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index a56b729617..1cba218fce 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -68,6 +68,15 @@ const baseResourcesPendingAccountsBufferSize = 100000 // is being flushed into the main base resources cache. const baseResourcesPendingAccountsWarnThreshold = 85000 +// baseKVPendingBufferSize defines the size of the base KVs pending buffer size. +// At the beginning of a new round, the entries from this buffer are being flushed into the base KVs map. +const baseKVPendingBufferSize = 5000 + +// baseKVPendingWarnThreshold defines the threshold at which the lruKV would generate a warning +// after we've surpassed a given pending kv size. The warning is being generated when the pending kv data +// is being flushed into the main base kv cache. +const baseKVPendingWarnThreshold = 4250 + // initializeCachesReadaheadBlocksStream defines how many block we're going to attempt to queue for the // initializeCaches method before it can process and store the account changes to disk. const initializeCachesReadaheadBlocksStream = 4 @@ -206,6 +215,9 @@ type accountUpdates struct { // baseResources stores the most recently used resources, at exactly dbRound baseResources lruResources + // baseKVs stores the most recently used KV, at exactly dbRound + baseKVs lruKV + // logAccountUpdatesMetrics is a flag for enable/disable metrics logging logAccountUpdatesMetrics bool @@ -310,6 +322,7 @@ func (au *accountUpdates) close() { } au.baseAccounts.prune(0) au.baseResources.prune(0) + au.baseKVs.prune(0) } func (au *accountUpdates) LookupResource(rnd basics.Round, addr basics.Address, aidx basics.CreatableIndex, ctype basics.CreatableType) (ledgercore.AccountResource, basics.Round, error) { @@ -374,7 +387,13 @@ func (au *accountUpdates) lookupKv(rnd basics.Round, key string, synchronized bo rnd = currentDbRound + basics.Round(currentDeltaLen) } - // OTHER LOOKUPS USE "base" caches here. + // check the baseKV cache + if pbd, has := au.baseKVs.read(key); has { + // we don't technically need this, since it's already in the baseKV, however, writing this over + // would ensure that we promote this field. + au.baseKVs.writePending(pbd, key) + return pbd.value, nil + } if synchronized { au.accountsMu.RUnlock() @@ -386,7 +405,15 @@ func (au *accountUpdates) lookupKv(rnd basics.Round, key string, synchronized bo // on-disk DB. persistedData, err := au.accountsq.lookupKeyValue(key) + if err != nil { + return nil, err + } + if persistedData.round == currentDbRound { + // if we read actual data return it. This includes deleted values + // where persistedData.value == nil to avoid unnecessary db lookups + // for deleted KVs. + au.baseKVs.writePending(persistedData, key) return persistedData.value, nil } @@ -504,6 +531,10 @@ func (au *accountUpdates) lookupKeysByPrefix(round basics.Round, keyPrefix strin needUnlock = false } + // NOTE: the kv cache isn't used here because the data structure doesn't support range + // queries. It may be preferable to increase the SQLite cache size if these reads become + // too slow. + // Finishing searching updates of this account in kvDeltas, keep going: use on-disk DB // to find the rest matching keys in DB. dbRound, _err := au.accountsq.lookupKeysByPrefix(keyPrefix, maxKeyNum, results, resultCount) @@ -923,6 +954,7 @@ func (au *accountUpdates) initializeFromDisk(l ledgerForTracker, lastBalancesRou au.baseAccounts.init(au.log, baseAccountsPendingAccountsBufferSize, baseAccountsPendingAccountsWarnThreshold) au.baseResources.init(au.log, baseResourcesPendingAccountsBufferSize, baseResourcesPendingAccountsWarnThreshold) + au.baseKVs.init(au.log, baseKVPendingBufferSize, baseKVPendingWarnThreshold) return } @@ -947,6 +979,7 @@ func (au *accountUpdates) newBlockImpl(blk bookkeeping.Block, delta ledgercore.S au.baseAccounts.flushPendingWrites() au.baseResources.flushPendingWrites() + au.baseKVs.flushPendingWrites() for i := 0; i < delta.Accts.Len(); i++ { addr, data := delta.Accts.GetByIdx(i) @@ -1001,6 +1034,8 @@ func (au *accountUpdates) newBlockImpl(blk bookkeeping.Block, delta ledgercore.S au.baseAccounts.prune(newBaseAccountSize) newBaseResourcesSize := (len(au.resources) + 1) + baseResourcesPendingAccountsBufferSize au.baseResources.prune(newBaseResourcesSize) + newBaseKVSize := (len(au.kvStore) + 1) + baseKVPendingBufferSize + au.baseKVs.prune(newBaseKVSize) } // lookupLatest returns the account data for a given address for the latest round. @@ -1626,7 +1661,7 @@ func (au *accountUpdates) commitRound(ctx context.Context, tx *sql.Tx, dcc *defe // the updates of the actual account data is done last since the accountsNewRound would modify the compactDeltas old values // so that we can update the base account back. - dcc.updatedPersistedAccounts, dcc.updatedPersistedResources, err = accountsNewRound(tx, dcc.compactAccountDeltas, dcc.compactResourcesDeltas, dcc.compactKvDeltas, dcc.compactCreatableDeltas, dcc.genesisProto, dbRound+basics.Round(offset)) + dcc.updatedPersistedAccounts, dcc.updatedPersistedResources, dcc.updatedPersistedKVs, err = accountsNewRound(tx, dcc.compactAccountDeltas, dcc.compactResourcesDeltas, dcc.compactKvDeltas, dcc.compactCreatableDeltas, dcc.genesisProto, dbRound+basics.Round(offset)) if err != nil { return err } @@ -1717,6 +1752,10 @@ func (au *accountUpdates) postCommit(ctx context.Context, dcc *deferredCommitCon } } + for key, persistedKV := range dcc.updatedPersistedKVs { + au.baseKVs.write(persistedKV, key) + } + for cidx, modCrt := range dcc.compactCreatableDeltas { cnt := modCrt.Ndeltas mcreat, ok := au.creatables[cidx] @@ -1846,6 +1885,7 @@ func (au *accountUpdates) vacuumDatabase(ctx context.Context) (err error) { // rowid are flushed. au.baseAccounts.prune(0) au.baseResources.prune(0) + au.baseKVs.prune(0) startTime := time.Now() vacuumExitCh := make(chan struct{}, 1) diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index 6bb6e7c8e2..7ac098ebea 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -1216,7 +1216,7 @@ func TestListCreatables(t *testing.T) { // sync with the database var updates compactAccountDeltas var resUpdates compactResourcesDeltas - _, _, err = accountsNewRound(tx, updates, resUpdates, nil, ctbsWithDeletes, proto, basics.Round(1)) + _, _, _, err = accountsNewRound(tx, updates, resUpdates, nil, ctbsWithDeletes, proto, basics.Round(1)) require.NoError(t, err) // nothing left in cache au.creatables = make(map[basics.CreatableIndex]ledgercore.ModifiedCreatable) @@ -1232,7 +1232,7 @@ func TestListCreatables(t *testing.T) { // ******* Results are obtained from the database and from the cache ******* // ******* Deletes are in the database and in the cache ******* // sync with the database. This has deletes synced to the database. - _, _, err = accountsNewRound(tx, updates, resUpdates, nil, au.creatables, proto, basics.Round(1)) + _, _, _, err = accountsNewRound(tx, updates, resUpdates, nil, au.creatables, proto, basics.Round(1)) require.NoError(t, err) // get new creatables in the cache. There will be deletes in the cache from the previous batch. au.creatables = randomCreatableSampling(3, ctbsList, randomCtbs, @@ -1357,6 +1357,183 @@ func TestBoxNamesByAppIDs(t *testing.T) { } } +func TestKVCache(t *testing.T) { + partitiontest.PartitionTest(t) + t.Parallel() + + initialBlocksCount := 1 + accts := make(map[basics.Address]basics.AccountData) + + protoParams := config.Consensus[protocol.ConsensusCurrentVersion] + ml := makeMockLedgerForTracker(t, true, initialBlocksCount, protocol.ConsensusCurrentVersion, + []map[basics.Address]basics.AccountData{accts}, + ) + defer ml.Close() + + conf := config.GetDefaultLocal() + au, _ := newAcctUpdates(t, ml, conf) + defer au.close() + + knownCreatables := make(map[basics.CreatableIndex]bool) + opts := auNewBlockOpts{ledgercore.AccountDeltas{}, protocol.ConsensusCurrentVersion, protoParams, knownCreatables} + + kvCnt := 1000 + kvsPerBlock := 100 + curKV := 0 + var currentRound basics.Round + currentDBRound := basics.Round(1) + + kvMap := make(map[string]string) + for i := 0; i < kvCnt; i++ { + kvMap[fmt.Sprintf("%d", i)] = fmt.Sprintf("value%d", i) + } + + // add kvsPerBlock KVs on each iteration. The first kvCnt/kvsPerBlock + // iterations produce a block with kvCnt kv manipulations. The last + // conf.MaxAcctLookback iterations are meant to verify the contents of the cache + // are correct after every kv containing block has been committed. + for i := 0; i < kvCnt/kvsPerBlock+int(conf.MaxAcctLookback); i++ { + currentRound = currentRound + 1 + kvMods := make(map[string]*string) + if i < kvCnt/kvsPerBlock { + for j := 0; j < kvsPerBlock; j++ { + name := fmt.Sprintf("%d", curKV) + curKV++ + val := kvMap[name] + kvMods[name] = &val + } + } + + auNewBlock(t, currentRound, au, accts, opts, kvMods) + auCommitSync(t, currentRound, au, ml) + + // ensure rounds + rnd := au.latest() + require.Equal(t, currentRound, rnd) + if uint64(currentRound) > conf.MaxAcctLookback { + require.Equal(t, basics.Round(uint64(currentRound)-conf.MaxAcctLookback), au.cachedDBRound) + } else { + require.Equal(t, basics.Round(0), au.cachedDBRound) + } + + // verify cache doesn't contain the new kvs until committed to DB. + for name := range kvMods { + _, has := au.baseKVs.read(name) + require.False(t, has) + } + + // verify commited kvs appear in the kv cache + for ; currentDBRound <= au.cachedDBRound; currentDBRound++ { + startKV := (currentDBRound - 1) * basics.Round(kvsPerBlock) + for j := 0; j < kvsPerBlock; j++ { + name := fmt.Sprintf("%d", uint64(startKV)+uint64(j)) + persistedValue, has := au.baseKVs.read(name) + require.True(t, has) + require.Equal(t, kvMap[name], *persistedValue.value) + } + } + } + + // updating inserted KVs + curKV = 0 + for i := 0; i < kvCnt/kvsPerBlock+int(conf.MaxAcctLookback); i++ { + currentRound = currentRound + 1 + + kvMods := make(map[string]*string) + if i < kvCnt/kvsPerBlock { + for j := 0; j < kvsPerBlock; j++ { + name := fmt.Sprintf("%d", curKV) + val := fmt.Sprintf("modified value%d", curKV) + kvMods[name] = &val + curKV++ + } + } + + auNewBlock(t, currentRound, au, accts, opts, kvMods) + auCommitSync(t, currentRound, au, ml) + + // ensure rounds + rnd := au.latest() + require.Equal(t, currentRound, rnd) + require.Equal(t, basics.Round(uint64(currentRound)-conf.MaxAcctLookback), au.cachedDBRound) + + // verify cache doesn't contain updated kv values that haven't been committed to db + if i < kvCnt/kvsPerBlock { + for name := range kvMods { + persistedValue, has := au.baseKVs.read(name) + require.True(t, has) + require.Equal(t, kvMap[name], *persistedValue.value) + } + } + + // verify commited updated kv values appear in the kv cache + for ; currentDBRound <= au.cachedDBRound; currentDBRound++ { + lookback := basics.Round(kvCnt/kvsPerBlock + int(conf.MaxAcctLookback) + 1) + if currentDBRound < lookback { + continue + } + + startKV := (currentDBRound - lookback) * basics.Round(kvsPerBlock) + for j := 0; j < kvsPerBlock; j++ { + name := fmt.Sprintf("%d", uint64(startKV)+uint64(j)) + persistedValue, has := au.baseKVs.read(name) + require.True(t, has) + expectedValue := fmt.Sprintf("modified value%s", name) + require.Equal(t, expectedValue, *persistedValue.value) + } + } + } + + // deleting KVs + curKV = 0 + for i := 0; i < kvCnt/kvsPerBlock+int(conf.MaxAcctLookback); i++ { + currentRound = currentRound + 1 + + kvMods := make(map[string]*string) + if i < kvCnt/kvsPerBlock { + for j := 0; j < kvsPerBlock; j++ { + name := fmt.Sprintf("%d", curKV) + kvMods[name] = nil + curKV++ + } + } + + auNewBlock(t, currentRound, au, accts, opts, kvMods) + auCommitSync(t, currentRound, au, ml) + + // ensure rounds + rnd := au.latest() + require.Equal(t, currentRound, rnd) + require.Equal(t, basics.Round(uint64(currentRound)-conf.MaxAcctLookback), au.cachedDBRound) + + // verify cache doesn't contain updated kv values that haven't been committed to db + if i < kvCnt/kvsPerBlock { + for name := range kvMods { + persistedValue, has := au.baseKVs.read(name) + require.True(t, has) + value := fmt.Sprintf("modified value%s", name) + require.Equal(t, value, *persistedValue.value) + } + } + + // verify commited updated kv values appear in the kv cache + for ; currentDBRound <= au.cachedDBRound; currentDBRound++ { + lookback := basics.Round(2*(kvCnt/kvsPerBlock+int(conf.MaxAcctLookback)) + 1) + if currentDBRound < lookback { + continue + } + + startKV := (currentDBRound - lookback) * basics.Round(kvsPerBlock) + for j := 0; j < kvsPerBlock; j++ { + name := fmt.Sprintf("%d", uint64(startKV)+uint64(j)) + persistedValue, has := au.baseKVs.read(name) + require.True(t, has) + require.True(t, persistedValue.value == nil) + } + } + } +} + func accountsAll(tx *sql.Tx) (bals map[basics.Address]basics.AccountData, err error) { rows, err := tx.Query("SELECT rowid, address, data FROM accountbase") if err != nil { @@ -1436,7 +1613,7 @@ func BenchmarkLargeMerkleTrieRebuild(b *testing.B) { } err := ml.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { - _, _, err = accountsNewRound(tx, updates, compactResourcesDeltas{}, nil, nil, proto, basics.Round(1)) + _, _, _, err = accountsNewRound(tx, updates, compactResourcesDeltas{}, nil, nil, proto, basics.Round(1)) return }) require.NoError(b, err) diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index 5285bf69fd..4a08d9e1f9 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -363,7 +363,7 @@ func BenchmarkLargeCatchpointDataWriting(b *testing.B) { i++ } - _, _, err = accountsNewRound(tx, updates, compactResourcesDeltas{}, nil, nil, proto, basics.Round(1)) + _, _, _, err = accountsNewRound(tx, updates, compactResourcesDeltas{}, nil, nil, proto, basics.Round(1)) if err != nil { return } diff --git a/ledger/lrukv.go b/ledger/lrukv.go new file mode 100644 index 0000000000..45f4f5027d --- /dev/null +++ b/ledger/lrukv.go @@ -0,0 +1,132 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package ledger + +import ( + "github.com/algorand/go-algorand/logging" +) + +//msgp:ignore cachedKVData +type cachedKVData struct { + persistedKVData + + // kv key + key string +} + +// lruKV provides a storage class for the most recently used kv data. +// It doesn't have any synchronization primitive on it's own and require to be +// syncronized by the caller. +type lruKV struct { + // kvList contain the list of persistedKVData, where the front ones are the most "fresh" + // and the ones on the back are the oldest. + kvList *persistedKVDataList + + // kvs provides fast access to the various elements in the list by using the key + kvs map[string]*persistedKVDataListNode + + // pendingKVs are used as a way to avoid taking a write-lock. When the caller needs to "materialize" these, + // it would call flushPendingWrites and these would be merged into the kvs/kvList + pendingKVs chan cachedKVData + + // log interface; used for logging the threshold event. + log logging.Logger + + // pendingWritesWarnThreshold is the threshold beyond we would write a warning for exceeding the number of pendingKVs entries + pendingWritesWarnThreshold int +} + +// init initializes the lruKV for use. +// thread locking semantics : write lock +func (m *lruKV) init(log logging.Logger, pendingWrites int, pendingWritesWarnThreshold int) { + m.kvList = newPersistedKVList().allocateFreeNodes(pendingWrites) + m.kvs = make(map[string]*persistedKVDataListNode, pendingWrites) + m.pendingKVs = make(chan cachedKVData, pendingWrites) + m.log = log + m.pendingWritesWarnThreshold = pendingWritesWarnThreshold +} + +// read the persistedKVData object that the lruKV has for the given key. +// thread locking semantics : read lock +func (m *lruKV) read(key string) (data persistedKVData, has bool) { + if el := m.kvs[key]; el != nil { + return el.Value.persistedKVData, true + } + return persistedKVData{}, false +} + +// flushPendingWrites flushes the pending writes to the main lruKV cache. +// thread locking semantics : write lock +func (m *lruKV) flushPendingWrites() { + pendingEntriesCount := len(m.pendingKVs) + if pendingEntriesCount >= m.pendingWritesWarnThreshold { + m.log.Warnf("lruKV: number of entries in pendingKVs(%d) exceed the warning threshold of %d", pendingEntriesCount, m.pendingWritesWarnThreshold) + } + for ; pendingEntriesCount > 0; pendingEntriesCount-- { + select { + case pendingKVData := <-m.pendingKVs: + m.write(pendingKVData.persistedKVData, pendingKVData.key) + default: + return + } + } +} + +// writePending write a single persistedKVData entry to the pendingKVs buffer. +// the function doesn't block, and in case of a buffer overflow the entry would not be added. +// thread locking semantics : no lock is required. +func (m *lruKV) writePending(kv persistedKVData, key string) { + select { + case m.pendingKVs <- cachedKVData{persistedKVData: kv, key: key}: + default: + } +} + +// write a single persistedKVData to the lruKV cache. +// when writing the entry, the round number would be used to determine if it's a newer +// version of what's already on the cache or not. In all cases, the entry is going +// to be promoted to the front of the list. +// thread locking semantics : write lock +func (m *lruKV) write(kvData persistedKVData, key string) { + if el := m.kvs[key]; el != nil { + // already exists; is it a newer ? + if el.Value.before(&kvData) { + // we update with a newer version. + el.Value = &cachedKVData{persistedKVData: kvData, key: key} + } + m.kvList.moveToFront(el) + } else { + // new entry. + m.kvs[key] = m.kvList.pushFront(&cachedKVData{persistedKVData: kvData, key: key}) + } +} + +// prune adjust the current size of the lruKV cache, by dropping the least +// recently used entries. +// thread locking semantics : write lock +func (m *lruKV) prune(newSize int) (removed int) { + for { + if len(m.kvs) <= newSize { + break + } + back := m.kvList.back() + delete(m.kvs, back.Value.key) + m.kvList.remove(back) + removed++ + } + return +} diff --git a/ledger/lrukv_test.go b/ledger/lrukv_test.go new file mode 100644 index 0000000000..8eebb1459c --- /dev/null +++ b/ledger/lrukv_test.go @@ -0,0 +1,240 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package ledger + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/test/partitiontest" +) + +func TestLRUBasicKV(t *testing.T) { + partitiontest.PartitionTest(t) + + var baseKV lruKV + baseKV.init(logging.TestingLog(t), 10, 5) + + kvNum := 50 + // write 50 KVs + for i := 0; i < kvNum; i++ { + kvValue := fmt.Sprintf("kv %d value", i) + kv := persistedKVData{ + value: &kvValue, + round: basics.Round(i), + } + baseKV.write(kv, fmt.Sprintf("key%d", i)) + } + + // verify that all these KVs are truly there. + for i := 0; i < kvNum; i++ { + kv, has := baseKV.read(fmt.Sprintf("key%d", i)) + require.True(t, has) + require.Equal(t, basics.Round(i), kv.round) + require.Equal(t, fmt.Sprintf("kv %d value", i), *(kv.value)) + } + + // verify expected missing entries + for i := kvNum; i < kvNum*2; i++ { + kv, has := baseKV.read(fmt.Sprintf("key%d", i)) + require.False(t, has) + require.Equal(t, persistedKVData{}, kv) + } + + baseKV.prune(kvNum / 2) + + // verify expected (missing/existing) entries + for i := 0; i < kvNum*2; i++ { + kv, has := baseKV.read(fmt.Sprintf("key%d", i)) + + if i >= kvNum/2 && i < kvNum { + // expected to have it. + require.True(t, has) + require.Equal(t, basics.Round(i), kv.round) + require.Equal(t, fmt.Sprintf("kv %d value", i), *(kv.value)) + } else { + require.False(t, has) + require.Equal(t, persistedKVData{}, kv) + } + } +} + +func TestLRUKVPendingWrites(t *testing.T) { + partitiontest.PartitionTest(t) + + var baseKV lruKV + kvNum := 250 + baseKV.init(logging.TestingLog(t), kvNum*2, kvNum) + + for i := 0; i < kvNum; i++ { + go func(i int) { + time.Sleep(time.Duration((crypto.RandUint64() % 50)) * time.Millisecond) + kvValue := fmt.Sprintf("kv %d value", i) + kv := persistedKVData{ + value: &kvValue, + round: basics.Round(i), + } + baseKV.writePending(kv, fmt.Sprintf("key%d", i)) + }(i) + } + testStarted := time.Now() + for { + baseKV.flushPendingWrites() + + // check if all kvs were loaded into "main" cache. + allKVsLoaded := true + for i := 0; i < kvNum; i++ { + _, has := baseKV.read(fmt.Sprintf("key%d", i)) + if !has { + allKVsLoaded = false + break + } + } + if allKVsLoaded { + break + } + if time.Since(testStarted).Seconds() > 20 { + require.Fail(t, "failed after waiting for 20 second") + } + // not yet, keep looping. + } +} + +type lruKVTestLogger struct { + logging.Logger + WarnfCallback func(string, ...interface{}) + warnMsgCount int +} + +func (cl *lruKVTestLogger) Warnf(s string, args ...interface{}) { + cl.warnMsgCount++ +} + +func TestLRUKVPendingWritesWarning(t *testing.T) { + partitiontest.PartitionTest(t) + + var baseKV lruKV + pendingWritesBuffer := 50 + pendingWritesThreshold := 40 + log := &lruKVTestLogger{Logger: logging.TestingLog(t)} + baseKV.init(log, pendingWritesBuffer, pendingWritesThreshold) + for j := 0; j < 50; j++ { + for i := 0; i < j; i++ { + kvValue := fmt.Sprintf("kv %d value", i) + kv := persistedKVData{ + value: &kvValue, + round: basics.Round(i), + } + baseKV.writePending(kv, fmt.Sprintf("key%d", i)) + } + baseKV.flushPendingWrites() + if j >= pendingWritesThreshold { + // expect a warning in the log + require.Equal(t, 1+j-pendingWritesThreshold, log.warnMsgCount) + } + } +} + +func TestLRUKVOmittedPendingWrites(t *testing.T) { + partitiontest.PartitionTest(t) + + var baseKV lruKV + pendingWritesBuffer := 50 + pendingWritesThreshold := 40 + log := &lruKVTestLogger{Logger: logging.TestingLog(t)} + baseKV.init(log, pendingWritesBuffer, pendingWritesThreshold) + + for i := 0; i < pendingWritesBuffer*2; i++ { + kvValue := fmt.Sprintf("kv %d value", i) + kv := persistedKVData{ + value: &kvValue, + round: basics.Round(i), + } + baseKV.writePending(kv, fmt.Sprintf("key%d", i)) + } + + baseKV.flushPendingWrites() + + // verify that all these kvs are truly there. + for i := 0; i < pendingWritesBuffer; i++ { + kv, has := baseKV.read(fmt.Sprintf("key%d", i)) + require.True(t, has) + require.Equal(t, basics.Round(i), kv.round) + require.Equal(t, fmt.Sprintf("kv %d value", i), *(kv.value)) + } + + // verify expected missing entries + for i := pendingWritesBuffer; i < pendingWritesBuffer*2; i++ { + kv, has := baseKV.read(fmt.Sprintf("key%d", i)) + require.False(t, has) + require.Equal(t, persistedKVData{}, kv) + } +} + +func BenchmarkLRUKVWrite(b *testing.B) { + numTestKV := 5000 + // there are 2500 kvs that overlap + fillerKVs := generatePersistedKVData(0, 97500) + kvs := generatePersistedKVData(97500-numTestKV/2, 97500+numTestKV/2) + + benchLruWriteKVs(b, fillerKVs, kvs) +} + +func benchLruWriteKVs(b *testing.B, fillerKVs []cachedKVData, kvs []cachedKVData) { + b.ResetTimer() + b.StopTimer() + var baseKV lruKV + // setting up the baseKV with a predefined cache size + baseKV.init(logging.TestingLog(b), baseKVPendingBufferSize, baseKVPendingWarnThreshold) + for i := 0; i < b.N; i++ { + baseKV = fillLRUKV(baseKV, fillerKVs) + + b.StartTimer() + fillLRUKV(baseKV, kvs) + b.StopTimer() + baseKV.prune(0) + } +} + +func fillLRUKV(baseKV lruKV, fillerKVs []cachedKVData) lruKV { + for _, entry := range fillerKVs { + baseKV.write(entry.persistedKVData, entry.key) + } + return baseKV +} + +func generatePersistedKVData(startRound, endRound int) []cachedKVData { + kvs := make([]cachedKVData, endRound-startRound) + for i := startRound; i < endRound; i++ { + kvValue := fmt.Sprintf("kv %d value", i) + + kvs[i-startRound] = cachedKVData{ + persistedKVData: persistedKVData{ + value: &kvValue, + round: basics.Round(i + startRound), + }, + key: fmt.Sprintf("key%d", i), + } + } + return kvs +} diff --git a/ledger/persistedkvs.go b/ledger/persistedkvs.go new file mode 100644 index 0000000000..34f3c36ecb --- /dev/null +++ b/ledger/persistedkvs.go @@ -0,0 +1,143 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package ledger + +// persistedKVDataList represents a doubly linked list. +// must initiate with newPersistedKVList. +type persistedKVDataList struct { + root persistedKVDataListNode // sentinel list element, only &root, root.prev, and root.next are used + freeList *persistedKVDataListNode // preallocated nodes location +} + +type persistedKVDataListNode struct { + // Next and previous pointers in the doubly-linked list of elements. + // To simplify the implementation, internally a list l is implemented + // as a ring, such that &l.root is both the next element of the last + // list element (l.Back()) and the previous element of the first list + // element (l.Front()). + next, prev *persistedKVDataListNode + + Value *cachedKVData +} + +func newPersistedKVList() *persistedKVDataList { + l := new(persistedKVDataList) + l.root.next = &l.root + l.root.prev = &l.root + // used as a helper but does not store value + l.freeList = new(persistedKVDataListNode) + + return l +} + +func (l *persistedKVDataList) insertNodeToFreeList(otherNode *persistedKVDataListNode) { + otherNode.next = l.freeList.next + otherNode.prev = nil + otherNode.Value = nil + + l.freeList.next = otherNode +} + +func (l *persistedKVDataList) getNewNode() *persistedKVDataListNode { + if l.freeList.next == nil { + return new(persistedKVDataListNode) + } + newNode := l.freeList.next + l.freeList.next = newNode.next + + return newNode +} + +func (l *persistedKVDataList) allocateFreeNodes(numAllocs int) *persistedKVDataList { + if l.freeList == nil { + return l + } + for i := 0; i < numAllocs; i++ { + l.insertNodeToFreeList(new(persistedKVDataListNode)) + } + + return l +} + +// Back returns the last element of list l or nil if the list is empty. +func (l *persistedKVDataList) back() *persistedKVDataListNode { + isEmpty := func(list *persistedKVDataList) bool { + // assumes we are inserting correctly to the list - using pushFront. + return list.root.next == &list.root + } + if isEmpty(l) { + return nil + } + return l.root.prev +} + +// remove removes e from l if e is an element of list l. +// It returns the element value e.Value. +// The element must not be nil. +func (l *persistedKVDataList) remove(e *persistedKVDataListNode) { + e.prev.next = e.next + e.next.prev = e.prev + e.next = nil // avoid memory leaks + e.prev = nil // avoid memory leaks + + l.insertNodeToFreeList(e) +} + +// pushFront inserts a new element e with value v at the front of list l and returns e. +func (l *persistedKVDataList) pushFront(v *cachedKVData) *persistedKVDataListNode { + newNode := l.getNewNode() + newNode.Value = v + return l.insertValue(newNode, &l.root) +} + +// insertValue inserts e after at, increments l.len, and returns e. +func (l *persistedKVDataList) insertValue(newNode *persistedKVDataListNode, at *persistedKVDataListNode) *persistedKVDataListNode { + n := at.next + at.next = newNode + newNode.prev = at + newNode.next = n + n.prev = newNode + + return newNode +} + +// moveToFront moves element e to the front of list l. +// If e is not an element of l, the list is not modified. +// The element must not be nil. +func (l *persistedKVDataList) moveToFront(e *persistedKVDataListNode) { + if l.root.next == e { + return + } + l.move(e, &l.root) +} + +// move moves e to next to at and returns e. +func (l *persistedKVDataList) move(e, at *persistedKVDataListNode) *persistedKVDataListNode { + if e == at { + return e + } + e.prev.next = e.next + e.next.prev = e.prev + + n := at.next + at.next = e + e.prev = at + e.next = n + n.prev = e + + return e +} diff --git a/ledger/persistedkvs_test.go b/ledger/persistedkvs_test.go new file mode 100644 index 0000000000..eb5ed9dff7 --- /dev/null +++ b/ledger/persistedkvs_test.go @@ -0,0 +1,175 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package ledger + +import ( + "testing" + + "github.com/algorand/go-algorand/test/partitiontest" +) + +func (l *persistedKVDataList) getRoot() dataListNode { + return &l.root +} + +func (l *persistedKVDataListNode) getNext() dataListNode { + // get rid of returning nil wrapped into an interface to let i = x.getNext(); i != nil work. + if l.next == nil { + return nil + } + return l.next +} + +func (l *persistedKVDataListNode) getPrev() dataListNode { + if l.prev == nil { + return nil + } + return l.prev +} + +// inspect that the list seems like the array +func checkListPointersBD(t *testing.T, l *persistedKVDataList, es []*persistedKVDataListNode) { + es2 := make([]dataListNode, len(es)) + for i, el := range es { + es2[i] = el + } + + checkListPointers(t, l, es2) +} + +func TestRemoveFromListBD(t *testing.T) { + partitiontest.PartitionTest(t) + l := newPersistedKVList() + e1 := l.pushFront(&cachedKVData{key: "key1"}) + e2 := l.pushFront(&cachedKVData{key: "key2"}) + e3 := l.pushFront(&cachedKVData{key: "key3"}) + checkListPointersBD(t, l, []*persistedKVDataListNode{e3, e2, e1}) + + l.remove(e2) + checkListPointersBD(t, l, []*persistedKVDataListNode{e3, e1}) + l.remove(e3) + checkListPointersBD(t, l, []*persistedKVDataListNode{e1}) +} + +func TestAddingNewNodeWithAllocatedFreeListBD(t *testing.T) { + partitiontest.PartitionTest(t) + l := newPersistedKVList().allocateFreeNodes(10) + checkListPointersBD(t, l, []*persistedKVDataListNode{}) + if countListSize(l.freeList) != 10 { + t.Errorf("free list did not allocate nodes") + return + } + // test elements + e1 := l.pushFront(&cachedKVData{key: "key1"}) + checkListPointersBD(t, l, []*persistedKVDataListNode{e1}) + + if countListSize(l.freeList) != 9 { + t.Errorf("free list did not provide a node on new list entry") + return + } +} + +func TestMultielementListPositioningBD(t *testing.T) { + partitiontest.PartitionTest(t) + l := newPersistedKVList() + checkListPointersBD(t, l, []*persistedKVDataListNode{}) + // test elements + e2 := l.pushFront(&cachedKVData{key: "key1"}) + e1 := l.pushFront(&cachedKVData{key: "key2"}) + e3 := l.pushFront(&cachedKVData{key: "key3"}) + e4 := l.pushFront(&cachedKVData{key: "key4"}) + e5 := l.pushFront(&cachedKVData{key: "key5"}) + + checkListPointersBD(t, l, []*persistedKVDataListNode{e5, e4, e3, e1, e2}) + + l.move(e4, e1) + checkListPointersBD(t, l, []*persistedKVDataListNode{e5, e3, e1, e4, e2}) + + l.remove(e5) + checkListPointersBD(t, l, []*persistedKVDataListNode{e3, e1, e4, e2}) + + l.move(e1, e4) // swap in middle + checkListPointersBD(t, l, []*persistedKVDataListNode{e3, e4, e1, e2}) + + l.moveToFront(e4) + checkListPointersBD(t, l, []*persistedKVDataListNode{e4, e3, e1, e2}) + + l.remove(e2) + checkListPointersBD(t, l, []*persistedKVDataListNode{e4, e3, e1}) + + l.moveToFront(e3) // move from middle + checkListPointersBD(t, l, []*persistedKVDataListNode{e3, e4, e1}) + + l.moveToFront(e1) // move from end + checkListPointersBD(t, l, []*persistedKVDataListNode{e1, e3, e4}) + + l.moveToFront(e1) // no movement + checkListPointersBD(t, l, []*persistedKVDataListNode{e1, e3, e4}) + + e2 = l.pushFront(&cachedKVData{key: "key2"}) + checkListPointersBD(t, l, []*persistedKVDataListNode{e2, e1, e3, e4}) + + l.remove(e3) // removing from middle + checkListPointersBD(t, l, []*persistedKVDataListNode{e2, e1, e4}) + + l.remove(e4) // removing from end + checkListPointersBD(t, l, []*persistedKVDataListNode{e2, e1}) + + l.move(e2, e1) // swapping between two elements + checkListPointersBD(t, l, []*persistedKVDataListNode{e1, e2}) + + l.remove(e1) // removing front + checkListPointersBD(t, l, []*persistedKVDataListNode{e2}) + + l.move(e2, l.back()) // swapping element with itself. + checkListPointersBD(t, l, []*persistedKVDataListNode{e2}) + + l.remove(e2) // remove last one + checkListPointersBD(t, l, []*persistedKVDataListNode{}) +} + +func TestSingleElementListPositioningBD(t *testing.T) { + partitiontest.PartitionTest(t) + l := newPersistedKVList() + checkListPointersBD(t, l, []*persistedKVDataListNode{}) + e := l.pushFront(&cachedKVData{key: "key1"}) + checkListPointersBD(t, l, []*persistedKVDataListNode{e}) + l.moveToFront(e) + checkListPointersBD(t, l, []*persistedKVDataListNode{e}) + l.remove(e) + checkListPointersBD(t, l, []*persistedKVDataListNode{}) +} + +func TestRemovedNodeShouldBeMovedToFreeListBD(t *testing.T) { + partitiontest.PartitionTest(t) + l := newPersistedKVList() + e1 := l.pushFront(&cachedKVData{key: "key1"}) + e2 := l.pushFront(&cachedKVData{key: "key2"}) + + checkListPointersBD(t, l, []*persistedKVDataListNode{e2, e1}) + + e := l.back() + l.remove(e) + + for i := l.freeList.next; i != nil; i = i.next { + if i == e { + // stopping the tst with good results: + return + } + } + t.Error("expected the removed node to appear at the freelist") +} diff --git a/ledger/persistedresources_list.go b/ledger/persistedresources_list.go index 57b0cdc44c..baa7ac351a 100644 --- a/ledger/persistedresources_list.go +++ b/ledger/persistedresources_list.go @@ -17,7 +17,7 @@ package ledger // persistedResourcesDataList represents a doubly linked list. -// must initiate with newPersistedAccountList. +// must initiate with newPersistedResourcesList. type persistedResourcesDataList struct { root persistedResourcesDataListNode // sentinel list element, only &root, root.prev, and root.next are used freeList *persistedResourcesDataListNode // preallocated nodes location diff --git a/ledger/tracker.go b/ledger/tracker.go index b51fc666e6..4d0e7582df 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -248,6 +248,7 @@ type deferredCommitContext struct { updatedPersistedAccounts []persistedAccountData updatedPersistedResources map[basics.Address][]persistedResourcesData + updatedPersistedKVs map[string]persistedKVData compactOnlineAccountDeltas compactOnlineAccountDeltas updatedPersistedOnlineAccounts []persistedOnlineAccountData