Skip to content

Commit

Permalink
put a hard limit on the number of active hop streams
Browse files Browse the repository at this point in the history
  • Loading branch information
vyzo committed May 7, 2019
1 parent 3499b40 commit 1ab3f81
Showing 1 changed file with 23 additions and 43 deletions.
66 changes: 23 additions & 43 deletions p2p/protocol/internal/circuitv1-deprecated/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"sync"
"sync/atomic"
"time"

pb "github.com/libp2p/go-libp2p-circuit/pb"
Expand All @@ -29,6 +30,9 @@ var (
RelayAcceptTimeout = 10 * time.Second
HopConnectTimeout = 30 * time.Second
StopHandshakeTimeout = 1 * time.Minute

HopStreamBuffer = 4096
HopStreamLimit = 1 << 19 // 512K hops for 1M goroutines
)

// Relay is the relay transport and service.
Expand All @@ -47,9 +51,9 @@ type Relay struct {
relays map[peer.ID]struct{}
mx sync.Mutex

liveHops map[peer.ID]map[peer.ID]int
lhCount uint64
lhLk sync.Mutex
// atomic counters
sCount int32
lhCount int32
}

// RelayOpts are options for configuring the relay transport.
Expand Down Expand Up @@ -88,7 +92,6 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts ..
self: h.ID(),
incoming: make(chan *Conn),
relays: make(map[peer.ID]struct{}),
liveHops: make(map[peer.ID]map[peer.ID]int),
}

for _, opt := range opts {
Expand All @@ -114,48 +117,15 @@ func NewRelay(ctx context.Context, h host.Host, upgrader *tptu.Upgrader, opts ..
}

func (r *Relay) addLiveHop(from, to peer.ID) {
r.lhLk.Lock()
defer r.lhLk.Unlock()

trg, ok := r.liveHops[from]
if !ok {
trg = make(map[peer.ID]int)
r.liveHops[from] = trg
}
trg[to]++
r.lhCount++
atomic.AddInt32(&r.lhCount, 1)
}

func (r *Relay) rmLiveHop(from, to peer.ID) {
r.lhLk.Lock()
defer r.lhLk.Unlock()

trg, ok := r.liveHops[from]
if !ok {
return
}
var count int
if count, ok = trg[to]; !ok {
return
}
count--

r.lhCount--
if count <= 0 {
delete(trg, to)
if len(trg) == 0 {
delete(r.liveHops, from)
}
} else {
trg[to] = count
}
atomic.AddInt32(&r.lhCount, -1)
}

func (r *Relay) GetActiveHops() uint64 {
r.lhLk.Lock()
defer r.lhLk.Unlock()

return r.lhCount
func (r *Relay) GetActiveHops() int32 {
return atomic.LoadInt32(&r.lhCount)
}

func (r *Relay) DialPeer(ctx context.Context, relay pstore.PeerInfo, dest pstore.PeerInfo) (*Conn, error) {
Expand Down Expand Up @@ -283,6 +253,16 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) {
return
}

sCount := atomic.AddInt32(&r.sCount, 1)
lhCount := atomic.LoadInt32(&r.lhCount)
defer atomic.AddInt32(&r.sCount, -1)

if (sCount + lhCount) > int32(HopStreamLimit) {
log.Warning("hop stream limit exceeded; resetting stream")
s.Reset()
return
}

src, err := peerToPeerInfo(msg.GetSrcPeer())
if err != nil {
r.handleError(s, pb.CircuitRelay_HOP_SRC_MULTIADDR_INVALID)
Expand Down Expand Up @@ -389,7 +369,7 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) {
go func() {
defer r.rmLiveHop(src.ID, dst.ID)

buf := pool.Get(4096)
buf := pool.Get(HopStreamBuffer)
defer pool.Put(buf)

count, err := io.CopyBuffer(s, bs, buf)
Expand All @@ -406,7 +386,7 @@ func (r *Relay) handleHopStream(s inet.Stream, msg *pb.CircuitRelay) {
}()

go func() {
buf := pool.Get(4096)
buf := pool.Get(HopStreamBuffer)
defer pool.Put(buf)

count, err := io.CopyBuffer(bs, s, buf)
Expand Down

0 comments on commit 1ab3f81

Please sign in to comment.