Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] rate limit dials #228

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
peer "github.com/libp2p/go-libp2p-peer"
)

var dhtReadMessageTimeout = time.Minute
// If a peer takes more than 5 seconds to respond, we should simply move on.
var dhtReadMessageTimeout = 10 * time.Second
var ErrReadTimeout = fmt.Errorf("timed out reading response")

type bufferedWriteCloser interface {
Expand Down
70 changes: 50 additions & 20 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
todoctr "github.com/ipfs/go-todocounter"
process "github.com/jbenet/goprocess"
ctxproc "github.com/jbenet/goprocess/context"
kb "github.com/libp2p/go-libp2p-kbucket"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pset "github.com/libp2p/go-libp2p-peer/peerset"
Expand Down Expand Up @@ -72,6 +73,7 @@ func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, e

type dhtQueryRunner struct {
query *dhtQuery // query to run
kbKey kb.ID // key in the kbucket space
peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x
peersQueried *pset.PeerSet // peers successfully connected to and queried
peersToQuery *queue.ChanQueue // peers remaining to be queried
Expand All @@ -80,8 +82,9 @@ type dhtQueryRunner struct {
result *dhtQueryResult // query result
errs u.MultiErr // result errors. maybe should be a map[peer.ID]error

rateLimit chan struct{} // processing semaphore
log logging.EventLogger
queryLimit chan struct{} // processing semaphore
dialLimit chan struct{} // processing semaphore
log logging.EventLogger

runCtx context.Context

Expand All @@ -94,11 +97,13 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
ctx := ctxproc.OnClosingContext(proc)
return &dhtQueryRunner{
query: q,
kbKey: kb.ConvertKey(string(q.key)),
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(string(q.key))),
peersRemaining: todoctr.NewSyncCounter(),
peersSeen: pset.New(),
peersQueried: pset.New(),
rateLimit: make(chan struct{}, q.concurrency),
queryLimit: make(chan struct{}, q.concurrency),
dialLimit: make(chan struct{}, q.concurrency*5),
proc: proc,
}
}
Expand All @@ -113,8 +118,11 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
}

// setup concurrency rate limiting
for i := 0; i < r.query.concurrency; i++ {
r.rateLimit <- struct{}{}
for len(r.queryLimit) < cap(r.queryLimit) {
r.queryLimit <- struct{}{}
}
for len(r.dialLimit) < cap(r.dialLimit) {
r.dialLimit <- struct{}{}
}

// add all the peers we got first.
Expand Down Expand Up @@ -200,12 +208,11 @@ func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
case <-r.proc.Closing():
return

case <-r.rateLimit:
case <-r.queryLimit:
select {
case p, more := <-r.peersToQuery.DeqChan:
if !more {
// Put this back so we can finish any outstanding queries.
r.rateLimit <- struct{}{}
r.queryLimit <- struct{}{}
return // channel closed.
}

Expand All @@ -229,13 +236,6 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
// create a context from our proc.
ctx := ctxproc.OnClosingContext(proc)

// make sure we do this when we exit
defer func() {
// signal we're done processing peer p
r.peersRemaining.Decrement(1)
r.rateLimit <- struct{}{}
}()

// make sure we're connected to the peer.
// FIXME abstract away into the network layer
// Note: Failure to connect in this block will cause the function to
Expand All @@ -247,13 +247,18 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
Type: notif.DialingPeer,
ID: p,
})

// while we dial, we do not take up a rate limit. this is to allow
// forward progress during potentially very high latency dials.
r.rateLimit <- struct{}{}
r.queryLimit <- struct{}{}
<-r.dialLimit

pi := pstore.PeerInfo{ID: p}

if err := r.query.dht.host.Connect(ctx, pi); err != nil {
err := r.query.dht.host.Connect(ctx, pi)
r.dialLimit <- struct{}{}

if err != nil {
log.Debugf("Error connecting: %s", err)

notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Expand All @@ -265,13 +270,29 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()
<-r.rateLimit // need to grab it again, as we deferred.

// We're dropping this peer.
r.peersRemaining.Decrement(1)
return
}
<-r.rateLimit // need to grab it again, as we deferred.
log.Debugf("connected. dial success.")

// Put the peer back in the queue. There's no point in *jumping*
// the queue as we may have better peers to query.
select {
case r.peersToQuery.EnqChan <- p:
case <-r.proc.Closing():
}
return
}

// make sure we do this when we exit
defer func() {
// signal we're done processing peer p
r.peersRemaining.Decrement(1)
r.queryLimit <- struct{}{}
}()

// finally, run the query against this peer
res, err := r.query.qfunc(ctx, p)

Expand All @@ -293,6 +314,10 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {

} else if len(res.closerPeers) > 0 {
log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))

// Sort first to avoid dequeuing further peers before nearer peers.

peerIDs := make([]peer.ID, 0, len(res.closerPeers))
for _, next := range res.closerPeers {
if next.ID == r.query.dht.self { // don't add self.
log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
Expand All @@ -301,8 +326,13 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {

// add their addresses to the dialer's peerstore
r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
r.addPeerToQuery(next.ID)
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
peerIDs = append(peerIDs, next.ID)
}

peerIDs = kb.SortClosestPeers(peerIDs, r.kbKey)
for _, next := range peerIDs {
r.addPeerToQuery(next)
}
} else {
log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
Expand Down