Skip to content

Commit

Permalink
Added reversed for proof-of-stake syncing (#3092)
Browse files Browse the repository at this point in the history
* reverse sync slow but working

* progress

* p

* close request

* backwards refactoring

* cleanup 1

* added RequestAssembler

* remove trash code and spaghetti

* efficient

* fix

* refactor

* tf

* refact

* final refactoring

* headers forward restored

* test fixed

* make CI happy

* resolved comments

* not using insertList anymore

* oops

* better collectors

* removed debug config

* avoid pointers

* added sleep

* use of channels

* sleeping

* added logs for ETL

* added more cleanup

* correct glacier

* some refactoring

* maxRequests

* tweaks

* config.go

* config conflicts

* renamed functions
  • Loading branch information
Giulio2002 authored Dec 13, 2021
1 parent 432fd11 commit 23b3c1d
Show file tree
Hide file tree
Showing 7 changed files with 268 additions and 31 deletions.
51 changes: 32 additions & 19 deletions cmd/sentry/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConf
1024*1024, /* linkLimit */
engine,
)

if err := hd.RecoverFromDb(db); err != nil {
return nil, fmt.Errorf("recovery from DB failed: %w", err)
}
Expand Down Expand Up @@ -470,30 +471,42 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead
Number: number,
})
}

if segments, penaltyKind, err := cs.Hd.SplitIntoSegments(csHeaders); err == nil {
if penaltyKind == headerdownload.NoPenalty {
var canRequestMore bool
for _, segment := range segments {
requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(peerID))
canRequestMore = canRequestMore || requestMore
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)
if cs.Hd.POSSync() {
tx, err := cs.db.BeginRo(ctx)
defer tx.Rollback()
if err != nil {
return err
}
}

if canRequestMore {
currentTime := uint64(time.Now().Unix())
req, penalties := cs.Hd.RequestMoreHeaders(currentTime)
if req != nil {
if _, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer {
// If request was actually sent to a peer, we update retry time to be 5 seconds in the future
cs.Hd.UpdateRetryTime(req, currentTime, 5 /* timeout */)
log.Trace("Sent request", "height", req.Number)
for _, segment := range segments {
if err := cs.Hd.ProcessSegmentPOS(segment, tx); err != nil {
return err
}
}
} else {
var canRequestMore bool
for _, segment := range segments {
requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(peerID))
canRequestMore = canRequestMore || requestMore
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)
}
}
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)

if canRequestMore {
currentTime := uint64(time.Now().Unix())
req, penalties := cs.Hd.RequestMoreHeaders(currentTime)
if req != nil {
if _, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer {
// If request was actually sent to a peer, we update retry time to be 5 seconds in the future
cs.Hd.UpdateRetryTime(req, currentTime, 5 /* timeout */)
log.Trace("Sent request", "height", req.Number)
}
}
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)
}
}
}
} else {
Expand Down
132 changes: 122 additions & 10 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/c2h5oh/datasize"
"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces"
"github.com/ledgerwatch/erigon/common"
Expand Down Expand Up @@ -38,6 +39,7 @@ type HeadersCfg struct {
penalize func(context.Context, []headerdownload.PenaltyItem)
batchSize datasize.ByteSize
noP2PDiscovery bool
tmpdir string
reverseDownloadCh chan types.Header
waitingPosHeaders *bool
snapshots *snapshotsync.AllSnapshots
Expand All @@ -58,6 +60,7 @@ func StageHeadersCfg(
waitingPosHeaders *bool,
snapshots *snapshotsync.AllSnapshots,
blockReader interfaces.FullBlockReader,
tmpdir string,
) HeadersCfg {
return HeadersCfg{
db: db,
Expand Down Expand Up @@ -110,30 +113,29 @@ func SpawnStageHeaders(
}

if isTrans {
return HeadersDownward(s, u, ctx, tx, cfg, initialCycle, test)
return HeadersPOS(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx)
} else {
return HeadersForward(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx)
return HeadersPOW(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx)
}
}

// HeadersDownward progresses Headers stage in the downward direction
func HeadersDownward(
func HeadersPOS(
s *StageState,
u Unwinder,
ctx context.Context,
tx kv.RwTx,
cfg HeadersCfg,
initialCycle bool,
test bool, // Set to true in tests, allows the stage to fail rather than wait indefinitely
useExternalTx bool,
) error {
*cfg.waitingPosHeaders = true
// Waiting for the beacon chain
log.Info("Waiting for payloads...")
header := <-cfg.reverseDownloadCh
*cfg.waitingPosHeaders = false

defer tx.Commit()

headerNumber := header.Number.Uint64()

blockHash, err := rawdb.ReadCanonicalHash(tx, headerNumber)
Expand Down Expand Up @@ -162,14 +164,124 @@ func HeadersDownward(
return err
}
if parent != nil && parent.Hash() == header.ParentHash {
return s.Update(tx, header.Number.Uint64())
if err := s.Update(tx, header.Number.Uint64()); err != nil {
return err
}
// For the sake of simplicity we can just assume it will be valid for now. (TODO: move to execution stage)
cfg.statusCh <- privateapi.ExecutionStatus{
Status: privateapi.Valid,
HeadHash: header.Hash(),
}
return tx.Commit()
}
// Downward sync if we need to process more (TODO)
return s.Update(tx, header.Number.Uint64())
cfg.hd.SetPOSSync(true)
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
return err
}
cfg.hd.SetProcessed(header.Number.Uint64())
cfg.hd.SetExpectedHash(header.ParentHash)
cfg.hd.SetFetching(true)
logPrefix := s.LogPrefix()

logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

// Allow other stages to run 1 cycle if no network available
if initialCycle && cfg.noP2PDiscovery {
return nil
}

cfg.statusCh <- privateapi.ExecutionStatus{
Status: privateapi.Syncing,
HeadHash: rawdb.ReadHeadBlockHash(tx),
}
log.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", header.Number.Uint64())

cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})

stopped := false
prevProgress := header.Number.Uint64()

headerCollector := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
canonicalHeadersCollector := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
cfg.hd.SetHeadersCollector(headerCollector)
cfg.hd.SetCanonicalHashesCollector(canonicalHeadersCollector)
// Cleanup after we finish backward sync
defer func() {
headerCollector.Close()
canonicalHeadersCollector.Close()
cfg.hd.SetHeadersCollector(nil)
cfg.hd.SetCanonicalHashesCollector(nil)
cfg.hd.Unsync()
cfg.hd.SetFetching(false)
}()

var req headerdownload.HeaderRequest
for !stopped {
sentToPeer := false
maxRequests := 4096
for !sentToPeer && !stopped && maxRequests != 0 {
req = cfg.hd.RequestMoreHeadersForPOS()
_, sentToPeer = cfg.headerReqSend(ctx, &req)
maxRequests--
}
// Load headers into the database
announces := cfg.hd.GrabAnnounces()
if len(announces) > 0 {
cfg.announceNewHashes(ctx, announces)
}
if cfg.hd.Synced() { // We do not break unless there best header changed
stopped = true
}
// Sleep and check for logs
timer := time.NewTimer(2 * time.Millisecond)
select {
case <-ctx.Done():
stopped = true
case <-logEvery.C:
diff := prevProgress - cfg.hd.Progress()
if cfg.hd.Progress() <= prevProgress {
log.Info("Wrote Block Headers backwards", "from", header.Number.Uint64(),
"now", cfg.hd.Progress(), "blk/sec", float64(diff)/float64(logInterval/time.Second))
prevProgress = cfg.hd.Progress()
}
case <-timer.C:
log.Trace("RequestQueueTime (header) ticked")
}
// Cleanup timer
timer.Stop()
}
// If the user stopped it, we dont update anything
if !cfg.hd.Synced() {
return nil
}

if err := headerCollector.Load(tx, kv.Headers, etl.IdentityLoadFunc, etl.TransformArgs{
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
}); err != nil {
return err
}
if err = canonicalHeadersCollector.Load(tx, kv.HeaderCanonical, etl.IdentityLoadFunc, etl.TransformArgs{
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
}); err != nil {
return err
}
if s.BlockNumber >= cfg.hd.Progress() {
u.UnwindTo(cfg.hd.Progress(), common.Hash{})
} else {
if err := s.Update(tx, header.Number.Uint64()); err != nil {
return err
}
}
return tx.Commit()
}

// HeadersForward progresses Headers stage in the forward direction
func HeadersForward(
func HeadersPOW(
s *StageState,
u Unwinder,
ctx context.Context,
Expand Down Expand Up @@ -261,7 +373,6 @@ func HeadersForward(
if header.Eip3675 {
return nil
}
if td.Cmp(cfg.terminalTotalDifficulty) > 0 {
return rawdb.MarkTransition(tx, blockNum)
}
Expand Down Expand Up @@ -305,6 +416,7 @@ func HeadersForward(
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
return err
}
cfg.hd.SetPOSSync(false)
cfg.hd.SetFetching(true)
defer cfg.hd.SetFetching(false)
headerProgress = cfg.hd.Progress()
Expand Down
1 change: 0 additions & 1 deletion ethdb/privateapi/ethbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *type
if header.Hash() != blockHash {
return nil, fmt.Errorf("invalid hash for payload. got: %s, wanted: %s", common.Bytes2Hex(blockHash[:]), common.Bytes2Hex(header.Hash().Bytes()))
}
log.Info("Received Payload from beacon-chain", "hash", blockHash)
// Send the block over
s.numberSent = req.BlockNumber
s.reverseDownloadCh <- header
Expand Down
Loading

0 comments on commit 23b3c1d

Please sign in to comment.