From 1ab3f810773e12a70ef904a6c9a8df4e5d8701ed Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 4 May 2019 22:53:57 +0300 Subject: [PATCH] put a hard limit on the number of active hop streams --- .../internal/circuitv1-deprecated/relay.go | 66 +++++++------------ 1 file changed, 23 insertions(+), 43 deletions(-) diff --git a/p2p/protocol/internal/circuitv1-deprecated/relay.go b/p2p/protocol/internal/circuitv1-deprecated/relay.go index 2a5f740210..3fe6e666ef 100644 --- a/p2p/protocol/internal/circuitv1-deprecated/relay.go +++ b/p2p/protocol/internal/circuitv1-deprecated/relay.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "time" pb "github.com/libp2p/go-libp2p-circuit/pb" @@ -29,6 +30,9 @@ var ( RelayAcceptTimeout = 10 * time.Second HopConnectTimeout = 30 * time.Second StopHandshakeTimeout = 1 * time.Minute + + HopStreamBuffer = 4096 + HopStreamLimit = 1 << 19 // 512K hops for 1M goroutines ) // Relay is the relay transport and service. @@ -47,9 +51,9 @@ type Relay struct { relays map[peer.ID]struct{} mx sync.Mutex - liveHops map[peer.ID]map[peer.ID]int - lhCount uint64 - lhLk sync.Mutex + // atomic counters + sCount int32 + lhCount int32 } // RelayOpts are options for configuring the relay transport. @@ -88,7 +92,6 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts .. self: h.ID(), incoming: make(chan *Conn), relays: make(map[peer.ID]struct{}), - liveHops: make(map[peer.ID]map[peer.ID]int), } for _, opt := range opts { @@ -114,48 +117,15 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts .. } func (r *Relay) addLiveHop(from, to peer.ID) { - r.lhLk.Lock() - defer r.lhLk.Unlock() - - trg, ok := r.liveHops[from] - if !ok { - trg = make(map[peer.ID]int) - r.liveHops[from] = trg - } - trg[to]++ - r.lhCount++ + atomic.AddInt32(&r.lhCount, 1) } func (r *Relay) rmLiveHop(from, to peer.ID) { - r.lhLk.Lock() - defer r.lhLk.Unlock() - - trg, ok := r.liveHops[from] - if !ok { - return - } - var count int - if count, ok = trg[to]; !ok { - return - } - count-- - - r.lhCount-- - if count <= 0 { - delete(trg, to) - if len(trg) == 0 { - delete(r.liveHops, from) - } - } else { - trg[to] = count - } + atomic.AddInt32(&r.lhCount, -1) } -func (r *Relay) GetActiveHops() uint64 { - r.lhLk.Lock() - defer r.lhLk.Unlock() - - return r.lhCount +func (r *Relay) GetActiveHops() int32 { + return atomic.LoadInt32(&r.lhCount) } func (r *Relay) DialPeer(ctx context.Context, relay pstore.PeerInfo, dest pstore.PeerInfo) (*Conn, error) { @@ -283,6 +253,16 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { return } + sCount := atomic.AddInt32(&r.sCount, 1) + lhCount := atomic.LoadInt32(&r.lhCount) + defer atomic.AddInt32(&r.sCount, -1) + + if (sCount + lhCount) > int32(HopStreamLimit) { + log.Warning("hop stream limit exceeded; resetting stream") + s.Reset() + return + } + src, err := peerToPeerInfo(msg.GetSrcPeer()) if err != nil { r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID) @@ -389,7 +369,7 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { go func() { defer r.rmLiveHop(src.ID, dst.ID) - buf := pool.Get(4096) + buf := pool.Get(HopStreamBuffer) defer pool.Put(buf) count, err := io.CopyBuffer(s, bs, buf) @@ -406,7 +386,7 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) { }() go func() { - buf := pool.Get(4096) + buf := pool.Get(HopStreamBuffer) defer pool.Put(buf) count, err := io.CopyBuffer(bs, s, buf)