From c720cb4bb51ede585c4917accbc5443eafdef6bc Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Mon, 14 Mar 2022 18:00:21 -0400 Subject: [PATCH] ledger: fix LookupLatest when the ledger advances --- ledger/acctupdates.go | 17 ++-- ledger/acctupdates_test.go | 202 +++++++++++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+), 7 deletions(-) diff --git a/ledger/acctupdates.go b/ledger/acctupdates.go index 693328181b..c6ff9cc846 100644 --- a/ledger/acctupdates.go +++ b/ledger/acctupdates.go @@ -886,7 +886,7 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account var resourceDbRound basics.Round withRewards := true - foundAccount := false + var foundAccount bool var ad ledgercore.AccountData var foundResources map[basics.CreatableIndex]basics.Round @@ -917,7 +917,8 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account return true } // not possible to know how many resources rows to look for: totals conceal possibly overlapping assets/apps - if (ad.TotalAssetParams != 0 && ad.TotalAssets != 0) || + // but asset params also assume asset holding + if (ad.TotalAssetParams != 0 && ad.TotalAssets != 0 && ad.TotalAssetParams != ad.TotalAssets) || (ad.TotalAppParams != 0 && ad.TotalAppLocalStates != 0) { return false } @@ -929,6 +930,8 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account needToFind += uint64(ad.TotalAssets) // look for N asset holdings } else if ad.TotalAssets == 0 { // not a holder of assets needToFind += uint64(ad.TotalAssetParams) // look for N asset params + } else if ad.TotalAssetParams == ad.TotalAssets { + needToFind += uint64(ad.TotalAssetParams) } else { return false } @@ -957,6 +960,7 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account return basics.AccountData{}, basics.Round(0), basics.MicroAlgos{}, fmt.Errorf("offset != len(au.deltas): %w", ErrLookupLatestResources) } ad = ledgercore.AccountData{} + foundAccount = false foundResources = make(map[basics.CreatableIndex]basics.Round) resourceCount = 0 @@ -977,17 +981,16 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account } // check if we've had this address modified in the past rounds. ( i.e. if it's in the deltas ) - macct, indeltas := au.accounts[addr] - if indeltas { + if macct, has := au.accounts[addr]; has { // This is the most recent round, so we can // use a cache of the most recent account state. ad = macct.data foundAccount = true - } else if macct, has := au.baseAccounts.read(addr); has && macct.round == currentDbRound { + } else if pad, has := au.baseAccounts.read(addr); has && pad.round == currentDbRound { // we don't technically need this, since it's already in the baseAccounts, however, writing this over // would ensure that we promote this field. - au.baseAccounts.writePending(macct) - ad = macct.accountData.GetLedgerCoreAccountData() + au.baseAccounts.writePending(pad) + ad = pad.accountData.GetLedgerCoreAccountData() foundAccount = true } diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index fa0400bb4e..e2c577ee2f 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -2348,3 +2348,205 @@ func TestAcctUpdatesLookupRetry(t *testing.T) { require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd) }) } + +// TestAcctUpdatesLookupLatestCacheRetry simulates a situation when base account and resources are in a cache but +// account updates advances while calling lookupLatest +// The idea of the test: +// - create some base accounts and an account with resources +// - set that account to be in the caches +// - force cached round to be one less than the real DB round +// - call lookupLatest, ensure it blocks +// - advance lookupLatest and check the content is actual +func TestAcctUpdatesLookupLatestCacheRetry(t *testing.T) { + partitiontest.PartitionTest(t) + + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} + pooldata := basics.AccountData{} + pooldata.MicroAlgos.Raw = 100 * 1000 * 1000 * 1000 * 1000 + pooldata.Status = basics.NotParticipating + accts[0][testPoolAddr] = pooldata + + sinkdata := basics.AccountData{} + sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 + sinkdata.Status = basics.NotParticipating + accts[0][testSinkAddr] = sinkdata + + testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestAcctUpdatesLookupLatestCacheRetry") + protoParams := config.Consensus[protocol.ConsensusCurrentVersion] + protoParams.MaxBalLookback = 2 + protoParams.SeedLookback = 1 + protoParams.SeedRefreshInterval = 1 + config.Consensus[testProtocolVersion] = protoParams + defer func() { + delete(config.Consensus, testProtocolVersion) + }() + + ml := makeMockLedgerForTracker(t, true, 1, testProtocolVersion, accts) + defer ml.Close() + + conf := config.GetDefaultLocal() + au := newAcctUpdates(t, ml, conf, ".") + defer au.close() + + var addr1 basics.Address + for addr := range accts[0] { + if addr != testSinkAddr && addr != testPoolAddr { + addr1 = addr + break + } + } + + aidx1 := basics.AssetIndex(1) + aidx2 := basics.AssetIndex(2) + knownCreatables := make(map[basics.CreatableIndex]bool) + + commitSync := func(rnd basics.Round) { + _, maxLookback := au.committedUpTo(rnd) + dcc := &deferredCommitContext{ + deferredCommitRange: deferredCommitRange{ + lookback: maxLookback, + }, + } + cdr := &dcc.deferredCommitRange + cdr = au.produceCommittingTask(rnd, ml.trackers.dbRound, cdr) + if cdr != nil { + func() { + dcc.deferredCommitRange = *cdr + ml.trackers.accountsWriting.Add(1) + defer ml.trackers.accountsWriting.Done() + + // do not take any locks since all operations are synchronous + newBase := basics.Round(dcc.offset) + dcc.oldBase + dcc.newBase = newBase + + err := au.prepareCommit(dcc) + require.NoError(t, err) + err = ml.trackers.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { + err = au.commitRound(ctx, tx, dcc) + if err != nil { + return err + } + err = updateAccountsRound(tx, newBase) + return err + }) + require.NoError(t, err) + ml.trackers.dbRound = newBase + au.postCommit(ml.trackers.ctx, dcc) + au.postCommitUnlocked(ml.trackers.ctx, dcc) + }() + + } + } + + newBlock := func(au *accountUpdates, rnd basics.Round, base map[basics.Address]basics.AccountData, updates ledgercore.AccountDeltas) { + rewardLevel := uint64(0) + prevTotals, err := au.Totals(basics.Round(rnd - 1)) + require.NoError(t, err) + + newTotals := ledgertesting.CalculateNewRoundAccountTotals(t, updates, rewardLevel, protoParams, base, prevTotals) + + blk := bookkeeping.Block{ + BlockHeader: bookkeeping.BlockHeader{ + Round: basics.Round(rnd), + }, + } + blk.RewardsLevel = rewardLevel + blk.CurrentProtocol = testProtocolVersion + delta := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, updates.Len(), 0) + delta.Accts.MergeAccounts(updates) + delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables) + delta.Totals = newTotals + + au.newBlock(blk, delta) + } + + // the test 1 requires 2 blocks with different resource state, au requires MaxBalLookback block to start persisting + for i := basics.Round(1); i <= basics.Round(protoParams.MaxBalLookback+2); i++ { + var updates ledgercore.AccountDeltas + + // add data + if i == 1 { + updates.Upsert(addr1, ledgercore.AccountData{AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 1000000}, TotalAssetParams: 1, TotalAssets: 2}}) + updates.UpsertAssetResource(addr1, aidx1, ledgercore.AssetParamsDelta{Params: &basics.AssetParams{Total: 100}}, ledgercore.AssetHoldingDelta{Holding: &basics.AssetHolding{Amount: 100}}) + updates.UpsertAssetResource(addr1, aidx2, ledgercore.AssetParamsDelta{}, ledgercore.AssetHoldingDelta{Holding: &basics.AssetHolding{Amount: 200}}) + } + + base := accts[i-1] + newAccts := applyPartialDeltas(base, updates) + accts = append(accts, newAccts) + + // prepare block + newBlock(au, i, base, updates) + + // commit changes synchroniously + commitSync(i) + } + + // ensure rounds + rnd := au.latest() + require.Equal(t, basics.Round(protoParams.MaxBalLookback+2), rnd) + require.Equal(t, basics.Round(2), au.cachedDBRound) + oldCachedDBRound := au.cachedDBRound + + // simulate the following state + // 1. addr1 and in baseAccounts and its round is less than addr1's data in baseResources + // 2. au.cachedDBRound is less than actual DB round + delete(au.accounts, addr1) + au.cachedDBRound-- + + pad, ok := au.baseAccounts.read(addr1) + require.True(t, ok) + pad.round = au.cachedDBRound + au.baseAccounts.write(pad) + + prd, ok := au.baseResources.read(addr1, basics.CreatableIndex(aidx1)) + require.True(t, ok) + prd.round = oldCachedDBRound + au.baseResources.write(prd, addr1) + prd, ok = au.baseResources.read(addr1, basics.CreatableIndex(aidx2)) + require.True(t, ok) + prd.round = oldCachedDBRound + au.baseResources.write(prd, addr1) + + var ad basics.AccountData + var err error + + // lookupLatest blocks on waiting new round. There is no reliable way to say it is blocked, + // so run it in a goroutine and query it to ensure it is blocked. + var wg sync.WaitGroup + wg.Add(1) + done := make(chan struct{}) + go func() { + ad, _, _, err = au.lookupLatest(addr1) + close(done) + wg.Done() + }() + + // wait to ensure lookupLatest is stuck + maxIterations := 10 + i := 0 + for i < maxIterations { + select { + case <-done: + require.Fail(t, "lookupLatest returns without waiting for new block") + default: + i++ + time.Sleep(10 * time.Millisecond) + } + } + + // give it a new block and restore the original cachedDBRound value + au.accountsMu.Lock() + au.cachedDBRound = oldCachedDBRound + au.accountsMu.Unlock() + newBlock(au, rnd+1, accts[rnd], ledgercore.AccountDeltas{}) + commitSync(rnd + 1) + + wg.Wait() + + require.NoError(t, err) + require.Equal(t, uint64(1000000), ad.MicroAlgos.Raw) + require.Equal(t, uint64(100), ad.AssetParams[aidx1].Total) + require.Equal(t, uint64(100), ad.Assets[aidx1].Amount) + require.Equal(t, uint64(200), ad.Assets[aidx2].Amount) +}