Skip to content

Commit

Permalink
feat: ping peers on routing table refresh
Browse files Browse the repository at this point in the history
We have seen in the past that there are peers in the IPFS DHT that let you connect to them but then refuse to speak any protocol. This was mainly due to resource manager killing the connection if limits were exceeded. We have seen that such peers are pushed to the edge of the DHT - meaning, they get already pruned from lower buckets. However, they won't get pruned from higher ones, because we only try to connect to them and not speak anything on that connection.

This change adds a ping message to the liveness check on routing table refreshes.
  • Loading branch information
dennis-tra committed Feb 3, 2023
1 parent dae5a9a commit 758f56d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
5 changes: 5 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,10 +365,15 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb
return err
}

pingFnc := func(ctx context.Context, p peer.ID) error {
return dht.protoMessenger.Ping(ctx, p)
}

r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
keyGenFnc,
queryFnc,
pingFnc,
cfg.RoutingTable.RefreshQueryTimeout,
cfg.RoutingTable.RefreshInterval,
maxLastSuccessfulOutboundThreshold,
Expand Down
16 changes: 14 additions & 2 deletions rtrefresh/rt_refresh_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ type RtRefreshManager struct {
enableAutoRefresh bool // should run periodic refreshes ?
refreshKeyGenFnc func(cpl uint) (string, error) // generate the key for the query to refresh this cpl
refreshQueryFnc func(ctx context.Context, key string) error // query to run for a refresh.
refreshQueryTimeout time.Duration // timeout for one refresh query
refreshPingFnc func(ctx context.Context, p peer.ID) error
refreshQueryTimeout time.Duration // timeout for one refresh query

// interval between two periodic refreshes.
// also, a cpl wont be refreshed if the time since it was last refreshed
Expand All @@ -57,6 +58,7 @@ type RtRefreshManager struct {
func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool,
refreshKeyGenFnc func(cpl uint) (string, error),
refreshQueryFnc func(ctx context.Context, key string) error,
refreshPingFnc func(ctx context.Context, p peer.ID) error,
refreshQueryTimeout time.Duration,
refreshInterval time.Duration,
successfulOutboundQueryGracePeriod time.Duration,
Expand All @@ -73,6 +75,7 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool
enableAutoRefresh: autoRefresh,
refreshKeyGenFnc: refreshKeyGenFnc,
refreshQueryFnc: refreshQueryFnc,
refreshPingFnc: refreshPingFnc,

refreshQueryTimeout: refreshQueryTimeout,
refreshInterval: refreshInterval,
Expand Down Expand Up @@ -178,12 +181,21 @@ func (r *RtRefreshManager) loop() {
wg.Add(1)
go func(ps kbucket.PeerInfo) {
defer wg.Done()

livelinessCtx, cancel := context.WithTimeout(r.ctx, peerPingTimeout)
defer cancel()

if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil {
logger.Debugw("evicting peer after failed connection", "peer", ps.Id, "error", err)
r.rt.RemovePeer(ps.Id)
return
}

if err := r.refreshPingFnc(livelinessCtx, ps.Id); err != nil {
logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err)
r.rt.RemovePeer(ps.Id)
return
}
cancel()
}(ps)
}
}
Expand Down

0 comments on commit 758f56d

Please sign in to comment.