Skip to content

Commit

Permalink
Merge pull request #5671 from cce/dynamic-lambda-vote-validatedAt
Browse files Browse the repository at this point in the history
  • Loading branch information
cce authored Aug 16, 2023
2 parents 69633c7 + e178503 commit b161d7d
Show file tree
Hide file tree
Showing 12 changed files with 197 additions and 48 deletions.
5 changes: 5 additions & 0 deletions agreement/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ func (d *demux) next(s *Service, deadline Deadline, fastDeadline time.Duration,
e = e.(messageEvent).AttachValidatedAt(s.Clock.Since())
case payloadPresent, votePresent:
e = e.(messageEvent).AttachReceivedAt(s.Clock.Since())
case voteVerified:
// if this is a proposal vote (step 0), record the validatedAt time on the vote
if e.(messageEvent).Input.UnauthenticatedVote.R.Step == 0 {
e = e.(messageEvent).AttachValidatedAt(s.Clock.Since())
}
}
}()

Expand Down
39 changes: 38 additions & 1 deletion agreement/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ const (
// readPinned is sent to the proposalStore to read the pinned value, if it exists.
readPinned

// readLowestVote is sent to the proposalPeriodMachine to read the
// proposal-vote with the lowest credential.
readLowestVote

/*
* The following are event types that replace queries, and may warrant
* a revision to make them more state-machine-esque.
Expand Down Expand Up @@ -407,6 +411,34 @@ func (e newRoundEvent) ComparableStr() string {
return e.String()
}

type readLowestEvent struct {
// T currently only supports readLowestVote
T eventType

// Round and Period are the round and period for which to query
// the lowest-credential vote, value or payload. This type of event
// is only sent for pipelining, which only makes sense for period
// 0, but the Period is here anyway to route to the appropriate
// proposalMachinePeriod.
Round round
Period period

// Vote holds the lowest-credential vote.
Vote vote
}

func (e readLowestEvent) t() eventType {
return e.T
}

func (e readLowestEvent) String() string {
return fmt.Sprintf("%s: %d %d", e.t().String(), e.Round, e.Period)
}

func (e readLowestEvent) ComparableStr() string {
return e.String()
}

type newPeriodEvent struct {
// Period holds the latest period relevant to the proposalRoundMachine.
Period period
Expand Down Expand Up @@ -942,7 +974,12 @@ func (e checkpointEvent) AttachConsensusVersion(v ConsensusVersionView) external
}

func (e messageEvent) AttachValidatedAt(d time.Duration) messageEvent {
e.Input.Proposal.validatedAt = d
switch e.T {
case payloadVerified:
e.Input.Proposal.validatedAt = d
case voteVerified:
e.Input.Vote.validatedAt = d
}
return e
}

Expand Down
25 changes: 13 additions & 12 deletions agreement/eventtype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions agreement/msgp_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 28 additions & 16 deletions agreement/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ type player struct {
// must be verified after some vote has been verified.
Pending proposalTable

payloadArrivals []time.Duration
// the history of arrival times of the lowest credential from previous
// ronuds, used for calculating dynamic lambda.
lowestCredentialArrivals []time.Duration
}

func (p *player) T() stateMachineTag {
Expand Down Expand Up @@ -275,19 +277,29 @@ func (p *player) handleCheckpointEvent(r routerHandle, e checkpointEvent) []acti
}}
}

func (p *player) handleWinningPayloadArrival(payload proposal, ver protocol.ConsensusVersion) {
// ignoring validatedAt
p.payloadArrivals = append(p.payloadArrivals, payload.receivedAt)
p.resizePayloadArrivals(ver)
// updateDynamicLambdaTimings is called at the end of a successful uninterrupted round (just after ensureAction
// is generated) to collect round timings for updating dynamic lambda.
func (p *player) updateDynamicLambdaTimings(r routerHandle, ver protocol.ConsensusVersion) {
// only append to lowestCredentialArrivals if this was a successful round completing in period 0.
if p.Period != 0 {
return
}
// look up the validatedAt time of the winning proposal-vote
re := readLowestEvent{T: readLowestVote, Round: p.Round, Period: p.Period}
re = r.dispatch(*p, re, proposalMachineRound, p.Round, p.Period, 0).(readLowestEvent)
if re.Vote.validatedAt != 0 {
p.lowestCredentialArrivals = append(p.lowestCredentialArrivals, re.Vote.validatedAt)
}
p.resizeLowestCredentialArrivals(ver)
}

func (p *player) resizePayloadArrivals(ver protocol.ConsensusVersion) {
func (p *player) resizeLowestCredentialArrivals(ver protocol.ConsensusVersion) {
proto := config.Consensus[ver]
if len(p.payloadArrivals) > proto.DynamicFilterPayloadArriavalHistory {
p.payloadArrivals = p.payloadArrivals[len(p.payloadArrivals)-proto.DynamicFilterPayloadArriavalHistory:]
if len(p.lowestCredentialArrivals) > proto.DynamicFilterPayloadArriavalHistory {
p.lowestCredentialArrivals = p.lowestCredentialArrivals[len(p.lowestCredentialArrivals)-proto.DynamicFilterPayloadArriavalHistory:]
}
for len(p.payloadArrivals) < proto.DynamicFilterPayloadArriavalHistory {
p.payloadArrivals = append([]time.Duration{FilterTimeout(0, ver)}, p.payloadArrivals...)
for len(p.lowestCredentialArrivals) < proto.DynamicFilterPayloadArriavalHistory {
p.lowestCredentialArrivals = append([]time.Duration{FilterTimeout(0, ver)}, p.lowestCredentialArrivals...)
}
}

Expand All @@ -307,12 +319,12 @@ func (p *player) calculateFilterTimeout(ver protocol.ConsensusVersion, tracer *t
if proto.DynamicFilterPayloadArriavalHistory <= 0 {
// we don't keep any history, use the default
dynamicDelay = defaultDelay
} else if proto.DynamicFilterPayloadArriavalHistory > len(p.payloadArrivals) {
} else if proto.DynamicFilterPayloadArriavalHistory > len(p.lowestCredentialArrivals) {
// not enough samples, use the default
dynamicDelay = defaultDelay
} else {
sortedArrivals := make([]time.Duration, len(p.payloadArrivals))
copy(sortedArrivals[:], p.payloadArrivals[:])
sortedArrivals := make([]time.Duration, len(p.lowestCredentialArrivals))
copy(sortedArrivals[:], p.lowestCredentialArrivals[:])
sort.Slice(sortedArrivals, func(i, j int) bool { return sortedArrivals[i] < sortedArrivals[j] })
dynamicDelay = sortedArrivals[proto.DynamicFilterPayloadArriavalHistory-1]
}
Expand Down Expand Up @@ -348,7 +360,7 @@ func (p *player) handleThresholdEvent(r routerHandle, e thresholdEvent) []action
cert := Certificate(e.Bundle)
a0 := ensureAction{Payload: res.Payload, Certificate: cert}
actions = append(actions, a0)
p.handleWinningPayloadArrival(res.Payload, e.Proto)
p.updateDynamicLambdaTimings(r, e.Proto)
as := p.enterRound(r, e, p.Round+1)
return append(actions, as...)
}
Expand Down Expand Up @@ -405,7 +417,7 @@ func (p *player) enterPeriod(r routerHandle, source thresholdEvent, target perio
if target != 0 {
// We entered a non-0 period, we should reset the filter timeout
// calculation mechanism.
p.payloadArrivals = make([]time.Duration, 0)
p.lowestCredentialArrivals = make([]time.Duration, 0)
}
p.Deadline.Duration = p.calculateFilterTimeout(source.Proto, r.t)
p.Deadline.Type = timers.Filter
Expand Down Expand Up @@ -678,7 +690,7 @@ func (p *player) handleMessageEvent(r routerHandle, e messageEvent) (actions []a
cert := Certificate(freshestRes.Event.Bundle)
a0 := ensureAction{Payload: e.Input.Proposal, Certificate: cert}
actions = append(actions, a0)
p.handleWinningPayloadArrival(e.Input.Proposal, e.Proto.Version)
p.updateDynamicLambdaTimings(r, e.Proto.Version)
as := p.enterRound(r, delegatedE, cert.Round+1)
return append(actions, as...)
}
Expand Down
102 changes: 86 additions & 16 deletions agreement/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3235,19 +3235,20 @@ func TestPlayerAlwaysResynchsPinnedValue(t *testing.T) {
}

// test that ReceivedAt and ValidateAt timing information are retained in proposalStore
// when the payloadPresent and payloadVerified events are processed, and that both timings
// when the payloadPresent, payloadVerified, and voteVerified events are processed, and that all timings
// are available when the ensureAction is called for the block.
func TestPlayerRetainsReceivedValidatedAt(t *testing.T) {
partitiontest.PartitionTest(t)

const r = round(20239)
const p = period(1001)
const p = period(0)
pWhite, pM, helper := setupP(t, r-1, p, soft)
pP, pV := helper.MakeRandomProposalPayload(t, r-1)

// send voteVerified message
vVote := helper.MakeVerifiedVote(t, 0, r-1, p, propose, *pV)
inMsg := messageEvent{T: voteVerified, Input: message{Vote: vVote, UnauthenticatedVote: vVote.u()}}
inMsg = inMsg.AttachValidatedAt(501 * time.Millisecond)
err, panicErr := pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)
Expand All @@ -3260,61 +3261,130 @@ func TestPlayerRetainsReceivedValidatedAt(t *testing.T) {
require.NoError(t, err)
require.NoError(t, panicErr)

assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, m)
assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, m, protocol.ConsensusFuture)

// assert lowest vote validateAt time was recorded into payloadArrivals
historyLen := config.Consensus[protocol.ConsensusFuture].DynamicFilterPayloadArriavalHistory
require.NotZero(t, historyLen)
require.Len(t, pWhite.lowestCredentialArrivals, historyLen)
require.Equal(t, 501*time.Millisecond, pWhite.lowestCredentialArrivals[historyLen-1])
}

// test that ReceivedAt and ValidateAt timing information are retained in proposalStore
// when the payloadPresent (as part of the CompoundMessage encoding used by PP messages)
// and payloadVerified events are processed, and that both timings
// when the payloadPresent (as part of the CompoundMessage encoding used by PP messages),
// payloadVerified, and voteVerified events are processed, and that all timings
// are available when the ensureAction is called for the block.
func TestPlayerRetainsReceivedValidatedAtPP(t *testing.T) {
partitiontest.PartitionTest(t)

const r = round(20239)
const p = period(1001)
const p = period(0)
pWhite, pM, helper := setupP(t, r-1, p, soft)
pP, pV := helper.MakeRandomProposalPayload(t, r-1)

// create a PP message for an arbitrary proposal/payload similar to setupCompoundMessage
vVote := helper.MakeVerifiedVote(t, 0, r-1, p, propose, *pV)
voteMsg := message{Vote: vVote, UnauthenticatedVote: vVote.u()}
unverifiedVoteMsg := message{UnauthenticatedVote: vVote.u()}
proposalMsg := message{UnauthenticatedProposal: pP.u()}
compoundMsg := messageEvent{T: votePresent, Input: voteMsg,
compoundMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg,
Tail: &messageEvent{T: payloadPresent, Input: proposalMsg}}
inMsg := compoundMsg.AttachReceivedAt(time.Second) // call AttachReceivedAt like demux would
err, panicErr := pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)

// make sure vote verify requests
verifyEvent := ev(cryptoAction{T: verifyVote, M: voteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
verifyEvent := ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
require.Truef(t, pM.getTrace().Contains(verifyEvent), "Player should verify vote")

// send voteVerified
verifiedVoteMsg := message{Vote: vVote, UnauthenticatedVote: vVote.u()}
inMsg = messageEvent{T: voteVerified, Input: verifiedVoteMsg, TaskIndex: 1}
inMsg = inMsg.AttachValidatedAt(502 * time.Millisecond)
err, panicErr = pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)

assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, proposalMsg, protocol.ConsensusFuture)

// assert lowest vote validateAt time was recorded into payloadArrivals
historyLen := config.Consensus[protocol.ConsensusFuture].DynamicFilterPayloadArriavalHistory
require.NotZero(t, historyLen)
require.Len(t, pWhite.lowestCredentialArrivals, historyLen)
require.Equal(t, 502*time.Millisecond, pWhite.lowestCredentialArrivals[historyLen-1])
}

// test that ReceivedAt and ValidateAt timing information are retained in proposalStore
// when the voteVerified event comes in first (as part of the AV message before PP),
// then the payloadPresent (as part of the CompoundMessage encoding used by PP messages)
// and payloadVerified events are processed, and that all timings
// are available when the ensureAction is called for the block.
func TestPlayerRetainsReceivedValidatedAtAVPP(t *testing.T) {
partitiontest.PartitionTest(t)

const r = round(20239)
const p = period(0)
pWhite, pM, helper := setupP(t, r-1, p, soft)
pP, pV := helper.MakeRandomProposalPayload(t, r-1)

// send votePresent message (mimicking the first AV message validating)
vVote := helper.MakeVerifiedVote(t, 0, r-1, p, propose, *pV)
unverifiedVoteMsg := message{UnauthenticatedVote: vVote.u()}
inMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg}
err, panicErr := pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)

// make sure vote verify requests
verifyEvent := ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
require.Truef(t, pM.getTrace().Contains(verifyEvent), "Player should verify vote")

// send voteVerified
inMsg = messageEvent{T: voteVerified, Input: voteMsg, TaskIndex: 1}
verifiedVoteMsg := message{Vote: vVote, UnauthenticatedVote: vVote.u()}
inMsg = messageEvent{T: voteVerified, Input: verifiedVoteMsg, TaskIndex: 1}
inMsg = inMsg.AttachValidatedAt(502 * time.Millisecond)
err, panicErr = pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)

assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, proposalMsg)
// create a PP message for an arbitrary proposal/payload similar to setupCompoundMessage
proposalMsg := message{UnauthenticatedProposal: pP.u()}
compoundMsg := messageEvent{T: votePresent, Input: unverifiedVoteMsg,
Tail: &messageEvent{T: payloadPresent, Input: proposalMsg}}
inMsg = compoundMsg.AttachReceivedAt(time.Second) // call AttachReceivedAt like demux would
err, panicErr = pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)

// make sure no second request to verify this vote
verifyEvent = ev(cryptoAction{T: verifyVote, M: unverifiedVoteMsg, Round: r - 1, Period: p, Step: propose, TaskIndex: 1})
require.Equal(t, 1, pM.getTrace().CountEvent(verifyEvent), "Player should not verify second vote")

assertCorrectReceivedAtSet(t, pWhite, pM, helper, r, p, pP, pV, proposalMsg, protocol.ConsensusFuture)

// assert lowest vote validateAt time was recorded into payloadArrivals
historyLen := config.Consensus[protocol.ConsensusFuture].DynamicFilterPayloadArriavalHistory
require.NotZero(t, historyLen)
require.Len(t, pWhite.lowestCredentialArrivals, historyLen)
require.Equal(t, 502*time.Millisecond, pWhite.lowestCredentialArrivals[historyLen-1])
}

func assertCorrectReceivedAtSet(t *testing.T, pWhite *player, pM ioAutomata, helper *voteMakerHelper,
r round, p period, pP *proposal, pV *proposalValue, m message) {
r round, p period, pP *proposal, pV *proposalValue, m message, ver protocol.ConsensusVersion) {
// make sure payload verify request
verifyEvent := ev(cryptoAction{T: verifyPayload, M: m, Round: r - 1, Period: p, Step: propose, TaskIndex: 0})
require.Truef(t, pM.getTrace().Contains(verifyEvent), "Player should verify payload")

// payloadVerified
inMsg := messageEvent{T: payloadVerified, Input: message{Proposal: *pP}, Proto: ConsensusVersionView{Version: protocol.ConsensusCurrentVersion}}
inMsg := messageEvent{T: payloadVerified, Input: message{Proposal: *pP}, Proto: ConsensusVersionView{Version: ver}}
inMsg = inMsg.AttachValidatedAt(2 * time.Second) // call AttachValidatedAt like demux would
err, panicErr := pM.transition(inMsg)
require.NoError(t, err)
require.NoError(t, panicErr)

// gen cert to move into the next round
votes := make([]vote, int(cert.threshold(config.Consensus[protocol.ConsensusCurrentVersion])))
for i := 0; i < int(cert.threshold(config.Consensus[protocol.ConsensusCurrentVersion])); i++ {
votes := make([]vote, int(cert.threshold(config.Consensus[ver])))
for i := 0; i < int(cert.threshold(config.Consensus[ver])); i++ {
votes[i] = helper.MakeVerifiedVote(t, i, r-1, p, cert, *pV)
}
bun := unauthenticatedBundle{
Expand All @@ -3331,7 +3401,7 @@ func assertCorrectReceivedAtSet(t *testing.T, pWhite *player, pM ioAutomata, hel
},
UnauthenticatedBundle: bun,
},
Proto: ConsensusVersionView{Version: protocol.ConsensusCurrentVersion},
Proto: ConsensusVersionView{Version: ver},
}
err, panicErr = pM.transition(inMsg)
require.NoError(t, err)
Expand Down
Loading

0 comments on commit b161d7d

Please sign in to comment.