From 8144830e92cbfa44afd620c93dc59fe63680840b Mon Sep 17 00:00:00 2001 From: zsfelfoldi Date: Wed, 7 Feb 2018 16:30:50 +0100 Subject: [PATCH 1/5] p2p/discv5: add query delay, fix node address update logic, retry refresh if empty --- p2p/discv5/net.go | 49 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index f9baf126f12f..9518022694e7 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -569,7 +569,7 @@ loop: net.ping(n, n.addr()) return n.pingEcho }, func(n *Node, topic Topic) []byte { - if n.state == known { + if n.canQuery() { return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration } else { if n.state == unknown { @@ -633,15 +633,20 @@ loop: } net.refreshResp <- refreshDone case <-refreshDone: - log.Trace("<-net.refreshDone") - refreshDone = nil - list := searchReqWhenRefreshDone - searchReqWhenRefreshDone = nil - go func() { - for _, req := range list { - net.topicSearchReq <- req - } - }() + log.Trace("<-net.refreshDone", "table size", net.tab.count) + if net.tab.count != 0 { + refreshDone = nil + list := searchReqWhenRefreshDone + searchReqWhenRefreshDone = nil + go func() { + for _, req := range list { + net.topicSearchReq <- req + } + }() + } else { + refreshDone = make(chan struct{}) + net.refresh(refreshDone) + } } } log.Trace("loop stopped") @@ -751,7 +756,15 @@ func (net *Network) internNodeFromNeighbours(sender *net.UDPAddr, rn rpcNode) (n return n, err } if !n.IP.Equal(rn.IP) || n.UDP != rn.UDP || n.TCP != rn.TCP { - err = fmt.Errorf("metadata mismatch: got %v, want %v", rn, n) + if n.state == known { + // reject address change if node is known by us + err = fmt.Errorf("metadata mismatch: got %v, want %v", rn, n) + } else { + // accept otherwise; this will be handled nicer with signed ENRs + n.IP = rn.IP + n.UDP = rn.UDP + n.TCP = rn.TCP + } } return n, err } @@ -773,6 +786,11 @@ type nodeNetGuts struct { deferredQueries []*findnodeQuery // queries that can't be sent yet pendingNeighbours *findnodeQuery // current query, waiting for reply queryTimeouts int + canQueryAfter mclock.AbsTime // cannot query if zero +} + +func (n *nodeNetGuts) canQuery() bool { + return n.canQueryAfter != 0 && mclock.Now() > n.canQueryAfter } func (n *nodeNetGuts) deferQuery(q *findnodeQuery) { @@ -796,7 +814,7 @@ func (q *findnodeQuery) start(net *Network) bool { q.reply <- closest.entries return true } - if q.remote.state.canQuery && q.remote.pendingNeighbours == nil { + if q.remote.canQuery() && q.remote.pendingNeighbours == nil { net.conn.sendFindnodeHash(q.remote, q.target) net.timedEvent(respTimeout, q.remote, neighboursTimeout) q.remote.pendingNeighbours = q @@ -1068,6 +1086,13 @@ func (net *Network) checkPacket(n *Node, ev nodeEvent, pkt *ingressPacket) error func (net *Network) transition(n *Node, next *nodeState) { if n.state != next { + if next.canQuery { + if !n.state.canQuery { + n.canQueryAfter = mclock.Now()+mclock.AbsTime(time.Second) + } + } else { + n.canQueryAfter = 0 + } n.state = next if next.enter != nil { next.enter(net, n) From f303cc54676b240f65fa04b4efc3216547db19a9 Mon Sep 17 00:00:00 2001 From: zsfelfoldi Date: Wed, 7 Feb 2018 16:42:15 +0100 Subject: [PATCH 2/5] p2p/discv5: remove unnecessary ping before topic query --- p2p/discv5/net.go | 7 ++----- p2p/discv5/ticket.go | 4 ++-- p2p/discv5/udp.go | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index 9518022694e7..b16c065b1759 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -565,10 +565,7 @@ loop: if lookupChn := searchInfo[res.target.topic].lookupChn; lookupChn != nil { lookupChn <- net.ticketStore.radius[res.target.topic].converged } - net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node) []byte { - net.ping(n, n.addr()) - return n.pingEcho - }, func(n *Node, topic Topic) []byte { + net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node, topic Topic) []byte { if n.canQuery() { return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration } else { @@ -1088,7 +1085,7 @@ func (net *Network) transition(n *Node, next *nodeState) { if n.state != next { if next.canQuery { if !n.state.canQuery { - n.canQueryAfter = mclock.Now()+mclock.AbsTime(time.Second) + n.canQueryAfter = mclock.Now()+mclock.AbsTime(queryDelay) } } else { n.canQueryAfter = 0 diff --git a/p2p/discv5/ticket.go b/p2p/discv5/ticket.go index 37ce8d23cbc5..b3d1ac4bafbb 100644 --- a/p2p/discv5/ticket.go +++ b/p2p/discv5/ticket.go @@ -494,13 +494,13 @@ func (s *ticketStore) registerLookupDone(lookup lookupInfo, nodes []*Node, ping } } -func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, ping func(n *Node) []byte, query func(n *Node, topic Topic) []byte) { +func (s *ticketStore) searchLookupDone(lookup lookupInfo, nodes []*Node, query func(n *Node, topic Topic) []byte) { now := mclock.Now() for i, n := range nodes { if i == 0 || (binary.BigEndian.Uint64(n.sha[:8])^binary.BigEndian.Uint64(lookup.target[:8])) < s.radius[lookup.topic].minRadius { if lookup.radiusLookup { if lastReq, ok := s.nodeLastReq[n]; !ok || time.Duration(now-lastReq.time) > radiusTC { - s.nodeLastReq[n] = reqInfo{pingHash: ping(n), lookup: lookup, time: now} + s.nodeLastReq[n] = reqInfo{pingHash: nil, lookup: lookup, time: now} } } // else { if s.canQueryTopic(n, lookup.topic) { diff --git a/p2p/discv5/udp.go b/p2p/discv5/udp.go index 5437718173d0..fead1b615918 100644 --- a/p2p/discv5/udp.go +++ b/p2p/discv5/udp.go @@ -49,7 +49,7 @@ var ( // Timeouts const ( respTimeout = 500 * time.Millisecond - sendTimeout = 500 * time.Millisecond + queryDelay = 1000 * time.Millisecond expiration = 20 * time.Second ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP From 19de3bc61fe5ce437b7623bdbf28f551cd7bb504 Mon Sep 17 00:00:00 2001 From: zsfelfoldi Date: Thu, 8 Feb 2018 13:15:06 +0100 Subject: [PATCH 3/5] p2p/discv5: do not filter local address from topicNodes --- p2p/discv5/udp.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/p2p/discv5/udp.go b/p2p/discv5/udp.go index fead1b615918..43f54060512e 100644 --- a/p2p/discv5/udp.go +++ b/p2p/discv5/udp.go @@ -318,20 +318,20 @@ func (t *udp) sendTopicRegister(remote *Node, topics []Topic, idx int, pong []by func (t *udp) sendTopicNodes(remote *Node, queryHash common.Hash, nodes []*Node) { p := topicNodes{Echo: queryHash} - if len(nodes) == 0 { - t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p) - return - } - for i, result := range nodes { - if netutil.CheckRelayIP(remote.IP, result.IP) != nil { - continue + var sent bool + for _, result := range nodes { + if result.IP.Equal(t.net.tab.self.IP) || netutil.CheckRelayIP(remote.IP, result.IP) == nil { + p.Nodes = append(p.Nodes, nodeToRPC(result)) } - p.Nodes = append(p.Nodes, nodeToRPC(result)) - if len(p.Nodes) == maxTopicNodes || i == len(nodes)-1 { + if len(p.Nodes) == maxTopicNodes { t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p) p.Nodes = p.Nodes[:0] + sent = true } } + if !sent || len(p.Nodes) > 0 { + t.sendPacket(remote.ID, remote.addr(), byte(topicNodesPacket), p) + } } func (t *udp) sendPacket(toid NodeID, toaddr *net.UDPAddr, ptype byte, req interface{}) (hash []byte, err error) { From 05328a40a467579906279ba12adea95e0c2976b0 Mon Sep 17 00:00:00 2001 From: zsfelfoldi Date: Thu, 8 Feb 2018 13:47:33 +0100 Subject: [PATCH 4/5] p2p/discv5: remove canQuery() --- p2p/discv5/net.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/p2p/discv5/net.go b/p2p/discv5/net.go index b16c065b1759..52c677b623b3 100644 --- a/p2p/discv5/net.go +++ b/p2p/discv5/net.go @@ -566,7 +566,7 @@ loop: lookupChn <- net.ticketStore.radius[res.target.topic].converged } net.ticketStore.searchLookupDone(res.target, res.nodes, func(n *Node, topic Topic) []byte { - if n.canQuery() { + if n.state != nil && n.state.canQuery { return net.conn.send(n, topicQueryPacket, topicQuery{Topic: topic}) // TODO: set expiration } else { if n.state == unknown { @@ -783,11 +783,6 @@ type nodeNetGuts struct { deferredQueries []*findnodeQuery // queries that can't be sent yet pendingNeighbours *findnodeQuery // current query, waiting for reply queryTimeouts int - canQueryAfter mclock.AbsTime // cannot query if zero -} - -func (n *nodeNetGuts) canQuery() bool { - return n.canQueryAfter != 0 && mclock.Now() > n.canQueryAfter } func (n *nodeNetGuts) deferQuery(q *findnodeQuery) { @@ -811,7 +806,7 @@ func (q *findnodeQuery) start(net *Network) bool { q.reply <- closest.entries return true } - if q.remote.canQuery() && q.remote.pendingNeighbours == nil { + if q.remote.state.canQuery && q.remote.pendingNeighbours == nil { net.conn.sendFindnodeHash(q.remote, q.target) net.timedEvent(respTimeout, q.remote, neighboursTimeout) q.remote.pendingNeighbours = q @@ -1083,13 +1078,6 @@ func (net *Network) checkPacket(n *Node, ev nodeEvent, pkt *ingressPacket) error func (net *Network) transition(n *Node, next *nodeState) { if n.state != next { - if next.canQuery { - if !n.state.canQuery { - n.canQueryAfter = mclock.Now()+mclock.AbsTime(queryDelay) - } - } else { - n.canQueryAfter = 0 - } n.state = next if next.enter != nil { next.enter(net, n) From ea20c71d8b91f1bbe3979b95e06681ccd36c73f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 8 Feb 2018 18:48:49 +0200 Subject: [PATCH 5/5] p2p/discv5: gofmt --- p2p/discv5/udp.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/discv5/udp.go b/p2p/discv5/udp.go index 43f54060512e..6ce72d2c152b 100644 --- a/p2p/discv5/udp.go +++ b/p2p/discv5/udp.go @@ -49,7 +49,7 @@ var ( // Timeouts const ( respTimeout = 500 * time.Millisecond - queryDelay = 1000 * time.Millisecond + queryDelay = 1000 * time.Millisecond expiration = 20 * time.Second ntpFailureThreshold = 32 // Continuous timeouts after which to check NTP