Skip to content

Commit

Permalink
Merge pull request #2 from libp2p/feat/accept-queue
Browse files Browse the repository at this point in the history
keep accepting and negotiating connections until we have 16 ready and queued up
  • Loading branch information
Stebalien authored Jan 22, 2018
2 parents cf38072 + cec0d9f commit 6b92f28
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 19 deletions.
33 changes: 15 additions & 18 deletions p2p/net/upgrader/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ type listener struct {
incoming chan transport.Conn
err error

ticket chan struct{}
// Used for backpressure
threshold *threshold

// Canceling this context isn't sufficient to tear down the listener.
// Call close.
ctx context.Context
cancel func()
}
Expand Down Expand Up @@ -60,14 +63,7 @@ func (l *listener) handleIncoming() {
}()

var catcher tec.TempErrCatcher
for {

select {
case <-l.ticket:
case <-l.ctx.Done():
return
}

for l.ctx.Err() == nil {
maconn, err := l.Listener.Accept()
if err != nil {
if catcher.IsTemporary(err) {
Expand Down Expand Up @@ -103,6 +99,9 @@ func (l *listener) handleIncoming() {

log.Debugf("listener %s accepted connection: %s", l, conn)

l.threshold.Acquire()
defer l.threshold.Release()

select {
case l.incoming <- conn:
case <-ctx.Done():
Expand All @@ -117,21 +116,19 @@ func (l *listener) handleIncoming() {
conn.Close()
}
}()

// The go routine above calls Release when the context is
// canceled so there's no need to wait on it here.
l.threshold.Wait()
}
}

// Accept accepts a connection.
func (l *listener) Accept() (transport.Conn, error) {
for {
select {
case l.ticket <- struct{}{}:
case c, ok := <-l.incoming:
if !ok {
return nil, l.err
}
return c, nil
}
if c, ok := <-l.incoming; ok {
return c, nil
}
return nil, l.err
}

func (l *listener) String() string {
Expand Down
50 changes: 50 additions & 0 deletions p2p/net/upgrader/threshold.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package stream

import (
"sync"
)

func newThreshold(cutoff int) *threshold {
t := &threshold{
threshold: cutoff,
}
t.cond.L = &t.mu
return t
}

type threshold struct {
mu sync.Mutex
cond sync.Cond

count int
threshold int
}

// Acquire increments the counter. It will not block.
func (t *threshold) Acquire() {
t.mu.Lock()
t.count++
t.mu.Unlock()
}

// Release decrements the counter.
func (t *threshold) Release() {
t.mu.Lock()
if t.count == 0 {
panic("negative count")
}
if t.threshold == t.count {
t.cond.Broadcast()
}
t.count--
t.mu.Unlock()
}

// Wait waits for the counter to drop below the threshold
func (t *threshold) Wait() {
t.mu.Lock()
for t.count > t.threshold {
t.cond.Wait()
}
t.mu.Unlock()
}
5 changes: 4 additions & 1 deletion p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
// without specifying a peer ID.
var ErrNilPeer = errors.New("nil peer")

// AcceptQueueLength is the number of connections to fully setup before not accepting any new connections
var AcceptQueueLength = 16

// Upgrader is a multistream upgrader that can upgrade an underlying connection
// to a full transport connection (secure and multiplexed).
type Upgrader struct {
Expand All @@ -35,7 +38,7 @@ func (u *Upgrader) UpgradeListener(t transport.Transport, list manet.Listener) t
Listener: list,
upgrader: u,
transport: t,
ticket: make(chan struct{}),
threshold: newThreshold(AcceptQueueLength),
incoming: make(chan transport.Conn),
cancel: cancel,
ctx: ctx,
Expand Down

0 comments on commit 6b92f28

Please sign in to comment.