Skip to content

Commit

Permalink
reverse sync slow but working
Browse files Browse the repository at this point in the history
  • Loading branch information
Giulio2002 committed Dec 7, 2021
1 parent 3f34dee commit 5ca7263
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 23 deletions.
1 change: 1 addition & 0 deletions cmd/lol.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
curl -X POST -H "Content-Type: application/json" --data '{"jsonrpc":"2.0","method":"engine_executePayloadV1","params":[{"parentHash":"0x49996e2ca45c30dd2c4692afe2053b6ece6b22c78b2d95877144708e64d1eb13","coinbase":"0xa94f5374fce5edbc8e2a8697c15331677e6ebf0b","stateRoot":"0xca3149fa9e37db08d1cd49c9061db1002ef1cd58db2210f2115c8c989b2bdf45","receiptRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","random":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0xd1efb7","gasLimit":"0x1c9c380","gasUsed":"0x0","timestamp":"0x5","extraData":"0x","baseFeePerGas":"0x7","blockHash":"0xf9218048a47240d048dff4a22513cda5968b3c460d82e873273139b8973426f4","transactions":[]}],"id":67}' http://localhost:8545
11 changes: 9 additions & 2 deletions cmd/sentry/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,8 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead
Number: number,
})
}

if segments, penalty, err := cs.Hd.SplitIntoSegments(csHeaders); err == nil {
if penalty == headerdownload.NoPenalty {
if penalty == headerdownload.NoPenalty && !cs.Hd.IsBackwards {
var canRequestMore bool
for _, segment := range segments {
requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(peerID))
Expand All @@ -549,6 +548,14 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead
}
cs.Penalize(ctx, penalties)
}
} else if penalty == headerdownload.NoPenalty && cs.Hd.IsBackwards {
for _, segment := range segments {
// Invalid segment length, so ignore.
if len(segment) != 192 {
continue
}
cs.Hd.AppendSegmentPOS(segment)
}
} else {
outreq := proto_sentry.PenalizePeerRequest{
PeerId: peerID,
Expand Down
96 changes: 95 additions & 1 deletion eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func SpawnStageHeaders(
}

if isTrans {
return HeadersDownward(s, u, ctx, tx, cfg, initialCycle, test)
return HeadersDownward(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx)
} else {
return HeadersForward(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx)
}
Expand All @@ -121,6 +121,7 @@ func HeadersDownward(
cfg HeadersCfg,
initialCycle bool,
test bool, // Set to true in tests, allows the stage to fail rather than wait indefinitely
useExternalTx bool,
) error {
*cfg.waitingPosHeaders = true
// Waiting for the beacon chain
Expand All @@ -142,6 +143,99 @@ func HeadersDownward(
if parent != nil && parent.Hash() == header.ParentHash {
return s.Update(tx, header.Number.Uint64())
}
cfg.hd.IsBackwards = true
cfg.hd.CurrentNumber = header.Number.Uint64() - 1
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
return err
}
cfg.hd.SetProcessed(header.Number.Uint64())
cfg.hd.SetExpectedHash(header.ParentHash)
cfg.hd.SetFetching(true)
defer cfg.hd.SetFetching(false)
headerProgress := cfg.hd.Progress()
logPrefix := s.LogPrefix()
// Check if this is called straight after the unwinds, which means we need to create new canonical markings
hash, err := rawdb.ReadCanonicalHash(tx, headerProgress)
if err != nil {
return err
}
logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()
if hash == (common.Hash{}) {
headHash := rawdb.ReadHeadHeaderHash(tx)
if err = fixCanonicalChain(logPrefix, logEvery, headerProgress, headHash, tx, cfg.blockReader); err != nil {
return err
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}

// Allow other stages to run 1 cycle if no network available
if initialCycle && cfg.noP2PDiscovery {
return nil
}

log.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", headerProgress)

cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})

var sentToPeer bool
stopped := false
prevProgress := header.Number.Uint64()
for !stopped {
currentTime := uint64(time.Now().Unix())
req, penalties := cfg.hd.RequestMoreHeadersForPOS(currentTime)
if req != nil {
_, sentToPeer = cfg.headerReqSend(ctx, req)
if sentToPeer {
cfg.hd.SentRequest(req, currentTime, 5 /* timeout */)
log.Trace("Sent request", "height", req.Number)
}
}
cfg.penalize(ctx, penalties)
maxRequests := 64 // Limit number of requests sent per round to let some headers to be inserted into the database
for req != nil && sentToPeer && maxRequests > 0 {
req, penalties = cfg.hd.RequestMoreHeadersForPOS(currentTime)
if req != nil {
_, sentToPeer = cfg.headerReqSend(ctx, req)
if sentToPeer {
cfg.hd.SentRequest(req, currentTime, 5 /*timeout */)
log.Trace("Sent request", "height", req.Number)
}
}
cfg.penalize(ctx, penalties)
maxRequests--
}

// Load headers into the database
var inSync bool
if inSync, err = cfg.hd.InsertHeadersBackwards(tx, logPrefix, logEvery.C); err != nil {
return err
}

announces := cfg.hd.GrabAnnounces()
if len(announces) > 0 {
cfg.announceNewHashes(ctx, announces)
}
if inSync { // We do not break unless there best header changed
break
}
timer := time.NewTimer(1 * time.Second)
select {
case <-ctx.Done():
stopped = true
case <-logEvery.C:
log.Info("Wrote Block Headers backwards", "from", header.Number.Uint64(),
"now", cfg.hd.CurrentNumber, "blk/sec", float64(prevProgress-cfg.hd.POSProress())/float64(logInterval/time.Second))
prevProgress = cfg.hd.POSProress()
}
timer.Stop()
}
cfg.hd.IsBackwards = true
// Downward sync if we need to process more (TODO)
return s.Update(tx, header.Number.Uint64())
}
Expand Down
1 change: 0 additions & 1 deletion ethdb/privateapi/ethbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *type
if header.Hash() != blockHash {
return nil, fmt.Errorf("invalid hash for payload. got: %s, wanted: %s", common.Bytes2Hex(blockHash[:]), common.Bytes2Hex(header.Hash().Bytes()))
}
log.Info("Received Payload from beacon-chain", "hash", blockHash)
// Send the block over
s.numberSent = req.BlockNumber
s.reverseDownloadCh <- header
Expand Down
39 changes: 20 additions & 19 deletions params/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,25 +70,26 @@ var (
var (
// MainnetChainConfig is the chain parameters to run a node on the main network.
MainnetChainConfig = &ChainConfig{
ChainName: MainnetChainName,
ChainID: big.NewInt(1),
Consensus: EtHashConsensus,
HomesteadBlock: big.NewInt(1_150_000),
DAOForkBlock: big.NewInt(1_920_000),
DAOForkSupport: true,
EIP150Block: big.NewInt(2_463_000),
EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"),
EIP155Block: big.NewInt(2_675_000),
EIP158Block: big.NewInt(2_675_000),
ByzantiumBlock: big.NewInt(4_370_000),
ConstantinopleBlock: big.NewInt(7_280_000),
PetersburgBlock: big.NewInt(7_280_000),
IstanbulBlock: big.NewInt(9_069_000),
MuirGlacierBlock: big.NewInt(9_200_000),
BerlinBlock: big.NewInt(12_244_000),
LondonBlock: big.NewInt(12_965_000),
ArrowGlacierBlock: big.NewInt(13_773_000),
Ethash: new(EthashConfig),
ChainName: MainnetChainName,
ChainID: big.NewInt(1),
Consensus: EtHashConsensus,
HomesteadBlock: big.NewInt(1_150_000),
DAOForkBlock: big.NewInt(1_920_000),
DAOForkSupport: true,
EIP150Block: big.NewInt(2_463_000),
EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"),
EIP155Block: big.NewInt(2_675_000),
EIP158Block: big.NewInt(2_675_000),
ByzantiumBlock: big.NewInt(4_370_000),
ConstantinopleBlock: big.NewInt(7_280_000),
PetersburgBlock: big.NewInt(7_280_000),
IstanbulBlock: big.NewInt(9_069_000),
MuirGlacierBlock: big.NewInt(9_200_000),
BerlinBlock: big.NewInt(12_244_000),
LondonBlock: big.NewInt(12_965_000),
TerminalTotalDifficulty: big.NewInt(0),
ArrowGlacierBlock: big.NewInt(13_773_000),
Ethash: new(EthashConfig),
}

// RopstenChainConfig contains the chain parameters to run a node on the Ropsten test network.
Expand Down
94 changes: 94 additions & 0 deletions turbo/stages/headerdownload/header_algos.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,37 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime uint64) (*HeaderRequest
return nil, penalties
}

func (hd *HeaderDownload) RequestMoreHeadersForPOS(currentTime uint64) (*HeaderRequest, []PenaltyItem) {
hd.lock.Lock()
defer hd.lock.Unlock()
var penalties []PenaltyItem
if hd.anchorQueue.Len() == 0 {
log.Trace("Empty anchor queue")
return nil, penalties
}

for hd.anchorQueue.Len() > 0 {
anchor := (*hd.anchorQueue)[0]
if _, ok := hd.anchors[anchor.parentHash]; ok {
if anchor.timestamp > currentTime {
// Anchor not ready for re-request yet
return nil, penalties
}
if anchor.timeouts < 10 {
defer func() { hd.CurrentNumber -= 192 }()
return &HeaderRequest{Hash: common.Hash{}, Number: hd.CurrentNumber, Length: 192, Skip: 0, Reverse: true}, penalties
} else {
// Ancestors of this anchor seem to be unavailable, invalidate and move on
hd.invalidateAnchor(anchor)
penalties = append(penalties, PenaltyItem{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID})
}
}
// Anchor disappeared or unavailable, pop from the queue and move on
heap.Remove(hd.anchorQueue, 0)
}
return nil, penalties
}

func (hd *HeaderDownload) SentRequest(req *HeaderRequest, currentTime, timeout uint64) {
hd.lock.Lock()
defer hd.lock.Unlock()
Expand Down Expand Up @@ -722,6 +753,62 @@ func (hd *HeaderDownload) InsertHeaders(hf func(header *types.Header, hash commo
return hd.highestInDb >= hd.preverifiedHeight && hd.topSeenHeight > 0 && hd.highestInDb >= hd.topSeenHeight, nil
}

func (hd *HeaderDownload) InsertHeadersBackwards(tx kv.RwTx, logPrefix string, logChannel <-chan time.Time) (bool, error) {
hd.lock.Lock()
defer hd.lock.Unlock()
sort.Slice(hd.PosHeaders, func(i, j int) bool {
return hd.PosHeaders[i].Number.Uint64() > hd.PosHeaders[j].Number.Uint64()
})
for len(hd.PosHeaders) > 0 {
header := hd.PosHeaders[0]
if header.Number.Uint64() >= hd.lastProcessedPayload {
hd.PosHeaders = hd.PosHeaders[1:]
continue
}
if header.Number.Uint64() != hd.lastProcessedPayload-1 {
break
}
// This is the parent of the last processed block
if header.Hash() != hd.expectedHash {
return false, fmt.Errorf("invalid header ")
}
/*rawdb.WriteHeader(tx, &header)
if err := rawdb.WriteCanonicalHash(tx, header.Hash(), header.Number.Uint64()); err != nil {
return false, err
}*/
hd.lastProcessedPayload--
hd.expectedHash = header.ParentHash
hd.CurrentNumber = hd.lastProcessedPayload + 1
hd.PosHeaders = hd.PosHeaders[1:]
}
return false, nil
}

func (hd *HeaderDownload) SetExpectedHash(hash common.Hash) {
hd.lock.Lock()
defer hd.lock.Unlock()
hd.expectedHash = hash
}

func (hd *HeaderDownload) POSProress() uint64 {
hd.lock.Lock()
defer hd.lock.Unlock()
return hd.lastProcessedPayload
}

func (hd *HeaderDownload) AppendSegmentPOS(segment ChainSegment) {
hd.lock.Lock()
defer hd.lock.Unlock()
for _, segmentFragment := range segment {
// ignore if too low or too high
if segmentFragment.Header.Number.Uint64() < hd.CurrentNumber &&
segmentFragment.Header.Number.Uint64() >= hd.lastProcessedPayload {
continue
}
hd.PosHeaders = append(hd.PosHeaders, *segmentFragment.Header)
}
}

// GrabAnnounces - returns all available announces and forget them
func (hd *HeaderDownload) GrabAnnounces() []Announce {
hd.lock.Lock()
Expand Down Expand Up @@ -1025,6 +1112,13 @@ func (hd *HeaderDownload) SetFetching(fetching bool) {
hd.fetching = fetching
}

func (hd *HeaderDownload) SetProcessed(lastProcessed uint64) {
hd.lock.Lock()
defer hd.lock.Unlock()
hd.lastProcessedPayload = lastProcessed
hd.CurrentNumber = lastProcessed - 1
}

func (hd *HeaderDownload) RequestChaining() bool {
hd.lock.RLock()
defer hd.lock.RUnlock()
Expand Down
6 changes: 6 additions & 0 deletions turbo/stages/headerdownload/header_data_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ type HeaderDownload struct {
topSeenHeight uint64
requestChaining bool // Whether the downloader is allowed to issue more requests when previous responses created or moved an anchor
fetching bool // Set when the stage that is actively fetching the headers is in progress
// proof-of-stake
lastProcessedPayload uint64
expectedHash common.Hash
CurrentNumber uint64
IsBackwards bool
PosHeaders []types.Header
}

// HeaderRecord encapsulates two forms of the same header - raw RLP encoding (to avoid duplicated decodings and encodings), and parsed value types.Header
Expand Down

0 comments on commit 5ca7263

Please sign in to comment.