Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

R4R: Defend against DoS attacks #110

Merged
merged 1 commit into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool, options
metrics: NopMetrics(),
}
conR.updateFastSyncingMetric()
conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)

for _, option := range options {
option(conR)
Expand Down
2 changes: 1 addition & 1 deletion evidence/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor {
evR := &EvidenceReactor{
evpool: evpool,
}
evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
return evR
}

Expand Down
11 changes: 8 additions & 3 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type mempoolIDs struct {
activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter
}

// Reserve searches for the next unused ID and assignes it to the
// Reserve searches for the next unused ID and assigns it to the
// peer.
func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) {
ids.mtx.Lock()
Expand Down Expand Up @@ -111,10 +111,16 @@ func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReac
Mempool: mempool,
ids: newMempoolIDs(),
}
memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
return memR
}

// InitPeer implements Reactor by creating a state for the peer.
func (memR *MempoolReactor) InitPeer(peer p2p.Peer) p2p.Peer {
memR.ids.ReserveForPeer(peer)
return peer
}

// SetLogger sets the Logger on the reactor and the underlying Mempool.
func (memR *MempoolReactor) SetLogger(l log.Logger) {
memR.Logger = l
Expand Down Expand Up @@ -143,7 +149,6 @@ func (memR *MempoolReactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor.
// It starts a broadcast routine ensuring all txs are forwarded to the given peer.
func (memR *MempoolReactor) AddPeer(peer p2p.Peer) {
memR.ids.ReserveForPeer(peer)
go memR.broadcastTxRoutine(peer)
}

Expand Down
18 changes: 18 additions & 0 deletions mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,21 @@ func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
ids.ReserveForPeer(peer)
})
}

func TestDontExhaustMaxActiveIDs(t *testing.T) {
config := cfg.TestConfig()
const N = 1
reactors := makeAndConnectMempoolReactors(config, N)
defer func() {
for _, r := range reactors {
r.Stop()
}
}()
reactor := reactors[0]

for i := 0; i < maxActiveIDs+1; i++ {
peer := mock.NewPeer(nil)
reactor.Receive(MempoolChannel, peer, []byte{0x1, 0x2, 0x3})
reactor.AddPeer(peer)
}
}
4 changes: 4 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ func NewNode(config *cfg.Config,

p2p.MultiplexTransportConnFilters(connFilters...)(transport)

// Limit the number of incoming connections.
max := config.P2P.MaxNumInboundPeers
p2p.MultiplexTransportMaxIncomingConnections(max)(transport)

// Setup Switch.
sw := p2p.NewSwitch(
config.P2P,
Expand Down
2 changes: 1 addition & 1 deletion p2p/pex/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
lastReceivedRequests: cmn.NewCMap(),
crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo),
}
r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
r.BaseReactor = *p2p.NewBaseReactor("PEX", r)
return r
}

Expand Down
16 changes: 14 additions & 2 deletions p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/pkg/errors"
"golang.org/x/net/netutil"
"net"
"time"

Expand Down Expand Up @@ -121,11 +122,18 @@ func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption {
return func(mt *MultiplexTransport) { mt.resolver = resolver }
}

// MultiplexTransportMaxIncomingConnections sets the maximum number of
// simultaneous connections (incoming). Default: 0 (unlimited)
func MultiplexTransportMaxIncomingConnections(n int) MultiplexTransportOption {
return func(mt *MultiplexTransport) { mt.maxIncomingConnections = n }
}

// MultiplexTransport accepts and dials tcp connections and upgrades them to
// multiplexed peers.
type MultiplexTransport struct {
netAddr NetAddress
listener net.Listener
netAddr NetAddress
listener net.Listener
maxIncomingConnections int // see MaxIncomingConnections

acceptc chan accept
closec chan struct{}
Expand Down Expand Up @@ -239,6 +247,10 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
return err
}

if mt.maxIncomingConnections > 0 {
ln = netutil.LimitListener(ln, mt.maxIncomingConnections)
}

mt.netAddr = addr
mt.listener = ln

Expand Down
45 changes: 45 additions & 0 deletions p2p/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"net"
"reflect"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -134,6 +135,50 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
}
}

func TestTransportMultiplexMaxIncomingConnections(t *testing.T) {
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
)
id := mt.nodeKey.ID()

MultiplexTransportMaxIncomingConnections(0)(mt)

addr, err := NewNetAddressString(IDAddressString(id, "127.0.0.1:0"))
if err != nil {
t.Fatal(err)
}

if err := mt.Listen(*addr); err != nil {
t.Fatal(err)
}

errc := make(chan error)

go func() {
addr := NewNetAddress(id, mt.listener.Addr())

_, err := addr.Dial()
if err != nil {
errc <- err
return
}

close(errc)
}()

if err := <-errc; err != nil {
t.Errorf("connection failed: %v", err)
}

_, err = mt.Accept(peerConfig{})
if err == nil || !strings.Contains(err.Error(), "connection reset by peer") {
t.Errorf("expected connection reset by peer error, got %v", err)
}
}

func TestTransportMultiplexAcceptMultiple(t *testing.T) {
mt := testSetupMultiplexTransport(t)
laddr := NewNetAddress(mt.nodeKey.ID(), mt.listener.Addr())
Expand Down
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
// TMCoreSemVer is the current version of Tendermint Core.
// It's the Semantic Version of the software.
// Must be a string because scripts like dist.sh read this file.
TMCoreSemVer = "0.32.1"
TMCoreSemVer = "0.32.2"

// ABCISemVer is the semantic version of the ABCI library
ABCISemVer = "0.15.0"
Expand Down