Skip to content

Commit

Permalink
les: address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Jun 1, 2020
1 parent 81b721a commit 814302c
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 83 deletions.
2 changes: 1 addition & 1 deletion les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.T
if checkpoint != nil {
height = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
}
handler.fetcher = newLightFetcher(mclock.System{}, backend.blockchain, backend.engine, backend.peers, handler.ulc, backend.chainDb, backend.reqDist, handler.synchronise)
handler.fetcher = newLightFetcher(backend.blockchain, backend.engine, backend.peers, handler.ulc, backend.chainDb, backend.reqDist, handler.synchronise)
handler.downloader = downloader.New(height, backend.chainDb, nil, backend.eventMux, nil, backend.blockchain, handler.removePeer)
handler.backend.peers.subscribe((*downloaderPeerNotify)(handler))
return handler
Expand Down
150 changes: 97 additions & 53 deletions les/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,27 @@
package les

import (
"github.com/ethereum/go-ethereum/common/prque"
"math/rand"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/fetcher"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
)

const (
blockDelayTimeout = 10 * time.Second // Timeout for retrieving the headers from the peer
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired requests

outOrderThreshold = 3 // The maximum limit the server is allowed to send out_of_order announces
timeoutThreshold = 5 // The maximum limit the server is allowed for timeout requests
expirationRate = time.Hour // The linear expiration rate of the _bad_ behaviors statistic
blockDelayTimeout = 10 * time.Second // Timeout for retrieving the headers from the peer
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired requests
trustedItemsThreshold = 64 // The maximum queued trusted announcements
)

// announce represents an new block announcement from the les server.
Expand Down Expand Up @@ -69,17 +65,49 @@ type response struct {

// fetcherPeer holds the fetcher-specific information for each active peer
type fetcherPeer struct {
latest *announceData // The latest announcement sent from the peer
timeout utils.LinearExpiredValue // The counter of all timeout requests made
outOrder utils.LinearExpiredValue // The counter of all out-of-oreder announces
latest *announceData // The latest announcement sent from the peer
trustedMap map[common.Hash]uint64 // Trusted announces map
trustedQueue *prque.Prque // Trusted announces queue
}

// addTrustedAnno enqueues an new trusted announcement. If the queued
// announces overflow, evict from the oldest.
func (fp *fetcherPeer) addTrustedAnno(number uint64, hash common.Hash) {
// Short circuit if the announce already exists. In normal case it should
// never happen since only monotonic announce is accepted. But the adversary
// may feed us fake announces with higher td but same hash. In this case,
// ignore the announce anyway.
if _, exist := fp.trustedMap[hash]; exist {
return
}
fp.trustedMap[hash] = number
fp.trustedQueue.Push(hash, -int64(number))

// Evict oldest if the announces are oversized.
for fp.trustedQueue.Size() > trustedItemsThreshold {
item, _ := fp.trustedQueue.Pop()
delete(fp.trustedMap, item.(common.Hash))
}
}

// forwardTrustedAnno removes all announces from the map with a number lower than
// the provided threshold.
func (fp *fetcherPeer) forwardTrustedAnno(number uint64) {
for !fp.trustedQueue.Empty() {
item, priority := fp.trustedQueue.Pop()
if uint64(-priority) > number {
fp.trustedQueue.Push(item, priority)
return
}
delete(fp.trustedMap, item.(common.Hash))
}
}

// lightFetcher implements retrieval of newly announced headers. It reuses
// the eth.BlockFetcher as the underlying fetcher but adding more additional
// rules: e.g. evict "timeout" peers.
type lightFetcher struct {
// Various handlers
clock mclock.Clock
ulc *ulc
chaindb ethdb.Database
reqDist *requestDistributor
Expand All @@ -106,10 +134,11 @@ type lightFetcher struct {
// Test fields or hooks
noAnnounce bool
newHeadHook func(*types.Header)
newAnnounce func(*serverPeer, *announceData)
}

// newLightFetcher creates a light fetcher instance.
func newLightFetcher(clock mclock.Clock, chain *light.LightChain, engine consensus.Engine, peers *serverPeerSet, ulc *ulc, chaindb ethdb.Database, reqDist *requestDistributor, syncFn func(p *serverPeer)) *lightFetcher {
func newLightFetcher(chain *light.LightChain, engine consensus.Engine, peers *serverPeerSet, ulc *ulc, chaindb ethdb.Database, reqDist *requestDistributor, syncFn func(p *serverPeer)) *lightFetcher {
// Construct the fetcher by offering all necessary APIs
validator := func(header *types.Header) error {
// Disable seal verification explicitly if we are running in ulc mode.
Expand All @@ -126,7 +155,6 @@ func newLightFetcher(clock mclock.Clock, chain *light.LightChain, engine consens
return chain.InsertHeaderChain(headers, checkFreq)
}
f := &lightFetcher{
clock: clock,
ulc: ulc,
peerset: peers,
chaindb: chaindb,
Expand Down Expand Up @@ -163,8 +191,8 @@ func (f *lightFetcher) registerPeer(p *serverPeer) {
defer f.plock.Unlock()

f.peers[p.ID()] = &fetcherPeer{
timeout: utils.LinearExpiredValue{Offset: uint64(f.clock.Now() / mclock.AbsTime(expirationRate))},
outOrder: utils.LinearExpiredValue{Offset: uint64(f.clock.Now() / mclock.AbsTime(expirationRate))},
trustedMap: make(map[common.Hash]uint64),
trustedQueue: prque.New(nil),
}
}

Expand All @@ -184,6 +212,19 @@ func (f *lightFetcher) peer(id enode.ID) *fetcherPeer {
return f.peers[id]
}

// forEachPeer iterates the fetcher peerset, abort the iteration if the
// callback returns false.
func (f *lightFetcher) forEachPeer(check func(id enode.ID, p *fetcherPeer) bool) {
f.plock.RLock()
defer f.plock.RUnlock()

for id, peer := range f.peers {
if !check(id, peer) {
return
}
}
}

// mainloop is the main event loop of the light fetcher, which is responsible for
// - announcement maintenance(ulc)
// If we are running in ultra light client mode, then all announcements from
Expand All @@ -204,12 +245,10 @@ func (f *lightFetcher) mainloop() {
syncInterval = uint64(1) // Interval used to trigger a light resync.
syncing bool // Indicator whether the client is syncing

ulc = f.ulc != nil
headCh = make(chan core.ChainHeadEvent, 100)
trusted = make(map[common.Hash][]enode.ID)
trustedNumber = make(map[common.Hash]uint64)
fetching = make(map[uint64]*request)
requestTimer = time.NewTimer(0)
ulc = f.ulc != nil
headCh = make(chan core.ChainHeadEvent, 100)
fetching = make(map[uint64]*request)
requestTimer = time.NewTimer(0)

// Local status
localHead = f.chain.CurrentHeader()
Expand All @@ -226,8 +265,22 @@ func (f *lightFetcher) mainloop() {
// trustedHeader returns an indicator whether the header is regarded as
// trusted. If we are running in the ulc mode, only when we receive enough
// same announcement from trusted server, the header will be trusted.
trustedHeader := func(hash common.Hash) bool {
return 100*len(trusted[hash])/len(f.ulc.keys) >= f.ulc.fraction
trustedHeader := func(hash common.Hash, number uint64) (bool, []enode.ID) {
var (
agreed []enode.ID
trusted bool
)
f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
if n := p.trustedMap[hash]; n == number {
agreed = append(agreed, id)
if 100*len(agreed)/len(f.ulc.keys) >= f.ulc.fraction {
trusted = true
return false // abort iteration
}
}
return true
})
return trusted, agreed
}
for {
select {
Expand All @@ -243,11 +296,8 @@ func (f *lightFetcher) mainloop() {
// Announced tds should be strictly monotonic, drop the peer if
// there are too many out-of-order announces accumulated.
if peer.latest != nil && data.Td.Cmp(peer.latest.Td) <= 0 {
f.peerset.unregister(peerid.String())
log.Debug("Non-monotonic td", "peer", peerid, "current", data.Td, "previous", peer.latest.Td)

if count := peer.outOrder.Add(1, uint64(f.clock.Now()/mclock.AbsTime(expirationRate))); count > outOrderThreshold {
f.peerset.unregister(peerid.String())
}
continue
}
peer.latest = data
Expand All @@ -271,27 +321,27 @@ func (f *lightFetcher) mainloop() {
log.Debug("Trigger light sync", "peer", peerid, "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash)
continue
}
f.fetcher.Notify(peerid.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(peerid, data.Hash), nil)
f.fetcher.Notify(peerid.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(peerid), nil)
log.Debug("Trigger header retrieval", "peer", peerid, "number", data.Number, "hash", data.Hash)
}
// Keep collecting announces from trusted server even we are syncing.
if ulc && anno.trust {
number, hash := data.Number, data.Hash
trusted[hash], trustedNumber[hash] = append(trusted[hash], peerid), data.Number
peer.addTrustedAnno(data.Number, data.Hash)

// Notify underlying fetcher to retrieve header or trigger a resync if
// we have receive enough announcements from trusted server.
if trustedHeader(hash) && !syncing {
if number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 {
trusted, agreed := trustedHeader(data.Hash, data.Number)
if trusted && !syncing {
if data.Number > localHead.Number.Uint64()+syncInterval || data.ReorgDepth > 0 {
syncing = true
if !f.requestResync(peerid) {
syncing = false
}
log.Debug("Trigger trusted light sync", "local", localHead.Number, "localhash", localHead.Hash(), "remote", data.Number, "remotehash", data.Hash)
continue
}
p := trusted[hash][rand.Intn(len(trusted[hash]))]
f.fetcher.Notify(p.String(), hash, number, time.Now(), f.requestHeaderByHash(p, hash), nil)
p := agreed[rand.Intn(len(agreed))]
f.fetcher.Notify(p.String(), data.Hash, data.Number, time.Now(), f.requestHeaderByHash(p), nil)
log.Debug("Trigger trusted header retrieval", "number", data.Number, "hash", data.Hash)
}
}
Expand All @@ -306,15 +356,8 @@ func (f *lightFetcher) mainloop() {
for reqid, request := range fetching {
if time.Since(request.sendAt) > blockDelayTimeout-gatherSlack {
delete(fetching, reqid)
f.peerset.unregister(request.peerid.String())
log.Debug("request timeout", "peer", request.peerid, "reqid", reqid)

peer := f.peer(request.peerid)
if peer == nil {
continue
}
if count := peer.timeout.Add(1, uint64(f.clock.Now()/mclock.AbsTime(expirationRate))); count > timeoutThreshold {
f.peerset.unregister(request.peerid.String())
}
}
}
f.rescheduleTimer(fetching, requestTimer)
Expand All @@ -339,12 +382,10 @@ func (f *lightFetcher) mainloop() {

// Clean stale announcements from trusted server.
if ulc {
for h, n := range trustedNumber {
if n <= number {
delete(trustedNumber, h)
delete(trusted, h)
}
}
f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
p.forwardTrustedAnno(number)
return true
})
}
if f.newHeadHook != nil {
f.newHeadHook(localHead)
Expand All @@ -359,12 +400,12 @@ func (f *lightFetcher) mainloop() {
ancestor := rawdb.FindCommonAncestor(f.chaindb, origin, head)
var untrusted []common.Hash
for head.Number.Cmp(ancestor.Number) > 0 {
hash := head.Hash()
if trustedHeader(hash) {
hash, number := head.Hash(), head.Number.Uint64()
if trusted, _ := trustedHeader(hash, number); trusted {
break
}
untrusted = append(untrusted, hash)
head = f.chain.GetHeader(head.ParentHash, head.Number.Uint64()-1)
head = f.chain.GetHeader(head.ParentHash, number-1)
}
if len(untrusted) > 0 {
for i, j := 0, len(untrusted)-1; i < j; i, j = i+1, j-1 {
Expand All @@ -388,6 +429,9 @@ func (f *lightFetcher) mainloop() {

// announce processes a new announcement message received from a peer.
func (f *lightFetcher) announce(p *serverPeer, head *announceData) {
if f.newAnnounce != nil {
f.newAnnounce(p, head)
}
if f.noAnnounce {
return
}
Expand All @@ -412,7 +456,7 @@ func (f *lightFetcher) trackRequest(peerid enode.ID, reqid uint64) {
// Note, we rely on the underlying eth/fetcher to retrieve and validate the
// response, so that we have to obey the rule of eth/fetcher which only accepts
// the response from given peer.
func (f *lightFetcher) requestHeaderByHash(peerid enode.ID, hash common.Hash) func(common.Hash) error {
func (f *lightFetcher) requestHeaderByHash(peerid enode.ID) func(common.Hash) error {
return func(hash common.Hash) error {
req := &distReq{
getCost: func(dp distPeer) uint64 { return dp.(*serverPeer).getRequestCost(GetBlockHeadersMsg, 1) },
Expand Down
25 changes: 16 additions & 9 deletions les/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func testGappedAnnouncements(t *testing.T, protocol int) {

// Create connected peer pair.
c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync.
p1, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler)
peer, _, err := newTestPeerPair("peer", protocol, s.handler, c.handler)
if err != nil {
t.Fatalf("Failed to create peer pair %v", err)
}
Expand All @@ -121,30 +121,38 @@ func testGappedAnnouncements(t *testing.T, protocol int) {

// Sign the announcement if necessary.
announce := announceData{hash, number, td, 0, nil}
if p1.cpeer.announceType == announceTypeSigned {
if peer.cpeer.announceType == announceTypeSigned {
announce.sign(s.handler.server.privateKey)
}
p1.cpeer.sendAnnounce(announce)
peer.cpeer.sendAnnounce(announce)

<-done // Wait syncing
verifyChainHeight(t, c.handler.fetcher, 4)

// Send a reorged announcement
var newAnno = make(chan struct{}, 1)
c.handler.fetcher.noAnnounce = true
c.handler.fetcher.newAnnounce = func(*serverPeer, *announceData) {
newAnno <- struct{}{}
}
blocks, _ := core.GenerateChain(rawdb.ReadChainConfig(s.db, s.backend.Blockchain().Genesis().Hash()), s.backend.Blockchain().GetBlockByNumber(3),
ethash.NewFaker(), s.db, 2, func(i int, gen *core.BlockGen) {
gen.OffsetTime(-9) // higher block difficulty
})
s.backend.Blockchain().InsertChain(blocks)
<-newAnno
c.handler.fetcher.noAnnounce = false
c.handler.fetcher.newAnnounce = nil

latest = blocks[len(blocks)-1].Header()
hash, number = latest.Hash(), latest.Number.Uint64()
td = rawdb.ReadTd(s.db, hash, number)

announce = announceData{hash, number, td, 1, nil}
if p1.cpeer.announceType == announceTypeSigned {
if peer.cpeer.announceType == announceTypeSigned {
announce.sign(s.handler.server.privateKey)
}
p1.cpeer.sendAnnounce(announce)
peer.cpeer.sendAnnounce(announce)

<-done // Wait syncing
verifyChainHeight(t, c.handler.fetcher, 5)
Expand Down Expand Up @@ -219,8 +227,7 @@ func testTrustedAnnouncement(t *testing.T, protocol int) {
}
verifyChainHeight(t, c.handler.fetcher, expected)
}
check([]uint64{1}, 1, func() { <-newHead }) // Sequential announcements
check([]uint64{4}, 4, func() { <-newHead }) // ULC-style light syncing, rollback untrusted headers
check([]uint64{6, 8}, 8, func() { <-newHead }) // ULC-style light syncing, keep the later trusted announces.
check([]uint64{10}, 10, func() { <-newHead }) // Sync the whole chain.
check([]uint64{1}, 1, func() { <-newHead }) // Sequential announcements
check([]uint64{4}, 4, func() { <-newHead }) // ULC-style light syncing, rollback untrusted headers
check([]uint64{10}, 10, func() { <-newHead }) // Sync the whole chain.
}
Loading

0 comments on commit 814302c

Please sign in to comment.