Skip to content

Commit

Permalink
ledger: fix LookupLatest when the ledger advances (#3769)
Browse files Browse the repository at this point in the history
## Summary

LookupLatest combines base account and resources data, and in some cases it cannot
determine reliably how many resources account has, therefore it reads data directly from the DB.
The DB might advance causing cachedDBRound (and base account data) and  resourceDbRound
to be out of sync, and a retry needed.
If on retry baseAccount data is outdated, foundAccount flag incorrectly contains a value from
a previous iteration causing completion by checkDone because of empty ad (ledgercore.AccountData).
Having ad and foundAccount synchronized on retry eliminates the problem.

In addition checkDone improved for scenarios when accounts have only own assets.

## Test Plan

Added new test
  • Loading branch information
algorandskiy authored Mar 15, 2022
1 parent 5978c3d commit 14544e2
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 7 deletions.
17 changes: 10 additions & 7 deletions ledger/acctupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account
var resourceDbRound basics.Round
withRewards := true

foundAccount := false
var foundAccount bool
var ad ledgercore.AccountData

var foundResources map[basics.CreatableIndex]basics.Round
Expand Down Expand Up @@ -917,7 +917,8 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account
return true
}
// not possible to know how many resources rows to look for: totals conceal possibly overlapping assets/apps
if (ad.TotalAssetParams != 0 && ad.TotalAssets != 0) ||
// but asset params also assume asset holding
if (ad.TotalAssetParams != 0 && ad.TotalAssets != 0 && ad.TotalAssetParams != ad.TotalAssets) ||
(ad.TotalAppParams != 0 && ad.TotalAppLocalStates != 0) {
return false
}
Expand All @@ -929,6 +930,8 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account
needToFind += uint64(ad.TotalAssets) // look for N asset holdings
} else if ad.TotalAssets == 0 { // not a holder of assets
needToFind += uint64(ad.TotalAssetParams) // look for N asset params
} else if ad.TotalAssetParams == ad.TotalAssets {
needToFind += uint64(ad.TotalAssetParams)
} else {
return false
}
Expand Down Expand Up @@ -957,6 +960,7 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account
return basics.AccountData{}, basics.Round(0), basics.MicroAlgos{}, fmt.Errorf("offset != len(au.deltas): %w", ErrLookupLatestResources)
}
ad = ledgercore.AccountData{}
foundAccount = false
foundResources = make(map[basics.CreatableIndex]basics.Round)
resourceCount = 0

Expand All @@ -977,17 +981,16 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account
}

// check if we've had this address modified in the past rounds. ( i.e. if it's in the deltas )
macct, indeltas := au.accounts[addr]
if indeltas {
if macct, has := au.accounts[addr]; has {
// This is the most recent round, so we can
// use a cache of the most recent account state.
ad = macct.data
foundAccount = true
} else if macct, has := au.baseAccounts.read(addr); has && macct.round == currentDbRound {
} else if pad, has := au.baseAccounts.read(addr); has && pad.round == currentDbRound {
// we don't technically need this, since it's already in the baseAccounts, however, writing this over
// would ensure that we promote this field.
au.baseAccounts.writePending(macct)
ad = macct.accountData.GetLedgerCoreAccountData()
au.baseAccounts.writePending(pad)
ad = pad.accountData.GetLedgerCoreAccountData()
foundAccount = true
}

Expand Down
202 changes: 202 additions & 0 deletions ledger/acctupdates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2348,3 +2348,205 @@ func TestAcctUpdatesLookupRetry(t *testing.T) {
require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd)
})
}

// TestAcctUpdatesLookupLatestCacheRetry simulates a situation when base account and resources are in a cache but
// account updates advances while calling lookupLatest
// The idea of the test:
// - create some base accounts and an account with resources
// - set that account to be in the caches
// - force cached round to be one less than the real DB round
// - call lookupLatest, ensure it blocks
// - advance lookupLatest and check the content is actual
func TestAcctUpdatesLookupLatestCacheRetry(t *testing.T) {
partitiontest.PartitionTest(t)

accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
pooldata := basics.AccountData{}
pooldata.MicroAlgos.Raw = 100 * 1000 * 1000 * 1000 * 1000
pooldata.Status = basics.NotParticipating
accts[0][testPoolAddr] = pooldata

sinkdata := basics.AccountData{}
sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000
sinkdata.Status = basics.NotParticipating
accts[0][testSinkAddr] = sinkdata

testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestAcctUpdatesLookupLatestCacheRetry")
protoParams := config.Consensus[protocol.ConsensusCurrentVersion]
protoParams.MaxBalLookback = 2
protoParams.SeedLookback = 1
protoParams.SeedRefreshInterval = 1
config.Consensus[testProtocolVersion] = protoParams
defer func() {
delete(config.Consensus, testProtocolVersion)
}()

ml := makeMockLedgerForTracker(t, true, 1, testProtocolVersion, accts)
defer ml.Close()

conf := config.GetDefaultLocal()
au := newAcctUpdates(t, ml, conf, ".")
defer au.close()

var addr1 basics.Address
for addr := range accts[0] {
if addr != testSinkAddr && addr != testPoolAddr {
addr1 = addr
break
}
}

aidx1 := basics.AssetIndex(1)
aidx2 := basics.AssetIndex(2)
knownCreatables := make(map[basics.CreatableIndex]bool)

commitSync := func(rnd basics.Round) {
_, maxLookback := au.committedUpTo(rnd)
dcc := &deferredCommitContext{
deferredCommitRange: deferredCommitRange{
lookback: maxLookback,
},
}
cdr := &dcc.deferredCommitRange
cdr = au.produceCommittingTask(rnd, ml.trackers.dbRound, cdr)
if cdr != nil {
func() {
dcc.deferredCommitRange = *cdr
ml.trackers.accountsWriting.Add(1)
defer ml.trackers.accountsWriting.Done()

// do not take any locks since all operations are synchronous
newBase := basics.Round(dcc.offset) + dcc.oldBase
dcc.newBase = newBase

err := au.prepareCommit(dcc)
require.NoError(t, err)
err = ml.trackers.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) {
err = au.commitRound(ctx, tx, dcc)
if err != nil {
return err
}
err = updateAccountsRound(tx, newBase)
return err
})
require.NoError(t, err)
ml.trackers.dbRound = newBase
au.postCommit(ml.trackers.ctx, dcc)
au.postCommitUnlocked(ml.trackers.ctx, dcc)
}()

}
}

newBlock := func(au *accountUpdates, rnd basics.Round, base map[basics.Address]basics.AccountData, updates ledgercore.AccountDeltas) {
rewardLevel := uint64(0)
prevTotals, err := au.Totals(basics.Round(rnd - 1))
require.NoError(t, err)

newTotals := ledgertesting.CalculateNewRoundAccountTotals(t, updates, rewardLevel, protoParams, base, prevTotals)

blk := bookkeeping.Block{
BlockHeader: bookkeeping.BlockHeader{
Round: basics.Round(rnd),
},
}
blk.RewardsLevel = rewardLevel
blk.CurrentProtocol = testProtocolVersion
delta := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, updates.Len(), 0)
delta.Accts.MergeAccounts(updates)
delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables)
delta.Totals = newTotals

au.newBlock(blk, delta)
}

// the test 1 requires 2 blocks with different resource state, au requires MaxBalLookback block to start persisting
for i := basics.Round(1); i <= basics.Round(protoParams.MaxBalLookback+2); i++ {
var updates ledgercore.AccountDeltas

// add data
if i == 1 {
updates.Upsert(addr1, ledgercore.AccountData{AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 1000000}, TotalAssetParams: 1, TotalAssets: 2}})
updates.UpsertAssetResource(addr1, aidx1, ledgercore.AssetParamsDelta{Params: &basics.AssetParams{Total: 100}}, ledgercore.AssetHoldingDelta{Holding: &basics.AssetHolding{Amount: 100}})
updates.UpsertAssetResource(addr1, aidx2, ledgercore.AssetParamsDelta{}, ledgercore.AssetHoldingDelta{Holding: &basics.AssetHolding{Amount: 200}})
}

base := accts[i-1]
newAccts := applyPartialDeltas(base, updates)
accts = append(accts, newAccts)

// prepare block
newBlock(au, i, base, updates)

// commit changes synchroniously
commitSync(i)
}

// ensure rounds
rnd := au.latest()
require.Equal(t, basics.Round(protoParams.MaxBalLookback+2), rnd)
require.Equal(t, basics.Round(2), au.cachedDBRound)
oldCachedDBRound := au.cachedDBRound

// simulate the following state
// 1. addr1 and in baseAccounts and its round is less than addr1's data in baseResources
// 2. au.cachedDBRound is less than actual DB round
delete(au.accounts, addr1)
au.cachedDBRound--

pad, ok := au.baseAccounts.read(addr1)
require.True(t, ok)
pad.round = au.cachedDBRound
au.baseAccounts.write(pad)

prd, ok := au.baseResources.read(addr1, basics.CreatableIndex(aidx1))
require.True(t, ok)
prd.round = oldCachedDBRound
au.baseResources.write(prd, addr1)
prd, ok = au.baseResources.read(addr1, basics.CreatableIndex(aidx2))
require.True(t, ok)
prd.round = oldCachedDBRound
au.baseResources.write(prd, addr1)

var ad basics.AccountData
var err error

// lookupLatest blocks on waiting new round. There is no reliable way to say it is blocked,
// so run it in a goroutine and query it to ensure it is blocked.
var wg sync.WaitGroup
wg.Add(1)
done := make(chan struct{})
go func() {
ad, _, _, err = au.lookupLatest(addr1)
close(done)
wg.Done()
}()

// wait to ensure lookupLatest is stuck
maxIterations := 10
i := 0
for i < maxIterations {
select {
case <-done:
require.Fail(t, "lookupLatest returns without waiting for new block")
default:
i++
time.Sleep(10 * time.Millisecond)
}
}

// give it a new block and restore the original cachedDBRound value
au.accountsMu.Lock()
au.cachedDBRound = oldCachedDBRound
au.accountsMu.Unlock()
newBlock(au, rnd+1, accts[rnd], ledgercore.AccountDeltas{})
commitSync(rnd + 1)

wg.Wait()

require.NoError(t, err)
require.Equal(t, uint64(1000000), ad.MicroAlgos.Raw)
require.Equal(t, uint64(100), ad.AssetParams[aidx1].Total)
require.Equal(t, uint64(100), ad.Assets[aidx1].Amount)
require.Equal(t, uint64(200), ad.Assets[aidx2].Amount)
}

0 comments on commit 14544e2

Please sign in to comment.