Skip to content

Commit

Permalink
cache proposal verify result (ethereum#784)
Browse files Browse the repository at this point in the history
* cache verify result

* removed new lint

* fixed a comment

* fix bug

* created the proposalverificationstatus cache on demand

* added more log statements

* addressed PR comments and added test cases

* fixed flaky preprepare tests
  • Loading branch information
kevjue authored Jan 7, 2020
1 parent a9c4647 commit fe80c83
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 22 deletions.
8 changes: 4 additions & 4 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,9 @@ func (sb *Backend) Gossip(destAddresses []common.Address, payload []byte, ethMsg
}

if len(peers) > 0 {
for addr, p := range peers {
for nodeID, p := range peers {
if !ignoreCache {
ms, ok := sb.recentMessages.Get(addr)
ms, ok := sb.recentMessages.Get(nodeID)
var m *lru.ARCCache
if ok {
m, _ = ms.(*lru.ARCCache)
Expand All @@ -351,9 +351,9 @@ func (sb *Backend) Gossip(destAddresses []common.Address, payload []byte, ethMsg
}

m.Add(hash, true)
sb.recentMessages.Add(addr, m)
sb.recentMessages.Add(nodeID, m)
}
sb.logger.Trace("Sending istanbul message to peer", "msg_code", ethMsgCode, "address", addr)
sb.logger.Trace("Sending istanbul message to peer", "msg_code", ethMsgCode, "nodeID", nodeID)

go p.Send(ethMsgCode, payload)
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/istanbul/backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (sb *Backend) HandleMsg(addr common.Address, msg p2p.Msg, peer consensus.Pe
sb.coreMu.Lock()
defer sb.coreMu.Unlock()

sb.logger.Trace("HandleMsg called", "address", addr, "msg", msg, "peer.Node()", peer.Node())
sb.logger.Trace("HandleMsg called", "address", addr, "ethMsg", msg, "peer.Node()", peer.Node())

if sb.isIstanbulMsg(msg) {
if (!sb.coreStarted && !sb.config.Proxy) && (msg.Code == istanbulConsensusMsg) {
Expand Down
2 changes: 1 addition & 1 deletion consensus/istanbul/backend/pos.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (sb *Backend) distributeEpochPaymentsAndRewards(header *types.Header, state
func (sb *Backend) updateValidatorScores(header *types.Header, state *state.StateDB, valSet []istanbul.Validator) ([]*big.Int, error) {
epoch := istanbul.GetEpochNumber(header.Number.Uint64(), sb.EpochSize())
logger := sb.logger.New("func", "Backend.updateValidatorScores", "blocknum", header.Number.Uint64(), "epoch", epoch, "epochsize", sb.EpochSize(), "window", sb.LookbackWindow())
sb.logger.Trace("Updating validator scores")
logger.Trace("Updating validator scores")

// The denominator is the (last block - first block + 1) of the val score tally window
denominator := istanbul.GetValScoreTallyLastBlockNumber(epoch, sb.EpochSize()) - istanbul.GetValScoreTallyFirstBlockNumber(epoch, sb.EpochSize(), sb.LookbackWindow()) + 1
Expand Down
2 changes: 1 addition & 1 deletion consensus/istanbul/core/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (c *core) handleCheckedCommitForCurrentSequence(msg *istanbul.Message, comm
}
numberOfCommits := c.current.Commits().Size()
minQuorumSize := c.current.ValidatorSet().MinQuorumSize()
logger.Trace("Accepted commit", "Number of commits", numberOfCommits)
logger.Trace("Accepted commit for current sequence", "Number of commits", numberOfCommits)

// Commit the proposal once we have enough COMMIT messages and we are not in the Committed state.
//
Expand Down
21 changes: 21 additions & 0 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -556,3 +557,23 @@ func (c *core) Sequence() *big.Int {
}
return c.current.Sequence()
}

func (c *core) verifyProposal(proposal istanbul.Proposal) (time.Duration, error) {
logger := c.newLogger("func", "verifyProposal", "proposal", proposal.Hash())
if verificationStatus, isCached := c.current.GetProposalVerificationStatus(proposal.Hash()); isCached {
logger.Trace("verification status cache hit", "verificationStatus", verificationStatus)
return 0, verificationStatus
} else {
logger.Trace("verification status cache miss")

duration, err := c.backend.Verify(proposal)
logger.Trace("proposal verify return values", "duration", duration, "err", err)

// Don't cache the verification status if it's a future block
if err != consensus.ErrFutureBlock {
c.current.SetProposalVerificationStatus(proposal.Hash(), err)
}

return duration, err
}
}
92 changes: 92 additions & 0 deletions consensus/istanbul/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package core

import (
"errors"
"math/big"
"reflect"
"testing"
"time"

"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/core/types"
elog "github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -59,6 +61,8 @@ func newTestProposal() istanbul.Proposal {
return makeBlock(1)
}

var InvalidProposalError = errors.New("invalid proposal")

func TestNewRequest(t *testing.T) {
testLogger.SetHandler(elog.StdoutHandler)

Expand Down Expand Up @@ -92,3 +96,91 @@ func TestNewRequest(t *testing.T) {
}
}
}

func TestVerifyProposal(t *testing.T) {
// Check that it should not be in the cache
sys := NewTestSystemWithBackend(1, 0)

close := sys.Run(true)
defer close()

backendCore := sys.backends[0].engine.(*core)
backend := backendCore.backend.(*testSystemBackend)

testCases := []struct {
name string
proposal istanbul.Proposal
verifyImpl func(proposal istanbul.Proposal) (time.Duration, error)
expectedErr error
expectedDuration time.Duration
}{
// Test case with valid proposal
{
"Valid proposal",
newTestProposalWithNum(1),
backend.verifyWithSuccess,
nil,
0,
},

// Test case with invalid proposal
{
"Invalid proposal",
newTestProposalWithNum(2),
backend.verifyWithFailure,
InvalidProposalError,
0,
},

// Test case with future proposal
{
"Future proposal",
newTestProposalWithNum(3),
backend.verifyWithFutureProposal,
consensus.ErrFutureBlock,
5,
},
}

for _, testCase := range testCases {
t.Run(testCase.name,
func(t *testing.T) {
// Inject in the verification function implementation
backend.setVerifyImpl(testCase.verifyImpl)

// Verify a cache miss
_, isCached := backendCore.current.GetProposalVerificationStatus(testCase.proposal.Hash())
if isCached {
t.Errorf("Should of had a cache miss")
}

// Do a verification with success
_, err := backendCore.verifyProposal(testCase.proposal)
if err != testCase.expectedErr {
t.Errorf("Unexpected return status on first verifyProposal call. Want: %v, Actual: %v", testCase.expectedErr, err)
}

// The cache entry for this proposal should be created, if it wasn't the future proposal case
err, isCached = backendCore.current.GetProposalVerificationStatus(testCase.proposal.Hash())
if testCase.name != "Future proposal" {
if !isCached {
t.Errorf("Should of had a cache hit")
}

if err != testCase.expectedErr {
t.Errorf("Unexpected cached proposal verification status. Want: %v, actual: %v", testCase.expectedErr, err)
}
} else { // testCase.name == "Future proposal"
if isCached {
t.Errorf("Should of had a cache miss for the future proposal test case")
}
}

// Call verify proposal again to check for the cached verifcation result and duration
duration, err := backendCore.verifyProposal(testCase.proposal)
if duration != testCase.expectedDuration || err != testCase.expectedErr {
t.Errorf("Unexpected return status on second verifyProposal call. Want: err - %v, duration - %v; Actual: err - %v, duration - %v", testCase.expectedErr, testCase.expectedDuration, err, duration)
}
})
}
}
2 changes: 1 addition & 1 deletion consensus/istanbul/core/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c *core) verifyPreparedCertificate(preparedCertificate istanbul.PreparedCe
logger := c.newLogger("func", "verifyPreparedCertificate", "proposal_number", preparedCertificate.Proposal.Number(), "proposal_hash", preparedCertificate.Proposal.Hash().String())

// Validate the attached proposal
if _, err := c.backend.Verify(preparedCertificate.Proposal); err != nil {
if _, err := c.verifyProposal(preparedCertificate.Proposal); err != nil {
return errInvalidPreparedCertificateProposal
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/istanbul/core/preprepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c *core) sendPreprepare(request *istanbul.Request, roundChangeCertificate

func (c *core) handlePreprepare(msg *istanbul.Message) error {
logger := c.newLogger("func", "handlePreprepare", "tag", "handleMsg", "from", msg.Address)
logger.Trace("Got pre-prepare message", "msg", msg)
logger.Trace("Got pre-prepare message", "IstMsg", msg)

// Decode PRE-PREPARE
var preprepare *istanbul.Preprepare
Expand Down Expand Up @@ -117,7 +117,7 @@ func (c *core) handlePreprepare(msg *istanbul.Message) error {
}

// Verify the proposal we received
if duration, err := c.backend.Verify(preprepare.Proposal); err != nil {
if duration, err := c.verifyProposal(preprepare.Proposal); err != nil {
logger.Warn("Failed to verify proposal", "err", err, "duration", duration)
// if it's a future block, we will handle it again after the duration
if err == consensus.ErrFutureBlock {
Expand Down
4 changes: 3 additions & 1 deletion consensus/istanbul/core/preprepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestHandlePreprepare(t *testing.T) {
t.Run(test.name, func(t *testing.T) {

sys := test.system()
sys.Run(false)
closer := sys.Run(false)

v0 := sys.backends[0]
r0 := v0.engine.(*core)
Expand Down Expand Up @@ -409,6 +409,8 @@ func TestHandlePreprepare(t *testing.T) {
}
}
}

closer()
})
}
}
33 changes: 33 additions & 0 deletions consensus/istanbul/core/roundstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type RoundState interface {
AddPrepare(msg *istanbul.Message) error
AddParentCommit(msg *istanbul.Message) error
SetPendingRequest(pendingRequest *istanbul.Request) error
SetProposalVerificationStatus(proposalHash common.Hash, verificationStatus error)

// view functions
DesiredRound() *big.Int
Expand All @@ -68,6 +69,7 @@ type RoundState interface {
Sequence() *big.Int
View() *istanbul.View
PreparedCertificate() istanbul.PreparedCertificate
GetProposalVerificationStatus(proposalHash common.Hash) (verificationStatus error, isCached bool)
}

// RoundState stores the consensus state
Expand All @@ -89,6 +91,12 @@ type roundStateImpl struct {
pendingRequest *istanbul.Request
preparedCertificate istanbul.PreparedCertificate

// Verification status for proposals seen in this view
// Note that this field will not get RLP enoded and persisted, since it contains an error type,
// which doesn't have a native RLP encoding. Also, this is a cache, so it's not necessary for it
// to be persisted.
proposalVerificationStatus map[common.Hash]error

mu *sync.RWMutex
logger log.Logger
}
Expand Down Expand Up @@ -259,6 +267,7 @@ func (s *roundStateImpl) StartNewSequence(nextSequence *big.Int, validatorSet is
s.preparedCertificate = istanbul.EmptyPreparedCertificate()
s.pendingRequest = nil
s.parentCommits = parentCommits
s.proposalVerificationStatus = nil

logger.Debug("Starting new sequence", "next_sequence", nextSequence, "next_proposer", nextProposer.Address().Hex())
return nil
Expand Down Expand Up @@ -378,6 +387,30 @@ func (s *roundStateImpl) PreparedCertificate() istanbul.PreparedCertificate {
return s.preparedCertificate
}

func (s *roundStateImpl) SetProposalVerificationStatus(proposalHash common.Hash, verificationStatus error) {
s.mu.Lock()
defer s.mu.Unlock()

if s.proposalVerificationStatus == nil {
s.proposalVerificationStatus = make(map[common.Hash]error)
}

s.proposalVerificationStatus[proposalHash] = verificationStatus
}

func (s *roundStateImpl) GetProposalVerificationStatus(proposalHash common.Hash) (verificationStatus error, isCached bool) {
s.mu.RLock()
defer s.mu.RUnlock()

verificationStatus, isCached = nil, false

if s.proposalVerificationStatus != nil {
verificationStatus, isCached = s.proposalVerificationStatus[proposalHash]
}

return
}

func (s *roundStateImpl) newLogger(ctx ...interface{}) log.Logger {
logger := s.logger.New(ctx...)
return logger.New("cur_seq", s.sequence, "cur_round", s.round, "state", s.state)
Expand Down
31 changes: 20 additions & 11 deletions consensus/istanbul/core/roundstate_save_decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type rsSaveDecorator struct {
db RoundStateDB
}

func (rsp *rsSaveDecorator) persitOnNoError(err error) error {
func (rsp *rsSaveDecorator) persistOnNoError(err error) error {
if err != nil {
return err
}
Expand All @@ -47,34 +47,38 @@ func (rsp *rsSaveDecorator) persitOnNoError(err error) error {

// mutation functions
func (rsp *rsSaveDecorator) StartNewRound(nextRound *big.Int, validatorSet istanbul.ValidatorSet, nextProposer istanbul.Validator) error {
return rsp.persitOnNoError(rsp.rs.StartNewRound(nextRound, validatorSet, nextProposer))
return rsp.persistOnNoError(rsp.rs.StartNewRound(nextRound, validatorSet, nextProposer))
}
func (rsp *rsSaveDecorator) StartNewSequence(nextSequence *big.Int, validatorSet istanbul.ValidatorSet, nextProposer istanbul.Validator, parentCommits MessageSet) error {
return rsp.persitOnNoError(rsp.rs.StartNewSequence(nextSequence, validatorSet, nextProposer, parentCommits))
return rsp.persistOnNoError(rsp.rs.StartNewSequence(nextSequence, validatorSet, nextProposer, parentCommits))
}
func (rsp *rsSaveDecorator) TransitionToPreprepared(preprepare *istanbul.Preprepare) error {
return rsp.persitOnNoError(rsp.rs.TransitionToPreprepared(preprepare))
return rsp.persistOnNoError(rsp.rs.TransitionToPreprepared(preprepare))
}
func (rsp *rsSaveDecorator) TransitionToWaitingForNewRound(r *big.Int, nextProposer istanbul.Validator) error {
return rsp.persitOnNoError(rsp.rs.TransitionToWaitingForNewRound(r, nextProposer))
return rsp.persistOnNoError(rsp.rs.TransitionToWaitingForNewRound(r, nextProposer))
}
func (rsp *rsSaveDecorator) TransitionToCommitted() error {
return rsp.persitOnNoError(rsp.rs.TransitionToCommitted())
return rsp.persistOnNoError(rsp.rs.TransitionToCommitted())
}
func (rsp *rsSaveDecorator) TransitionToPrepared(quorumSize int) error {
return rsp.persitOnNoError(rsp.rs.TransitionToPrepared(quorumSize))
return rsp.persistOnNoError(rsp.rs.TransitionToPrepared(quorumSize))
}
func (rsp *rsSaveDecorator) AddCommit(msg *istanbul.Message) error {
return rsp.persitOnNoError(rsp.rs.AddCommit(msg))
return rsp.persistOnNoError(rsp.rs.AddCommit(msg))
}
func (rsp *rsSaveDecorator) AddPrepare(msg *istanbul.Message) error {
return rsp.persitOnNoError(rsp.rs.AddPrepare(msg))
return rsp.persistOnNoError(rsp.rs.AddPrepare(msg))
}
func (rsp *rsSaveDecorator) AddParentCommit(msg *istanbul.Message) error {
return rsp.persitOnNoError(rsp.rs.AddParentCommit(msg))
return rsp.persistOnNoError(rsp.rs.AddParentCommit(msg))
}
func (rsp *rsSaveDecorator) SetPendingRequest(pendingRequest *istanbul.Request) error {
return rsp.persitOnNoError(rsp.rs.SetPendingRequest(pendingRequest))
return rsp.persistOnNoError(rsp.rs.SetPendingRequest(pendingRequest))
}
func (rsp *rsSaveDecorator) SetProposalVerificationStatus(proposalHash common.Hash, verificationStatus error) {
// Don't persist on proposal verification status change, since it's just a cache
rsp.rs.SetProposalVerificationStatus(proposalHash, verificationStatus)
}

// DesiredRound implements RoundState.DesiredRound
Expand Down Expand Up @@ -127,6 +131,11 @@ func (rsp *rsSaveDecorator) GetValidatorByAddress(address common.Address) istanb
return rsp.rs.GetValidatorByAddress(address)
}

// GetValidatorByAddress implements RoundState.GetProposalVerificationStatus
func (rsp *rsSaveDecorator) GetProposalVerificationStatus(proposalHash common.Hash) (verificationStatus error, isChecked bool) {
return rsp.rs.GetProposalVerificationStatus(proposalHash)
}

// IsProposer implements RoundState.IsProposer
func (rsp *rsSaveDecorator) IsProposer(address common.Address) bool { return rsp.rs.IsProposer(address) }

Expand Down
Loading

0 comments on commit fe80c83

Please sign in to comment.