diff --git a/dht.go b/dht.go index 64b5d4911..689de6240 100644 --- a/dht.go +++ b/dht.go @@ -363,10 +363,16 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb return err } + pingFnc := func(ctx context.Context, p peer.ID) error { + _, err := dht.protoMessenger.GetClosestPeers(ctx, p, p) // don't use the PING message type as it's deprecated + return err + } + r, err := rtrefresh.NewRtRefreshManager( dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh, keyGenFnc, queryFnc, + pingFnc, cfg.RoutingTable.RefreshQueryTimeout, cfg.RoutingTable.RefreshInterval, maxLastSuccessfulOutboundThreshold, diff --git a/rtrefresh/rt_refresh_manager.go b/rtrefresh/rt_refresh_manager.go index cb0eefc25..ff8f27e8d 100644 --- a/rtrefresh/rt_refresh_manager.go +++ b/rtrefresh/rt_refresh_manager.go @@ -41,6 +41,7 @@ type RtRefreshManager struct { enableAutoRefresh bool // should run periodic refreshes ? refreshKeyGenFnc func(cpl uint) (string, error) // generate the key for the query to refresh this cpl refreshQueryFnc func(ctx context.Context, key string) error // query to run for a refresh. + refreshPingFnc func(ctx context.Context, p peer.ID) error // request to check liveness of remote peer refreshQueryTimeout time.Duration // timeout for one refresh query // interval between two periodic refreshes. @@ -57,6 +58,7 @@ type RtRefreshManager struct { func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool, refreshKeyGenFnc func(cpl uint) (string, error), refreshQueryFnc func(ctx context.Context, key string) error, + refreshPingFnc func(ctx context.Context, p peer.ID) error, refreshQueryTimeout time.Duration, refreshInterval time.Duration, successfulOutboundQueryGracePeriod time.Duration, @@ -73,6 +75,7 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool enableAutoRefresh: autoRefresh, refreshKeyGenFnc: refreshKeyGenFnc, refreshQueryFnc: refreshQueryFnc, + refreshPingFnc: refreshPingFnc, refreshQueryTimeout: refreshQueryTimeout, refreshInterval: refreshInterval, @@ -178,12 +181,21 @@ func (r *RtRefreshManager) loop() { wg.Add(1) go func(ps kbucket.PeerInfo) { defer wg.Done() + livelinessCtx, cancel := context.WithTimeout(r.ctx, peerPingTimeout) + defer cancel() + if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil { + logger.Debugw("evicting peer after failed connection", "peer", ps.Id, "error", err) + r.rt.RemovePeer(ps.Id) + return + } + + if err := r.refreshPingFnc(livelinessCtx, ps.Id); err != nil { logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err) r.rt.RemovePeer(ps.Id) + return } - cancel() }(ps) } }