Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: add checkpoint challenge for LES #20125

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,9 @@ func (d *Downloader) UnregisterPeer(id string) error {
}
d.queue.Revoke(id)

if d.isMaster(id) {
d.cancel()
}
return nil
}

Expand Down Expand Up @@ -551,6 +554,14 @@ func (d *Downloader) spawnSync(fetchers []func() error) error {
return err
}

// isMaster returns an indicator whether the given peer id is master peer
// used for syncing.
func (d *Downloader) isMaster(id string) bool {
d.cancelLock.RLock()
defer d.cancelLock.RUnlock()
return d.cancelPeer == id
}

// cancel aborts all of the operations and resets the queue. However, cancel does
// not wait for the running download goroutines to finish. This method should be
// used when cancelling the downloads from inside the downloader.
Expand Down Expand Up @@ -1275,15 +1286,11 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack)
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
} else {
// In dropPeer function, a callback will be called which aborts
// the sync immediately if the unregisted peer is master peer.
// If the peer is master one, return concrete error here.
d.dropPeer(pid)

// If this peer was the master peer, abort sync immediately
d.cancelLock.RLock()
master := pid == d.cancelPeer
d.cancelLock.RUnlock()

if master {
d.cancel()
if d.isMaster(pid) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, you are introducing a gap between checking isMaster and actually doing the cancel. I assume that the cancel now happens later, during UnregisterPeer -- but is it possible that when that happens, the pid is no longer the master?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d.cancelPeer is only assigned before syncing start at d.synchronise function.

Although we drop the peer but d.synchronising is still 1, so we don't need to worry about the mismatch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that the cancel now happens later

Actually cancel now happen earlier in dropPeer function

return errTimeout
}
}
Expand Down
57 changes: 55 additions & 2 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,20 @@ func (dl *downloadTester) dropPeer(id string) {
dl.downloader.UnregisterPeer(id)
}

// setDelay adds a response delay to test peer.
func (dl *downloadTester) setDelay(id string, delay time.Duration) {
dl.lock.Lock()
defer dl.lock.Unlock()

dl.peers[id].delayResponse = delay
}

type downloadTesterPeer struct {
dl *downloadTester
id string
lock sync.RWMutex
chain *testChain
delayResponse time.Duration
missingStates map[common.Hash]bool // State entries that fast sync should not return
}

Expand All @@ -384,7 +393,7 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i
if reverse {
panic("reverse header requests not supported")
}

time.Sleep(dlp.delayResponse)
result := dlp.chain.headersByHash(origin, amount, skip)
go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
return nil
Expand All @@ -397,7 +406,7 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int,
if reverse {
panic("reverse header requests not supported")
}

time.Sleep(dlp.delayResponse)
result := dlp.chain.headersByNumber(origin, amount, skip)
go dlp.dl.downloader.DeliverHeaders(dlp.id, result)
return nil
Expand All @@ -407,6 +416,7 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int,
// peer in the download tester. The returned function can be used to retrieve
// batches of block bodies from the particularly requested peer.
func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error {
time.Sleep(dlp.delayResponse)
txs, uncles := dlp.chain.bodies(hashes)
go dlp.dl.downloader.DeliverBodies(dlp.id, txs, uncles)
return nil
Expand All @@ -416,6 +426,7 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash) error {
// peer in the download tester. The returned function can be used to retrieve
// batches of block receipts from the particularly requested peer.
func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash) error {
time.Sleep(dlp.delayResponse)
receipts := dlp.chain.receipts(hashes)
go dlp.dl.downloader.DeliverReceipts(dlp.id, receipts)
return nil
Expand All @@ -428,6 +439,7 @@ func (dlp *downloadTesterPeer) RequestNodeData(hashes []common.Hash) error {
dlp.dl.lock.RLock()
defer dlp.dl.lock.RUnlock()

time.Sleep(dlp.delayResponse)
results := make([][]byte, 0, len(hashes))
for _, hash := range hashes {
if data, err := dlp.dl.peerDb.Get(hash.Bytes()); err == nil {
Expand Down Expand Up @@ -1656,3 +1668,44 @@ func testCheckpointEnforcement(t *testing.T, protocol int, mode SyncMode) {
assertOwnChain(t, tester, chain.len())
}
}

func TestCancelMasterPeer62(t *testing.T) { testCancelMasterPeer(t, 62, FullSync) }
func TestCancelMasterPeer63Full(t *testing.T) { testCancelMasterPeer(t, 63, FullSync) }
func TestCancelMasterPeer63Fast(t *testing.T) { testCancelMasterPeer(t, 63, FastSync) }
func TestCancelMasterPeer64Full(t *testing.T) { testCancelMasterPeer(t, 64, FullSync) }
func TestCancelMasterPeer64Fast(t *testing.T) { testCancelMasterPeer(t, 64, FastSync) }
func TestCancelMasterPeer64Light(t *testing.T) { testCancelMasterPeer(t, 64, LightSync) }

func testCancelMasterPeer(t *testing.T, protocol int, mode SyncMode) {
t.Parallel()

// Create a new tester
tester := newTester()
defer tester.terminate()

// Attempt to sync with the two peers
chain1 := testChainBase.shorten(MaxHeaderFetch + 64)
tester.newPeer("peer1", protocol, chain1) // Mark peer1 as the master peer

// Add some response delay so that we have enough time to unregister master
// peer before sync finished.
tester.setDelay("peer1", time.Duration(time.Millisecond*300))
chain2 := testChainBase.shorten(MaxHeaderFetch)
tester.newPeer("peer2", protocol, chain2) // Mark peer2 as the auxiliary peer

var errCh = make(chan error, 1)
go func() {
errCh <- tester.sync("peer1", nil, mode)
}()
time.Sleep(time.Millisecond * 300) // Ensure we have started the syncing
tester.downloader.UnregisterPeer("peer1") // Unregister the master peer, which should abort sync

select {
case err := <-errCh:
if err != errCanceled {
t.Fatalf("error mismatch, want %v, got %v", errCanceled, err)
}
case <-time.NewTimer(time.Second).C:
t.Fatalf("timeout")
}
}
12 changes: 4 additions & 8 deletions eth/downloader/statesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,15 +316,11 @@ func (s *stateSync) loop() (err error) {
// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
req.peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", req.peer.id)
} else {
// In dropPeer function, a callback will be called which aborts
// the sync immediately if the unregisted peer is master peer.
// If the peer is master one, return concrete error here.
s.d.dropPeer(req.peer.id)

// If this peer was the master peer, abort sync immediately
s.d.cancelLock.RLock()
master := req.peer.id == s.d.cancelPeer
s.d.cancelLock.RUnlock()

if master {
s.d.cancel()
if s.d.isMaster(req.peer.id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above

return errTimeout
}
}
Expand Down
18 changes: 9 additions & 9 deletions les/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (b *benchmarkBlockHeaders) init(h *serverHandler, count int) error {

func (b *benchmarkBlockHeaders) request(peer *peer, index int) error {
if b.byHash {
return peer.RequestHeadersByHash(0, 0, b.hashes[index], b.amount, b.skip, b.reverse)
return peer.RequestHeadersByHash(0, b.hashes[index], b.amount, b.skip, b.reverse)
} else {
return peer.RequestHeadersByNumber(0, 0, uint64(b.offset+rand.Int63n(b.randMax)), b.amount, b.skip, b.reverse)
return peer.RequestHeadersByNumber(0, uint64(b.offset+rand.Int63n(b.randMax)), b.amount, b.skip, b.reverse)
}
}

Expand All @@ -97,9 +97,9 @@ func (b *benchmarkBodiesOrReceipts) init(h *serverHandler, count int) error {

func (b *benchmarkBodiesOrReceipts) request(peer *peer, index int) error {
if b.receipts {
return peer.RequestReceipts(0, 0, []common.Hash{b.hashes[index]})
return peer.RequestReceipts(0, []common.Hash{b.hashes[index]})
} else {
return peer.RequestBodies(0, 0, []common.Hash{b.hashes[index]})
return peer.RequestBodies(0, []common.Hash{b.hashes[index]})
}
}

Expand All @@ -118,9 +118,9 @@ func (b *benchmarkProofsOrCode) request(peer *peer, index int) error {
key := make([]byte, 32)
rand.Read(key)
if b.code {
return peer.RequestCode(0, 0, []CodeReq{{BHash: b.headHash, AccKey: key}})
return peer.RequestCode(0, []CodeReq{{BHash: b.headHash, AccKey: key}})
} else {
return peer.RequestProofs(0, 0, []ProofReq{{BHash: b.headHash, Key: key}})
return peer.RequestProofs(0, []ProofReq{{BHash: b.headHash, Key: key}})
}
}

Expand Down Expand Up @@ -163,7 +163,7 @@ func (b *benchmarkHelperTrie) request(peer *peer, index int) error {
}
}

return peer.RequestHelperTrieProofs(0, 0, reqs)
return peer.RequestHelperTrieProofs(0, reqs)
}

// benchmarkTxSend implements requestBenchmark
Expand Down Expand Up @@ -191,7 +191,7 @@ func (b *benchmarkTxSend) init(h *serverHandler, count int) error {

func (b *benchmarkTxSend) request(peer *peer, index int) error {
enc, _ := rlp.EncodeToBytes(types.Transactions{b.txs[index]})
return peer.SendTxs(0, 0, enc)
return peer.SendTxs(0, enc)
}

// benchmarkTxStatus implements requestBenchmark
Expand All @@ -204,7 +204,7 @@ func (b *benchmarkTxStatus) init(h *serverHandler, count int) error {
func (b *benchmarkTxStatus) request(peer *peer, index int) error {
var hash common.Hash
rand.Read(hash[:])
return peer.RequestTxStatus(0, 0, []common.Hash{hash})
return peer.RequestTxStatus(0, []common.Hash{hash})
}

// benchmarkSetup stores measurement data for a single benchmark type
Expand Down
85 changes: 61 additions & 24 deletions les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,30 @@ import (
"github.com/ethereum/go-ethereum/params"
)

var checkpointChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the checkpoint progress challenge

// clientHandler is responsible for receiving and processing all incoming server
// responses.
type clientHandler struct {
ulc *ulc
clock mclock.Clock
checkpoint *params.TrustedCheckpoint
fetcher *lightFetcher
downloader *downloader.Downloader
backend *LightEthereum

closeCh chan struct{}
wg sync.WaitGroup // WaitGroup used to track all connected peers.
syncDone func() // Test hooks when syncing is done.
closeCh chan struct{}
wg sync.WaitGroup // WaitGroup used to track all connected peers.

// Testing fields or hooks
ignoreCheckpoint bool // Indicator whether ignore received checkpoint
syncDone func() // Test hooks when syncing is done.
}

func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.TrustedCheckpoint, backend *LightEthereum) *clientHandler {
handler := &clientHandler{
checkpoint: checkpoint,
clock: mclock.System{},
backend: backend,
closeCh: make(chan struct{}),
}
Expand All @@ -61,7 +68,7 @@ func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.T
}
var height uint64
if checkpoint != nil {
height = (checkpoint.SectionIndex+1)*params.CHTFrequency - 1
height = (checkpoint.SectionIndex+1)*backend.iConfig.ChtSize - 1
}
handler.fetcher = newLightFetcher(handler)
handler.downloader = downloader.New(height, backend.chainDb, nil, backend.eventMux, nil, backend.blockchain, handler.removePeer)
Expand Down Expand Up @@ -89,6 +96,7 @@ func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter)
}
h.wg.Add(1)
defer h.wg.Done()

err := h.handle(peer)
h.backend.serverPool.disconnect(peer.poolEntry)
return err
Expand Down Expand Up @@ -131,6 +139,26 @@ func (h *clientHandler) handle(p *peer) error {
if p.poolEntry != nil {
h.backend.serverPool.registered(p.poolEntry)
}
// If we have a trusted CHT, reject all peers below that (avoid light sync eclipse)
if h.checkpoint != nil {
// Request the peer's checkpoint header for chain height/weight validation
wrapPeer := &peerConnection{handler: h, peer: p}
if err := wrapPeer.RequestHeadersByNumber((h.checkpoint.SectionIndex+1)*h.backend.iConfig.ChtSize-1, 1, 0, false); err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
p.syncDrop = h.clock.AfterFunc(checkpointChallengeTimeout, func() {
p.Log().Warn("Checkpoint challenge timed out, dropping", "addr", p.RemoteAddr(), "type", p.Name())
h.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.syncDrop != nil {
p.syncDrop.Stop()
p.syncDrop = nil
}
}()
}
// Spawn a main loop to handle all incoming messages.
for {
if err := h.handleMsg(p); err != nil {
Expand Down Expand Up @@ -199,6 +227,27 @@ func (h *clientHandler) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)

// If no headers were received, but we're expecting a checkpoint header,
// drop the unsynced server.
if len(resp.Headers) == 0 && p.syncDrop != nil {
p.syncDrop.Stop()
p.syncDrop = nil
return errResp(ErrUselessPeer, "msg %v: %v", msg, err)
}
// If we are still waiting the checkpoint response.
if len(resp.Headers) == 1 && p.syncDrop != nil {
if !h.ignoreCheckpoint && resp.Headers[0].Number.Uint64() == (h.checkpoint.SectionIndex+1)*h.backend.iConfig.ChtSize-1 {
// First stop timer anyway.
p.syncDrop.Stop()
p.syncDrop = nil
if resp.Headers[0].Hash() != h.checkpoint.SectionHead {
return errResp(ErrUselessPeer, "msg %v: %v", msg, err)
}
return nil
}
}
// Deliver response header to concrete requester.
if h.fetcher.requestedID(resp.ReqID) {
h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
} else {
Expand Down Expand Up @@ -339,19 +388,13 @@ func (pc *peerConnection) Head() (common.Hash, *big.Int) {

func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pc.peer
},
getCost: func(dp distPeer) uint64 { return dp.(*peer).GetRequestCost(GetBlockHeadersMsg, amount) },
canSend: func(dp distPeer) bool { return dp.(*peer) == pc.peer },
request: func(dp distPeer) func() {
reqID := genReqID()
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
peer.fcServer.QueuedRequest(reqID, peer.GetRequestCost(GetBlockHeadersMsg, amount))
return func() { peer.RequestHeadersByHash(reqID, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.handler.backend.reqDist.queue(rq)
Expand All @@ -363,19 +406,13 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s

func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pc.peer
},
getCost: func(dp distPeer) uint64 { return dp.(*peer).GetRequestCost(GetBlockHeadersMsg, amount) },
canSend: func(dp distPeer) bool { return dp.(*peer) == pc.peer },
request: func(dp distPeer) func() {
reqID := genReqID()
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
peer.fcServer.QueuedRequest(reqID, peer.GetRequestCost(GetBlockHeadersMsg, amount))
return func() { peer.RequestHeadersByNumber(reqID, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.handler.backend.reqDist.queue(rq)
Expand Down
2 changes: 1 addition & 1 deletion les/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bes
time.Sleep(hardRequestTimeout)
f.timeoutChn <- reqID
}()
return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) }
return func() { p.RequestHeadersByHash(reqID, bestHash, int(bestAmount), 0, true) }
},
}
}
Expand Down
Loading