diff --git a/agreement/persistence.go b/agreement/persistence.go index d92abbd1e6..aef2a0f609 100644 --- a/agreement/persistence.go +++ b/agreement/persistence.go @@ -125,6 +125,7 @@ func restore(log logging.Logger, crash db.Accessor) (raw []byte, err error) { // the above call was completed sucecssfully, which means that we've just created the table ( which wasn't there ! ). // in that case, the table is guaranteed to be empty, and therefore we can return right here. log.Infof("restore (agreement): crash state table initialized") + noCrashState = true // this is a normal case (we don't have crash state) err = errNoCrashStateAvailable return } diff --git a/catchup/service.go b/catchup/service.go index 4005316d54..f022a3dcc8 100644 --- a/catchup/service.go +++ b/catchup/service.go @@ -345,6 +345,12 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, // if the context expired, just exit. return false } + if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound { + // the block was added to the ledger from elsewhere after fetching it here + // only the agreement could have added this block into the ledger, catchup is complete + s.log.Infof("fetchAndWrite(%d): after fetching the block, it is already in the ledger. The catchup is complete", r) + return false + } s.log.Warnf("fetchAndWrite(%d): failed to validate block : %v", r, err) return false } diff --git a/config/consensus.go b/config/consensus.go index 1f553d20e5..1bd7bd774e 100644 --- a/config/consensus.go +++ b/config/consensus.go @@ -394,6 +394,9 @@ type ConsensusParams struct { // MaxProposedExpiredOnlineAccounts is the maximum number of online accounts, which need // to be taken offline, that would be proposed to be taken offline. MaxProposedExpiredOnlineAccounts int + + // When rewards rate changes, use the new value immediately. + RewardsCalculationFix bool } // PaysetCommitType enumerates possible ways for the block header to commit to @@ -1054,6 +1057,8 @@ func initConsensusProtocols() { vFuture.MaxProposedExpiredOnlineAccounts = 32 + vFuture.RewardsCalculationFix = true + Consensus[protocol.ConsensusFuture] = vFuture } diff --git a/daemon/algod/api/server/v2/test/handlers_test.go b/daemon/algod/api/server/v2/test/handlers_test.go index 0b416eaa5d..9af675fa91 100644 --- a/daemon/algod/api/server/v2/test/handlers_test.go +++ b/daemon/algod/api/server/v2/test/handlers_test.go @@ -177,7 +177,7 @@ func TestGetBlockJsonEncoding(t *testing.T) { Round: l.Latest() + 1, Branch: genBlk.Hash(), TimeStamp: 0, - RewardsState: genBlk.NextRewardsState(l.Latest()+1, proto, poolBal.MicroAlgos, totalRewardUnits), + RewardsState: genBlk.NextRewardsState(l.Latest()+1, proto, poolBal.MicroAlgos, totalRewardUnits, logging.Base()), UpgradeState: genBlk.UpgradeState, } diff --git a/data/bookkeeping/block.go b/data/bookkeeping/block.go index f75c08386a..29a3f274d5 100644 --- a/data/bookkeeping/block.go +++ b/data/bookkeeping/block.go @@ -285,17 +285,17 @@ func (block *Block) Seed() committee.Seed { // NextRewardsState computes the RewardsState of the subsequent round // given the subsequent consensus parameters, along with the incentive pool // balance and the total reward units in the system as of the current round. -func (s RewardsState) NextRewardsState(nextRound basics.Round, nextProto config.ConsensusParams, incentivePoolBalance basics.MicroAlgos, totalRewardUnits uint64) (res RewardsState) { +func (s RewardsState) NextRewardsState(nextRound basics.Round, nextProto config.ConsensusParams, incentivePoolBalance basics.MicroAlgos, totalRewardUnits uint64, log logging.Logger) (res RewardsState) { res = s - if nextRound == s.RewardsRecalculationRound { + if nextRound == res.RewardsRecalculationRound { maxSpentOver := nextProto.MinBalance overflowed := false if nextProto.PendingResidueRewards { - maxSpentOver, overflowed = basics.OAdd(maxSpentOver, s.RewardsResidue) + maxSpentOver, overflowed = basics.OAdd(maxSpentOver, res.RewardsResidue) if overflowed { - logging.Base().Errorf("overflowed when trying to accumulate MinBalance(%d) and RewardsResidue(%d) for round %d (state %+v)", nextProto.MinBalance, s.RewardsResidue, nextRound, s) + log.Errorf("overflowed when trying to accumulate MinBalance(%d) and RewardsResidue(%d) for round %d (state %+v)", nextProto.MinBalance, res.RewardsResidue, nextRound, s) // this should never happen, but if it does, adjust the maxSpentOver so that we will have no rewards. maxSpentOver = incentivePoolBalance.Raw } @@ -304,7 +304,7 @@ func (s RewardsState) NextRewardsState(nextRound basics.Round, nextProto config. // it is time to refresh the rewards rate newRate, overflowed := basics.OSub(incentivePoolBalance.Raw, maxSpentOver) if overflowed { - logging.Base().Errorf("overflowed when trying to refresh RewardsRate for round %v (state %+v)", nextRound, s) + log.Errorf("overflowed when trying to refresh RewardsRate for round %v (state %+v)", nextRound, s) newRate = 0 } @@ -317,14 +317,21 @@ func (s RewardsState) NextRewardsState(nextRound basics.Round, nextProto config. return } + var rewardsRate uint64 + if nextProto.RewardsCalculationFix { + rewardsRate = res.RewardsRate + } else { + rewardsRate = s.RewardsRate + } + var ot basics.OverflowTracker - rewardsWithResidue := ot.Add(s.RewardsRate, s.RewardsResidue) - nextRewardLevel := ot.Add(s.RewardsLevel, rewardsWithResidue/totalRewardUnits) + rewardsWithResidue := ot.Add(rewardsRate, res.RewardsResidue) + nextRewardLevel := ot.Add(res.RewardsLevel, rewardsWithResidue/totalRewardUnits) nextResidue := rewardsWithResidue % totalRewardUnits if ot.Overflowed { - logging.Base().Errorf("could not compute next reward level (current level %v, adding %v MicroAlgos in total, number of reward units %v) using old level", - s.RewardsLevel, s.RewardsRate, totalRewardUnits) + log.Errorf("could not compute next reward level (current level %v, adding %v MicroAlgos in total, number of reward units %v) using old level", + res.RewardsLevel, rewardsRate, totalRewardUnits) return } diff --git a/data/bookkeeping/block_test.go b/data/bookkeeping/block_test.go index 078a0b2532..2956edb128 100644 --- a/data/bookkeeping/block_test.go +++ b/data/bookkeeping/block_test.go @@ -17,15 +17,19 @@ package bookkeeping import ( + "bytes" + "math" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/crypto" "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/transactions" + "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" ) @@ -248,20 +252,30 @@ func TestTime(t *testing.T) { func TestRewardsLevel(t *testing.T) { partitiontest.PartitionTest(t) + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + proto := config.Consensus[protocol.ConsensusCurrentVersion] var prev Block prev.RewardsLevel = 1 prev.RewardsRate = 10 rewardUnits := uint64(10) - state := prev.NextRewardsState(prev.Round()+1, proto, basics.MicroAlgos{}, rewardUnits) + state := prev.NextRewardsState(prev.Round()+1, proto, basics.MicroAlgos{}, rewardUnits, log) require.Equal(t, uint64(2), state.RewardsLevel) require.Equal(t, uint64(0), state.RewardsResidue) + + assert.Zero(t, buf.Len()) } func TestRewardsLevelWithResidue(t *testing.T) { partitiontest.PartitionTest(t) + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + proto := config.Consensus[protocol.ConsensusCurrentVersion] var prev Block @@ -270,14 +284,20 @@ func TestRewardsLevelWithResidue(t *testing.T) { prev.RewardsRate = 1 rewardUnits := uint64(10) - state := prev.NextRewardsState(prev.Round()+1, proto, basics.MicroAlgos{}, rewardUnits) + state := prev.NextRewardsState(prev.Round()+1, proto, basics.MicroAlgos{}, rewardUnits, log) require.Equal(t, uint64(11), state.RewardsLevel) require.Equal(t, uint64(0), state.RewardsResidue) + + assert.Zero(t, buf.Len()) } func TestRewardsLevelNoUnits(t *testing.T) { partitiontest.PartitionTest(t) + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + proto := config.Consensus[protocol.ConsensusCurrentVersion] var prev Block @@ -285,14 +305,20 @@ func TestRewardsLevelNoUnits(t *testing.T) { prev.RewardsResidue = 2 rewardUnits := uint64(0) - state := prev.NextRewardsState(prev.Round()+1, proto, basics.MicroAlgos{}, rewardUnits) + state := prev.NextRewardsState(prev.Round()+1, proto, basics.MicroAlgos{}, rewardUnits, log) require.Equal(t, prev.RewardsLevel, state.RewardsLevel) require.Equal(t, prev.RewardsResidue, state.RewardsResidue) + + assert.Zero(t, buf.Len()) } func TestTinyLevel(t *testing.T) { partitiontest.PartitionTest(t) + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + proto := config.Consensus[protocol.ConsensusCurrentVersion] var prev Block @@ -300,13 +326,19 @@ func TestTinyLevel(t *testing.T) { prev.RewardsRate = 10 * unitsInAlgos algosInSystem := uint64(1000 * 1000 * 1000) rewardUnits := algosInSystem * unitsInAlgos / proto.RewardUnit - state := prev.NextRewardsState(prev.Round()+1, proto, basics.MicroAlgos{}, rewardUnits) + state := prev.NextRewardsState(prev.Round()+1, proto, basics.MicroAlgos{}, rewardUnits, log) require.True(t, state.RewardsLevel > 0 || state.RewardsResidue > 0) + + assert.Zero(t, buf.Len()) } func TestRewardsRate(t *testing.T) { partitiontest.PartitionTest(t) + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + var prev Block prev.RewardsLevel = 1 prev.RewardsRate = 10 @@ -318,14 +350,20 @@ func TestRewardsRate(t *testing.T) { incentivePoolBalance := basics.MicroAlgos{Raw: 1000 * uint64(proto.RewardsRateRefreshInterval)} // make sure that RewardsRate stays the same - state := prev.NextRewardsState(prev.Round()+1, proto, incentivePoolBalance, 0) + state := prev.NextRewardsState(prev.Round()+1, proto, incentivePoolBalance, 0, log) require.Equal(t, prev.RewardsRate, state.RewardsRate) require.Equal(t, prev.BlockHeader.RewardsRecalculationRound, state.RewardsRecalculationRound) + + assert.Zero(t, buf.Len()) } func TestRewardsRateRefresh(t *testing.T) { partitiontest.PartitionTest(t) + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + var prev Block prev.RewardsLevel = 1 prev.RewardsRate = 10 @@ -337,9 +375,11 @@ func TestRewardsRateRefresh(t *testing.T) { incentivePoolBalance := basics.MicroAlgos{Raw: 1000 * uint64(proto.RewardsRateRefreshInterval)} // make sure that RewardsRate was recomputed nextRound := prev.Round() + 1 - state := prev.NextRewardsState(nextRound, proto, incentivePoolBalance, 0) + state := prev.NextRewardsState(nextRound, proto, incentivePoolBalance, 0, log) require.Equal(t, (incentivePoolBalance.Raw-proto.MinBalance)/uint64(proto.RewardsRateRefreshInterval), state.RewardsRate) require.Equal(t, nextRound+basics.Round(proto.RewardsRateRefreshInterval), state.RewardsRecalculationRound) + + assert.Zero(t, buf.Len()) } func TestEncodeDecodeSignedTxn(t *testing.T) { @@ -412,8 +452,13 @@ func TestInitialRewardsRateCalculation(t *testing.T) { partitiontest.PartitionTest(t) consensusParams := config.Consensus[protocol.ConsensusCurrentVersion] + consensusParams.RewardsCalculationFix = false runTest := func() bool { + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + incentivePoolBalance := uint64(125000000000000) totalRewardUnits := uint64(10000000000) require.GreaterOrEqual(t, incentivePoolBalance, consensusParams.MinBalance) @@ -429,7 +474,7 @@ func TestInitialRewardsRateCalculation(t *testing.T) { curRewardsState.RewardsRate = incentivePoolBalance / uint64(consensusParams.RewardsRateRefreshInterval) } for rnd := 1; rnd < int(consensusParams.RewardsRateRefreshInterval+2); rnd++ { - nextRewardState := curRewardsState.NextRewardsState(basics.Round(rnd), consensusParams, basics.MicroAlgos{Raw: incentivePoolBalance}, totalRewardUnits) + nextRewardState := curRewardsState.NextRewardsState(basics.Round(rnd), consensusParams, basics.MicroAlgos{Raw: incentivePoolBalance}, totalRewardUnits, log) // adjust the incentive pool balance var ot basics.OverflowTracker @@ -450,6 +495,8 @@ func TestInitialRewardsRateCalculation(t *testing.T) { // prepare for the next iteration curRewardsState = nextRewardState } + + assert.Zero(t, buf.Len()) return true } @@ -461,3 +508,264 @@ func TestInitialRewardsRateCalculation(t *testing.T) { consensusParams.InitialRewardsRateCalculation = true require.True(t, runTest()) } + +func performRewardsRateCalculation( + t *testing.T, consensusParams config.ConsensusParams, + curRewardsState RewardsState, + incentivePoolBalance uint64, totalRewardUnits uint64, startingRound uint64, overspends bool, logs bool) { + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + defer func() { + require.Equal(t, logs, buf.Len() != 0) + }() + + require.GreaterOrEqual(t, incentivePoolBalance, consensusParams.MinBalance) + + for rnd := startingRound; rnd < startingRound+uint64(consensusParams.RewardsRateRefreshInterval)*3; rnd++ { + nextRewardState := curRewardsState.NextRewardsState(basics.Round(rnd), consensusParams, basics.MicroAlgos{Raw: incentivePoolBalance}, totalRewardUnits, log) + // adjust the incentive pool balance + var ot basics.OverflowTracker + + // get number of rewards per unit + rewardsPerUnit := ot.Sub(nextRewardState.RewardsLevel, curRewardsState.RewardsLevel) + require.False(t, ot.Overflowed) + + // subtract the total dispersed funds from the pool balance + incentivePoolBalance = ot.Sub(incentivePoolBalance, ot.Mul(totalRewardUnits, rewardsPerUnit)) + if ot.Overflowed { + require.True(t, overspends) + return + } + + if incentivePoolBalance < consensusParams.MinBalance { + require.True(t, overspends) + return + } + + // prepare for the next iteration + curRewardsState = nextRewardState + } + + require.False(t, overspends) +} + +func TestNextRewardsRateWithFix(t *testing.T) { + partitiontest.PartitionTest(t) + + proto, ok := config.Consensus[protocol.ConsensusCurrentVersion] + require.True(t, ok) + proto.RewardsCalculationFix = true + + tests := []struct { + name string + rewardsRate uint64 + rewardsLevel uint64 + rewardsResidue uint64 + rewardsRecalculationRound basics.Round + incentivePoolBalance uint64 + totalRewardUnits uint64 + startingRound uint64 + logs bool + }{ + {"zero_rate", 0, 215332, 0, 18500000, proto.MinBalance, 6756334087, 18063999, false}, + // 3 subtests below use parameters found in the block header `startingRound` - 1. + {"mainnet_0", 24000000, 215332, 545321700, 18500000, 10464550021728, 6756334087, + 18063999, true}, + {"mainnet_1", 24000000, 215332, 521321700, 18500000, 10464550021728, 6756334078, + 18063998, true}, + {"mainnet_2", 24000000, 215332, 425321700, 18500000, 10464550021728, 6756334079, + 18063994, true}, + {"no_residue", 0, 0, 0, 1000000, + proto.MinBalance + 500000000000 /* 5*10^11 */, 1, 1000000, false}, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + curRewardsState := RewardsState{ + RewardsLevel: test.rewardsLevel, + RewardsResidue: test.rewardsResidue, + RewardsRecalculationRound: test.rewardsRecalculationRound, + RewardsRate: test.rewardsRate, + } + + performRewardsRateCalculation( + t, proto, curRewardsState, test.incentivePoolBalance, test.totalRewardUnits, + test.startingRound, false, test.logs) + }) + } +} + +func TestNextRewardsRateFailsWithoutFix(t *testing.T) { + partitiontest.PartitionTest(t) + + proto, ok := config.Consensus[protocol.ConsensusCurrentVersion] + require.True(t, ok) + proto.RewardsCalculationFix = false + + curRewardsState := RewardsState{ + RewardsLevel: 0, + RewardsResidue: 0, + RewardsRecalculationRound: 1000000, + RewardsRate: 0, + } + + performRewardsRateCalculation( + t, proto, curRewardsState, proto.MinBalance+500000000000, + 1, 1000000, true, false) +} + +func TestNextRewardsRateWithFixUsesNewRate(t *testing.T) { + partitiontest.PartitionTest(t) + + proto, ok := config.Consensus[protocol.ConsensusCurrentVersion] + require.True(t, ok) + proto.RewardsCalculationFix = true + proto.MinBalance = 1 + proto.RewardsRateRefreshInterval = 10 + + state := RewardsState{ + RewardsLevel: 4, + RewardsRate: 80, + RewardsResidue: 2, + RewardsRecalculationRound: 100, + } + + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + + newState := state.NextRewardsState( + state.RewardsRecalculationRound, proto, basics.MicroAlgos{Raw: 113}, 10, log) + + expected := RewardsState{ + RewardsLevel: 5, + RewardsRate: 11, + RewardsResidue: 3, + RewardsRecalculationRound: 110, + } + assert.Equal(t, expected, newState) + + assert.Zero(t, buf.Len()) +} + +func TestNextRewardsRateWithFixPoolBalanceInsufficient(t *testing.T) { + partitiontest.PartitionTest(t) + + proto, ok := config.Consensus[protocol.ConsensusCurrentVersion] + require.True(t, ok) + proto.RewardsCalculationFix = true + proto.MinBalance = 10 + + state := RewardsState{ + RewardsLevel: 4, + RewardsRate: 80, + RewardsResidue: 21, + RewardsRecalculationRound: 100, + } + + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + + newState := state.NextRewardsState( + state.RewardsRecalculationRound, proto, basics.MicroAlgos{Raw: 19}, 10, log) + + expected := RewardsState{ + RewardsLevel: 6, + RewardsRate: 0, + RewardsResidue: 1, + RewardsRecalculationRound: 100 + basics.Round(proto.RewardsRateRefreshInterval), + } + assert.Equal(t, expected, newState) + + assert.Contains( + t, string(buf.Bytes()), "overflowed when trying to refresh RewardsRate") +} + +func TestNextRewardsRateWithFixMaxSpentOverOverflow(t *testing.T) { + partitiontest.PartitionTest(t) + + proto, ok := config.Consensus[protocol.ConsensusCurrentVersion] + require.True(t, ok) + proto.RewardsCalculationFix = true + proto.MinBalance = 10 + + state := RewardsState{ + RewardsLevel: 4, + RewardsRate: 80, + RewardsResidue: math.MaxUint64, + RewardsRecalculationRound: 100, + } + + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + + newState := state.NextRewardsState( + state.RewardsRecalculationRound, proto, basics.MicroAlgos{Raw: 9009}, 10, log) + + expected := RewardsState{ + RewardsLevel: 4 + math.MaxUint64/10, + RewardsRate: 0, + RewardsResidue: math.MaxUint64 % 10, + RewardsRecalculationRound: 100 + basics.Round(proto.RewardsRateRefreshInterval), + } + assert.Equal(t, expected, newState) + + assert.Contains( + t, string(buf.Bytes()), + "overflowed when trying to accumulate MinBalance(10) and "+ + "RewardsResidue(18446744073709551615)") +} + +func TestNextRewardsRateWithFixRewardsWithResidueOverflow(t *testing.T) { + partitiontest.PartitionTest(t) + + proto, ok := config.Consensus[protocol.ConsensusCurrentVersion] + require.True(t, ok) + proto.RewardsCalculationFix = true + proto.MinBalance = 10 + + state := RewardsState{ + RewardsLevel: 4, + RewardsRate: 80, + RewardsResidue: math.MaxUint64, + RewardsRecalculationRound: 100, + } + + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + + newState := state.NextRewardsState( + state.RewardsRecalculationRound-1, proto, basics.MicroAlgos{Raw: 0}, 1, log) + assert.Equal(t, state, newState) + + assert.Contains(t, string(buf.Bytes()), "could not compute next reward level") +} + +func TestNextRewardsRateWithFixNextRewardLevelOverflow(t *testing.T) { + partitiontest.PartitionTest(t) + + proto, ok := config.Consensus[protocol.ConsensusCurrentVersion] + require.True(t, ok) + proto.RewardsCalculationFix = true + proto.MinBalance = 10 + + state := RewardsState{ + RewardsLevel: math.MaxUint64, + RewardsRate: 0, + RewardsResidue: 1, + RewardsRecalculationRound: 100, + } + + var buf bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&buf) + + newState := state.NextRewardsState( + state.RewardsRecalculationRound-1, proto, basics.MicroAlgos{Raw: 1000}, 1, log) + assert.Equal(t, state, newState) + + assert.Contains(t, string(buf.Bytes()), "could not compute next reward level") +} diff --git a/data/ledger.go b/data/ledger.go index f5d851082f..8fc03cb6eb 100644 --- a/data/ledger.go +++ b/data/ledger.go @@ -325,14 +325,16 @@ func (l *Ledger) EnsureValidatedBlock(vb *ledgercore.ValidatedBlock, c agreement break } - logfn := logging.Base().Errorf + logfn := l.log.Errorf switch err.(type) { case ledgercore.BlockInLedgerError: - logfn = logging.Base().Debugf + // If the block is already in the ledger (catchup and agreement might be competing), + // reporting this as a debug message is sufficient. + logfn = l.log.Debugf + // Otherwise, the error is because the block is in the future. Error is logged. } - - logfn("could not write block %d to the ledger: %v", round, err) + logfn("data.EnsureValidatedBlock: could not write block %d to the ledger: %v", round, err) } } @@ -353,14 +355,16 @@ func (l *Ledger) EnsureBlock(block *bookkeeping.Block, c agreement.Certificate) switch err.(type) { case protocol.Error: if !protocolErrorLogged { - logging.Base().Errorf("unrecoverable protocol error detected at block %d: %v", round, err) + l.log.Errorf("data.EnsureBlock: unrecoverable protocol error detected at block %d: %v", round, err) protocolErrorLogged = true } case ledgercore.BlockInLedgerError: - logging.Base().Debugf("could not write block %d to the ledger: %v", round, err) - return // this error implies that l.LastRound() >= round + // The block is already in the ledger. Catchup and agreement could be competing + // It is sufficient to report this as a Debug message + l.log.Debugf("data.EnsureBlock: could not write block %d to the ledger: %v", round, err) + return default: - logging.Base().Errorf("could not write block %d to the ledger: %v", round, err) + l.log.Errorf("data.EnsureBlock: could not write block %d to the ledger: %v", round, err) } // If there was an error add a short delay before the next attempt. diff --git a/data/ledger_test.go b/data/ledger_test.go index 29456608b6..dc50147db3 100644 --- a/data/ledger_test.go +++ b/data/ledger_test.go @@ -17,6 +17,9 @@ package data import ( + "context" + "fmt" + "sync" "testing" "github.com/stretchr/testify/require" @@ -32,6 +35,7 @@ import ( "github.com/algorand/go-algorand/logging" "github.com/algorand/go-algorand/protocol" "github.com/algorand/go-algorand/test/partitiontest" + "github.com/algorand/go-algorand/util/execpool" ) var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} @@ -420,3 +424,240 @@ func TestConsensusVersion(t *testing.T) { require.Equal(t, protocol.ConsensusVersion(""), ver) require.Equal(t, ledgercore.ErrNoEntry{Round: basics.Round(blk.BlockHeader.NextProtocolSwitchOn + 1), Latest: basics.Round(blk.BlockHeader.Round), Committed: basics.Round(blk.BlockHeader.Round)}, err) } + +type loggedMessages struct { + logging.Logger + expectedMessages chan string + unexpectedMessages chan string +} + +func (lm loggedMessages) Debug(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Debugf(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.expectedMessages <- m +} +func (lm loggedMessages) Info(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Infof(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Warn(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Warnf(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Error(args ...interface{}) { + m := fmt.Sprint(args...) + lm.unexpectedMessages <- m +} +func (lm loggedMessages) Errorf(s string, args ...interface{}) { + m := fmt.Sprintf(s, args...) + lm.unexpectedMessages <- m +} + +// TestLedgerErrorValidate creates 3 parallel routines adding blocks to the ledger through different interfaces. +// The purpose here is to simulate the scenario where the catchup and the agreement compete to add blocks to the ledger. +// The error messages reported can be excessive or unnecessary. This test evaluates what messages are generate and at what frequency. +func TestLedgerErrorValidate(t *testing.T) { + partitiontest.PartitionTest(t) + + var testPoolAddr = basics.Address{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + var testSinkAddr = basics.Address{0x2c, 0x2a, 0x6c, 0xe9, 0xa9, 0xa7, 0xc2, 0x8c, 0x22, 0x95, 0xfd, 0x32, 0x4f, 0x77, 0xa5, 0x4, 0x8b, 0x42, 0xc2, 0xb7, 0xa8, 0x54, 0x84, 0xb6, 0x80, 0xb1, 0xe1, 0x3d, 0x59, 0x9b, 0xeb, 0x36} + + proto, _ := config.Consensus[protocol.ConsensusCurrentVersion] + origProto := proto + defer func() { + config.Consensus[protocol.ConsensusCurrentVersion] = origProto + }() + proto.MinBalance = 0 + config.Consensus[protocol.ConsensusCurrentVersion] = proto + + blk := bookkeeping.Block{} + blk.CurrentProtocol = protocol.ConsensusCurrentVersion + blk.RewardsPool = testPoolAddr + blk.FeeSink = testSinkAddr + blk.BlockHeader.GenesisHash = crypto.Hash([]byte(t.Name())) + + accts := make(map[basics.Address]basics.AccountData) + accts[testPoolAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0}) + accts[testSinkAddr] = basics.MakeAccountData(basics.NotParticipating, basics.MicroAlgos{Raw: 0}) + + genesisInitState := ledgercore.InitState{ + Accounts: accts, + Block: blk, + GenesisHash: crypto.Hash([]byte(t.Name())), + } + + expectedMessages := make(chan string, 100) + unexpectedMessages := make(chan string, 100) + + const inMem = true + cfg := config.GetDefaultLocal() + cfg.Archival = true + log := loggedMessages{Logger: logging.TestingLog(t), expectedMessages: expectedMessages, unexpectedMessages: unexpectedMessages} + log.SetLevel(logging.Debug) + realLedger, err := ledger.OpenLedger(log, t.Name(), inMem, genesisInitState, cfg) + require.NoError(t, err, "could not open ledger") + defer realLedger.Close() + + l := Ledger{Ledger: realLedger, log: log} + l.log.SetLevel(logging.Debug) + require.NotNil(t, &l) + + totalsRound, _, err := realLedger.LatestTotals() + require.NoError(t, err) + require.Equal(t, basics.Round(0), totalsRound) + + errChan := make(chan error, 1) + defer close(errChan) + + wg := sync.WaitGroup{} + defer wg.Wait() + + blkChan1 := make(chan bookkeeping.Block, 10) + blkChan2 := make(chan bookkeeping.Block, 10) + blkChan3 := make(chan bookkeeping.Block, 10) + defer close(blkChan1) + defer close(blkChan2) + defer close(blkChan3) + + // Add blocks to the ledger via EnsureValidatedBlock. This calls AddValidatedBlock, which simply + // passes the block to blockQueue. The returned error is handled by EnsureValidatedBlock, which reports + // in the form of logged error message. + go func() { + wg.Add(1) + i := 0 + for blk := range blkChan1 { + i++ + vb, err := validatedBlock(l.Ledger, blk) + if err != nil { + // AddBlock already added the block + // This is okay to ignore. + // This error is generated from ledger.Ledger Validate function, used from: + // - node blockValidatorImpl Validate + // - catchup service s.ledger.Validate (Catchup service returns after the first error) + continue + } + l.EnsureValidatedBlock(vb, agreement.Certificate{}) + } + wg.Done() + }() + + // Add blocks to the ledger via EnsureBlock. This basically calls AddBlock, but handles + // the errors by logging them. Checking the logged messages to verify its behavior. + go func() { + wg.Add(1) + i := 0 + for blk := range blkChan2 { + i++ + l.EnsureBlock(&blk, agreement.Certificate{}) + } + wg.Done() + }() + + // Add blocks directly to the ledger + go func() { + wg.Add(1) + i := 0 + for blk := range blkChan3 { + i++ + err := l.AddBlock(blk, agreement.Certificate{}) + // AddBlock is used in 2 places: + // - data.ledger.EnsureBlock which reports a log message as Error or Debug + // - catchup.service.fetchAndWrite which leads to interrupting catchup or skiping the round + if err != nil { + switch err.(type) { + // The following two cases are okay to ignore, since these are expected and handled + case ledgercore.BlockInLedgerError: + case ledgercore.ErrNonSequentialBlockEval: + continue + default: + // Make sure unexpected error is not obtained here + errChan <- err + } + } + l.WaitForCommit(blk.BlockHeader.Round) + } + wg.Done() + }() + + // flush the messages output during the setup + more := true + for more { + select { + case <-expectedMessages: + case <-unexpectedMessages: + default: + more = false + } + } + + for rnd := basics.Round(1); rnd <= basics.Round(2000); rnd++ { + blk, err := getEmptyBlock(rnd-1, l.Ledger, t.Name(), genesisInitState.Accounts) + require.NoError(t, err) + blkChan3 <- blk + blkChan2 <- blk + blkChan1 <- blk + + more = true + for more { + select { + case err := <-errChan: + require.NoError(t, err) + case <-expectedMessages: + // only debug messages should be reported + case um := <-unexpectedMessages: + require.Empty(t, um, um) + default: + more = false + } + } + } +} + +func validatedBlock(l *ledger.Ledger, blk bookkeeping.Block) (vb *ledgercore.ValidatedBlock, err error) { + backlogPool := execpool.MakeBacklog(nil, 0, execpool.LowPriority, nil) + defer backlogPool.Shutdown() + vb, err = l.Validate(context.Background(), blk, backlogPool) + return +} + +func getEmptyBlock(afterRound basics.Round, l *ledger.Ledger, genesisID string, initAccounts map[basics.Address]basics.AccountData) (blk bookkeeping.Block, err error) { + l.WaitForCommit(afterRound) + + lastBlock, err := l.Block(l.Latest()) + if err != nil { + return + } + + proto := config.Consensus[lastBlock.CurrentProtocol] + blk.BlockHeader = bookkeeping.BlockHeader{ + GenesisID: genesisID, + Round: l.Latest() + 1, + Branch: lastBlock.Hash(), + TimeStamp: 0, + } + + if proto.SupportGenesisHash { + blk.BlockHeader.GenesisHash = crypto.Hash([]byte(genesisID)) + } + + blk.RewardsPool = testPoolAddr + blk.FeeSink = testSinkAddr + blk.CurrentProtocol = lastBlock.CurrentProtocol + + blk.TxnRoot, err = blk.PaysetCommit() + if err != nil { + return + } + return +} diff --git a/ledger/accountdb.go b/ledger/accountdb.go index 5143322652..788ccdc37f 100644 --- a/ledger/accountdb.go +++ b/ledger/accountdb.go @@ -1228,7 +1228,7 @@ func updateAccountsHashRound(tx *sql.Tx, hashRound basics.Round) (err error) { } if aff != 1 { - err = fmt.Errorf("updateAccountsRound(hashbase,%d): expected to update 1 row but got %d", hashRound, aff) + err = fmt.Errorf("updateAccountsHashRound(hashbase,%d): expected to update 1 row but got %d", hashRound, aff) return } return diff --git a/ledger/acctupdates_test.go b/ledger/acctupdates_test.go index f1434b02f3..1e77ef49a0 100644 --- a/ledger/acctupdates_test.go +++ b/ledger/acctupdates_test.go @@ -75,12 +75,10 @@ func accumulateTotals(t testing.TB, consensusVersion protocol.ConsensusVersion, return } -func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData) *mockLedgerForTracker { +func makeMockLedgerForTrackerWithLogger(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData, l logging.Logger) *mockLedgerForTracker { dbs, fileName := dbOpenTest(t, inMemory) - dblogger := logging.TestingLog(t) - dblogger.SetLevel(logging.Info) - dbs.Rdb.SetLogger(dblogger) - dbs.Wdb.SetLogger(dblogger) + dbs.Rdb.SetLogger(l) + dbs.Wdb.SetLogger(l) blocks := randomInitChain(consensusVersion, initialBlocksCount) deltas := make([]ledgercore.StateDelta, initialBlocksCount) @@ -92,7 +90,15 @@ func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount in } } consensusParams := config.Consensus[consensusVersion] - return &mockLedgerForTracker{dbs: dbs, log: dblogger, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams, accts: accts[0]} + return &mockLedgerForTracker{dbs: dbs, log: l, filename: fileName, inMemory: inMemory, blocks: blocks, deltas: deltas, consensusParams: consensusParams, accts: accts[0]} + +} + +func makeMockLedgerForTracker(t testing.TB, inMemory bool, initialBlocksCount int, consensusVersion protocol.ConsensusVersion, accts []map[basics.Address]basics.AccountData) *mockLedgerForTracker { + dblogger := logging.TestingLog(t) + dblogger.SetLevel(logging.Info) + + return makeMockLedgerForTrackerWithLogger(t, inMemory, initialBlocksCount, consensusVersion, accts, dblogger) } // fork creates another database which has the same content as the current one. Works only for non-memory databases. diff --git a/ledger/catchpointtracker.go b/ledger/catchpointtracker.go index 58d6e77b7a..087580f42f 100644 --- a/ledger/catchpointtracker.go +++ b/ledger/catchpointtracker.go @@ -272,7 +272,11 @@ func (ct *catchpointTracker) produceCommittingTask(committedRound basics.Round, return nil } - dcr.offset = uint64(newBase - dcr.oldBase) + newOffset := uint64(newBase - dcr.oldBase) + // trackers are not allowed to increase offsets, only descease + if newOffset < dcr.offset { + dcr.offset = newOffset + } // check to see if this is a catchpoint round dcr.isCatchpointRound = ct.isCatchpointRound(dcr.offset, dcr.oldBase, dcr.lookback) diff --git a/ledger/catchpointtracker_test.go b/ledger/catchpointtracker_test.go index b5778691e0..b75d07adae 100644 --- a/ledger/catchpointtracker_test.go +++ b/ledger/catchpointtracker_test.go @@ -114,6 +114,7 @@ func TestGetCatchpointStream(t *testing.T) { // File on disk, and database has the record reader, err := ct.GetCatchpointStream(basics.Round(1)) + require.NoError(t, err) n, err = reader.Read(dataRead) require.NoError(t, err) require.Equal(t, 3, n) @@ -125,13 +126,16 @@ func TestGetCatchpointStream(t *testing.T) { // File deleted, but record in the database err = os.Remove(filepath.Join(temporaryDirectroy, "catchpoints", "2.catchpoint")) + require.NoError(t, err) reader, err = ct.GetCatchpointStream(basics.Round(2)) require.Equal(t, ledgercore.ErrNoEntry{}, err) require.Nil(t, reader) // File on disk, but database lost the record err = ct.accountsq.storeCatchpoint(context.Background(), basics.Round(3), "", "", 0) + require.NoError(t, err) reader, err = ct.GetCatchpointStream(basics.Round(3)) + require.NoError(t, err) n, err = reader.Read(dataRead) require.NoError(t, err) require.Equal(t, 3, n) diff --git a/ledger/internal/eval.go b/ledger/internal/eval.go index 336fcb943b..4bba4a0a4c 100644 --- a/ledger/internal/eval.go +++ b/ledger/internal/eval.go @@ -526,7 +526,7 @@ func StartEvaluator(l LedgerForEvaluator, hdr bookkeeping.BlockHeader, evalOpts if eval.proto.SupportGenesisHash { eval.block.BlockHeader.GenesisHash = eval.genesisHash } - eval.block.BlockHeader.RewardsState = eval.prevHeader.NextRewardsState(hdr.Round, proto, incentivePoolData.MicroAlgos, prevTotals.RewardUnits()) + eval.block.BlockHeader.RewardsState = eval.prevHeader.NextRewardsState(hdr.Round, proto, incentivePoolData.MicroAlgos, prevTotals.RewardUnits(), logging.Base()) } // set the eval state with the current header eval.state = makeRoundCowState(base, eval.block.BlockHeader, proto, eval.prevHeader.TimeStamp, prevTotals, evalOpts.PaysetHint) @@ -538,7 +538,7 @@ func StartEvaluator(l LedgerForEvaluator, hdr bookkeeping.BlockHeader, evalOpts } // Check that the rewards rate, level and residue match expected values - expectedRewardsState := eval.prevHeader.NextRewardsState(hdr.Round, proto, incentivePoolData.MicroAlgos, prevTotals.RewardUnits()) + expectedRewardsState := eval.prevHeader.NextRewardsState(hdr.Round, proto, incentivePoolData.MicroAlgos, prevTotals.RewardUnits(), logging.Base()) if eval.block.RewardsState != expectedRewardsState { return nil, fmt.Errorf("bad rewards state: %+v != %+v", eval.block.RewardsState, expectedRewardsState) } diff --git a/ledger/ledger.go b/ledger/ledger.go index 02d5516afc..cf5476e1be 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -578,6 +578,11 @@ func (l *Ledger) AddBlock(blk bookkeeping.Block, cert agreement.Certificate) err updates, err := internal.Eval(context.Background(), l, blk, false, l.verifiedTxnCache, nil) if err != nil { + if errNSBE, ok := err.(ledgercore.ErrNonSequentialBlockEval); ok && errNSBE.EvaluatorRound <= errNSBE.LatestRound { + return ledgercore.BlockInLedgerError{ + LastRound: errNSBE.EvaluatorRound, + NextRound: errNSBE.LatestRound + 1} + } return err } vb := ledgercore.MakeValidatedBlock(blk, updates) @@ -602,7 +607,7 @@ func (l *Ledger) AddValidatedBlock(vb ledgercore.ValidatedBlock, cert agreement. } l.headerCache.Put(blk.Round(), blk.BlockHeader) l.trackers.newBlock(blk, vb.Delta()) - l.log.Debugf("added blk %d", blk.Round()) + l.log.Debugf("ledger.AddValidatedBlock: added blk %d", blk.Round()) return nil } diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index b289657552..c889997613 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -120,7 +120,7 @@ func makeNewEmptyBlock(t *testing.T, l *Ledger, GenesisID string, initAccounts m Round: l.Latest() + 1, Branch: lastBlock.Hash(), TimeStamp: 0, - RewardsState: lastBlock.NextRewardsState(l.Latest()+1, proto, poolBal.MicroAlgos, totalRewardUnits), + RewardsState: lastBlock.NextRewardsState(l.Latest()+1, proto, poolBal.MicroAlgos, totalRewardUnits, logging.Base()), UpgradeState: lastBlock.UpgradeState, // Seed: does not matter, // UpgradeVote: empty, @@ -219,7 +219,7 @@ func TestLedgerBlockHeaders(t *testing.T) { Round: l.Latest() + 1, Branch: lastBlock.Hash(), TimeStamp: 0, - RewardsState: lastBlock.NextRewardsState(l.Latest()+1, proto, poolBal.MicroAlgos, totalRewardUnits), + RewardsState: lastBlock.NextRewardsState(l.Latest()+1, proto, poolBal.MicroAlgos, totalRewardUnits, logging.Base()), UpgradeState: lastBlock.UpgradeState, // Seed: does not matter, // UpgradeVote: empty, @@ -1216,7 +1216,7 @@ func testLedgerSingleTxApplyData(t *testing.T, version protocol.ConsensusVersion Round: l.Latest() + 1, Branch: lastBlock.Hash(), TimeStamp: 0, - RewardsState: lastBlock.NextRewardsState(l.Latest()+1, proto, poolBal.MicroAlgos, totalRewardUnits), + RewardsState: lastBlock.NextRewardsState(l.Latest()+1, proto, poolBal.MicroAlgos, totalRewardUnits, logging.Base()), UpgradeState: lastBlock.UpgradeState, // Seed: does not matter, // UpgradeVote: empty, diff --git a/ledger/tracker.go b/ledger/tracker.go index b5d608c2b8..57b1e52c4c 100644 --- a/ledger/tracker.go +++ b/ledger/tracker.go @@ -91,6 +91,9 @@ type ledgerTracker interface { // effort, and all the trackers contribute to that effort. All the trackers are being handed a // pointer to the deferredCommitRange, and have the ability to either modify it, or return a // nil. If nil is returned, the commit would be skipped. + // The contract: + // offset must not be greater than the received dcr.offset value of non zero + // oldBase must not be modifed if non zero produceCommittingTask(committedRound basics.Round, dbRound basics.Round, dcr *deferredCommitRange) *deferredCommitRange // prepareCommit, commitRound and postCommit are called when it is time to commit tracker's data. @@ -321,10 +324,18 @@ func (tr *trackerRegistry) scheduleCommit(blockqRound, maxLookback basics.Round) } cdr := &dcc.deferredCommitRange for _, lt := range tr.trackers { + base := cdr.oldBase + offset := cdr.offset cdr = lt.produceCommittingTask(blockqRound, dbRound, cdr) if cdr == nil { break } + if offset > 0 && cdr.offset > offset { + tr.log.Warnf("tracker %T produced offset %d but expected not greater than %d, dbRound %d, latestRound %d", lt, cdr.offset, offset, dbRound, blockqRound) + } + if base > 0 && base != cdr.oldBase { + tr.log.Warnf("tracker %T modified oldBase %d that expected to be %d, dbRound %d, latestRound %d", lt, cdr.oldBase, base, dbRound, blockqRound) + } } if cdr != nil { dcc.deferredCommitRange = *cdr diff --git a/ledger/tracker_test.go b/ledger/tracker_test.go new file mode 100644 index 0000000000..731772a241 --- /dev/null +++ b/ledger/tracker_test.go @@ -0,0 +1,125 @@ +// Copyright (C) 2019-2022 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package ledger + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/algorand/go-algorand/config" + "github.com/algorand/go-algorand/data/basics" + "github.com/algorand/go-algorand/ledger/ledgercore" + ledgertesting "github.com/algorand/go-algorand/ledger/testing" + "github.com/algorand/go-algorand/logging" + "github.com/algorand/go-algorand/protocol" + "github.com/algorand/go-algorand/test/partitiontest" +) + +// TestTrackerScheduleCommit checks catchpointTracker.produceCommittingTask does not increase commit offset relative +// to the value set by accountUpdates +func TestTrackerScheduleCommit(t *testing.T) { + partitiontest.PartitionTest(t) + + a := require.New(t) + + var bufNewLogger bytes.Buffer + log := logging.NewLogger() + log.SetOutput(&bufNewLogger) + + accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(1, true)} + ml := makeMockLedgerForTrackerWithLogger(t, true, 10, protocol.ConsensusCurrentVersion, accts, log) + defer ml.Close() + + conf := config.GetDefaultLocal() + conf.CatchpointTracking = 1 + conf.CatchpointInterval = 10 + + au := &accountUpdates{} + ct := &catchpointTracker{} + au.initialize(conf) + ct.initialize(conf, ".") + + _, err := trackerDBInitialize(ml, false, ".") + a.NoError(err) + + ml.trackers.initialize(ml, []ledgerTracker{au, ct}, conf) + defer ml.trackers.close() + err = ml.trackers.loadFromDisk(ml) + a.NoError(err) + // close commitSyncer goroutine + ml.trackers.ctxCancel() + ml.trackers.ctxCancel = nil + <-ml.trackers.commitSyncerClosed + ml.trackers.commitSyncerClosed = nil + + // simulate situation when au returns smaller offset b/c of consecutive versions + // and ct increses it + // base = 1, offset = 100, lookback = 16 + // lastest = 1000 + // would give a large mostRecentCatchpointRound value => large newBase => larger offset + + expectedOffset := uint64(100) + blockqRound := basics.Round(1000) + lookback := basics.Round(16) + dbRound := basics.Round(1) + + // prepare deltas and versions + au.accountsMu.Lock() + au.deltas = make([]ledgercore.AccountDeltas, int(blockqRound)) + au.deltasAccum = make([]int, int(blockqRound)) + au.versions = make([]protocol.ConsensusVersion, int(blockqRound)) + for i := 0; i <= int(expectedOffset); i++ { + au.versions[i] = protocol.ConsensusCurrentVersion + } + for i := int(expectedOffset) + 1; i < len(au.versions); i++ { + au.versions[i] = protocol.ConsensusFuture + } + au.accountsMu.Unlock() + + // ensure au and ct produce data we expect + dcc := &deferredCommitContext{ + deferredCommitRange: deferredCommitRange{ + lookback: lookback, + }, + } + cdr := &dcc.deferredCommitRange + + cdr = au.produceCommittingTask(blockqRound, dbRound, cdr) + a.NotNil(cdr) + a.Equal(expectedOffset, cdr.offset) + + cdr = ct.produceCommittingTask(blockqRound, dbRound, cdr) + a.NotNil(cdr) + // before the fix + // expectedOffset = uint64(blockqRound - lookback - dbRound) // 983 + a.Equal(expectedOffset, cdr.offset) + + // schedule the commit. au is expected to return offset 100 and + ml.trackers.mu.Lock() + ml.trackers.dbRound = dbRound + ml.trackers.mu.Unlock() + ml.trackers.scheduleCommit(blockqRound, lookback) + + a.Equal(1, len(ml.trackers.deferredCommits)) + // before the fix + // a.Contains(bufNewLogger.String(), "tracker *ledger.catchpointTracker produced offset 983") + a.NotContains(bufNewLogger.String(), "tracker *ledger.catchpointTracker produced offset") + dc := <-ml.trackers.deferredCommits + a.Equal(expectedOffset, dc.offset) +} diff --git a/netdeploy/network.go b/netdeploy/network.go index 4266d38ab6..007d909859 100644 --- a/netdeploy/network.go +++ b/netdeploy/network.go @@ -265,8 +265,8 @@ func (n Network) Start(binDir string, redirectOutput bool) error { var relayAddress string var err error for _, relayDir := range n.cfg.RelayDirs { - nodeFulllPath := n.getNodeFullPath(relayDir) - nc := nodecontrol.MakeNodeController(binDir, nodeFulllPath) + nodeFullPath := n.getNodeFullPath(relayDir) + nc := nodecontrol.MakeNodeController(binDir, nodeFullPath) args := nodecontrol.AlgodStartArgs{ RedirectOutput: redirectOutput, ExitErrorCallback: n.nodeExitCallback, @@ -457,16 +457,16 @@ func (n Network) Delete(binDir string) error { // any of the nodes starts func (n Network) SetConsensus(binDir string, consensus config.ConsensusProtocols) error { for _, relayDir := range n.cfg.RelayDirs { - relayFulllPath := n.getNodeFullPath(relayDir) - nc := nodecontrol.MakeNodeController(binDir, relayFulllPath) + relayFullPath := n.getNodeFullPath(relayDir) + nc := nodecontrol.MakeNodeController(binDir, relayFullPath) err := nc.SetConsensus(consensus) if err != nil { return err } } for _, nodeDir := range n.nodeDirs { - nodeFulllPath := n.getNodeFullPath(nodeDir) - nc := nodecontrol.MakeNodeController(binDir, nodeFulllPath) + nodeFullPath := n.getNodeFullPath(nodeDir) + nc := nodecontrol.MakeNodeController(binDir, nodeFullPath) err := nc.SetConsensus(consensus) if err != nil { return err diff --git a/test/testdata/deployednettemplates/recipes/feature-networks/genesis.json b/test/testdata/deployednettemplates/recipes/feature-networks/genesis.json new file mode 100644 index 0000000000..943e9f5150 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/feature-networks/genesis.json @@ -0,0 +1,29 @@ +{ + "NetworkName": "", + "VersionModifier": "", + "ConsensusProtocol": "future", + "FirstPartKeyRound": 0, + "LastPartKeyRound": 100000000, + "Wallets": [ + { + "Name": "Wallet1-R1", + "Stake": 25, + "Online": true + }, + { + "Name": "Wallet2-R2", + "Stake": 25, + "Online": true + }, + { + "Name": "Wallet3-Dispenser", + "Stake": 40, + "Online": false + }, + { + "Name": "Wallet4-NPN1", + "Stake": 10, + "Online": false + } + ] +} diff --git a/test/testdata/deployednettemplates/recipes/feature-networks/hosttemplates.json b/test/testdata/deployednettemplates/recipes/feature-networks/hosttemplates.json new file mode 100644 index 0000000000..c8c8642a24 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/feature-networks/hosttemplates.json @@ -0,0 +1,10 @@ +{ + "Hosts": [ + { + "Name": "AWS-US-EAST-2-Large", + "Provider": "AWS", + "Region": "us-east-2", + "BaseConfiguration": "m6i.large" + } + ] +} diff --git a/test/testdata/deployednettemplates/recipes/feature-networks/net.json b/test/testdata/deployednettemplates/recipes/feature-networks/net.json new file mode 100644 index 0000000000..cbab021641 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/feature-networks/net.json @@ -0,0 +1,83 @@ +{ + "Hosts": [ + { + "Name": "R1", + "Nodes": [ + { + "Name": "relay1", + "IsRelay": true, + "Wallets": [ + { + "Name": "Wallet1-R1", + "ParticipationOnly": false + }, + { + "Name": "Wallet3-Dispenser", + "ParticipationOnly": false + } + ], + "NetAddress": "{{NetworkPort}}", + "APIEndpoint": "{{APIEndpoint}}", + "APIToken": "{{APIToken}}", + "EnableTelemetry": true, + "TelemetryURI": "telemetry.feature-networks.algodev.network", + "EnableMetrics": true, + "MetricsURI": "{{MetricsURI}}", + "EnableService": false, + "EnableBlockStats": true, + "ConfigJSONOverride": "{ \"DNSBootstrapID\": \".algodev.network\", \"DeadlockDetection\": -1, \"PeerPingPeriodSeconds\": 30, \"EnableAgreementReporting\": true, \"EnableAgreementTimeMetrics\": true, \"EnableAssembleStats\": true, \"EnableProcessBlockStats\": true }" + } + ] + }, + { + "Name": "R2", + "Nodes": [ + { + "Name": "relay2", + "IsRelay": true, + "Wallets": [ + { + "Name": "Wallet2-R2", + "ParticipationOnly": false + } + ], + "NetAddress": "{{NetworkPort}}", + "APIEndpoint": "{{APIEndpoint}}", + "APIToken": "{{APIToken}}", + "EnableTelemetry": true, + "TelemetryURI": "telemetry.feature-networks.algodev.network", + "EnableMetrics": true, + "MetricsURI": "{{MetricsURI}}", + "EnableService": false, + "EnableBlockStats": true, + "ConfigJSONOverride": "{ \"DNSBootstrapID\": \".algodev.network\",\"DeadlockDetection\": -1, \"PeerPingPeriodSeconds\": 30, \"EnableAgreementReporting\": true, \"EnableAgreementTimeMetrics\": true, \"EnableAssembleStats\": true, \"EnableProcessBlockStats\": true }" + } + ] + }, + { + "Name": "NPN1", + "Nodes": [ + { + "Name": "node1", + "IsRelay": false, + "Wallets": [ + { + "Name": "Wallet4-NPN1", + "ParticipationOnly": false + } + ], + "NetAddress": "{{NetworkPort}}", + "APIEndpoint": "{{APIEndpoint}}", + "APIToken": "{{APIToken}}", + "EnableTelemetry": true, + "TelemetryURI": "telemetry.feature-networks.algodev.network", + "EnableMetrics": true, + "MetricsURI": "{{MetricsURI}}", + "EnableService": false, + "EnableBlockStats": true, + "ConfigJSONOverride": "{ \"DNSBootstrapID\": \".algodev.network\",\"DeadlockDetection\": -1, \"PeerPingPeriodSeconds\": 30, \"EnableAgreementReporting\": true, \"EnableAgreementTimeMetrics\": true, \"EnableAssembleStats\": true, \"EnableProcessBlockStats\": true }" + } + ] + } + ] +} diff --git a/test/testdata/deployednettemplates/recipes/feature-networks/recipe.json b/test/testdata/deployednettemplates/recipes/feature-networks/recipe.json new file mode 100644 index 0000000000..587e513f26 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/feature-networks/recipe.json @@ -0,0 +1,7 @@ +{ + "GenesisFile":"genesis.json", + "NetworkFile":"net.json", + "ConfigFile": "../../configs/reference.json", + "HostTemplatesFile": "hosttemplates.json", + "TopologyFile": "topology.json" +} diff --git a/test/testdata/deployednettemplates/recipes/feature-networks/topology.json b/test/testdata/deployednettemplates/recipes/feature-networks/topology.json new file mode 100644 index 0000000000..360c9d0d49 --- /dev/null +++ b/test/testdata/deployednettemplates/recipes/feature-networks/topology.json @@ -0,0 +1,16 @@ +{ + "Hosts": [ + { + "Name": "R1", + "Template": "AWS-US-EAST-2-Large" + }, + { + "Name": "R2", + "Template": "AWS-US-EAST-2-Large" + }, + { + "Name": "NPN1", + "Template": "AWS-US-EAST-2-Large" + } + ] +}