From eb6d2f78c91f152b5f8d6b086e25248f7cb1f091 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Fri, 3 Feb 2023 13:12:22 +0100 Subject: [PATCH] feat: ping peers on routing table refresh We have seen in the past that there are peers in the IPFS DHT that let you connect to them but then refuse to speak any protocol. This was mainly due to resource manager killing the connection if limits were exceeded. We have seen that such peers are pushed to the edge of the DHT - meaning, they get already pruned from lower buckets. However, they won't get pruned from higher ones, because we only try to connect to them and not speak anything on that connection. This change adds a ping message to the liveness check on routing table refreshes. --- dht.go | 5 +++++ rtrefresh/rt_refresh_manager.go | 14 +++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/dht.go b/dht.go index 924c9bb6a..1bb75dfb0 100644 --- a/dht.go +++ b/dht.go @@ -365,10 +365,15 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb return err } + pingFnc := func(ctx context.Context, p peer.ID) error { + return dht.protoMessenger.Ping(ctx, p) + } + r, err := rtrefresh.NewRtRefreshManager( dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh, keyGenFnc, queryFnc, + pingFnc, cfg.RoutingTable.RefreshQueryTimeout, cfg.RoutingTable.RefreshInterval, maxLastSuccessfulOutboundThreshold, diff --git a/rtrefresh/rt_refresh_manager.go b/rtrefresh/rt_refresh_manager.go index cb0eefc25..ff8f27e8d 100644 --- a/rtrefresh/rt_refresh_manager.go +++ b/rtrefresh/rt_refresh_manager.go @@ -41,6 +41,7 @@ type RtRefreshManager struct { enableAutoRefresh bool // should run periodic refreshes ? refreshKeyGenFnc func(cpl uint) (string, error) // generate the key for the query to refresh this cpl refreshQueryFnc func(ctx context.Context, key string) error // query to run for a refresh. + refreshPingFnc func(ctx context.Context, p peer.ID) error // request to check liveness of remote peer refreshQueryTimeout time.Duration // timeout for one refresh query // interval between two periodic refreshes. @@ -57,6 +58,7 @@ type RtRefreshManager struct { func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool, refreshKeyGenFnc func(cpl uint) (string, error), refreshQueryFnc func(ctx context.Context, key string) error, + refreshPingFnc func(ctx context.Context, p peer.ID) error, refreshQueryTimeout time.Duration, refreshInterval time.Duration, successfulOutboundQueryGracePeriod time.Duration, @@ -73,6 +75,7 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool enableAutoRefresh: autoRefresh, refreshKeyGenFnc: refreshKeyGenFnc, refreshQueryFnc: refreshQueryFnc, + refreshPingFnc: refreshPingFnc, refreshQueryTimeout: refreshQueryTimeout, refreshInterval: refreshInterval, @@ -178,12 +181,21 @@ func (r *RtRefreshManager) loop() { wg.Add(1) go func(ps kbucket.PeerInfo) { defer wg.Done() + livelinessCtx, cancel := context.WithTimeout(r.ctx, peerPingTimeout) + defer cancel() + if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil { + logger.Debugw("evicting peer after failed connection", "peer", ps.Id, "error", err) + r.rt.RemovePeer(ps.Id) + return + } + + if err := r.refreshPingFnc(livelinessCtx, ps.Id); err != nil { logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err) r.rt.RemovePeer(ps.Id) + return } - cancel() }(ps) } }