Skip to content

Commit

Permalink
checkpoint: adding readlowest to agreement
Browse files Browse the repository at this point in the history
  • Loading branch information
yossigi committed Aug 2, 2022
1 parent 3687960 commit 63c03b9
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 48 deletions.
3 changes: 1 addition & 2 deletions agreement/abstractions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/data/committee"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/protocol"
)

Expand Down Expand Up @@ -87,7 +86,7 @@ type BlockFactory interface {
// lose liveness.
AssembleBlock(basics.Round) (ValidatedBlock, error)

OnNewSpeculativeBlock(block bookkeeping.Block, delta ledgercore.StateDelta)
OnNewSpeculativeBlock(block bookkeeping.Block)
}

// A Ledger represents the sequence of Entries agreed upon by the protocol.
Expand Down
17 changes: 6 additions & 11 deletions agreement/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,11 @@ type pseudonodeAction struct {
// assemble, repropose, attest, speculativeAssembly
T actionType

Round round
Period period
Step step
Proposal proposalValue
Round round
Period period
Step step
Proposal proposalValue
ValidatedBlock ValidatedBlock
}

func (a pseudonodeAction) t() actionType {
Expand Down Expand Up @@ -347,13 +348,7 @@ func (a pseudonodeAction) do(ctx context.Context, s *Service) {
s.log.Errorf("pseudonode.MakeProposals call failed %v", err)
}
case speculativeAssembly:
events, err := s.loopback.StartSpeculativeBlockAssembly(ctx, a.Round, a.Period)
switch err {
case nil:
s.demux.prioritize(events)
default:
s.log.Errorf("pseudonode.StartSpeculativeBlockAssembly call failed %v", err)
}
s.loopback.StartSpeculativeBlockAssembly(ctx, a.ValidatedBlock)

case repropose:
logEvent := logspec.AgreementEvent{
Expand Down
2 changes: 2 additions & 0 deletions agreement/agreementtest/simulate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (agreement.ValidatedBloc
return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil
}

func (f testBlockFactory) OnNewSpeculativeBlock(blk bookkeeping.Block) {}

// If we try to read from high rounds, we panic and do not emit an error to find bugs during testing.
type testLedger struct {
mu deadlock.Mutex
Expand Down
4 changes: 4 additions & 0 deletions agreement/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,10 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (ValidatedBlock, error)
return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil
}

func (f testBlockFactory) OnNewSpeculativeBlock(block bookkeeping.Block) {
return
}

// If we try to read from high rounds, we panic and do not emit an error to find bugs during testing.
type testLedger struct {
mu deadlock.Mutex
Expand Down
3 changes: 2 additions & 1 deletion agreement/demux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
)

const fastTimeoutChTime = 2
const speculativeAsmTime = 1

type demuxTester struct {
*testing.T
Expand Down Expand Up @@ -675,7 +676,7 @@ func (t *demuxTester) TestUsecase(testcase demuxTestUsecase) bool {
close(s.quit)
}

e, ok := dmx.next(s, time.Second, fastTimeoutChTime, 300)
e, ok := dmx.next(s, time.Second, fastTimeoutChTime, speculativeAsmTime, 300)

if !assert.Equal(t, testcase.ok, ok) {
return false
Expand Down
42 changes: 41 additions & 1 deletion agreement/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ const (
// readPinned is sent to the proposalStore to read the pinned value, if it exists.
readPinned

// readLowestValue is sent to the proposalPeriodMachine to read the
// proposal-vote with the lowest credential.
readLowestValue

// readLowestPayload is sent to the proposalStore to read the payload
// corresponding to the lowest-credential proposal-vote, if it exists.
readLowestPayload

/*
* The following are event types that replace queries, and may warrant
* a revision to make them more state-machine-esque.
Expand Down Expand Up @@ -405,6 +413,38 @@ func (e newRoundEvent) ComparableStr() string {
return e.String()
}

type readLowestEvent struct {
// T is either readLowestValue or readLowestPayload
T eventType

// Round and Period are the round and period for which to query
// the lowest-credential value and payload. This type of event
// is only sent for pipelining, which only makes sense for period
// 0, but the Period is here anyway to route to the appropriate
// proposalMachinePeriod.
Round round
Period period

// Proposal holds the lowest-credential value.
Proposal proposalValue
// Payload holds the payload, if one exists (which is the case if PayloadOK is set).
Payload proposal
// PayloadOK is set if and only if a payload was received for the lowest-credential value.
PayloadOK bool
}

func (e readLowestEvent) t() eventType {
return e.T
}

func (e readLowestEvent) String() string {
return fmt.Sprintf("%v: %.5v", e.t().String(), e.Proposal.BlockDigest.String())
}

func (e readLowestEvent) ComparableStr() string {
return e.String()
}

type newPeriodEvent struct {
// Period holds the latest period relevant to the proposalRoundMachine.
Period period
Expand Down Expand Up @@ -741,7 +781,7 @@ func zeroEvent(t eventType) event {
return messageEvent{}
case roundInterruption:
return roundInterruptionEvent{}
case timeout, fastTimeout:
case timeout, fastTimeout, speculationTimeout:
return timeoutEvent{}
case newRound:
return newRoundEvent{}
Expand Down
26 changes: 14 additions & 12 deletions agreement/eventtype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions agreement/fuzzer/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ func (f testBlockFactory) AssembleBlock(r basics.Round) (agreement.ValidatedBloc
return testValidatedBlock{Inside: bookkeeping.Block{BlockHeader: bookkeeping.BlockHeader{Round: r}}}, nil
}

func (f testBlockFactory) OnNewSpeculativeBlock(bookkeeping.Block) {}

type testLedgerSyncFunc func(l *testLedger, r basics.Round, c agreement.Certificate) bool

// If we try to read from high rounds, we panic and do not emit an error to find bugs during testing.
Expand Down
18 changes: 14 additions & 4 deletions agreement/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,21 @@ func (p *player) handle(r routerHandle, e event) []action {
}

func (p *player) handleSpeculationTimeout(r routerHandle, e timeoutEvent) []action {
// start speculative block assembly
if e.Proto.Err != nil {
r.t.log.Errorf("failed to read protocol version for speculationTimeout event (proto %v): %v", e.Proto.Version, e.Proto.Err)
return nil
}
return p.startSpeculativeBlockAsm(r)

// get the best proposal we have
re := readLowestEvent{T: readLowestPayload, Round: p.Round}
re = r.dispatch(*p, re, proposalMachineRound, p.Round, 0, 0).(readLowestEvent)

// if we have its payload and its been validated already, start speculating
// on top of it
if re.PayloadOK && re.Payload.ve != nil {
return p.startSpeculativeBlockAsm(r, re.Payload.ve)
}
return nil
}

func (p *player) handleFastTimeout(r routerHandle, e timeoutEvent) []action {
Expand Down Expand Up @@ -228,8 +237,9 @@ func (p *player) issueNextVote(r routerHandle) []action {
return actions
}

func (p *player) startSpeculativeBlockAsm(r routerHandle) (actions []action) {
return append(actions, pseudonodeAction{T: speculativeAssembly, Round: p.Round, Period: p.Period})
func (p *player) startSpeculativeBlockAsm(r routerHandle, ve ValidatedBlock) (actions []action) {
// TODO(yossi) extend the action to carry the speculative block payload
return append(actions, pseudonodeAction{T: speculativeAssembly, Round: p.Round, Period: p.Period, ValidatedBlock: ve})
}

func (p *player) issueFastVote(r routerHandle) (actions []action) {
Expand Down
2 changes: 1 addition & 1 deletion agreement/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (p unauthenticatedProposal) value() proposalValue {
}
}

// A proposal is an Block along with everything needed to validate it.
// A proposal is a Block along with everything needed to validate it.
type proposal struct {
unauthenticatedProposal

Expand Down
9 changes: 9 additions & 0 deletions agreement/proposalStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,15 @@ func (store *proposalStore) handle(r routerHandle, p player, e event) event {
se.Committable = ea.Assembled
se.Payload = ea.Payload
return se
case readLowestPayload:
re := e.(readLowestEvent)
re.T = readLowestValue
re = r.dispatch(p, re, proposalMachinePeriod, re.Round, re.Period, 0).(readLowestEvent)
re.T = readLowestPayload
ea := store.Assemblers[re.Proposal]
re.PayloadOK = ea.Assembled
re.Payload = ea.Payload
return re
case readPinned:
se := e.(pinnedValueEvent)
ea := store.Assemblers[store.Pinned] // If pinned is bottom, assembled/payloadOK = false, payload = bottom
Expand Down
5 changes: 5 additions & 0 deletions agreement/proposalTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ func (t *proposalTracker) handle(r routerHandle, p player, e event) event {
t.Freezer = t.Freezer.freeze()
return e

case readLowestValue:
e := e.(readLowestEvent)
e.Proposal = t.Freezer.Lowest.R.Proposal
return e

case softThreshold, certThreshold:
e := e.(thresholdEvent)
t.Staging = e.Proposal
Expand Down
10 changes: 5 additions & 5 deletions agreement/pseudonode.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type pseudonode interface {
// It returns an error if the pseudonode is unable to perform this.
MakeProposals(ctx context.Context, r round, p period) (<-chan externalEvent, error)

StartSpeculativeBlockAssembly(ctx context.Context, r round, p period) (<-chan externalEvent, error)
// TODO(yossi) pass in context all the way to the pool, so we can cancel spec asm if needed
StartSpeculativeBlockAssembly(ctx context.Context, ve ValidatedBlock)

// MakeVotes returns a vote for a given proposal in some round, period, and step.
//
Expand Down Expand Up @@ -187,10 +188,9 @@ func (n asyncPseudonode) MakeProposals(ctx context.Context, r round, p period) (
}
}

func (n asyncPseudonode) StartSpeculativeBlockAssembly(ctx context.Context, r round, p period) (<-chan externalEvent, error) {
return nil, nil
// n.validator.
// go n.factory.OnNewSpeculativeBlock(block, delta)
func (n asyncPseudonode) StartSpeculativeBlockAssembly(ctx context.Context, ve ValidatedBlock) {
// TODO(yossi) change to use ve directly, to avoid recomputing statedeltas
go n.factory.OnNewSpeculativeBlock(ve.Block())
}

func (n asyncPseudonode) MakeVotes(ctx context.Context, r round, p period, s step, prop proposalValue, persistStateDone chan error) (chan externalEvent, error) {
Expand Down
28 changes: 17 additions & 11 deletions ledger/speculative.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,36 @@ type validatedBlockAsLFE struct {
vb *ledgercore.ValidatedBlock
}

// makedBlockAsLFE constructs a new BlockAsLFE from a Block.
func MakeBlockAsLFE(blk bookkeeping.Block, l LedgerForEvaluator) (*validatedBlockAsLFE, ledgercore.StateDelta, error) {
// makeValidatedBlockAsLFE constructs a new validatedBlockAsLFE from a ValidatedBlock.
func MakeValidatedBlockAsLFE(vb *ledgercore.ValidatedBlock, l LedgerForEvaluator) (*validatedBlockAsLFE, error) {
latestRound := l.Latest()
if blk.Round().SubSaturate(1) != latestRound {
return nil, ledgercore.StateDelta{}, fmt.Errorf("MakeBlockAsLFE: Ledger round %d mismatches next block round %d", latestRound, blk.Round())
if vb.Block().Round().SubSaturate(1) != latestRound {
return nil, fmt.Errorf("MakeBlockAsLFE: Ledger round %d mismatches next block round %d", latestRound, vb.Block().Round())
}
hdr, err := l.BlockHdr(latestRound)
if err != nil {
return nil, ledgercore.StateDelta{}, err
return nil, err
}
if blk.Branch != hdr.Hash() {
return nil, ledgercore.StateDelta{}, fmt.Errorf("MakeBlockAsLFE: Ledger latest block hash %x mismatches block's prev hash %x", hdr.Hash(), blk.Branch)
if vb.Block().Branch != hdr.Hash() {
return nil, fmt.Errorf("MakeBlockAsLFE: Ledger latest block hash %x mismatches block's prev hash %x", hdr.Hash(), vb.Block().Branch)
}

return &validatedBlockAsLFE{
l: l,
vb: vb,
}, nil
}

// makeBlockAsLFE constructs a new validatedBlockAsLFE from a Block.
func MakeBlockAsLFE(blk bookkeeping.Block, l LedgerForEvaluator) (*validatedBlockAsLFE, ledgercore.StateDelta, error) {
state, err := internal.Eval(context.Background(), l, blk, false, l.VerifiedTransactionCache(), nil)
if err != nil {
return nil, ledgercore.StateDelta{}, fmt.Errorf("error computing deltas for block %d round %d: %v", blk.Hash(), blk.Round(), err)
}

vb := ledgercore.MakeValidatedBlock(blk, state)
return &validatedBlockAsLFE{
l: l,
vb: &vb,
}, state, nil
lfe, err := MakeValidatedBlockAsLFE(&vb, l)
return lfe, vb.Delta(), nil
}

// Block implements the ledgerForEvaluator interface.
Expand Down

0 comments on commit 63c03b9

Please sign in to comment.