Skip to content

Commit

Permalink
Eval prefetch (#1816)
Browse files Browse the repository at this point in the history
Improve speed of BlockEvaluator eval() by pre-fetching account data, decreasing latency at various points during block accounting.

Co-authored-by: Tsachi Herman <[email protected]>
  • Loading branch information
brianolson and tsachiherman authored Jan 27, 2021
1 parent a7652e0 commit e54bf93
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 26 deletions.
7 changes: 4 additions & 3 deletions ledger/appcow.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ func (cb *roundCowState) DelKey(addr basics.Address, aidx basics.AppIndex, globa
// MakeDebugBalances creates a ledger suitable for dryrun and debugger
func MakeDebugBalances(l ledgerForCowBase, round basics.Round, proto protocol.ConsensusVersion, prevTimestamp int64) apply.Balances {
base := &roundCowBase{
l: l,
rnd: round - 1,
proto: config.Consensus[proto],
l: l,
rnd: round - 1,
proto: config.Consensus[proto],
accounts: make(map[basics.Address]basics.AccountData),
}

hdr := bookkeeping.BlockHeader{
Expand Down
14 changes: 7 additions & 7 deletions ledger/catchpointwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ type encodedBalanceRecord struct {
type CatchpointFileHeader struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`

Version uint64 `codec:"version"`
BalancesRound basics.Round `codec:"balancesRound"`
BlocksRound basics.Round `codec:"blocksRound"`
Version uint64 `codec:"version"`
BalancesRound basics.Round `codec:"balancesRound"`
BlocksRound basics.Round `codec:"blocksRound"`
Totals ledgercore.AccountTotals `codec:"accountTotals"`
TotalAccounts uint64 `codec:"accountsCount"`
TotalChunks uint64 `codec:"chunksCount"`
Catchpoint string `codec:"catchpoint"`
BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"`
TotalAccounts uint64 `codec:"accountsCount"`
TotalChunks uint64 `codec:"chunksCount"`
Catchpoint string `codec:"catchpoint"`
BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"`
}

type catchpointFileBalancesChunk struct {
Expand Down
92 changes: 80 additions & 12 deletions ledger/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/algorand/go-deadlock"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
Expand Down Expand Up @@ -62,15 +65,41 @@ type roundCowBase struct {

// The current protocol consensus params.
proto config.ConsensusParams

// The accounts that we're already accessed during this round evaluation. This is a caching
// buffer used to avoid looking up the same account data more than once during a single evaluator
// execution. The AccountData is always an historical one, then therefore won't be changing.
// The underlying (accountupdates) infrastucture may provide additional cross-round caching which
// are beyond the scope of this cache.
// The account data store here is always the account data without the rewards.
accounts map[basics.Address]basics.AccountData

// accountsMu is the accounts read-write mutex, used to syncronize the access ot the accounts map.
accountsMu deadlock.RWMutex
}

func (x *roundCowBase) getCreator(cidx basics.CreatableIndex, ctype basics.CreatableType) (basics.Address, bool, error) {
return x.l.GetCreatorForRound(x.rnd, cidx, ctype)
}

func (x *roundCowBase) lookup(addr basics.Address) (acctData basics.AccountData, err error) {
acctData, _, err = x.l.LookupWithoutRewards(x.rnd, addr)
return acctData, err
// lookup returns the non-rewarded account data for the provided account address. It uses the internal per-round cache
// first, and if it cannot find it there, it would defer to the underlaying implementation.
// note that errors in accounts data retrivals are not cached as these typically cause the transaction evaluation to fail.
func (x *roundCowBase) lookup(addr basics.Address) (basics.AccountData, error) {
x.accountsMu.RLock()
if accountData, found := x.accounts[addr]; found {
x.accountsMu.RUnlock()
return accountData, nil
}
x.accountsMu.RUnlock()

accountData, _, err := x.l.LookupWithoutRewards(x.rnd, addr)
if err == nil {
x.accountsMu.Lock()
x.accounts[addr] = accountData
x.accountsMu.Unlock()
}
return accountData, err
}

func (x *roundCowBase) checkDup(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error {
Expand Down Expand Up @@ -362,8 +391,9 @@ func startEvaluator(l ledgerForEvaluator, hdr bookkeeping.BlockHeader, paysetHin
// the block at this round below, so underflow will be caught.
// If we are not validating, we must have previously checked
// an agreement.Certificate attesting that hdr is valid.
rnd: hdr.Round - 1,
proto: proto,
rnd: hdr.Round - 1,
proto: proto,
accounts: make(map[basics.Address]basics.AccountData),
}

eval := &BlockEvaluator{
Expand Down Expand Up @@ -1090,23 +1120,34 @@ func (validator *evalTxValidator) run() {

// used by Ledger.Validate() Ledger.AddBlock() Ledger.trackerEvalVerified()(accountUpdates.loadFromDisk())
//
// Validate: eval(ctx, l, blk, true, txcache, executionPool)
// AddBlock: eval(context.Background(), l, blk, false, txcache, nil)
// tracker: eval(context.Background(), l, blk, false, txcache, nil)
func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, validate bool, txcache verify.VerifiedTransactionCache, executionPool execpool.BacklogPool) (ledgercore.StateDelta, error) {
// Validate: eval(ctx, l, blk, true, txcache, executionPool, true)
// AddBlock: eval(context.Background(), l, blk, false, txcache, nil, true)
// tracker: eval(context.Background(), l, blk, false, txcache, nil, false)
func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, validate bool, txcache verify.VerifiedTransactionCache, executionPool execpool.BacklogPool, usePrefetch bool) (ledgercore.StateDelta, error) {
eval, err := startEvaluator(l, blk.BlockHeader, len(blk.Payset), validate, false)
if err != nil {
return ledgercore.StateDelta{}, err
}

validationCtx, validationCancel := context.WithCancel(ctx)
var wg sync.WaitGroup
defer func() {
validationCancel()
wg.Wait()
}()

// If validationCtx or underlying ctx are Done, end prefetch
if usePrefetch {
wg.Add(1)
go prefetchThread(validationCtx, eval.state.lookupParent, blk.Payset, &wg)
}

// Next, transactions
paysetgroups, err := blk.DecodePaysetGroups()
if err != nil {
return ledgercore.StateDelta{}, err
}
var txvalidator evalTxValidator
validationCtx, validationCancel := context.WithCancel(ctx)
defer validationCancel()
if validate {
_, ok := config.Consensus[blk.CurrentProtocol]
if !ok {
Expand Down Expand Up @@ -1170,12 +1211,39 @@ func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, vali
return eval.state.deltas(), nil
}

func prefetchThread(ctx context.Context, state roundCowParent, payset []transactions.SignedTxnInBlock, wg *sync.WaitGroup) {
defer wg.Done()
maybelookup := func(addr basics.Address) {
if addr.IsZero() {
return
}
state.lookup(addr)
}
for _, stxn := range payset {
select {
case <-ctx.Done():
return
default:
}
state.lookup(stxn.Txn.Sender)
maybelookup(stxn.Txn.Receiver)
maybelookup(stxn.Txn.CloseRemainderTo)
maybelookup(stxn.Txn.AssetSender)
maybelookup(stxn.Txn.AssetReceiver)
maybelookup(stxn.Txn.AssetCloseTo)
maybelookup(stxn.Txn.FreezeAccount)
for _, xa := range stxn.Txn.Accounts {
maybelookup(xa)
}
}
}

// Validate uses the ledger to validate block blk as a candidate next block.
// It returns an error if blk is not the expected next block, or if blk is
// not a valid block (e.g., it has duplicate transactions, overspends some
// account, etc).
func (l *Ledger) Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ValidatedBlock, error) {
delta, err := eval(ctx, l, blk, true, l.verifiedTxnCache, executionPool)
delta, err := eval(ctx, l, blk, true, l.verifiedTxnCache, executionPool, true)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion ledger/eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func benchmarkBlockEvaluator(b *testing.B, inMem bool, withCrypto bool) {
if withCrypto {
_, err = l2.Validate(context.Background(), validatedBlock.blk, backlogPool)
} else {
_, err = eval(context.Background(), l2, validatedBlock.blk, false, nil, nil)
_, err = eval(context.Background(), l2, validatedBlock.blk, false, nil, nil, true)
}
require.NoError(b, err)
}
Expand Down
4 changes: 2 additions & 2 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (l *Ledger) BlockCert(rnd basics.Round) (blk bookkeeping.Block, cert agreem
func (l *Ledger) AddBlock(blk bookkeeping.Block, cert agreement.Certificate) error {
// passing nil as the executionPool is ok since we've asking the evaluator to skip verification.

updates, err := eval(context.Background(), l, blk, false, l.verifiedTxnCache, nil)
updates, err := eval(context.Background(), l, blk, false, l.verifiedTxnCache, nil, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -647,7 +647,7 @@ func (l *Ledger) trackerLog() logging.Logger {
// evaluator to shortcut the "main" ledger ( i.e. this struct ) and avoid taking the trackers lock a second time.
func (l *Ledger) trackerEvalVerified(blk bookkeeping.Block, accUpdatesLedger ledgerForEvaluator) (ledgercore.StateDelta, error) {
// passing nil as the executionPool is ok since we've asking the evaluator to skip verification.
return eval(context.Background(), accUpdatesLedger, blk, false, l.verifiedTxnCache, nil)
return eval(context.Background(), accUpdatesLedger, blk, false, l.verifiedTxnCache, nil, false)
}

// IsWritingCatchpointFile returns true when a catchpoint file is being generated. The function is used by the catchup service
Expand Down
2 changes: 1 addition & 1 deletion ledger/ledger_perf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func benchmarkFullBlocks(params testParams, b *testing.B) {
vc := verify.GetMockedCache(true)
b.ResetTimer()
for _, blk := range blocks {
_, err = eval(context.Background(), l1, blk, true, vc, nil)
_, err = eval(context.Background(), l1, blk, true, vc, nil, true)
require.NoError(b, err)
err = l1.AddBlock(blk, cert)
require.NoError(b, err)
Expand Down

0 comments on commit e54bf93

Please sign in to comment.