Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: implement new les fetcher #20692

Merged
merged 15 commits into from
Jul 28, 2020
2 changes: 1 addition & 1 deletion les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.T
if checkpoint != nil {
height = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
}
handler.fetcher = newLightFetcher(mclock.System{}, backend.blockchain, backend.engine, backend.peers, handler.ulc, backend.chainDb, backend.reqDist, handler.synchronise)
handler.fetcher = newLightFetcher(backend.blockchain, backend.engine, backend.peers, handler.ulc, backend.chainDb, backend.reqDist, handler.synchronise)
handler.downloader = downloader.New(height, backend.chainDb, nil, backend.eventMux, nil, backend.blockchain, handler.removePeer)
handler.backend.peers.subscribe((*downloaderPeerNotify)(handler))
return handler
Expand Down
150 changes: 97 additions & 53 deletions les/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,27 @@
package les

import (
"github.com/ethereum/go-ethereum/common/prque"
"math/rand"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/fetcher"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
)

const (
blockDelayTimeout = 10 * time.Second // Timeout for retrieving the headers from the peer
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired requests

outOrderThreshold = 3 // The maximum limit the server is allowed to send out_of_order announces
timeoutThreshold = 5 // The maximum limit the server is allowed for timeout requests
expirationRate = time.Hour // The linear expiration rate of the _bad_ behaviors statistic
blockDelayTimeout = 10 * time.Second // Timeout for retrieving the headers from the peer
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired requests
trustedItemsThreshold = 64 // The maximum queued trusted announcements
)

// announce represents an new block announcement from the les server.
Expand Down Expand Up @@ -69,17 +65,49 @@ type response struct {

// fetcherPeer holds the fetcher-specific information for each active peer
type fetcherPeer struct {
latest *announceData // The latest announcement sent from the peer
timeout utils.LinearExpiredValue // The counter of all timeout requests made
outOrder utils.LinearExpiredValue // The counter of all out-of-oreder announces
latest *announceData // The latest announcement sent from the peer
trustedMap map[common.Hash]uint64 // Trusted announces map
trustedQueue *prque.Prque // Trusted announces queue
}

// addTrustedAnno enqueues an new trusted announcement. If the queued
// announces overflow, evict from the oldest.
func (fp *fetcherPeer) addTrustedAnno(number uint64, hash common.Hash) {
// Short circuit if the announce already exists. In normal case it should
// never happen since only monotonic announce is accepted. But the adversary
// may feed us fake announces with higher td but same hash. In this case,
// ignore the announce anyway.
if _, exist := fp.trustedMap[hash]; exist {
return
}
fp.trustedMap[hash] = number
fp.trustedQueue.Push(hash, -int64(number))

// Evict oldest if the announces are oversized.
for fp.trustedQueue.Size() > trustedItemsThreshold {
item, _ := fp.trustedQueue.Pop()
delete(fp.trustedMap, item.(common.Hash))
}
}

// forwardTrustedAnno removes all announces from the map with a number lower than
// the provided threshold.
func (fp *fetcherPeer) forwardTrustedAnno(number uint64) {
for !fp.trustedQueue.Empty() {
item, priority := fp.trustedQueue.Pop()
if uint64(-priority) > number {
fp.trustedQueue.Push(item, priority)
return
}
delete(fp.trustedMap, item.(common.Hash))
}
}

// lightFetcher implements retrieval of newly announced headers. It reuses
// the eth.BlockFetcher as the underlying fetcher but adding more additional
// rules: e.g. evict "timeout" peers.
type lightFetcher struct {
// Various handlers
clock mclock.Clock
ulc *ulc
chaindb ethdb.Database
reqDist *requestDistributor
Expand All @@ -106,10 +134,11 @@ type lightFetcher struct {
// Test fields or hooks
noAnnounce bool
newHeadHook func(*types.Header)
newAnnounce func(*serverPeer, *announceData)
}

// newLightFetcher creates a light fetcher instance.
func newLightFetcher(clock mclock.Clock, chain *light.LightChain, engine consensus.Engine, peers *serverPeerSet, ulc *ulc, chaindb ethdb.Database, reqDist *requestDistributor, syncFn func(p *serverPeer)) *lightFetcher {
func newLightFetcher(chain *light.LightChain, engine consensus.Engine, peers *serverPeerSet, ulc *ulc, chaindb ethdb.Database, reqDist *requestDistributor, syncFn func(p *serverPeer)) *lightFetcher {
// Construct the fetcher by offering all necessary APIs
validator := func(header *types.Header) error {
// Disable seal verification explicitly if we are running in ulc mode.
Expand All @@ -126,7 +155,6 @@ func newLightFetcher(clock mclock.Clock, chain *light.LightChain, engine consens
return chain.InsertHeaderChain(headers, checkFreq)
}
f := &lightFetcher{
clock: clock,
ulc: ulc,
peerset: peers,
chaindb: chaindb,
Expand Down Expand Up @@ -163,8 +191,8 @@ func (f *lightFetcher) registerPeer(p *serverPeer) {
defer f.plock.Unlock()

f.peers[p.ID()] = &fetcherPeer{
timeout: utils.LinearExpiredValue{Offset: uint64(f.clock.Now() / mclock.AbsTime(expirationRate))},
outOrder: utils.LinearExpiredValue{Offset: uint64(f.clock.Now() / mclock.AbsTime(expirationRate))},
trustedMap: make(map[common.Hash]uint64),
trustedQueue: prque.New(nil),
}
}

Expand All @@ -184,6 +212,19 @@ func (f *lightFetcher) peer(id enode.ID) *fetcherPeer {
return f.peers[id]
}

// forEachPeer iterates the fetcher peerset, abort the iteration if the
// callback returns false.
func (f *lightFetcher) forEachPeer(check func(id enode.ID, p *fetcherPeer) bool) {
f.plock.RLock()
defer f.plock.RUnlock()

for id, peer := range f.peers {
if !check(id, peer) {
return
}
}
}

// mainloop is the main event loop of the light fetcher, which is responsible for
// - announcement maintenance(ulc)
// If we are running in ultra light client mode, then all announcements from
Expand All @@ -204,12 +245,10 @@ func (f *lightFetcher) mainloop() {
syncInterval = uint64(1) // Interval used to trigger a light resync.
syncing bool // Indicator whether the client is syncing

ulc = f.ulc != nil
headCh = make(chan core.ChainHeadEvent, 100)
trusted = make(map[common.Hash][]enode.ID)
trustedNumber = make(map[common.Hash]uint64)
fetching = make(map[uint64]*request)
requestTimer = time.NewTimer(0)
ulc = f.ulc != nil
headCh = make(chan core.ChainHeadEvent, 100)
fetching = make(map[uint64]*request)
requestTimer = time.NewTimer(0)

// Local status
localHead = f.chain.CurrentHeader()
Expand All @@ -226,8 +265,22 @@ func (f *lightFetcher) mainloop() {
// trustedHeader returns an indicator whether the header is regarded as
// trusted. If we are running in the ulc mode, only when we receive enough
// same announcement from trusted server, the header will be trusted.
trustedHeader := func(hash common.Hash) bool {
return 100*len(trusted[hash])/len(f.ulc.keys) >= f.ulc.fraction
trustedHeader := func(hash common.Hash, number uint64) (bool, []enode.ID) {
var (
agreed []enode.ID
trusted bool
)
f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
if n := p.trustedMap[hash]; n == number {
agreed = append(agreed, id)
if 100*len(agreed)/len(f.ulc.keys) >= f.ulc.fraction {
trusted = true
return false // abort iteration
}
}
return true
})
return trusted, agreed
}
for {
select {
Expand All @@ -243,11 +296,8 @@ func (f *lightFetcher) mainloop() {
// Announced tds should be strictly monotonic, drop the peer if
// there are too many out-of-order announces accumulated.
if peer.latest != nil && data.Td.Cmp(peer.latest.Td) <= 0 {
f.peerset.unregister(peerid.String())
log.Debug("Non-monotonic td", "peer", peerid, "current", data.Td, "previous", peer.latest.Td)

if count := peer.outOrder.Add(1, uint64(f.clock.Now()/mclock.AbsTime(expirationRate))); count > outOrderThreshold {
f.peerset.unregister(peerid.String())
}
continue
}
peer.latest = data
Expand All @@ -271,27 +321,27 @@ func (f *lightFetcher) mainloop() {
log.Debug("Trigger light sync", "peer", peerid, "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash)
continue
}
f.fetcher.Notify(peerid.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(peerid, data.Hash), nil)
f.fetcher.Notify(peerid.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(peerid), nil)
log.Debug("Trigger header retrieval", "peer", peerid, "number", data.Number, "hash", data.Hash)
}
// Keep collecting announces from trusted server even we are syncing.
if ulc && anno.trust {
number, hash := data.Number, data.Hash
trusted[hash], trustedNumber[hash] = append(trusted[hash], peerid), data.Number
peer.addTrustedAnno(data.Number, data.Hash)

// Notify underlying fetcher to retrieve header or trigger a resync if
// we have receive enough announcements from trusted server.
if trustedHeader(hash) && !syncing {
if number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 {
trusted, agreed := trustedHeader(data.Hash, data.Number)
if trusted && !syncing {
if data.Number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 {
syncing = true
if !f.requestResync(peerid) {
syncing = false
}
log.Debug("Trigger trusted light sync", "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash)
continue
}
p := trusted[hash][rand.Intn(len(trusted[hash]))]
f.fetcher.Notify(p.String(), hash, number, time.Now(), f.requestHeaderByHash(p, hash), nil)
p := agreed[rand.Intn(len(agreed))]
f.fetcher.Notify(p.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(p), nil)
log.Debug("Trigger trusted header retrieval", "number", data.Number, "hash", data.Hash)
}
}
Expand All @@ -306,15 +356,8 @@ func (f *lightFetcher) mainloop() {
for reqid, request := range fetching {
if time.Since(request.sendAt) > blockDelayTimeout-gatherSlack {
delete(fetching, reqid)
f.peerset.unregister(request.peerid.String())
log.Debug("request timeout", "peer", request.peerid, "reqid", reqid)

peer := f.peer(request.peerid)
if peer == nil {
continue
}
if count := peer.timeout.Add(1, uint64(f.clock.Now()/mclock.AbsTime(expirationRate))); count > timeoutThreshold {
f.peerset.unregister(request.peerid.String())
}
}
}
f.rescheduleTimer(fetching, requestTimer)
Expand All @@ -339,12 +382,10 @@ func (f *lightFetcher) mainloop() {

// Clean stale announcements from trusted server.
if ulc {
for h, n := range trustedNumber {
if n <= number {
delete(trustedNumber, h)
delete(trusted, h)
}
}
f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
p.forwardTrustedAnno(number)
return true
})
}
if f.newHeadHook != nil {
f.newHeadHook(localHead)
Expand All @@ -359,12 +400,12 @@ func (f *lightFetcher) mainloop() {
ancestor := rawdb.FindCommonAncestor(f.chaindb, origin, head)
var untrusted []common.Hash
for head.Number.Cmp(ancestor.Number) > 0 {
hash := head.Hash()
if trustedHeader(hash) {
hash, number := head.Hash(), head.Number.Uint64()
if trusted, _ := trustedHeader(hash, number); trusted {
break
}
untrusted = append(untrusted, hash)
head = f.chain.GetHeader(head.ParentHash, head.Number.Uint64()-1)
head = f.chain.GetHeader(head.ParentHash, number-1)
}
if len(untrusted) > 0 {
for i, j := 0, len(untrusted)-1; i < j; i, j = i+1, j-1 {
Expand All @@ -388,6 +429,9 @@ func (f *lightFetcher) mainloop() {

// announce processes a new announcement message received from a peer.
func (f *lightFetcher) announce(p *serverPeer, head *announceData) {
if f.newAnnounce != nil {
f.newAnnounce(p, head)
}
if f.noAnnounce {
return
}
Expand All @@ -412,7 +456,7 @@ func (f *lightFetcher) trackRequest(peerid enode.ID, reqid uint64) {
// Note, we rely on the underlying eth/fetcher to retrieve and validate the
// response, so that we have to obey the rule of eth/fetcher which only accepts
// the response from given peer.
func (f *lightFetcher) requestHeaderByHash(peerid enode.ID, hash common.Hash) func(common.Hash) error {
func (f *lightFetcher) requestHeaderByHash(peerid enode.ID) func(common.Hash) error {
return func(hash common.Hash) error {
req := &distReq{
getCost: func(dp distPeer) uint64 { return dp.(*serverPeer).getRequestCost(GetBlockHeadersMsg, 1) },
Expand Down
25 changes: 16 additions & 9 deletions les/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func testGappedAnnouncements(t *testing.T, protocol int) {

// Create connected peer pair.
c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync.
p1, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler)
peer, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler)
if err != nil {
t.Fatalf("Failed to create peer pair %v", err)
}
Expand All @@ -121,30 +121,38 @@ func testGappedAnnouncements(t *testing.T, protocol int) {

// Sign the announcement if necessary.
announce := announceData{hash, number, td, 0, nil}
if p1.cpeer.announceType == announceTypeSigned {
if peer.cpeer.announceType == announceTypeSigned {
announce.sign(s.handler.server.privateKey)
}
p1.cpeer.sendAnnounce(announce)
peer.cpeer.sendAnnounce(announce)

<-done // Wait syncing
verifyChainHeight(t, c.handler.fetcher, 4)

// Send a reorged announcement
var newAnno = make(chan struct{}, 1)
c.handler.fetcher.noAnnounce = true
c.handler.fetcher.newAnnounce = func(*serverPeer, *announceData) {
newAnno <- struct{}{}
}
blocks, _ := core.GenerateChain(rawdb.ReadChainConfig(s.db, s.backend.Blockchain().Genesis().Hash()), s.backend.Blockchain().GetBlockByNumber(3),
ethash.NewFaker(), s.db, 2, func(i int, gen *core.BlockGen) {
gen.OffsetTime(-9) // higher block difficulty
})
s.backend.Blockchain().InsertChain(blocks)
<-newAnno
c.handler.fetcher.noAnnounce = false
c.handler.fetcher.newAnnounce = nil

latest = blocks[len(blocks)-1].Header()
hash, number = latest.Hash(), latest.Number.Uint64()
td = rawdb.ReadTd(s.db, hash, number)

announce = announceData{hash, number, td, 1, nil}
if p1.cpeer.announceType == announceTypeSigned {
if peer.cpeer.announceType == announceTypeSigned {
announce.sign(s.handler.server.privateKey)
}
p1.cpeer.sendAnnounce(announce)
peer.cpeer.sendAnnounce(announce)

<-done // Wait syncing
verifyChainHeight(t, c.handler.fetcher, 5)
Expand Down Expand Up @@ -219,8 +227,7 @@ func testTrustedAnnouncement(t *testing.T, protocol int) {
}
verifyChainHeight(t, c.handler.fetcher, expected)
}
check([]uint64{1}, 1, func() { <-newHead }) // Sequential announcements
check([]uint64{4}, 4, func() { <-newHead }) // ULC-style light syncing, rollback untrusted headers
check([]uint64{6, 8}, 8, func() { <-newHead }) // ULC-style light syncing, keep the later trusted announces.
check([]uint64{10}, 10, func() { <-newHead }) // Sync the whole chain.
check([]uint64{1}, 1, func() { <-newHead }) // Sequential announcements
check([]uint64{4}, 4, func() { <-newHead }) // ULC-style light syncing, rollback untrusted headers
check([]uint64{10}, 10, func() { <-newHead }) // Sync the whole chain.
}
Loading