From 034ea804f84e1bfd6b5dbc230f6df1b02ec1b754 Mon Sep 17 00:00:00 2001 From: Tsachi Herman Date: Thu, 17 Sep 2020 13:22:17 -0400 Subject: [PATCH 01/11] Add a cache for the roundCowBase --- ledger/eval.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/ledger/eval.go b/ledger/eval.go index 092e9b3646..de51adac48 100644 --- a/ledger/eval.go +++ b/ledger/eval.go @@ -61,14 +61,32 @@ type roundCowBase struct { // The current protocol consensus params. proto config.ConsensusParams + + // The accounts that we're already accessed during this round evaluation. This is a caching + // buffer used to avoid looking up the same account data more than once during a single evaluator + // execution. The AccountData is always an historical one, then therefore won't be changing. + // The underlying (accountupdates) infrastucture may provide additional cross-round caching which + // are beyond the scope of this cache. + // The account data store here is always the account data without the rewards. + accounts map[basics.Address]basics.AccountData } func (x *roundCowBase) getCreator(cidx basics.CreatableIndex, ctype basics.CreatableType) (basics.Address, bool, error) { return x.l.GetCreatorForRound(x.rnd, cidx, ctype) } +// lookup returns the non-rewarded account data for the provided account address. It uses the internal per-round cache +// first, and if it cannot find it there, it would defer to the underlaying implementation. +// note that errors in accounts data retrivals are not cached as these typically cause the transaction evaluation to fail. func (x *roundCowBase) lookup(addr basics.Address) (basics.AccountData, error) { - return x.l.LookupWithoutRewards(x.rnd, addr) + if accountData, found := x.accounts[addr]; found { + return accountData, nil + } + accountData, err := x.l.LookupWithoutRewards(x.rnd, addr) + if err == nil { + x.accounts[addr] = accountData + } + return accountData, err } func (x *roundCowBase) isDup(firstValid, lastValid basics.Round, txid transactions.Txid, txl txlease) (bool, error) { @@ -211,8 +229,9 @@ func startEvaluator(l ledgerForEvaluator, hdr bookkeeping.BlockHeader, paysetHin // the block at this round below, so underflow will be caught. // If we are not validating, we must have previously checked // an agreement.Certificate attesting that hdr is valid. - rnd: hdr.Round - 1, - proto: proto, + rnd: hdr.Round - 1, + proto: proto, + accounts: make(map[basics.Address]basics.AccountData), } eval := &BlockEvaluator{ From 86b05829c3e8af33ab5d20a4ccbd161ec18a7457 Mon Sep 17 00:00:00 2001 From: Tsachi Herman Date: Thu, 24 Sep 2020 15:53:30 -0400 Subject: [PATCH 02/11] Add lock --- ledger/eval.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/ledger/eval.go b/ledger/eval.go index 6a91472bf5..affbe52ace 100644 --- a/ledger/eval.go +++ b/ledger/eval.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "github.com/algorand/go-deadlock" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" @@ -73,6 +74,9 @@ type roundCowBase struct { // are beyond the scope of this cache. // The account data store here is always the account data without the rewards. accounts map[basics.Address]basics.AccountData + + // accountsMu is the accounts read-write mutex, used to syncronize the access ot the accounts map. + accountsMu deadlock.RWMutex } func (x *roundCowBase) getCreator(cidx basics.CreatableIndex, ctype basics.CreatableType) (basics.Address, bool, error) { @@ -83,12 +87,18 @@ func (x *roundCowBase) getCreator(cidx basics.CreatableIndex, ctype basics.Creat // first, and if it cannot find it there, it would defer to the underlaying implementation. // note that errors in accounts data retrivals are not cached as these typically cause the transaction evaluation to fail. func (x *roundCowBase) lookup(addr basics.Address) (basics.AccountData, error) { + x.accountsMu.RLock() if accountData, found := x.accounts[addr]; found { + x.accountsMu.RUnlock() return accountData, nil } + x.accountsMu.RUnlock() + accountData, err := x.l.LookupWithoutRewards(x.rnd, addr) if err == nil { + x.accountsMu.Lock() x.accounts[addr] = accountData + x.accountsMu.Unlock() } return accountData, err } @@ -310,8 +320,8 @@ func startEvaluator(l ledgerForEvaluator, hdr bookkeeping.BlockHeader, paysetHin } poolAddr := eval.prevHeader.RewardsPool - // get the reward pool account data without any rewards - incentivePoolData, err := l.LookupWithoutRewards(eval.prevHeader.Round, poolAddr) + // get the reward pool account data without any rewards; we use the roundCowBase so the account data would get cached. + incentivePoolData, err := base.lookup(poolAddr) if err != nil { return nil, err } From 071fe3049e1ae3e22ff515cc1d13e096872e7146 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Fri, 8 Jan 2021 11:35:35 -0500 Subject: [PATCH 03/11] prefetch accounts at start of eval --- ledger/eval.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/ledger/eval.go b/ledger/eval.go index a7f17d696a..d4cfd88a39 100644 --- a/ledger/eval.go +++ b/ledger/eval.go @@ -991,6 +991,7 @@ func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, vali if err != nil { return StateDelta{}, err } + go prefetchThread(ctx, eval.state.lookupParent, blk.Payset) // Next, transactions paysetgroups, err := blk.DecodePaysetGroups() @@ -1063,6 +1064,38 @@ func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, vali return eval.state.mods, nil } +func prefetchThread(ctx context.Context, state roundCowParent, payset []transactions.SignedTxnInBlock) { + maybelookup := func(addr basics.Address) { + if addr.IsZero() { + return + } + state.lookup(addr) + } + bail := 10 + for _, stxn := range payset { + state.lookup(stxn.Txn.Sender) + maybelookup(stxn.Txn.Receiver) + maybelookup(stxn.Txn.CloseRemainderTo) + maybelookup(stxn.Txn.AssetSender) + maybelookup(stxn.Txn.AssetReceiver) + maybelookup(stxn.Txn.AssetCloseTo) + maybelookup(stxn.Txn.FreezeAccount) + for _, xa := range stxn.Txn.Accounts { + maybelookup(xa) + } + if bail == 0 { + bail = 10 + select { + case <-ctx.Done(): + return + default: + } + } else { + bail-- + } + } +} + // Validate uses the ledger to validate block blk as a candidate next block. // It returns an error if blk is not the expected next block, or if blk is // not a valid block (e.g., it has duplicate transactions, overspends some From 5a93b3bc2d03e7d463e386ea9bbad5700db0afde Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Tue, 12 Jan 2021 15:40:50 -0500 Subject: [PATCH 04/11] better context --- ledger/eval.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ledger/eval.go b/ledger/eval.go index d4cfd88a39..3fe2c017c9 100644 --- a/ledger/eval.go +++ b/ledger/eval.go @@ -991,7 +991,12 @@ func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, vali if err != nil { return StateDelta{}, err } - go prefetchThread(ctx, eval.state.lookupParent, blk.Payset) + + validationCtx, validationCancel := context.WithCancel(ctx) + defer validationCancel() + + // If validationCtx or underlying ctx are Done, end prefetch + go prefetchThread(validationCtx, eval.state.lookupParent, blk.Payset) // Next, transactions paysetgroups, err := blk.DecodePaysetGroups() @@ -999,8 +1004,6 @@ func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, vali return StateDelta{}, err } var txvalidator evalTxValidator - validationCtx, validationCancel := context.WithCancel(ctx) - defer validationCancel() if validate { _, ok := config.Consensus[blk.CurrentProtocol] if !ok { From 08a83ba17ccaa61f8cfecaa39ad094baa4fb66ac Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Thu, 14 Jan 2021 15:18:06 -0500 Subject: [PATCH 05/11] fix usage of amended roundCowBase --- ledger/appcow.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ledger/appcow.go b/ledger/appcow.go index 6468ed0c1b..fd18470859 100644 --- a/ledger/appcow.go +++ b/ledger/appcow.go @@ -391,9 +391,10 @@ func (cb *roundCowState) DelKey(addr basics.Address, aidx basics.AppIndex, globa // MakeDebugBalances creates a ledger suitable for dryrun and debugger func MakeDebugBalances(l ledgerForCowBase, round basics.Round, proto protocol.ConsensusVersion, prevTimestamp int64) apply.Balances { base := &roundCowBase{ - l: l, - rnd: round - 1, - proto: config.Consensus[proto], + l: l, + rnd: round - 1, + proto: config.Consensus[proto], + accounts: make(map[basics.Address]basics.AccountData), } hdr := bookkeeping.BlockHeader{ From b4cada26d4e5ea304477903996acc42a53f9a0da Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Fri, 15 Jan 2021 09:24:34 -0500 Subject: [PATCH 06/11] add non-prefetch option to eval(); mutex around deadlock.Opts.Disable --- ledger/archival_test.go | 6 ++++++ ledger/eval.go | 30 +++++++++++++----------------- ledger/eval_test.go | 2 +- ledger/ledger.go | 4 ++-- ledger/ledger_perf_test.go | 2 +- 5 files changed, 23 insertions(+), 21 deletions(-) diff --git a/ledger/archival_test.go b/ledger/archival_test.go index 0f50c2bdbd..caa348d307 100644 --- a/ledger/archival_test.go +++ b/ledger/archival_test.go @@ -27,6 +27,7 @@ import ( "path/filepath" "reflect" "runtime" + "sync" "testing" "github.com/stretchr/testify/require" @@ -677,10 +678,15 @@ func makeSignedTxnInBlock(tx transactions.Transaction) transactions.SignedTxnInB func TestArchivalFromNonArchival(t *testing.T) { // Start in non-archival mode, add 2K blocks, restart in archival mode ensure only genesis block is there + var optlock sync.Mutex // deadlock.Opts.Disable itself becomes a race! + optlock.Lock() deadlockDisable := deadlock.Opts.Disable deadlock.Opts.Disable = true + optlock.Unlock() defer func() { + optlock.Lock() deadlock.Opts.Disable = deadlockDisable + optlock.Unlock() }() dbTempDir, err := ioutil.TempDir(os.TempDir(), "testdir") require.NoError(t, err) diff --git a/ledger/eval.go b/ledger/eval.go index 80a56f3ab8..3db7cde2fe 100644 --- a/ledger/eval.go +++ b/ledger/eval.go @@ -1087,10 +1087,10 @@ func (validator *evalTxValidator) run() { // used by Ledger.Validate() Ledger.AddBlock() Ledger.trackerEvalVerified()(accountUpdates.loadFromDisk()) // -// Validate: eval(ctx, l, blk, true, txcache, executionPool) -// AddBlock: eval(context.Background(), l, blk, false, txcache, nil) -// tracker: eval(context.Background(), l, blk, false, txcache, nil) -func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, validate bool, txcache verify.VerifiedTransactionCache, executionPool execpool.BacklogPool) (StateDelta, error) { +// Validate: eval(ctx, l, blk, true, txcache, executionPool, true) +// AddBlock: eval(context.Background(), l, blk, false, txcache, nil, true) +// tracker: eval(context.Background(), l, blk, false, txcache, nil, false) +func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, validate bool, txcache verify.VerifiedTransactionCache, executionPool execpool.BacklogPool, usePrefetch bool) (StateDelta, error) { eval, err := startEvaluator(l, blk.BlockHeader, len(blk.Payset), validate, false) if err != nil { return StateDelta{}, err @@ -1100,7 +1100,9 @@ func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, vali defer validationCancel() // If validationCtx or underlying ctx are Done, end prefetch - go prefetchThread(validationCtx, eval.state.lookupParent, blk.Payset) + if usePrefetch { + go prefetchThread(validationCtx, eval.state.lookupParent, blk.Payset) + } // Next, transactions paysetgroups, err := blk.DecodePaysetGroups() @@ -1178,8 +1180,12 @@ func prefetchThread(ctx context.Context, state roundCowParent, payset []transact } state.lookup(addr) } - bail := 10 for _, stxn := range payset { + select { + case <-ctx.Done(): + return + default: + } state.lookup(stxn.Txn.Sender) maybelookup(stxn.Txn.Receiver) maybelookup(stxn.Txn.CloseRemainderTo) @@ -1190,16 +1196,6 @@ func prefetchThread(ctx context.Context, state roundCowParent, payset []transact for _, xa := range stxn.Txn.Accounts { maybelookup(xa) } - if bail == 0 { - bail = 10 - select { - case <-ctx.Done(): - return - default: - } - } else { - bail-- - } } } @@ -1208,7 +1204,7 @@ func prefetchThread(ctx context.Context, state roundCowParent, payset []transact // not a valid block (e.g., it has duplicate transactions, overspends some // account, etc). func (l *Ledger) Validate(ctx context.Context, blk bookkeeping.Block, executionPool execpool.BacklogPool) (*ValidatedBlock, error) { - delta, err := eval(ctx, l, blk, true, l.verifiedTxnCache, executionPool) + delta, err := eval(ctx, l, blk, true, l.verifiedTxnCache, executionPool, true) if err != nil { return nil, err } diff --git a/ledger/eval_test.go b/ledger/eval_test.go index ee07377891..85d6a847df 100644 --- a/ledger/eval_test.go +++ b/ledger/eval_test.go @@ -556,7 +556,7 @@ func benchmarkBlockEvaluator(b *testing.B, inMem bool, withCrypto bool) { if withCrypto { _, err = l2.Validate(context.Background(), validatedBlock.blk, backlogPool) } else { - _, err = eval(context.Background(), l2, validatedBlock.blk, false, nil, nil) + _, err = eval(context.Background(), l2, validatedBlock.blk, false, nil, nil, true) } require.NoError(b, err) } diff --git a/ledger/ledger.go b/ledger/ledger.go index e60beb2eb8..473faeb9ae 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -544,7 +544,7 @@ func (l *Ledger) BlockCert(rnd basics.Round) (blk bookkeeping.Block, cert agreem func (l *Ledger) AddBlock(blk bookkeeping.Block, cert agreement.Certificate) error { // passing nil as the executionPool is ok since we've asking the evaluator to skip verification. - updates, err := eval(context.Background(), l, blk, false, l.verifiedTxnCache, nil) + updates, err := eval(context.Background(), l, blk, false, l.verifiedTxnCache, nil, true) if err != nil { return err } @@ -646,7 +646,7 @@ func (l *Ledger) trackerLog() logging.Logger { // evaluator to shortcut the "main" ledger ( i.e. this struct ) and avoid taking the trackers lock a second time. func (l *Ledger) trackerEvalVerified(blk bookkeeping.Block, accUpdatesLedger ledgerForEvaluator) (StateDelta, error) { // passing nil as the executionPool is ok since we've asking the evaluator to skip verification. - return eval(context.Background(), accUpdatesLedger, blk, false, l.verifiedTxnCache, nil) + return eval(context.Background(), accUpdatesLedger, blk, false, l.verifiedTxnCache, nil, false) } // IsWritingCatchpointFile returns true when a catchpoint file is being generated. The function is used by the catchup service diff --git a/ledger/ledger_perf_test.go b/ledger/ledger_perf_test.go index b1b2244b40..cea9aea430 100644 --- a/ledger/ledger_perf_test.go +++ b/ledger/ledger_perf_test.go @@ -319,7 +319,7 @@ func benchmarkFullBlocks(params testParams, b *testing.B) { vc := verify.GetMockedCache(true) b.ResetTimer() for _, blk := range blocks { - _, err = eval(context.Background(), l1, blk, true, vc, nil) + _, err = eval(context.Background(), l1, blk, true, vc, nil, true) require.NoError(b, err) err = l1.AddBlock(blk, cert) require.NoError(b, err) From ffc43a18e1536c8ab290e3332f24fd6b194a7061 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Fri, 22 Jan 2021 11:31:18 -0500 Subject: [PATCH 07/11] fmt can't disable deadlock.Opts.Disable because `go test -race` objects! --- ledger/archival_test.go | 11 ----------- ledger/catchpointwriter.go | 14 +++++++------- 2 files changed, 7 insertions(+), 18 deletions(-) diff --git a/ledger/archival_test.go b/ledger/archival_test.go index a52fc84c58..5948d7171a 100644 --- a/ledger/archival_test.go +++ b/ledger/archival_test.go @@ -27,7 +27,6 @@ import ( "path/filepath" "reflect" "runtime" - "sync" "testing" "github.com/stretchr/testify/require" @@ -679,16 +678,6 @@ func makeSignedTxnInBlock(tx transactions.Transaction) transactions.SignedTxnInB func TestArchivalFromNonArchival(t *testing.T) { // Start in non-archival mode, add 2K blocks, restart in archival mode ensure only genesis block is there - var optlock sync.Mutex // deadlock.Opts.Disable itself becomes a race! - optlock.Lock() - deadlockDisable := deadlock.Opts.Disable - deadlock.Opts.Disable = true - optlock.Unlock() - defer func() { - optlock.Lock() - deadlock.Opts.Disable = deadlockDisable - optlock.Unlock() - }() dbTempDir, err := ioutil.TempDir(os.TempDir(), "testdir") require.NoError(t, err) dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) diff --git a/ledger/catchpointwriter.go b/ledger/catchpointwriter.go index 12b590d755..f919fa6952 100644 --- a/ledger/catchpointwriter.go +++ b/ledger/catchpointwriter.go @@ -81,14 +81,14 @@ type encodedBalanceRecord struct { type CatchpointFileHeader struct { _struct struct{} `codec:",omitempty,omitemptyarray"` - Version uint64 `codec:"version"` - BalancesRound basics.Round `codec:"balancesRound"` - BlocksRound basics.Round `codec:"blocksRound"` + Version uint64 `codec:"version"` + BalancesRound basics.Round `codec:"balancesRound"` + BlocksRound basics.Round `codec:"blocksRound"` Totals ledgercore.AccountTotals `codec:"accountTotals"` - TotalAccounts uint64 `codec:"accountsCount"` - TotalChunks uint64 `codec:"chunksCount"` - Catchpoint string `codec:"catchpoint"` - BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"` + TotalAccounts uint64 `codec:"accountsCount"` + TotalChunks uint64 `codec:"chunksCount"` + Catchpoint string `codec:"catchpoint"` + BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"` } type catchpointFileBalancesChunk struct { From 77054ac97a3a06cd6002c49562418281f10c9593 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Fri, 22 Jan 2021 15:56:27 -0500 Subject: [PATCH 08/11] This reverts commit ffc43a18e1536c8ab290e3332f24fd6b194a7061. --- ledger/archival_test.go | 11 +++++++++++ ledger/catchpointwriter.go | 14 +++++++------- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/ledger/archival_test.go b/ledger/archival_test.go index 5948d7171a..a52fc84c58 100644 --- a/ledger/archival_test.go +++ b/ledger/archival_test.go @@ -27,6 +27,7 @@ import ( "path/filepath" "reflect" "runtime" + "sync" "testing" "github.com/stretchr/testify/require" @@ -678,6 +679,16 @@ func makeSignedTxnInBlock(tx transactions.Transaction) transactions.SignedTxnInB func TestArchivalFromNonArchival(t *testing.T) { // Start in non-archival mode, add 2K blocks, restart in archival mode ensure only genesis block is there + var optlock sync.Mutex // deadlock.Opts.Disable itself becomes a race! + optlock.Lock() + deadlockDisable := deadlock.Opts.Disable + deadlock.Opts.Disable = true + optlock.Unlock() + defer func() { + optlock.Lock() + deadlock.Opts.Disable = deadlockDisable + optlock.Unlock() + }() dbTempDir, err := ioutil.TempDir(os.TempDir(), "testdir") require.NoError(t, err) dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) diff --git a/ledger/catchpointwriter.go b/ledger/catchpointwriter.go index f919fa6952..12b590d755 100644 --- a/ledger/catchpointwriter.go +++ b/ledger/catchpointwriter.go @@ -81,14 +81,14 @@ type encodedBalanceRecord struct { type CatchpointFileHeader struct { _struct struct{} `codec:",omitempty,omitemptyarray"` - Version uint64 `codec:"version"` - BalancesRound basics.Round `codec:"balancesRound"` - BlocksRound basics.Round `codec:"blocksRound"` + Version uint64 `codec:"version"` + BalancesRound basics.Round `codec:"balancesRound"` + BlocksRound basics.Round `codec:"blocksRound"` Totals ledgercore.AccountTotals `codec:"accountTotals"` - TotalAccounts uint64 `codec:"accountsCount"` - TotalChunks uint64 `codec:"chunksCount"` - Catchpoint string `codec:"catchpoint"` - BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"` + TotalAccounts uint64 `codec:"accountsCount"` + TotalChunks uint64 `codec:"chunksCount"` + Catchpoint string `codec:"catchpoint"` + BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"` } type catchpointFileBalancesChunk struct { From 2ece8116e79c6fcb592e3bad5ba53825b05e53cb Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Mon, 25 Jan 2021 09:39:00 -0500 Subject: [PATCH 09/11] TestArchivalFromNonArchival is incompatible with `go test -race` --- ledger/archival_norace_test.go | 142 +++++++++++++++++++++++++++++++++ ledger/archival_test.go | 102 ----------------------- 2 files changed, 142 insertions(+), 102 deletions(-) create mode 100644 ledger/archival_norace_test.go diff --git a/ledger/archival_norace_test.go b/ledger/archival_norace_test.go new file mode 100644 index 0000000000..d9272e4517 --- /dev/null +++ b/ledger/archival_norace_test.go @@ -0,0 +1,142 @@ +// +build !race +// Copyright (C) 2019-2021 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package ledger + +import ( + "context" + "crypto/rand" + "database/sql" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-deadlock" + + "github.com/algorand/go-algorand/agreement" + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/crypto" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/protocol" +) + +// This test cannot be run with `go test -race` because other threads +// calling .Unlock() on a deadlock Mutex are incompatible with this +// setting at the top of the test to disable deadlock detection, and +// deadlock detection must be off or this test will be too slow and +// timeout on some test servers. +func TestArchivalFromNonArchival(t *testing.T) { + // Start in non-archival mode, add 2K blocks, restart in archival mode ensure only genesis block is there + deadlockDisable := deadlock.Opts.Disable + deadlock.Opts.Disable = true + defer func() { + deadlock.Opts.Disable = deadlockDisable + }() + dbTempDir, err := ioutil.TempDir(os.TempDir(), "testdir") + require.NoError(t, err) + dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) + dbPrefix := filepath.Join(dbTempDir, dbName) + defer os.RemoveAll(dbTempDir) + + genesisInitState := getInitState() + + genesisInitState.Block.BlockHeader.GenesisHash = crypto.Digest{} + genesisInitState.Block.CurrentProtocol = protocol.ConsensusFuture + genesisInitState.GenesisHash = crypto.Digest{1} + genesisInitState.Block.BlockHeader.GenesisHash = crypto.Digest{1} + + balanceRecords := []basics.BalanceRecord{} + + for i := 0; i < 50; i++ { + addr := basics.Address{} + _, err = rand.Read(addr[:]) + require.NoError(t, err) + br := basics.BalanceRecord{AccountData: basics.MakeAccountData(basics.Offline, basics.MicroAlgos{Raw: 1234567890}), Addr: addr} + genesisInitState.Accounts[addr] = br.AccountData + balanceRecords = append(balanceRecords, br) + } + + const inMem = false // use persistent storage + cfg := config.GetDefaultLocal() + cfg.Archival = false + + log := logging.TestingLog(t) + l, err := OpenLedger(log, dbPrefix, inMem, genesisInitState, cfg) + require.NoError(t, err) + blk := genesisInitState.Block + + const maxBlocks = 2000 + for i := 0; i < maxBlocks; i++ { + blk.BlockHeader.Round++ + blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000) + blk.Payset = transactions.Payset{} + + for j := 0; j < 5; j++ { + x := (j + i) % len(balanceRecords) + creatorEncoded := balanceRecords[x].Addr.String() + tx, err := makeUnsignedAssetCreateTx(blk.BlockHeader.Round-1, blk.BlockHeader.Round+3, 100, false, creatorEncoded, creatorEncoded, creatorEncoded, creatorEncoded, "m", "m", "", nil) + require.NoError(t, err) + tx.Sender = balanceRecords[x].Addr + stxnib := makeSignedTxnInBlock(tx) + blk.Payset = append(blk.Payset, stxnib) + blk.BlockHeader.TxnCounter++ + } + + err := l.AddBlock(blk, agreement.Certificate{}) + require.NoError(t, err) + } + l.WaitForCommit(blk.Round()) + + var latest, earliest basics.Round + err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + latest, err = blockLatest(tx) + require.NoError(t, err) + + earliest, err = blockEarliest(tx) + require.NoError(t, err) + return err + }) + require.NoError(t, err) + require.Equal(t, basics.Round(maxBlocks), latest) + require.True(t, basics.Round(0) < earliest, fmt.Sprintf("%d < %d", basics.Round(0), earliest)) + + // close and reopen the same DB, ensure the DB truncated + l.Close() + + cfg.Archival = true + l, err = OpenLedger(log, dbPrefix, inMem, genesisInitState, cfg) + require.NoError(t, err) + defer l.Close() + + err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + latest, err = blockLatest(tx) + require.NoError(t, err) + + earliest, err = blockEarliest(tx) + require.NoError(t, err) + return err + }) + require.NoError(t, err) + require.Equal(t, basics.Round(0), earliest) + require.Equal(t, basics.Round(0), latest) +} diff --git a/ledger/archival_test.go b/ledger/archival_test.go index a52fc84c58..3cb11b02bd 100644 --- a/ledger/archival_test.go +++ b/ledger/archival_test.go @@ -27,7 +27,6 @@ import ( "path/filepath" "reflect" "runtime" - "sync" "testing" "github.com/stretchr/testify/require" @@ -677,107 +676,6 @@ func makeSignedTxnInBlock(tx transactions.Transaction) transactions.SignedTxnInB } } -func TestArchivalFromNonArchival(t *testing.T) { - // Start in non-archival mode, add 2K blocks, restart in archival mode ensure only genesis block is there - var optlock sync.Mutex // deadlock.Opts.Disable itself becomes a race! - optlock.Lock() - deadlockDisable := deadlock.Opts.Disable - deadlock.Opts.Disable = true - optlock.Unlock() - defer func() { - optlock.Lock() - deadlock.Opts.Disable = deadlockDisable - optlock.Unlock() - }() - dbTempDir, err := ioutil.TempDir(os.TempDir(), "testdir") - require.NoError(t, err) - dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) - dbPrefix := filepath.Join(dbTempDir, dbName) - defer os.RemoveAll(dbTempDir) - - genesisInitState := getInitState() - - genesisInitState.Block.BlockHeader.GenesisHash = crypto.Digest{} - genesisInitState.Block.CurrentProtocol = protocol.ConsensusFuture - genesisInitState.GenesisHash = crypto.Digest{1} - genesisInitState.Block.BlockHeader.GenesisHash = crypto.Digest{1} - - balanceRecords := []basics.BalanceRecord{} - - for i := 0; i < 50; i++ { - addr := basics.Address{} - _, err = rand.Read(addr[:]) - require.NoError(t, err) - br := basics.BalanceRecord{AccountData: basics.MakeAccountData(basics.Offline, basics.MicroAlgos{Raw: 1234567890}), Addr: addr} - genesisInitState.Accounts[addr] = br.AccountData - balanceRecords = append(balanceRecords, br) - } - - const inMem = false // use persistent storage - cfg := config.GetDefaultLocal() - cfg.Archival = false - - log := logging.TestingLog(t) - l, err := OpenLedger(log, dbPrefix, inMem, genesisInitState, cfg) - require.NoError(t, err) - blk := genesisInitState.Block - - const maxBlocks = 2000 - for i := 0; i < maxBlocks; i++ { - blk.BlockHeader.Round++ - blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000) - blk.Payset = transactions.Payset{} - - for j := 0; j < 5; j++ { - x := (j + i) % len(balanceRecords) - creatorEncoded := balanceRecords[x].Addr.String() - tx, err := makeUnsignedAssetCreateTx(blk.BlockHeader.Round-1, blk.BlockHeader.Round+3, 100, false, creatorEncoded, creatorEncoded, creatorEncoded, creatorEncoded, "m", "m", "", nil) - require.NoError(t, err) - tx.Sender = balanceRecords[x].Addr - stxnib := makeSignedTxnInBlock(tx) - blk.Payset = append(blk.Payset, stxnib) - blk.BlockHeader.TxnCounter++ - } - - err := l.AddBlock(blk, agreement.Certificate{}) - require.NoError(t, err) - } - l.WaitForCommit(blk.Round()) - - var latest, earliest basics.Round - err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - latest, err = blockLatest(tx) - require.NoError(t, err) - - earliest, err = blockEarliest(tx) - require.NoError(t, err) - return err - }) - require.NoError(t, err) - require.Equal(t, basics.Round(maxBlocks), latest) - require.True(t, basics.Round(0) < earliest, fmt.Sprintf("%d < %d", basics.Round(0), earliest)) - - // close and reopen the same DB, ensure the DB truncated - l.Close() - - cfg.Archival = true - l, err = OpenLedger(log, dbPrefix, inMem, genesisInitState, cfg) - require.NoError(t, err) - defer l.Close() - - err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - latest, err = blockLatest(tx) - require.NoError(t, err) - - earliest, err = blockEarliest(tx) - require.NoError(t, err) - return err - }) - require.NoError(t, err) - require.Equal(t, basics.Round(0), earliest) - require.Equal(t, basics.Round(0), latest) -} - func checkTrackers(t *testing.T, wl *wrappedLedger, rnd basics.Round) (basics.Round, error) { minMinSave := rnd var minSave basics.Round From 2816c78805b6a3574f4aa96f0414d5bb90939640 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Mon, 25 Jan 2021 09:48:53 -0500 Subject: [PATCH 10/11] fmt --- ledger/archival_norace_test.go | 3 ++- ledger/catchpointwriter.go | 14 +++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/ledger/archival_norace_test.go b/ledger/archival_norace_test.go index d9272e4517..31796d8c84 100644 --- a/ledger/archival_norace_test.go +++ b/ledger/archival_norace_test.go @@ -1,4 +1,3 @@ -// +build !race // Copyright (C) 2019-2021 Algorand, Inc. // This file is part of go-algorand // @@ -14,6 +13,8 @@ // // You should have received a copy of the GNU Affero General Public License // along with go-algorand. If not, see . +// +// +build !race package ledger diff --git a/ledger/catchpointwriter.go b/ledger/catchpointwriter.go index 12b590d755..f919fa6952 100644 --- a/ledger/catchpointwriter.go +++ b/ledger/catchpointwriter.go @@ -81,14 +81,14 @@ type encodedBalanceRecord struct { type CatchpointFileHeader struct { _struct struct{} `codec:",omitempty,omitemptyarray"` - Version uint64 `codec:"version"` - BalancesRound basics.Round `codec:"balancesRound"` - BlocksRound basics.Round `codec:"blocksRound"` + Version uint64 `codec:"version"` + BalancesRound basics.Round `codec:"balancesRound"` + BlocksRound basics.Round `codec:"blocksRound"` Totals ledgercore.AccountTotals `codec:"accountTotals"` - TotalAccounts uint64 `codec:"accountsCount"` - TotalChunks uint64 `codec:"chunksCount"` - Catchpoint string `codec:"catchpoint"` - BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"` + TotalAccounts uint64 `codec:"accountsCount"` + TotalChunks uint64 `codec:"chunksCount"` + Catchpoint string `codec:"catchpoint"` + BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"` } type catchpointFileBalancesChunk struct { From 74643f0c018f6657d44cc363ef2c38e0bb10ee53 Mon Sep 17 00:00:00 2001 From: Brian Olson Date: Tue, 26 Jan 2021 15:33:23 -0500 Subject: [PATCH 11/11] wait for prefetchThread to complete before returning from eval() --- ledger/archival_norace_test.go | 143 --------------------------------- ledger/archival_test.go | 96 ++++++++++++++++++++++ ledger/eval.go | 14 +++- 3 files changed, 107 insertions(+), 146 deletions(-) delete mode 100644 ledger/archival_norace_test.go diff --git a/ledger/archival_norace_test.go b/ledger/archival_norace_test.go deleted file mode 100644 index 31796d8c84..0000000000 --- a/ledger/archival_norace_test.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright (C) 2019-2021 Algorand, Inc. -// This file is part of go-algorand -// -// go-algorand is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// go-algorand is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with go-algorand. If not, see . -// -// +build !race - -package ledger - -import ( - "context" - "crypto/rand" - "database/sql" - "fmt" - "io/ioutil" - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/algorand/go-deadlock" - - "github.com/algorand/go-algorand/agreement" - "github.com/algorand/go-algorand/config" - "github.com/algorand/go-algorand/crypto" - "github.com/algorand/go-algorand/data/basics" - "github.com/algorand/go-algorand/data/transactions" - "github.com/algorand/go-algorand/logging" - "github.com/algorand/go-algorand/protocol" -) - -// This test cannot be run with `go test -race` because other threads -// calling .Unlock() on a deadlock Mutex are incompatible with this -// setting at the top of the test to disable deadlock detection, and -// deadlock detection must be off or this test will be too slow and -// timeout on some test servers. -func TestArchivalFromNonArchival(t *testing.T) { - // Start in non-archival mode, add 2K blocks, restart in archival mode ensure only genesis block is there - deadlockDisable := deadlock.Opts.Disable - deadlock.Opts.Disable = true - defer func() { - deadlock.Opts.Disable = deadlockDisable - }() - dbTempDir, err := ioutil.TempDir(os.TempDir(), "testdir") - require.NoError(t, err) - dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) - dbPrefix := filepath.Join(dbTempDir, dbName) - defer os.RemoveAll(dbTempDir) - - genesisInitState := getInitState() - - genesisInitState.Block.BlockHeader.GenesisHash = crypto.Digest{} - genesisInitState.Block.CurrentProtocol = protocol.ConsensusFuture - genesisInitState.GenesisHash = crypto.Digest{1} - genesisInitState.Block.BlockHeader.GenesisHash = crypto.Digest{1} - - balanceRecords := []basics.BalanceRecord{} - - for i := 0; i < 50; i++ { - addr := basics.Address{} - _, err = rand.Read(addr[:]) - require.NoError(t, err) - br := basics.BalanceRecord{AccountData: basics.MakeAccountData(basics.Offline, basics.MicroAlgos{Raw: 1234567890}), Addr: addr} - genesisInitState.Accounts[addr] = br.AccountData - balanceRecords = append(balanceRecords, br) - } - - const inMem = false // use persistent storage - cfg := config.GetDefaultLocal() - cfg.Archival = false - - log := logging.TestingLog(t) - l, err := OpenLedger(log, dbPrefix, inMem, genesisInitState, cfg) - require.NoError(t, err) - blk := genesisInitState.Block - - const maxBlocks = 2000 - for i := 0; i < maxBlocks; i++ { - blk.BlockHeader.Round++ - blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000) - blk.Payset = transactions.Payset{} - - for j := 0; j < 5; j++ { - x := (j + i) % len(balanceRecords) - creatorEncoded := balanceRecords[x].Addr.String() - tx, err := makeUnsignedAssetCreateTx(blk.BlockHeader.Round-1, blk.BlockHeader.Round+3, 100, false, creatorEncoded, creatorEncoded, creatorEncoded, creatorEncoded, "m", "m", "", nil) - require.NoError(t, err) - tx.Sender = balanceRecords[x].Addr - stxnib := makeSignedTxnInBlock(tx) - blk.Payset = append(blk.Payset, stxnib) - blk.BlockHeader.TxnCounter++ - } - - err := l.AddBlock(blk, agreement.Certificate{}) - require.NoError(t, err) - } - l.WaitForCommit(blk.Round()) - - var latest, earliest basics.Round - err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - latest, err = blockLatest(tx) - require.NoError(t, err) - - earliest, err = blockEarliest(tx) - require.NoError(t, err) - return err - }) - require.NoError(t, err) - require.Equal(t, basics.Round(maxBlocks), latest) - require.True(t, basics.Round(0) < earliest, fmt.Sprintf("%d < %d", basics.Round(0), earliest)) - - // close and reopen the same DB, ensure the DB truncated - l.Close() - - cfg.Archival = true - l, err = OpenLedger(log, dbPrefix, inMem, genesisInitState, cfg) - require.NoError(t, err) - defer l.Close() - - err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { - latest, err = blockLatest(tx) - require.NoError(t, err) - - earliest, err = blockEarliest(tx) - require.NoError(t, err) - return err - }) - require.NoError(t, err) - require.Equal(t, basics.Round(0), earliest) - require.Equal(t, basics.Round(0), latest) -} diff --git a/ledger/archival_test.go b/ledger/archival_test.go index 3cb11b02bd..efd68c4032 100644 --- a/ledger/archival_test.go +++ b/ledger/archival_test.go @@ -676,6 +676,102 @@ func makeSignedTxnInBlock(tx transactions.Transaction) transactions.SignedTxnInB } } +func TestArchivalFromNonArchival(t *testing.T) { + // Start in non-archival mode, add 2K blocks, restart in archival mode ensure only genesis block is there + deadlockDisable := deadlock.Opts.Disable + deadlock.Opts.Disable = true + defer func() { + deadlock.Opts.Disable = deadlockDisable + }() + dbTempDir, err := ioutil.TempDir(os.TempDir(), "testdir") + require.NoError(t, err) + dbName := fmt.Sprintf("%s.%d", t.Name(), crypto.RandUint64()) + dbPrefix := filepath.Join(dbTempDir, dbName) + defer os.RemoveAll(dbTempDir) + + genesisInitState := getInitState() + + genesisInitState.Block.BlockHeader.GenesisHash = crypto.Digest{} + genesisInitState.Block.CurrentProtocol = protocol.ConsensusFuture + genesisInitState.GenesisHash = crypto.Digest{1} + genesisInitState.Block.BlockHeader.GenesisHash = crypto.Digest{1} + + balanceRecords := []basics.BalanceRecord{} + + for i := 0; i < 50; i++ { + addr := basics.Address{} + _, err = rand.Read(addr[:]) + require.NoError(t, err) + br := basics.BalanceRecord{AccountData: basics.MakeAccountData(basics.Offline, basics.MicroAlgos{Raw: 1234567890}), Addr: addr} + genesisInitState.Accounts[addr] = br.AccountData + balanceRecords = append(balanceRecords, br) + } + + const inMem = false // use persistent storage + cfg := config.GetDefaultLocal() + cfg.Archival = false + + log := logging.TestingLog(t) + l, err := OpenLedger(log, dbPrefix, inMem, genesisInitState, cfg) + require.NoError(t, err) + blk := genesisInitState.Block + + const maxBlocks = 2000 + for i := 0; i < maxBlocks; i++ { + blk.BlockHeader.Round++ + blk.BlockHeader.TimeStamp += int64(crypto.RandUint64() % 100 * 1000) + blk.Payset = transactions.Payset{} + + for j := 0; j < 5; j++ { + x := (j + i) % len(balanceRecords) + creatorEncoded := balanceRecords[x].Addr.String() + tx, err := makeUnsignedAssetCreateTx(blk.BlockHeader.Round-1, blk.BlockHeader.Round+3, 100, false, creatorEncoded, creatorEncoded, creatorEncoded, creatorEncoded, "m", "m", "", nil) + require.NoError(t, err) + tx.Sender = balanceRecords[x].Addr + stxnib := makeSignedTxnInBlock(tx) + blk.Payset = append(blk.Payset, stxnib) + blk.BlockHeader.TxnCounter++ + } + + err := l.AddBlock(blk, agreement.Certificate{}) + require.NoError(t, err) + } + l.WaitForCommit(blk.Round()) + + var latest, earliest basics.Round + err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + latest, err = blockLatest(tx) + require.NoError(t, err) + + earliest, err = blockEarliest(tx) + require.NoError(t, err) + return err + }) + require.NoError(t, err) + require.Equal(t, basics.Round(maxBlocks), latest) + require.True(t, basics.Round(0) < earliest, fmt.Sprintf("%d < %d", basics.Round(0), earliest)) + + // close and reopen the same DB, ensure the DB truncated + l.Close() + + cfg.Archival = true + l, err = OpenLedger(log, dbPrefix, inMem, genesisInitState, cfg) + require.NoError(t, err) + defer l.Close() + + err = l.blockDBs.Rdb.Atomic(func(ctx context.Context, tx *sql.Tx) error { + latest, err = blockLatest(tx) + require.NoError(t, err) + + earliest, err = blockEarliest(tx) + require.NoError(t, err) + return err + }) + require.NoError(t, err) + require.Equal(t, basics.Round(0), earliest) + require.Equal(t, basics.Round(0), latest) +} + func checkTrackers(t *testing.T, wl *wrappedLedger, rnd basics.Round) (basics.Round, error) { minMinSave := rnd var minSave basics.Round diff --git a/ledger/eval.go b/ledger/eval.go index e7a32e0b10..59c88ceaa4 100644 --- a/ledger/eval.go +++ b/ledger/eval.go @@ -20,6 +20,8 @@ import ( "context" "errors" "fmt" + "sync" + "github.com/algorand/go-deadlock" "github.com/algorand/go-algorand/config" @@ -1128,11 +1130,16 @@ func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, vali } validationCtx, validationCancel := context.WithCancel(ctx) - defer validationCancel() + var wg sync.WaitGroup + defer func() { + validationCancel() + wg.Wait() + }() // If validationCtx or underlying ctx are Done, end prefetch if usePrefetch { - go prefetchThread(validationCtx, eval.state.lookupParent, blk.Payset) + wg.Add(1) + go prefetchThread(validationCtx, eval.state.lookupParent, blk.Payset, &wg) } // Next, transactions @@ -1204,7 +1211,8 @@ func eval(ctx context.Context, l ledgerForEvaluator, blk bookkeeping.Block, vali return eval.state.deltas(), nil } -func prefetchThread(ctx context.Context, state roundCowParent, payset []transactions.SignedTxnInBlock) { +func prefetchThread(ctx context.Context, state roundCowParent, payset []transactions.SignedTxnInBlock, wg *sync.WaitGroup) { + defer wg.Done() maybelookup := func(addr basics.Address) { if addr.IsZero() { return