-
Notifications
You must be signed in to change notification settings - Fork 493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ledger: fix LookupLatest when the ledger advances #3769
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -886,7 +886,7 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account | |
var resourceDbRound basics.Round | ||
withRewards := true | ||
|
||
foundAccount := false | ||
var foundAccount bool | ||
var ad ledgercore.AccountData | ||
|
||
var foundResources map[basics.CreatableIndex]basics.Round | ||
|
@@ -917,7 +917,8 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account | |
return true | ||
} | ||
// not possible to know how many resources rows to look for: totals conceal possibly overlapping assets/apps | ||
if (ad.TotalAssetParams != 0 && ad.TotalAssets != 0) || | ||
// but asset params also assume asset holding | ||
if (ad.TotalAssetParams != 0 && ad.TotalAssets != 0 && ad.TotalAssetParams != ad.TotalAssets) || | ||
(ad.TotalAppParams != 0 && ad.TotalAppLocalStates != 0) { | ||
return false | ||
} | ||
|
@@ -929,6 +930,8 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account | |
needToFind += uint64(ad.TotalAssets) // look for N asset holdings | ||
} else if ad.TotalAssets == 0 { // not a holder of assets | ||
needToFind += uint64(ad.TotalAssetParams) // look for N asset params | ||
} else if ad.TotalAssetParams == ad.TotalAssets { | ||
needToFind += uint64(ad.TotalAssetParams) | ||
} else { | ||
return false | ||
} | ||
|
@@ -957,6 +960,7 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account | |
return basics.AccountData{}, basics.Round(0), basics.MicroAlgos{}, fmt.Errorf("offset != len(au.deltas): %w", ErrLookupLatestResources) | ||
} | ||
ad = ledgercore.AccountData{} | ||
foundAccount = false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for finding this... |
||
foundResources = make(map[basics.CreatableIndex]basics.Round) | ||
resourceCount = 0 | ||
|
||
|
@@ -977,17 +981,16 @@ func (au *accountUpdates) lookupLatest(addr basics.Address) (data basics.Account | |
} | ||
|
||
// check if we've had this address modified in the past rounds. ( i.e. if it's in the deltas ) | ||
macct, indeltas := au.accounts[addr] | ||
if indeltas { | ||
if macct, has := au.accounts[addr]; has { | ||
// This is the most recent round, so we can | ||
// use a cache of the most recent account state. | ||
ad = macct.data | ||
foundAccount = true | ||
} else if macct, has := au.baseAccounts.read(addr); has && macct.round == currentDbRound { | ||
} else if pad, has := au.baseAccounts.read(addr); has && pad.round == currentDbRound { | ||
// we don't technically need this, since it's already in the baseAccounts, however, writing this over | ||
// would ensure that we promote this field. | ||
au.baseAccounts.writePending(macct) | ||
ad = macct.accountData.GetLedgerCoreAccountData() | ||
au.baseAccounts.writePending(pad) | ||
ad = pad.accountData.GetLedgerCoreAccountData() | ||
foundAccount = true | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2348,3 +2348,205 @@ func TestAcctUpdatesLookupRetry(t *testing.T) { | |
require.GreaterOrEqualf(t, uint64(validThrough), uint64(rnd), "validThrough: %v rnd :%v", validThrough, rnd) | ||
}) | ||
} | ||
|
||
// TestAcctUpdatesLookupLatestCacheRetry simulates a situation when base account and resources are in a cache but | ||
// account updates advances while calling lookupLatest | ||
// The idea of the test: | ||
// - create some base accounts and an account with resources | ||
// - set that account to be in the caches | ||
// - force cached round to be one less than the real DB round | ||
// - call lookupLatest, ensure it blocks | ||
// - advance lookupLatest and check the content is actual | ||
func TestAcctUpdatesLookupLatestCacheRetry(t *testing.T) { | ||
partitiontest.PartitionTest(t) | ||
|
||
accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)} | ||
pooldata := basics.AccountData{} | ||
pooldata.MicroAlgos.Raw = 100 * 1000 * 1000 * 1000 * 1000 | ||
pooldata.Status = basics.NotParticipating | ||
accts[0][testPoolAddr] = pooldata | ||
|
||
sinkdata := basics.AccountData{} | ||
sinkdata.MicroAlgos.Raw = 1000 * 1000 * 1000 * 1000 | ||
sinkdata.Status = basics.NotParticipating | ||
accts[0][testSinkAddr] = sinkdata | ||
|
||
testProtocolVersion := protocol.ConsensusVersion("test-protocol-TestAcctUpdatesLookupLatestCacheRetry") | ||
protoParams := config.Consensus[protocol.ConsensusCurrentVersion] | ||
protoParams.MaxBalLookback = 2 | ||
protoParams.SeedLookback = 1 | ||
protoParams.SeedRefreshInterval = 1 | ||
config.Consensus[testProtocolVersion] = protoParams | ||
defer func() { | ||
delete(config.Consensus, testProtocolVersion) | ||
}() | ||
|
||
ml := makeMockLedgerForTracker(t, true, 1, testProtocolVersion, accts) | ||
defer ml.Close() | ||
|
||
conf := config.GetDefaultLocal() | ||
au := newAcctUpdates(t, ml, conf, ".") | ||
defer au.close() | ||
|
||
var addr1 basics.Address | ||
for addr := range accts[0] { | ||
if addr != testSinkAddr && addr != testPoolAddr { | ||
addr1 = addr | ||
break | ||
} | ||
} | ||
|
||
aidx1 := basics.AssetIndex(1) | ||
aidx2 := basics.AssetIndex(2) | ||
knownCreatables := make(map[basics.CreatableIndex]bool) | ||
|
||
commitSync := func(rnd basics.Round) { | ||
_, maxLookback := au.committedUpTo(rnd) | ||
dcc := &deferredCommitContext{ | ||
deferredCommitRange: deferredCommitRange{ | ||
lookback: maxLookback, | ||
}, | ||
} | ||
cdr := &dcc.deferredCommitRange | ||
cdr = au.produceCommittingTask(rnd, ml.trackers.dbRound, cdr) | ||
if cdr != nil { | ||
func() { | ||
dcc.deferredCommitRange = *cdr | ||
ml.trackers.accountsWriting.Add(1) | ||
defer ml.trackers.accountsWriting.Done() | ||
|
||
// do not take any locks since all operations are synchronous | ||
newBase := basics.Round(dcc.offset) + dcc.oldBase | ||
dcc.newBase = newBase | ||
|
||
err := au.prepareCommit(dcc) | ||
require.NoError(t, err) | ||
err = ml.trackers.dbs.Wdb.Atomic(func(ctx context.Context, tx *sql.Tx) (err error) { | ||
err = au.commitRound(ctx, tx, dcc) | ||
if err != nil { | ||
return err | ||
} | ||
err = updateAccountsRound(tx, newBase) | ||
return err | ||
}) | ||
require.NoError(t, err) | ||
ml.trackers.dbRound = newBase | ||
au.postCommit(ml.trackers.ctx, dcc) | ||
au.postCommitUnlocked(ml.trackers.ctx, dcc) | ||
}() | ||
|
||
} | ||
} | ||
|
||
newBlock := func(au *accountUpdates, rnd basics.Round, base map[basics.Address]basics.AccountData, updates ledgercore.AccountDeltas) { | ||
rewardLevel := uint64(0) | ||
prevTotals, err := au.Totals(basics.Round(rnd - 1)) | ||
require.NoError(t, err) | ||
|
||
newTotals := ledgertesting.CalculateNewRoundAccountTotals(t, updates, rewardLevel, protoParams, base, prevTotals) | ||
|
||
blk := bookkeeping.Block{ | ||
BlockHeader: bookkeeping.BlockHeader{ | ||
Round: basics.Round(rnd), | ||
}, | ||
} | ||
blk.RewardsLevel = rewardLevel | ||
blk.CurrentProtocol = testProtocolVersion | ||
delta := ledgercore.MakeStateDelta(&blk.BlockHeader, 0, updates.Len(), 0) | ||
delta.Accts.MergeAccounts(updates) | ||
delta.Creatables = creatablesFromUpdates(base, updates, knownCreatables) | ||
delta.Totals = newTotals | ||
|
||
au.newBlock(blk, delta) | ||
} | ||
|
||
// the test 1 requires 2 blocks with different resource state, au requires MaxBalLookback block to start persisting | ||
for i := basics.Round(1); i <= basics.Round(protoParams.MaxBalLookback+2); i++ { | ||
var updates ledgercore.AccountDeltas | ||
|
||
// add data | ||
if i == 1 { | ||
updates.Upsert(addr1, ledgercore.AccountData{AccountBaseData: ledgercore.AccountBaseData{MicroAlgos: basics.MicroAlgos{Raw: 1000000}, TotalAssetParams: 1, TotalAssets: 2}}) | ||
updates.UpsertAssetResource(addr1, aidx1, ledgercore.AssetParamsDelta{Params: &basics.AssetParams{Total: 100}}, ledgercore.AssetHoldingDelta{Holding: &basics.AssetHolding{Amount: 100}}) | ||
updates.UpsertAssetResource(addr1, aidx2, ledgercore.AssetParamsDelta{}, ledgercore.AssetHoldingDelta{Holding: &basics.AssetHolding{Amount: 200}}) | ||
} | ||
|
||
base := accts[i-1] | ||
newAccts := applyPartialDeltas(base, updates) | ||
accts = append(accts, newAccts) | ||
|
||
// prepare block | ||
newBlock(au, i, base, updates) | ||
|
||
// commit changes synchroniously | ||
commitSync(i) | ||
} | ||
|
||
// ensure rounds | ||
rnd := au.latest() | ||
require.Equal(t, basics.Round(protoParams.MaxBalLookback+2), rnd) | ||
require.Equal(t, basics.Round(2), au.cachedDBRound) | ||
oldCachedDBRound := au.cachedDBRound | ||
|
||
// simulate the following state | ||
// 1. addr1 and in baseAccounts and its round is less than addr1's data in baseResources | ||
// 2. au.cachedDBRound is less than actual DB round | ||
delete(au.accounts, addr1) | ||
au.cachedDBRound-- | ||
|
||
pad, ok := au.baseAccounts.read(addr1) | ||
require.True(t, ok) | ||
pad.round = au.cachedDBRound | ||
au.baseAccounts.write(pad) | ||
|
||
prd, ok := au.baseResources.read(addr1, basics.CreatableIndex(aidx1)) | ||
require.True(t, ok) | ||
prd.round = oldCachedDBRound | ||
au.baseResources.write(prd, addr1) | ||
prd, ok = au.baseResources.read(addr1, basics.CreatableIndex(aidx2)) | ||
require.True(t, ok) | ||
prd.round = oldCachedDBRound | ||
au.baseResources.write(prd, addr1) | ||
|
||
var ad basics.AccountData | ||
var err error | ||
|
||
// lookupLatest blocks on waiting new round. There is no reliable way to say it is blocked, | ||
// so run it in a goroutine and query it to ensure it is blocked. | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
done := make(chan struct{}) | ||
go func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had another implementation with timer a bit simpler but this looks a bit more robust. Any suggestions on how to check lookupLatest has been blocked at There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you'd have to change accountsReadCond to use an interface and create a mock (or wrapped) sync.Cond? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about adding some code into |
||
ad, _, _, err = au.lookupLatest(addr1) | ||
close(done) | ||
wg.Done() | ||
}() | ||
|
||
// wait to ensure lookupLatest is stuck | ||
maxIterations := 10 | ||
i := 0 | ||
for i < maxIterations { | ||
select { | ||
case <-done: | ||
require.Fail(t, "lookupLatest returns without waiting for new block") | ||
default: | ||
i++ | ||
time.Sleep(10 * time.Millisecond) | ||
} | ||
} | ||
|
||
// give it a new block and restore the original cachedDBRound value | ||
au.accountsMu.Lock() | ||
au.cachedDBRound = oldCachedDBRound | ||
au.accountsMu.Unlock() | ||
newBlock(au, rnd+1, accts[rnd], ledgercore.AccountDeltas{}) | ||
commitSync(rnd + 1) | ||
|
||
wg.Wait() | ||
|
||
require.NoError(t, err) | ||
require.Equal(t, uint64(1000000), ad.MicroAlgos.Raw) | ||
require.Equal(t, uint64(100), ad.AssetParams[aidx1].Total) | ||
require.Equal(t, uint64(100), ad.Assets[aidx1].Amount) | ||
require.Equal(t, uint64(200), ad.Assets[aidx2].Amount) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The two are equivilent..