diff --git a/agreement/abstractions.go b/agreement/abstractions.go index 8b1b3c875a..c36c4903e7 100644 --- a/agreement/abstractions.go +++ b/agreement/abstractions.go @@ -26,7 +26,6 @@ import ( "github.com/algorand/go-algorand/data/basics" "github.com/algorand/go-algorand/data/bookkeeping" "github.com/algorand/go-algorand/data/committee" - "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/protocol" ) @@ -87,7 +86,7 @@ type BlockFactory interface { // lose liveness. AssembleBlock(basics.Round) (ValidatedBlock, error) - OnNewSpeculativeBlock(block bookkeeping.Block, delta ledgercore.StateDelta) + OnNewSpeculativeBlock(block bookkeeping.Block) } // A Ledger represents the sequence of Entries agreed upon by the protocol. diff --git a/agreement/actions.go b/agreement/actions.go index cd6170815c..6b33234ce3 100644 --- a/agreement/actions.go +++ b/agreement/actions.go @@ -312,10 +312,11 @@ type pseudonodeAction struct { // assemble, repropose, attest, speculativeAssembly T actionType - Round round - Period period - Step step - Proposal proposalValue + Round round + Period period + Step step + Proposal proposalValue + ValidatedBlock ValidatedBlock } func (a pseudonodeAction) t() actionType { @@ -347,13 +348,7 @@ func (a pseudonodeAction) do(ctx context.Context, s *Service) { s.log.Errorf("pseudonode.MakeProposals call failed %v", err) } case speculativeAssembly: - events, err := s.loopback.StartSpeculativeBlockAssembly(ctx, a.Round, a.Period) - switch err { - case nil: - s.demux.prioritize(events) - default: - s.log.Errorf("pseudonode.StartSpeculativeBlockAssembly call failed %v", err) - } + s.loopback.StartSpeculativeBlockAssembly(ctx, a.ValidatedBlock) case repropose: logEvent := logspec.AgreementEvent{ diff --git a/agreement/agreementtest/simulate_test.go b/agreement/agreementtest/simulate_test.go index 173f91fad8..d742d89a0e 100644 --- a/agreement/agreementtest/simulate_test.go +++ b/agreement/agreementtest/simulate_test.go @@ -96,6 +96,8 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (agreement.ValidatedBloc return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil } +func (f testBlockFactory) OnNewSpeculativeBlock(blk bookkeeping.Block) {} + // If we try to read from high rounds, we panic and do not emit an error to find bugs during testing. type testLedger struct { mu deadlock.Mutex diff --git a/agreement/common_test.go b/agreement/common_test.go index fa43d10912..4998d386a7 100644 --- a/agreement/common_test.go +++ b/agreement/common_test.go @@ -183,6 +183,10 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (ValidatedBlock, error) return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil } +func (f testBlockFactory) OnNewSpeculativeBlock(block bookkeeping.Block) { + return +} + // If we try to read from high rounds, we panic and do not emit an error to find bugs during testing. type testLedger struct { mu deadlock.Mutex diff --git a/agreement/demux_test.go b/agreement/demux_test.go index 7d7bbc71bd..6cfdfe114d 100644 --- a/agreement/demux_test.go +++ b/agreement/demux_test.go @@ -37,6 +37,7 @@ import ( ) const fastTimeoutChTime = 2 +const speculativeAsmTime = 1 type demuxTester struct { *testing.T @@ -675,7 +676,7 @@ func (t *demuxTester) TestUsecase(testcase demuxTestUsecase) bool { close(s.quit) } - e, ok := dmx.next(s, time.Second, fastTimeoutChTime, 300) + e, ok := dmx.next(s, time.Second, fastTimeoutChTime, speculativeAsmTime, 300) if !assert.Equal(t, testcase.ok, ok) { return false diff --git a/agreement/events.go b/agreement/events.go index 3591687516..7e20447cfc 100644 --- a/agreement/events.go +++ b/agreement/events.go @@ -199,6 +199,14 @@ const ( // readPinned is sent to the proposalStore to read the pinned value, if it exists. readPinned + // readLowestValue is sent to the proposalPeriodMachine to read the + // proposal-vote with the lowest credential. + readLowestValue + + // readLowestPayload is sent to the proposalStore to read the payload + // corresponding to the lowest-credential proposal-vote, if it exists. + readLowestPayload + /* * The following are event types that replace queries, and may warrant * a revision to make them more state-machine-esque. @@ -405,6 +413,38 @@ func (e newRoundEvent) ComparableStr() string { return e.String() } +type readLowestEvent struct { + // T is either readLowestValue or readLowestPayload + T eventType + + // Round and Period are the round and period for which to query + // the lowest-credential value and payload. This type of event + // is only sent for pipelining, which only makes sense for period + // 0, but the Period is here anyway to route to the appropriate + // proposalMachinePeriod. + Round round + Period period + + // Proposal holds the lowest-credential value. + Proposal proposalValue + // Payload holds the payload, if one exists (which is the case if PayloadOK is set). + Payload proposal + // PayloadOK is set if and only if a payload was received for the lowest-credential value. + PayloadOK bool +} + +func (e readLowestEvent) t() eventType { + return e.T +} + +func (e readLowestEvent) String() string { + return fmt.Sprintf("%v: %.5v", e.t().String(), e.Proposal.BlockDigest.String()) +} + +func (e readLowestEvent) ComparableStr() string { + return e.String() +} + type newPeriodEvent struct { // Period holds the latest period relevant to the proposalRoundMachine. Period period @@ -741,7 +781,7 @@ func zeroEvent(t eventType) event { return messageEvent{} case roundInterruption: return roundInterruptionEvent{} - case timeout, fastTimeout: + case timeout, fastTimeout, speculationTimeout: return timeoutEvent{} case newRound: return newRoundEvent{} diff --git a/agreement/eventtype_string.go b/agreement/eventtype_string.go index 6944ce5862..ba8cd99960 100644 --- a/agreement/eventtype_string.go +++ b/agreement/eventtype_string.go @@ -38,21 +38,23 @@ func _() { _ = x[newPeriod-27] _ = x[readStaging-28] _ = x[readPinned-29] - _ = x[voteFilterRequest-30] - _ = x[voteFilteredStep-31] - _ = x[nextThresholdStatusRequest-32] - _ = x[nextThresholdStatus-33] - _ = x[freshestBundleRequest-34] - _ = x[freshestBundle-35] - _ = x[dumpVotesRequest-36] - _ = x[dumpVotes-37] - _ = x[wrappedAction-38] - _ = x[checkpointReached-39] + _ = x[readLowestValue-30] + _ = x[readLowestPayload-31] + _ = x[voteFilterRequest-32] + _ = x[voteFilteredStep-33] + _ = x[nextThresholdStatusRequest-34] + _ = x[nextThresholdStatus-35] + _ = x[freshestBundleRequest-36] + _ = x[freshestBundle-37] + _ = x[dumpVotesRequest-38] + _ = x[dumpVotes-39] + _ = x[wrappedAction-40] + _ = x[checkpointReached-41] } -const _eventType_name = "nonevotePresentpayloadPresentbundlePresentvoteVerifiedpayloadVerifiedbundleVerifiedroundInterruptiontimeoutfastTimeoutspeculationTimeoutsoftThresholdcertThresholdnextThresholdproposalCommittableproposalAcceptedvoteFilteredvoteMalformedbundleFilteredbundleMalformedpayloadRejectedpayloadMalformedpayloadPipelinedpayloadAcceptedproposalFrozenvoteAcceptednewRoundnewPeriodreadStagingreadPinnedvoteFilterRequestvoteFilteredStepnextThresholdStatusRequestnextThresholdStatusfreshestBundleRequestfreshestBundledumpVotesRequestdumpVoteswrappedActioncheckpointReached" +const _eventType_name = "nonevotePresentpayloadPresentbundlePresentvoteVerifiedpayloadVerifiedbundleVerifiedroundInterruptiontimeoutfastTimeoutspeculationTimeoutsoftThresholdcertThresholdnextThresholdproposalCommittableproposalAcceptedvoteFilteredvoteMalformedbundleFilteredbundleMalformedpayloadRejectedpayloadMalformedpayloadPipelinedpayloadAcceptedproposalFrozenvoteAcceptednewRoundnewPeriodreadStagingreadPinnedreadLowestValuereadLowestPayloadvoteFilterRequestvoteFilteredStepnextThresholdStatusRequestnextThresholdStatusfreshestBundleRequestfreshestBundledumpVotesRequestdumpVoteswrappedActioncheckpointReached" -var _eventType_index = [...]uint16{0, 4, 15, 29, 42, 54, 69, 83, 100, 107, 118, 136, 149, 162, 175, 194, 210, 222, 235, 249, 264, 279, 295, 311, 326, 340, 352, 360, 369, 380, 390, 407, 423, 449, 468, 489, 503, 519, 528, 541, 558} +var _eventType_index = [...]uint16{0, 4, 15, 29, 42, 54, 69, 83, 100, 107, 118, 136, 149, 162, 175, 194, 210, 222, 235, 249, 264, 279, 295, 311, 326, 340, 352, 360, 369, 380, 390, 405, 422, 439, 455, 481, 500, 521, 535, 551, 560, 573, 590} func (i eventType) String() string { if i < 0 || i >= eventType(len(_eventType_index)-1) { diff --git a/agreement/fuzzer/ledger_test.go b/agreement/fuzzer/ledger_test.go index 2ae73dd694..ab3bb4c5b4 100644 --- a/agreement/fuzzer/ledger_test.go +++ b/agreement/fuzzer/ledger_test.go @@ -112,6 +112,8 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (agreement.ValidatedBloc return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil } +func (f testBlockFactory) OnNewSpeculativeBlock(bookkeeping.Block) {} + type testLedgerSyncFunc func(l *testLedger, r basics.Round, c agreement.Certificate) bool // If we try to read from high rounds, we panic and do not emit an error to find bugs during testing. diff --git a/agreement/player.go b/agreement/player.go index ef4e127733..cee7ff0a15 100644 --- a/agreement/player.go +++ b/agreement/player.go @@ -126,12 +126,21 @@ func (p *player) handle(r routerHandle, e event) []action { } func (p *player) handleSpeculationTimeout(r routerHandle, e timeoutEvent) []action { - // start speculative block assembly if e.Proto.Err != nil { r.t.log.Errorf("failed to read protocol version for speculationTimeout event (proto %v): %v", e.Proto.Version, e.Proto.Err) return nil } - return p.startSpeculativeBlockAsm(r) + + // get the best proposal we have + re := readLowestEvent{T: readLowestPayload, Round: p.Round} + re = r.dispatch(*p, re, proposalMachineRound, p.Round, 0, 0).(readLowestEvent) + + // if we have its payload and its been validated already, start speculating + // on top of it + if re.PayloadOK && re.Payload.ve != nil { + return p.startSpeculativeBlockAsm(r, re.Payload.ve) + } + return nil } func (p *player) handleFastTimeout(r routerHandle, e timeoutEvent) []action { @@ -228,8 +237,9 @@ func (p *player) issueNextVote(r routerHandle) []action { return actions } -func (p *player) startSpeculativeBlockAsm(r routerHandle) (actions []action) { - return append(actions, pseudonodeAction{T: speculativeAssembly, Round: p.Round, Period: p.Period}) +func (p *player) startSpeculativeBlockAsm(r routerHandle, ve ValidatedBlock) (actions []action) { + // TODO(yossi) extend the action to carry the speculative block payload + return append(actions, pseudonodeAction{T: speculativeAssembly, Round: p.Round, Period: p.Period, ValidatedBlock: ve}) } func (p *player) issueFastVote(r routerHandle) (actions []action) { diff --git a/agreement/proposal.go b/agreement/proposal.go index 232fa6f5f4..75918c11a9 100644 --- a/agreement/proposal.go +++ b/agreement/proposal.go @@ -79,7 +79,7 @@ func (p unauthenticatedProposal) value() proposalValue { } } -// A proposal is an Block along with everything needed to validate it. +// A proposal is a Block along with everything needed to validate it. type proposal struct { unauthenticatedProposal diff --git a/agreement/proposalStore.go b/agreement/proposalStore.go index 973f909d03..ccfadc33c8 100644 --- a/agreement/proposalStore.go +++ b/agreement/proposalStore.go @@ -337,6 +337,15 @@ func (store *proposalStore) handle(r routerHandle, p player, e event) event { se.Committable = ea.Assembled se.Payload = ea.Payload return se + case readLowestPayload: + re := e.(readLowestEvent) + re.T = readLowestValue + re = r.dispatch(p, re, proposalMachinePeriod, re.Round, re.Period, 0).(readLowestEvent) + re.T = readLowestPayload + ea := store.Assemblers[re.Proposal] + re.PayloadOK = ea.Assembled + re.Payload = ea.Payload + return re case readPinned: se := e.(pinnedValueEvent) ea := store.Assemblers[store.Pinned] // If pinned is bottom, assembled/payloadOK = false, payload = bottom diff --git a/agreement/proposalTracker.go b/agreement/proposalTracker.go index c76c5c9fda..04a5dd5fca 100644 --- a/agreement/proposalTracker.go +++ b/agreement/proposalTracker.go @@ -163,6 +163,11 @@ func (t *proposalTracker) handle(r routerHandle, p player, e event) event { t.Freezer = t.Freezer.freeze() return e + case readLowestValue: + e := e.(readLowestEvent) + e.Proposal = t.Freezer.Lowest.R.Proposal + return e + case softThreshold, certThreshold: e := e.(thresholdEvent) t.Staging = e.Proposal diff --git a/agreement/pseudonode.go b/agreement/pseudonode.go index 3862d859db..deca7fd062 100644 --- a/agreement/pseudonode.go +++ b/agreement/pseudonode.go @@ -60,7 +60,8 @@ type pseudonode interface { // It returns an error if the pseudonode is unable to perform this. MakeProposals(ctx context.Context, r round, p period) (<-chan externalEvent, error) - StartSpeculativeBlockAssembly(ctx context.Context, r round, p period) (<-chan externalEvent, error) + // TODO(yossi) pass in context all the way to the pool, so we can cancel spec asm if needed + StartSpeculativeBlockAssembly(ctx context.Context, ve ValidatedBlock) // MakeVotes returns a vote for a given proposal in some round, period, and step. // @@ -187,10 +188,9 @@ func (n asyncPseudonode) MakeProposals(ctx context.Context, r round, p period) ( } } -func (n asyncPseudonode) StartSpeculativeBlockAssembly(ctx context.Context, r round, p period) (<-chan externalEvent, error) { - return nil, nil - // n.validator. - // go n.factory.OnNewSpeculativeBlock(block, delta) +func (n asyncPseudonode) StartSpeculativeBlockAssembly(ctx context.Context, ve ValidatedBlock) { + // TODO(yossi) change to use ve directly, to avoid recomputing statedeltas + go n.factory.OnNewSpeculativeBlock(ve.Block()) } func (n asyncPseudonode) MakeVotes(ctx context.Context, r round, p period, s step, prop proposalValue, persistStateDone chan error) (chan externalEvent, error) { diff --git a/ledger/speculative.go b/ledger/speculative.go index d7b0ec40e3..c15d4671ab 100644 --- a/ledger/speculative.go +++ b/ledger/speculative.go @@ -86,30 +86,36 @@ type validatedBlockAsLFE struct { vb *ledgercore.ValidatedBlock } -// makedBlockAsLFE constructs a new BlockAsLFE from a Block. -func MakeBlockAsLFE(blk bookkeeping.Block, l LedgerForEvaluator) (*validatedBlockAsLFE, ledgercore.StateDelta, error) { +// makeValidatedBlockAsLFE constructs a new validatedBlockAsLFE from a ValidatedBlock. +func MakeValidatedBlockAsLFE(vb *ledgercore.ValidatedBlock, l LedgerForEvaluator) (*validatedBlockAsLFE, error) { latestRound := l.Latest() - if blk.Round().SubSaturate(1) != latestRound { - return nil, ledgercore.StateDelta{}, fmt.Errorf("MakeBlockAsLFE: Ledger round %d mismatches next block round %d", latestRound, blk.Round()) + if vb.Block().Round().SubSaturate(1) != latestRound { + return nil, fmt.Errorf("MakeBlockAsLFE: Ledger round %d mismatches next block round %d", latestRound, vb.Block().Round()) } hdr, err := l.BlockHdr(latestRound) if err != nil { - return nil, ledgercore.StateDelta{}, err + return nil, err } - if blk.Branch != hdr.Hash() { - return nil, ledgercore.StateDelta{}, fmt.Errorf("MakeBlockAsLFE: Ledger latest block hash %x mismatches block's prev hash %x", hdr.Hash(), blk.Branch) + if vb.Block().Branch != hdr.Hash() { + return nil, fmt.Errorf("MakeBlockAsLFE: Ledger latest block hash %x mismatches block's prev hash %x", hdr.Hash(), vb.Block().Branch) } + return &validatedBlockAsLFE{ + l: l, + vb: vb, + }, nil +} + +// makeBlockAsLFE constructs a new validatedBlockAsLFE from a Block. +func MakeBlockAsLFE(blk bookkeeping.Block, l LedgerForEvaluator) (*validatedBlockAsLFE, ledgercore.StateDelta, error) { state, err := internal.Eval(context.Background(), l, blk, false, l.VerifiedTransactionCache(), nil) if err != nil { return nil, ledgercore.StateDelta{}, fmt.Errorf("error computing deltas for block %d round %d: %v", blk.Hash(), blk.Round(), err) } vb := ledgercore.MakeValidatedBlock(blk, state) - return &validatedBlockAsLFE{ - l: l, - vb: &vb, - }, state, nil + lfe, err := MakeValidatedBlockAsLFE(&vb, l) + return lfe, vb.Delta(), nil } // Block implements the ledgerForEvaluator interface.