From 946b2fdc6cdae57c47a72c19989b50843f70800b Mon Sep 17 00:00:00 2001 From: chengwenxi Date: Thu, 9 Apr 2020 22:39:44 +0800 Subject: [PATCH] reserve IDs in InitPeer instead of AddPeer --- consensus/reactor.go | 2 +- evidence/reactor.go | 2 +- mempool/reactor.go | 11 +++++++--- mempool/reactor_test.go | 18 +++++++++++++++++ node/node.go | 4 ++++ p2p/pex/pex_reactor.go | 2 +- p2p/transport.go | 16 +++++++++++++-- p2p/transport_test.go | 45 +++++++++++++++++++++++++++++++++++++++++ version/version.go | 2 +- 9 files changed, 93 insertions(+), 9 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index d025536ae41..110d70acabc 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -57,7 +57,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, options metrics: NopMetrics(), } conR.updateFastSyncingMetric() - conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR) + conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR) for _, option := range options { option(conR) diff --git a/evidence/reactor.go b/evidence/reactor.go index 2b83c324555..cf191a13fa8 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -34,7 +34,7 @@ func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor { evR := &EvidenceReactor{ evpool: evpool, } - evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR) + evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR) return evR } diff --git a/mempool/reactor.go b/mempool/reactor.go index e1376b2879e..7385b2405d3 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -48,7 +48,7 @@ type mempoolIDs struct { activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter } -// Reserve searches for the next unused ID and assignes it to the +// Reserve searches for the next unused ID and assigns it to the // peer. func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { ids.mtx.Lock() @@ -111,10 +111,16 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac Mempool: mempool, ids: newMempoolIDs(), } - memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR) + memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) return memR } +// InitPeer implements Reactor by creating a state for the peer. +func (memR *MempoolReactor) InitPeer(peer p2p.Peer) p2p.Peer { + memR.ids.ReserveForPeer(peer) + return peer +} + // SetLogger sets the Logger on the reactor and the underlying Mempool. func (memR *MempoolReactor) SetLogger(l log.Logger) { memR.Logger = l @@ -143,7 +149,6 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *MempoolReactor) AddPeer(peer p2p.Peer) { - memR.ids.ReserveForPeer(peer) go memR.broadcastTxRoutine(peer) } diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index c9cf498098d..796c94bf310 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -224,3 +224,21 @@ func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { ids.ReserveForPeer(peer) }) } + +func TestDontExhaustMaxActiveIDs(t *testing.T) { + config := cfg.TestConfig() + const N = 1 + reactors := makeAndConnectMempoolReactors(config, N) + defer func() { + for _, r := range reactors { + r.Stop() + } + }() + reactor := reactors[0] + + for i := 0; i < maxActiveIDs+1; i++ { + peer := mock.NewPeer(nil) + reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3}) + reactor.AddPeer(peer) + } +} diff --git a/node/node.go b/node/node.go index b40f83d3de6..98de776c06c 100644 --- a/node/node.go +++ b/node/node.go @@ -437,6 +437,10 @@ func NewNode(config *cfg.Config, p2p.MultiplexTransportConnFilters(connFilters...)(transport) + // Limit the number of incoming connections. + max := config.P2P.MaxNumInboundPeers + p2p.MultiplexTransportMaxIncomingConnections(max)(transport) + // Setup Switch. sw := p2p.NewSwitch( config.P2P, diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 55cde5a3504..73c89605b9d 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -130,7 +130,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor { lastReceivedRequests: cmn.NewCMap(), crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo), } - r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r) + r.BaseReactor = *p2p.NewBaseReactor("PEX", r) return r } diff --git a/p2p/transport.go b/p2p/transport.go index 06cd3e4a4ba..ae48d7ae0fd 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/pkg/errors" + "golang.org/x/net/netutil" "net" "time" @@ -121,11 +122,18 @@ func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption { return func(mt *MultiplexTransport) { mt.resolver = resolver } } +// MultiplexTransportMaxIncomingConnections sets the maximum number of +// simultaneous connections (incoming). Default: 0 (unlimited) +func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption { + return func(mt *MultiplexTransport) { mt.maxIncomingConnections = n } +} + // MultiplexTransport accepts and dials tcp connections and upgrades them to // multiplexed peers. type MultiplexTransport struct { - netAddr NetAddress - listener net.Listener + netAddr NetAddress + listener net.Listener + maxIncomingConnections int // see MaxIncomingConnections acceptc chan accept closec chan struct{} @@ -239,6 +247,10 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error { return err } + if mt.maxIncomingConnections > 0 { + ln = netutil.LimitListener(ln, mt.maxIncomingConnections) + } + mt.netAddr = addr mt.listener = ln diff --git a/p2p/transport_test.go b/p2p/transport_test.go index 7580f025960..131d0e1bddf 100644 --- a/p2p/transport_test.go +++ b/p2p/transport_test.go @@ -5,6 +5,7 @@ import ( "math/rand" "net" "reflect" + "strings" "testing" "time" @@ -134,6 +135,50 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) { } } +func TestTransportMultiplexMaxIncomingConnections(t *testing.T) { + mt := newMultiplexTransport( + emptyNodeInfo(), + NodeKey{ + PrivKey: ed25519.GenPrivKey(), + }, + ) + id := mt.nodeKey.ID() + + MultiplexTransportMaxIncomingConnections(0)(mt) + + addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0")) + if err != nil { + t.Fatal(err) + } + + if err := mt.Listen(*addr); err != nil { + t.Fatal(err) + } + + errc := make(chan error) + + go func() { + addr := NewNetAddress(id, mt.listener.Addr()) + + _, err := addr.Dial() + if err != nil { + errc <- err + return + } + + close(errc) + }() + + if err := <-errc; err != nil { + t.Errorf("connection failed: %v", err) + } + + _, err = mt.Accept(peerConfig{}) + if err == nil || !strings.Contains(err.Error(), "connection reset by peer") { + t.Errorf("expected connection reset by peer error, got %v", err) + } +} + func TestTransportMultiplexAcceptMultiple(t *testing.T) { mt := testSetupMultiplexTransport(t) laddr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr()) diff --git a/version/version.go b/version/version.go index 6f9aa532a00..1e1a1f3a0b5 100644 --- a/version/version.go +++ b/version/version.go @@ -18,7 +18,7 @@ const ( // TMCoreSemVer is the current version of Tendermint Core. // It's the Semantic Version of the software. // Must be a string because scripts like dist.sh read this file. - TMCoreSemVer = "0.32.1" + TMCoreSemVer = "0.32.2" // ABCISemVer is the semantic version of the ABCI library ABCISemVer = "0.15.0"