Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eval prefetch #1816

Merged
merged 27 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
034ea80
Add a cache for the roundCowBase
tsachiherman Sep 17, 2020
5f52560
Merge branch 'master' into tsachi/cacheRoundCowBase
tsachiherman Sep 19, 2020
0f03560
Merge branch 'master' into tsachi/cacheRoundCowBase
tsachiherman Sep 22, 2020
6454304
Merge branch 'master' into tsachi/cacheRoundCowBase
tsachiherman Sep 24, 2020
440a37d
Merge branch 'master' into tsachi/cacheRoundCowBase
tsachiherman Sep 24, 2020
86b0582
Add lock
tsachiherman Sep 24, 2020
9cd876e
Merge branch 'master' into tsachi/cacheRoundCowBase
tsachiherman Oct 1, 2020
1c7ba23
Merge branch 'master' into tsachi/cacheRoundCowBase
tsachiherman Oct 13, 2020
a8b26be
Merge branch 'master' into tsachi/cacheRoundCowBase
tsachiherman Oct 27, 2020
0df75f1
Merge branch 'master' into tsachi/cacheRoundCowBase
tsachiherman Nov 2, 2020
88e345f
Merge branch 'master' into tsachi/cacheRoundCowBase
tsachiherman Nov 23, 2020
f87f92f
Merge branch 'master' into tsachi/cacheRoundCowBase
tsachiherman Dec 2, 2020
f7aa6e4
Merge remote-tracking branch 'tsachi/tsachi/cacheRoundCowBase' into e…
brianolson Jan 8, 2021
071fe30
prefetch accounts at start of eval
brianolson Jan 8, 2021
5a93b3b
better context
brianolson Jan 12, 2021
d6446d1
Merge remote-tracking branch 'origin/master' into eval_prefetch
brianolson Jan 14, 2021
78984b5
Merge remote-tracking branch 'origin/master' into eval_prefetch
brianolson Jan 14, 2021
08a83ba
fix usage of amended roundCowBase
brianolson Jan 14, 2021
b4cada2
add non-prefetch option to eval(); mutex around deadlock.Opts.Disable
brianolson Jan 15, 2021
f79b3d7
Merge remote-tracking branch 'origin/master' into eval_prefetch
brianolson Jan 15, 2021
915b606
Merge remote-tracking branch 'origin/master' into eval_prefetch
brianolson Jan 21, 2021
ffc43a1
fmt
brianolson Jan 22, 2021
77054ac
This reverts commit ffc43a18e1536c8ab290e3332f24fd6b194a7061.
brianolson Jan 22, 2021
2ece811
TestArchivalFromNonArchival is incompatible with `go test -race`
brianolson Jan 25, 2021
2816c78
fmt
brianolson Jan 25, 2021
3a3fec8
Merge remote-tracking branch 'origin/master' into eval_prefetch
brianolson Jan 26, 2021
74643f0
wait for prefetchThread to complete before returning from eval()
brianolson Jan 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions ledger/appcow.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ func (cb *roundCowState) DelKey(addr basics.Address, aidx basics.AppIndex, globa
// MakeDebugBalances creates a ledger suitable for dryrun and debugger
func MakeDebugBalances(l ledgerForCowBase, round basics.Round, proto protocol.ConsensusVersion, prevTimestamp int64) apply.Balances {
base := &roundCowBase{
l: l,
rnd: round - 1,
proto: config.Consensus[proto],
l: l,
rnd: round - 1,
proto: config.Consensus[proto],
accounts: make(map[basics.Address]basics.AccountData),
}

hdr := bookkeeping.BlockHeader{
Expand Down
14 changes: 7 additions & 7 deletions ledger/catchpointwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ type encodedBalanceRecord struct {
type CatchpointFileHeader struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`

Version uint64 `codec:"version"`
BalancesRound basics.Round `codec:"balancesRound"`
BlocksRound basics.Round `codec:"blocksRound"`
Version uint64 `codec:"version"`
BalancesRound basics.Round `codec:"balancesRound"`
BlocksRound basics.Round `codec:"blocksRound"`
Totals ledgercore.AccountTotals `codec:"accountTotals"`
TotalAccounts uint64 `codec:"accountsCount"`
TotalChunks uint64 `codec:"chunksCount"`
Catchpoint string `codec:"catchpoint"`
BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"`
TotalAccounts uint64 `codec:"accountsCount"`
TotalChunks uint64 `codec:"chunksCount"`
Catchpoint string `codec:"catchpoint"`
BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"`
}

type catchpointFileBalancesChunk struct {
Expand Down
92 changes: 80 additions & 12 deletions ledger/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
Expand Down Expand Up @@ -62,15 +65,41 @@ type roundCowBase struct {

// The current protocol consensus params.
proto config.ConsensusParams

// The accounts that we're already accessed during this round evaluation. This is a caching
// buffer used to avoid looking up the same account data more than once during a single evaluator
// execution. The AccountData is always an historical one, then therefore won't be changing.
// The underlying (accountupdates) infrastucture may provide additional cross-round caching which
// are beyond the scope of this cache.
// The account data store here is always the account data without the rewards.
accounts map[basics.Address]basics.AccountData

// accountsMu is the accounts read-write mutex, used to syncronize the access ot the accounts map.
accountsMu deadlock.RWMutex
}

func (x *roundCowBase) getCreator(cidx basics.CreatableIndex, ctype basics.CreatableType) (basics.Address, bool, error) {
return x.l.GetCreatorForRound(x.rnd, cidx, ctype)
}

func (x *roundCowBase) lookup(addr basics.Address) (acctData basics.AccountData, err error) {
acctData, _, err = x.l.LookupWithoutRewards(x.rnd, addr)
return acctData, err
// lookup returns the non-rewarded account data for the provided account address. It uses the internal per-round cache
// first, and if it cannot find it there, it would defer to the underlaying implementation.
// note that errors in accounts data retrivals are not cached as these typically cause the transaction evaluation to fail.
func (x *roundCowBase) lookup(addr basics.Address) (basics.AccountData, error) {
x.accountsMu.RLock()
if accountData, found := x.accounts[addr]; found {
x.accountsMu.RUnlock()
return accountData, nil
}
x.accountsMu.RUnlock()

accountData, _, err := x.l.LookupWithoutRewards(x.rnd, addr)
if err == nil {
x.accountsMu.Lock()
x.accounts[addr] = accountData
x.accountsMu.Unlock()
}
return accountData, err
}

func (x *roundCowBase) checkDup(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error {
Expand Down Expand Up @@ -362,8 +391,9 @@ func startEvaluator(l ledgerForEvaluator, hdr bookkeeping.BlockHeader, paysetHin
// the block at this round below, so underflow will be caught.
// If we are not validating, we must have previously checked
// an agreement.Certificate attesting that hdr is valid.
rnd: hdr.Round - 1,
proto: proto,
rnd: hdr.Round - 1,
proto: proto,
accounts: make(map[basics.Address]basics.AccountData),
}

eval := &BlockEvaluator{
Expand Down Expand Up @@ -1090,23 +1120,34 @@ func (validator *evalTxValidator) run() {

// used by Ledger.Validate() Ledger.AddBlock() Ledger.trackerEvalVerified()(accountUpdates.loadFromDisk())
//
// Validate: eval(ctx, l, blk, true, txcache, executionPool)
// AddBlock: eval(context.Background(), l, blk, false, txcache, nil)
// tracker: eval(context.Background(), l, blk, false, txcache, nil)
func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, validate bool, txcache verify.VerifiedTransactionCache, executionPool execpool.BacklogPool) (ledgercore.StateDelta, error) {
// Validate: eval(ctx, l, blk, true, txcache, executionPool, true)
// AddBlock: eval(context.Background(), l, blk, false, txcache, nil, true)
// tracker: eval(context.Background(), l, blk, false, txcache, nil, false)
func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, validate bool, txcache verify.VerifiedTransactionCache, executionPool execpool.BacklogPool, usePrefetch bool) (ledgercore.StateDelta, error) {
eval, err := startEvaluator(l, blk.BlockHeader, len(blk.Payset), validate, false)
if err != nil {
return ledgercore.StateDelta{}, err
}

validationCtx, validationCancel := context.WithCancel(ctx)
var wg sync.WaitGroup
defer func() {
validationCancel()
wg.Wait()
}()

// If validationCtx or underlying ctx are Done, end prefetch
if usePrefetch {
wg.Add(1)
go prefetchThread(validationCtx, eval.state.lookupParent, blk.Payset, &wg)
}

// Next, transactions
paysetgroups, err := blk.DecodePaysetGroups()
if err != nil {
return ledgercore.StateDelta{}, err
}
var txvalidator evalTxValidator
validationCtx, validationCancel := context.WithCancel(ctx)
defer validationCancel()
if validate {
_, ok := config.Consensus[blk.CurrentProtocol]
if !ok {
Expand Down Expand Up @@ -1170,12 +1211,39 @@ func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, vali
return eval.state.deltas(), nil
}

func prefetchThread(ctx context.Context, state roundCowParent, payset []transactions.SignedTxnInBlock, wg *sync.WaitGroup) {
defer wg.Done()
maybelookup := func(addr basics.Address) {
if addr.IsZero() {
return
}
state.lookup(addr)
}
for _, stxn := range payset {
select {
case <-ctx.Done():
return
default:
}
state.lookup(stxn.Txn.Sender)
maybelookup(stxn.Txn.Receiver)
maybelookup(stxn.Txn.CloseRemainderTo)
maybelookup(stxn.Txn.AssetSender)
maybelookup(stxn.Txn.AssetReceiver)
maybelookup(stxn.Txn.AssetCloseTo)
maybelookup(stxn.Txn.FreezeAccount)
for _, xa := range stxn.Txn.Accounts {
maybelookup(xa)
}
}
}

// Validate uses the ledger to validate block blk as a candidate next block.
// It returns an error if blk is not the expected next block, or if blk is
// not a valid block (e.g., it has duplicate transactions, overspends some
// account, etc).
func (l *Ledger) Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ValidatedBlock, error) {
delta, err := eval(ctx, l, blk, true, l.verifiedTxnCache, executionPool)
delta, err := eval(ctx, l, blk, true, l.verifiedTxnCache, executionPool, true)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion ledger/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func benchmarkBlockEvaluator(b *testing.B, inMem bool, withCrypto bool) {
if withCrypto {
_, err = l2.Validate(context.Background(), validatedBlock.blk, backlogPool)
} else {
_, err = eval(context.Background(), l2, validatedBlock.blk, false, nil, nil)
_, err = eval(context.Background(), l2, validatedBlock.blk, false, nil, nil, true)
}
require.NoError(b, err)
}
Expand Down
4 changes: 2 additions & 2 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (l *Ledger) BlockCert(rnd basics.Round) (blk bookkeeping.Block, cert agreem
func (l *Ledger) AddBlock(blk bookkeeping.Block, cert agreement.Certificate) error {
// passing nil as the executionPool is ok since we've asking the evaluator to skip verification.

updates, err := eval(context.Background(), l, blk, false, l.verifiedTxnCache, nil)
updates, err := eval(context.Background(), l, blk, false, l.verifiedTxnCache, nil, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -647,7 +647,7 @@ func (l *Ledger) trackerLog() logging.Logger {
// evaluator to shortcut the "main" ledger ( i.e. this struct ) and avoid taking the trackers lock a second time.
func (l *Ledger) trackerEvalVerified(blk bookkeeping.Block, accUpdatesLedger ledgerForEvaluator) (ledgercore.StateDelta, error) {
// passing nil as the executionPool is ok since we've asking the evaluator to skip verification.
return eval(context.Background(), accUpdatesLedger, blk, false, l.verifiedTxnCache, nil)
return eval(context.Background(), accUpdatesLedger, blk, false, l.verifiedTxnCache, nil, false)
}

// IsWritingCatchpointFile returns true when a catchpoint file is being generated. The function is used by the catchup service
Expand Down
2 changes: 1 addition & 1 deletion ledger/ledger_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func benchmarkFullBlocks(params testParams, b *testing.B) {
vc := verify.GetMockedCache(true)
b.ResetTimer()
for _, blk := range blocks {
_, err = eval(context.Background(), l1, blk, true, vc, nil)
_, err = eval(context.Background(), l1, blk, true, vc, nil, true)
require.NoError(b, err)
err = l1.AddBlock(blk, cert)
require.NoError(b, err)
Expand Down