Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

R4R: add stopMtx for FlushStop and OnStop #50

Merged
merged 1 commit into from
Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion p2p/conn/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"net"
"reflect"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -89,6 +90,10 @@ type MConnection struct {
quitSendRoutine chan struct{}
doneSendRoutine chan struct{}

// used to ensure FlushStop and OnStop
// are safe to call concurrently.
stopMtx sync.Mutex

flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled.
pingTimer *cmn.RepeatTimer // send pings periodically

Expand Down Expand Up @@ -210,8 +215,17 @@ func (c *MConnection) OnStart() error {
// It additionally ensures that all successful
// .Send() calls will get flushed before closing
// the connection.
// NOTE: it is not safe to call this method more than once.
func (c *MConnection) FlushStop() {
c.stopMtx.Lock()
defer c.stopMtx.Unlock()

select {
case <-c.quitSendRoutine:
// already quit via OnStop
return
default:
}

c.BaseService.OnStop()
c.flushTimer.Stop()
c.pingTimer.Stop()
Expand Down Expand Up @@ -247,6 +261,9 @@ func (c *MConnection) FlushStop() {

// OnStop implements BaseService
func (c *MConnection) OnStop() {
c.stopMtx.Lock()
defer c.stopMtx.Unlock()

select {
case <-c.quitSendRoutine:
// already quit via FlushStop
Expand Down
75 changes: 75 additions & 0 deletions p2p/pex/pex_reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,81 @@ func TestPEXReactorCrawlStatus(t *testing.T) {
// TODO: test
}

// connect a peer to a seed, wait a bit, then stop it.
// this should give it time to request addrs and for the seed
// to call FlushStop, and allows us to test calling Stop concurrently
// with FlushStop. Before a fix, this non-deterministically reproduced
// https://github.com/tendermint/tendermint/issues/3231.
func TestPEXReactorSeedModeFlushStop(t *testing.T) {
N := 2
switches := make([]*p2p.Switch, N)

// directory to store address books
dir, err := ioutil.TempDir("", "pex_reactor")
require.Nil(t, err)
defer os.RemoveAll(dir) // nolint: errcheck

books := make([]*addrBook, N)
logger := log.TestingLogger()

// create switches
for i := 0; i < N; i++ {
switches[i] = p2p.MakeSwitch(cfg, i, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false)
books[i].SetLogger(logger.With("pex", i))
sw.SetAddrBook(books[i])

sw.SetLogger(logger.With("pex", i))

config := &PEXReactorConfig{}
if i == 0 {
// first one is a seed node
config = &PEXReactorConfig{SeedMode: true}
}
r := NewPEXReactor(books[i], config)
r.SetLogger(logger.With("pex", i))
r.SetEnsurePeersPeriod(250 * time.Millisecond)
sw.AddReactor("pex", r)

return sw
})
}

for _, sw := range switches {
err := sw.Start() // start switch and reactors
require.Nil(t, err)
}

reactor := switches[0].Reactors()["pex"].(*PEXReactor)
peerID := switches[1].NodeInfo().ID()

err = switches[1].DialPeerWithAddress(switches[0].NodeInfo().NetAddress(), false)
assert.NoError(t, err)

// sleep up to a second while waiting for the peer to send us a message.
// this isn't perfect since it's possible the peer sends us a msg and we FlushStop
// before this loop catches it. but non-deterministically it works pretty well.
for i := 0; i < 1000; i++ {
v := reactor.lastReceivedRequests.Get(string(peerID))
if v != nil {
break
}
time.Sleep(time.Millisecond)
}

// by now the FlushStop should have happened. Try stopping the peer.
// it should be safe to do this.
peers := switches[0].Peers().List()
for _, peer := range peers {
peer.Stop()
}

// stop the switches
for _, s := range switches {
s.Stop()
}
}

func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
peer := p2p.CreateRandomPeer(false)

Expand Down