Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add start stop e2e test #1705

Merged
merged 21 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,7 @@ func (sb *Backend) Commit(proposal istanbul.Proposal, aggregatedSeal types.Istan
return err
}
}
go sb.onNewConsensusBlock(block, result.Receipts, result.Logs, result.State)

sb.onNewConsensusBlock(block, result.Receipts, result.Logs, result.State)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is running this on a goroutine no longer necessary due to the other syncronization added in worker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if it ever was necessary. All I did was remove the go routine and nothing broke.

return nil
}

Expand Down
5 changes: 5 additions & 0 deletions consensus/istanbul/backend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ func (sb *Backend) newChainHead(newBlock *types.Block) {

sb.logger.Info("Validator Election Results", "address", sb.ValidatorAddress(), "elected", valSetIndex >= 0, "number", newBlock.Number().Uint64())

// We lock here to protect access to announceRunning because
// announceRunning is also accessed in StartAnnouncing and
// StopAnnouncing.
sb.announceMu.Lock()
defer sb.announceMu.Unlock()
if sb.announceRunning {
hbandura marked this conversation as resolved.
Show resolved Hide resolved
sb.logger.Trace("At end of epoch and going to refresh validator peers", "new_block_number", newBlock.Number().Uint64())
if err := sb.RefreshValPeers(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion consensus/istanbul/backend/peer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func (vph *validatorPeerHandler) startThread() error {
}

vph.threadRunning = true
vph.threadWg.Add(1)
go vph.thread()

return nil
Expand All @@ -72,7 +73,6 @@ func (vph *validatorPeerHandler) stopThread() error {
}

func (vph *validatorPeerHandler) thread() {
vph.threadWg.Add(1)
defer vph.threadWg.Done()

refreshValidatorPeersTicker := time.NewTicker(1 * time.Minute)
Expand Down
20 changes: 18 additions & 2 deletions consensus/istanbul/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ type core struct {
finalCommittedSub *event.TypeMuxSubscription
timeoutSub *event.TypeMuxSubscription

futurePreprepareTimer *time.Timer
resendRoundChangeMessageTimer *time.Timer
futurePreprepareTimer *time.Timer
resendRoundChangeMessageTimer *time.Timer
resendRoundChangeMessageTimerMu sync.Mutex

roundChangeTimer *time.Timer
roundChangeTimerMu sync.RWMutex
Expand All @@ -127,6 +128,7 @@ type core struct {

rsdb RoundStateDB
current RoundState
currentMu sync.RWMutex
handlerWg *sync.WaitGroup

roundChangeSet *roundChangeSet
Expand Down Expand Up @@ -191,6 +193,11 @@ func (c *core) SetAddress(address common.Address) {
}

func (c *core) CurrentView() *istanbul.View {
// CurrentView is called by Prepare which is called by miner.worker the
// main loop, we need to synchronise this access with the write which occurs
// in Stop, which is called from the miner's update loop.
c.currentMu.RLock()
defer c.currentMu.RUnlock()
if c.current == nil {
return nil
}
Expand All @@ -200,6 +207,11 @@ func (c *core) CurrentView() *istanbul.View {
func (c *core) CurrentRoundState() RoundState { return c.current }

func (c *core) ParentCommits() MessageSet {
// ParentCommits is called by Prepare which is called by miner.worker the
// main loop, we need to synchronise this access with the write which
// occurs in Stop, which is called from the miner's update loop.
c.currentMu.RLock()
defer c.currentMu.RUnlock()
if c.current == nil {
return nil
}
Expand Down Expand Up @@ -682,6 +694,8 @@ func (c *core) stopRoundChangeTimer() {
}

func (c *core) stopResendRoundChangeTimer() {
c.resendRoundChangeMessageTimerMu.Lock()
defer c.resendRoundChangeMessageTimerMu.Unlock()
if c.resendRoundChangeMessageTimer != nil {
c.resendRoundChangeMessageTimer.Stop()
c.resendRoundChangeMessageTimer = nil
Expand Down Expand Up @@ -771,6 +785,8 @@ func (c *core) resetResendRoundChangeTimer() {
resendTimeout = maxResendTimeout
}
view := &istanbul.View{Sequence: c.current.Sequence(), Round: c.current.DesiredRound()}
c.resendRoundChangeMessageTimerMu.Lock()
defer c.resendRoundChangeMessageTimerMu.Unlock()
c.resendRoundChangeMessageTimer = time.AfterFunc(resendTimeout, func() {
c.sendEvent(resendRoundChangeEvent{view})
})
Expand Down
6 changes: 4 additions & 2 deletions consensus/istanbul/core/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func (c *core) Start() error {
// Tests will handle events itself, so we have to make subscribeEvents()
// be able to call in test.
c.subscribeEvents()

c.handlerWg.Add(1)
go c.handleEvents()

return nil
Expand All @@ -60,6 +62,8 @@ func (c *core) Stop() error {
// Make sure the handler goroutine exits
c.handlerWg.Wait()

c.currentMu.Lock()
defer c.currentMu.Unlock()
c.current = nil
return nil
}
Expand Down Expand Up @@ -95,8 +99,6 @@ func (c *core) handleEvents() {
// Clear state
defer c.handlerWg.Done()

c.handlerWg.Add(1)

for {
logger := c.newLogger("func", "handleEvents")
select {
Expand Down
Loading