Skip to content

Commit

Permalink
les: add checkpoint challenge
Browse files Browse the repository at this point in the history
  • Loading branch information
rjl493456442 committed Sep 25, 2019
1 parent 52a8f7d commit f086977
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 86 deletions.
18 changes: 9 additions & 9 deletions les/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func (b *benchmarkBlockHeaders) init(h *serverHandler, count int) error {

func (b *benchmarkBlockHeaders) request(peer *peer, index int) error {
if b.byHash {
return peer.RequestHeadersByHash(0, 0, b.hashes[index], b.amount, b.skip, b.reverse)
return peer.RequestHeadersByHash(0, b.hashes[index], b.amount, b.skip, b.reverse)
} else {
return peer.RequestHeadersByNumber(0, 0, uint64(b.offset+rand.Int63n(b.randMax)), b.amount, b.skip, b.reverse)
return peer.RequestHeadersByNumber(0, uint64(b.offset+rand.Int63n(b.randMax)), b.amount, b.skip, b.reverse)
}
}

Expand All @@ -97,9 +97,9 @@ func (b *benchmarkBodiesOrReceipts) init(h *serverHandler, count int) error {

func (b *benchmarkBodiesOrReceipts) request(peer *peer, index int) error {
if b.receipts {
return peer.RequestReceipts(0, 0, []common.Hash{b.hashes[index]})
return peer.RequestReceipts(0, []common.Hash{b.hashes[index]})
} else {
return peer.RequestBodies(0, 0, []common.Hash{b.hashes[index]})
return peer.RequestBodies(0, []common.Hash{b.hashes[index]})
}
}

Expand All @@ -118,9 +118,9 @@ func (b *benchmarkProofsOrCode) request(peer *peer, index int) error {
key := make([]byte, 32)
rand.Read(key)
if b.code {
return peer.RequestCode(0, 0, []CodeReq{{BHash: b.headHash, AccKey: key}})
return peer.RequestCode(0, []CodeReq{{BHash: b.headHash, AccKey: key}})
} else {
return peer.RequestProofs(0, 0, []ProofReq{{BHash: b.headHash, Key: key}})
return peer.RequestProofs(0, []ProofReq{{BHash: b.headHash, Key: key}})
}
}

Expand Down Expand Up @@ -163,7 +163,7 @@ func (b *benchmarkHelperTrie) request(peer *peer, index int) error {
}
}

return peer.RequestHelperTrieProofs(0, 0, reqs)
return peer.RequestHelperTrieProofs(0, reqs)
}

// benchmarkTxSend implements requestBenchmark
Expand Down Expand Up @@ -191,7 +191,7 @@ func (b *benchmarkTxSend) init(h *serverHandler, count int) error {

func (b *benchmarkTxSend) request(peer *peer, index int) error {
enc, _ := rlp.EncodeToBytes(types.Transactions{b.txs[index]})
return peer.SendTxs(0, 0, enc)
return peer.SendTxs(0, enc)
}

// benchmarkTxStatus implements requestBenchmark
Expand All @@ -204,7 +204,7 @@ func (b *benchmarkTxStatus) init(h *serverHandler, count int) error {
func (b *benchmarkTxStatus) request(peer *peer, index int) error {
var hash common.Hash
rand.Read(hash[:])
return peer.RequestTxStatus(0, 0, []common.Hash{hash})
return peer.RequestTxStatus(0, []common.Hash{hash})
}

// benchmarkSetup stores measurement data for a single benchmark type
Expand Down
85 changes: 62 additions & 23 deletions les/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,30 @@ import (
"github.com/ethereum/go-ethereum/params"
)

var checkpointChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the checkpoint progress challenge

// clientHandler is responsible for receiving and processing all incoming server
// responses.
type clientHandler struct {
ulc *ulc
clock mclock.Clock
checkpoint *params.TrustedCheckpoint
fetcher *lightFetcher
downloader *downloader.Downloader
backend *LightEthereum

closeCh chan struct{}
wg sync.WaitGroup // WaitGroup used to track all connected peers.
syncDone func() // Test hooks when syncing is done.
closeCh chan struct{}
wg sync.WaitGroup // WaitGroup used to track all connected peers.

// Testing fields or hooks
ignoreHeaders bool // Indicator whether ignore received headers
syncDone func() // Test hooks when syncing is done.
}

func newClientHandler(ulcServers []string, ulcFraction int, checkpoint *params.TrustedCheckpoint, backend *LightEthereum) *clientHandler {
handler := &clientHandler{
checkpoint: checkpoint,
clock: mclock.System{},
backend: backend,
closeCh: make(chan struct{}),
}
Expand Down Expand Up @@ -89,6 +96,7 @@ func (h *clientHandler) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter)
}
h.wg.Add(1)
defer h.wg.Done()

err := h.handle(peer)
h.backend.serverPool.disconnect(peer.poolEntry)
return err
Expand Down Expand Up @@ -131,6 +139,26 @@ func (h *clientHandler) handle(p *peer) error {
if p.poolEntry != nil {
h.backend.serverPool.registered(p.poolEntry)
}
// If we have a trusted CHT, reject all peers below that (avoid light sync eclipse)
if h.checkpoint != nil {
// Request the peer's checkpoint header for chain height/weight validation
wrapPeer := &peerConnection{handler: h, peer: p}
if err := wrapPeer.RequestHeadersByNumber((h.checkpoint.SectionIndex+1)*h.backend.iConfig.ChtSize-1, 1, 0, false); err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
p.syncDrop = h.clock.AfterFunc(checkpointChallengeTimeout, func() {
p.Log().Warn("Checkpoint challenge timed out, dropping", "addr", p.RemoteAddr(), "type", p.Name())
h.removePeer(p.id)
})
// Make sure it's cleaned up if the peer dies off
defer func() {
if p.syncDrop != nil {
p.syncDrop.Stop()
p.syncDrop = nil
}
}()
}
// Spawn a main loop to handle all incoming messages.
for {
if err := h.handleMsg(p); err != nil {
Expand Down Expand Up @@ -199,6 +227,29 @@ func (h *clientHandler) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)

// If we are still waiting the checkpoint response.
if !h.ignoreHeaders && p.syncDrop != nil {
// First stop timer anyway.
p.syncDrop.Stop()
p.syncDrop = nil
// If no headers were received or more headers received than we expect,
// reject the server directly.
//
// Two cases here:
// (1) The server is not synced, so no checkpoint header to response
// (2) The server sends us useless headers which we don't explicitly
// request.
if len(resp.Headers) != 1 {
return errResp(ErrUselessPeer, "msg %v: %v", msg, err)
} else {
header := resp.Headers[0]
if header.Hash() != h.checkpoint.SectionHead {
return errResp(ErrUselessPeer, "msg %v: %v", msg, err)
}
}
}
// Deliver response header to concrete requester.
if h.fetcher.requestedID(resp.ReqID) {
h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
} else {
Expand Down Expand Up @@ -339,19 +390,13 @@ func (pc *peerConnection) Head() (common.Hash, *big.Int) {

func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pc.peer
},
getCost: func(dp distPeer) uint64 { return dp.(*peer).GetRequestCost(GetBlockHeadersMsg, amount) },
canSend: func(dp distPeer) bool { return dp.(*peer) == pc.peer },
request: func(dp distPeer) func() {
reqID := genReqID()
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) }
peer.fcServer.QueuedRequest(reqID, peer.GetRequestCost(GetBlockHeadersMsg, amount))
return func() { peer.RequestHeadersByHash(reqID, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.handler.backend.reqDist.queue(rq)
Expand All @@ -363,19 +408,13 @@ func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, s

func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(GetBlockHeadersMsg, amount)
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pc.peer
},
getCost: func(dp distPeer) uint64 { return dp.(*peer).GetRequestCost(GetBlockHeadersMsg, amount) },
canSend: func(dp distPeer) bool { return dp.(*peer) == pc.peer },
request: func(dp distPeer) func() {
reqID := genReqID()
peer := dp.(*peer)
cost := peer.GetRequestCost(GetBlockHeadersMsg, amount)
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) }
peer.fcServer.QueuedRequest(reqID, peer.GetRequestCost(GetBlockHeadersMsg, amount))
return func() { peer.RequestHeadersByNumber(reqID, origin, amount, skip, reverse) }
},
}
_, ok := <-pc.handler.backend.reqDist.queue(rq)
Expand Down
2 changes: 1 addition & 1 deletion les/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bes
time.Sleep(hardRequestTimeout)
f.timeoutChn <- reqID
}()
return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) }
return func() { p.RequestHeadersByHash(reqID, bestHash, int(bestAmount), 0, true) }
},
}
}
Expand Down
97 changes: 74 additions & 23 deletions les/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ func testGetBlockHeaders(t *testing.T, protocol int) {
// Send the hash request and verify the response
reqID++

cost := server.peer.peer.GetRequestCost(GetBlockHeadersMsg, int(tt.query.Amount))
sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, cost, tt.query)
sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, tt.query)
if err := expectResponse(server.peer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
}
Expand Down Expand Up @@ -246,8 +245,7 @@ func testGetBlockBodies(t *testing.T, protocol int) {
reqID++

// Send the hash request and verify the response
cost := server.peer.peer.GetRequestCost(GetBlockBodiesMsg, len(hashes))
sendRequest(server.peer.app, GetBlockBodiesMsg, reqID, cost, hashes)
sendRequest(server.peer.app, GetBlockBodiesMsg, reqID, hashes)
if err := expectResponse(server.peer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil {
t.Errorf("test %d: bodies mismatch: %v", i, err)
}
Expand Down Expand Up @@ -278,8 +276,7 @@ func testGetCode(t *testing.T, protocol int) {
}
}

cost := server.peer.peer.GetRequestCost(GetCodeMsg, len(codereqs))
sendRequest(server.peer.app, GetCodeMsg, 42, cost, codereqs)
sendRequest(server.peer.app, GetCodeMsg, 42, codereqs)
if err := expectResponse(server.peer.app, CodeMsg, 42, testBufLimit, codes); err != nil {
t.Errorf("codes mismatch: %v", err)
}
Expand All @@ -299,8 +296,7 @@ func testGetStaleCode(t *testing.T, protocol int) {
BHash: bc.GetHeaderByNumber(number).Hash(),
AccKey: crypto.Keccak256(testContractAddr[:]),
}
cost := server.peer.peer.GetRequestCost(GetCodeMsg, 1)
sendRequest(server.peer.app, GetCodeMsg, 42, cost, []*CodeReq{req})
sendRequest(server.peer.app, GetCodeMsg, 42, []*CodeReq{req})
if err := expectResponse(server.peer.app, CodeMsg, 42, testBufLimit, expected); err != nil {
t.Errorf("codes mismatch: %v", err)
}
Expand Down Expand Up @@ -331,8 +327,7 @@ func testGetReceipt(t *testing.T, protocol int) {
receipts = append(receipts, rawdb.ReadRawReceipts(server.db, block.Hash(), block.NumberU64()))
}
// Send the hash request and verify the response
cost := server.peer.peer.GetRequestCost(GetReceiptsMsg, len(hashes))
sendRequest(server.peer.app, GetReceiptsMsg, 42, cost, hashes)
sendRequest(server.peer.app, GetReceiptsMsg, 42, hashes)
if err := expectResponse(server.peer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil {
t.Errorf("receipts mismatch: %v", err)
}
Expand Down Expand Up @@ -367,8 +362,7 @@ func testGetProofs(t *testing.T, protocol int) {
}
}
// Send the proof request and verify the response
cost := server.peer.peer.GetRequestCost(GetProofsV2Msg, len(proofreqs))
sendRequest(server.peer.app, GetProofsV2Msg, 42, cost, proofreqs)
sendRequest(server.peer.app, GetProofsV2Msg, 42, proofreqs)
if err := expectResponse(server.peer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil {
t.Errorf("proofs mismatch: %v", err)
}
Expand All @@ -392,8 +386,7 @@ func testGetStaleProof(t *testing.T, protocol int) {
BHash: header.Hash(),
Key: account,
}
cost := server.peer.peer.GetRequestCost(GetProofsV2Msg, 1)
sendRequest(server.peer.app, GetProofsV2Msg, 42, cost, []*ProofReq{req})
sendRequest(server.peer.app, GetProofsV2Msg, 42, []*ProofReq{req})

var expected []rlp.RawValue
if wantOK {
Expand Down Expand Up @@ -453,8 +446,7 @@ func testGetCHTProofs(t *testing.T, protocol int) {
AuxReq: auxHeader,
}}
// Send the proof request and verify the response
cost := server.peer.peer.GetRequestCost(GetHelperTrieProofsMsg, len(requestsV2))
sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, cost, requestsV2)
sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, requestsV2)
if err := expectResponse(server.peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil {
t.Errorf("proofs mismatch: %v", err)
}
Expand Down Expand Up @@ -502,8 +494,7 @@ func testGetBloombitsProofs(t *testing.T, protocol int) {
trie.Prove(key, 0, &proofs.Proofs)

// Send the proof request and verify the response
cost := server.peer.peer.GetRequestCost(GetHelperTrieProofsMsg, len(requests))
sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, cost, requests)
sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, requests)
if err := expectResponse(server.peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil {
t.Errorf("bit %d: proofs mismatch: %v", bit, err)
}
Expand All @@ -525,11 +516,9 @@ func testTransactionStatus(t *testing.T, protocol int) {
test := func(tx *types.Transaction, send bool, expStatus light.TxStatus) {
reqID++
if send {
cost := server.peer.peer.GetRequestCost(SendTxV2Msg, 1)
sendRequest(server.peer.app, SendTxV2Msg, reqID, cost, types.Transactions{tx})
sendRequest(server.peer.app, SendTxV2Msg, reqID, types.Transactions{tx})
} else {
cost := server.peer.peer.GetRequestCost(GetTxStatusMsg, 1)
sendRequest(server.peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()})
sendRequest(server.peer.app, GetTxStatusMsg, reqID, []common.Hash{tx.Hash()})
}
if err := expectResponse(server.peer.app, TxStatusMsg, reqID, testBufLimit, []light.TxStatus{expStatus}); err != nil {
t.Errorf("transaction status mismatch")
Expand Down Expand Up @@ -620,7 +609,7 @@ func TestStopResumeLes3(t *testing.T) {
header := server.handler.blockchain.CurrentHeader()
req := func() {
reqID++
sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, testCost, &getBlockHeadersData{Origin: hashOrNumber{Hash: header.Hash()}, Amount: 1})
sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, &getBlockHeadersData{Origin: hashOrNumber{Hash: header.Hash()}, Amount: 1})
}
for i := 1; i <= 5; i++ {
// send requests while we still have enough buffer and expect a response
Expand Down Expand Up @@ -651,3 +640,65 @@ func TestStopResumeLes3(t *testing.T) {
}
}
}

func TestCheckpointChallengeLes2(t *testing.T) { testCheckpointChallenge(t, 2) }
func TestCheckpointChallengeLes3(t *testing.T) { testCheckpointChallenge(t, 3) }

func testCheckpointChallenge(t *testing.T, protocol int) {
config := light.TestServerIndexerConfig

waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
for {
cs, _, _ := cIndexer.Sections()
bts, _, _ := btIndexer.Sections()
if cs >= 1 && bts >= 1 {
break
}
time.Sleep(10 * time.Millisecond)
}
}
// Generate 512+4 blocks (totally 1 CHT sections)
server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), protocol, waitIndexers, nil, 0, false, false)
defer tearDown()

s, _, head := server.chtIndexer.Sections()
cp := &params.TrustedCheckpoint{
SectionIndex: 0,
SectionHead: head,
CHTRoot: light.GetChtRoot(server.db, s-1, head),
BloomRoot: light.GetBloomTrieRoot(server.db, s-1, head),
}
// Register the assembled checkpoint as hardcoded one.
client.handler.clock = &mclock.Simulated{}
client.handler.checkpoint = cp
client.handler.backend.blockchain.AddTrustedCheckpoint(cp)

// Create connected peer pair.
_, err1, _, err2 := newTestPeerPair("peer", protocol, server.handler, client.handler)
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
t.Fatalf("peer 1 handshake error: %v", err)
case err := <-err2:
t.Fatalf("peer 2 handshake error: %v", err)
}
client.handler.clock.(*mclock.Simulated).Run(checkpointChallengeTimeout)
if client.handler.backend.peers.Len() != 1 {
t.Fatalf("Should pass checkpoint challenge")
}

client.handler.ignoreHeaders = true // Explicitly ignore all received headers, trigger timer
// Create connected peer pair.
_, err1, _, err2 = newTestPeerPair("peer2", protocol, server.handler, client.handler)
select {
case <-time.After(time.Millisecond * 100):
case err := <-err1:
t.Fatalf("peer 1 handshake error: %v", err)
case err := <-err2:
t.Fatalf("peer 2 handshake error: %v", err)
}
client.handler.clock.(*mclock.Simulated).Run(checkpointChallengeTimeout)
if client.handler.backend.peers.Len() != 1 {
t.Fatalf("Shouldn't pass checkpoint challenge")
}
}
Loading

0 comments on commit f086977

Please sign in to comment.