Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added reversed for proof-of-stake syncing #3092

Merged
merged 38 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5ca7263
reverse sync slow but working
Giulio2002 Dec 7, 2021
a30cde0
progress
Giulio2002 Dec 8, 2021
0e08e99
p
Giulio2002 Dec 9, 2021
c1ef4eb
Merge remote-tracking branch 'origin/devel' into optimistic-sync
Giulio2002 Dec 9, 2021
3b133e6
close request
Giulio2002 Dec 9, 2021
7d357ae
backwards refactoring
Giulio2002 Dec 9, 2021
cc2ddcf
cleanup 1
Giulio2002 Dec 9, 2021
9f39c16
added RequestAssembler
Giulio2002 Dec 9, 2021
7c2d6c5
remove trash code and spaghetti
Giulio2002 Dec 9, 2021
3568d3c
efficient
Giulio2002 Dec 9, 2021
6a19eeb
fix
Giulio2002 Dec 9, 2021
aff42e7
refactor
Giulio2002 Dec 9, 2021
d9d6470
tf
Giulio2002 Dec 9, 2021
5a80bdf
refact
Giulio2002 Dec 9, 2021
0ecde3b
final refactoring
Giulio2002 Dec 9, 2021
c8cfb55
headers forward restored
Giulio2002 Dec 9, 2021
456560a
Merge remote-tracking branch 'origin/devel' into reverse-download
Giulio2002 Dec 9, 2021
15e90f8
test fixed
Giulio2002 Dec 9, 2021
ebfd90b
make CI happy
Giulio2002 Dec 10, 2021
eb62c97
resolved
Giulio2002 Dec 10, 2021
8524720
resolved comments
Giulio2002 Dec 11, 2021
344f996
not using insertList anymore
Giulio2002 Dec 11, 2021
226153e
oops
Giulio2002 Dec 11, 2021
3d33cd2
better collectors
Giulio2002 Dec 11, 2021
850ff37
removed debug config
Giulio2002 Dec 11, 2021
c62845c
avoid pointers
Giulio2002 Dec 11, 2021
b0a74d5
added sleep
Giulio2002 Dec 12, 2021
9af084a
use of channels
Giulio2002 Dec 12, 2021
2186e3e
sleeping
Giulio2002 Dec 12, 2021
58a21a7
added logs for ETL
Giulio2002 Dec 12, 2021
daafe78
added more cleanup
Giulio2002 Dec 12, 2021
2cd664d
correct glacier
Giulio2002 Dec 12, 2021
2d52f79
some refactoring
Giulio2002 Dec 12, 2021
0527723
maxRequests
Giulio2002 Dec 12, 2021
7f9e9a8
tweaks
Giulio2002 Dec 12, 2021
c88561e
config.go
Giulio2002 Dec 12, 2021
1f8d95b
config conflicts
Giulio2002 Dec 13, 2021
2e00bfd
renamed functions
Giulio2002 Dec 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions cmd/sentry/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConf
1024*1024, /* linkLimit */
engine,
)

if err := hd.RecoverFromDb(db); err != nil {
return nil, fmt.Errorf("recovery from DB failed: %w", err)
}
Expand Down Expand Up @@ -470,30 +471,42 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead
Number: number,
})
}

if segments, penaltyKind, err := cs.Hd.SplitIntoSegments(csHeaders); err == nil {
if penaltyKind == headerdownload.NoPenalty {
var canRequestMore bool
for _, segment := range segments {
requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(peerID))
canRequestMore = canRequestMore || requestMore
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)
if cs.Hd.GetBackwards() {
Copy link
Member

@yperbasis yperbasis Dec 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about this backwards flag. Firstly, my understanding is that the existing (pre-Merge) downloader downloads headers mostly (besides skeleton requests) backwards, towards the genesis. So "backwards" meaning PoS sync is a bit misleading. Secondly, why is it all or nothing mode? Couldn't exclusively backwards downloading of PoS blocks run simultaneously with the old skeleton+backwards algorithm for pre-Merge blocks? To sum up, to my mind "GetBackwards" is a misleading name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to POSSync, the flag indicates when we are doing sync for proof-of-stake or not

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So does it mean that we can't download PoS and PoW headers simultaneously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can, if we assume that the block supplied by the beacon chain is correct, we sync backwards until we find a block whose parenthash is present in our database, this can be past the terminal block. if such thing happen, we unwind

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. What about a node that wants to sync from scratch some time after the Merge? It doesn't have any PoW headers in the DB yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, i think that headersForward means that headers are inserted in "forward" order, if you were remove preverified hashes, you would have to consensus check in forward order(genesis to head), however in our case we process the headers in downward order we start from head and go to genesis

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you think that headersFoward is wrong/misleading that can be refactored in another PR, it is not in the scope of this one i believe

Copy link
Member

@yperbasis yperbasis Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I see the following problem with inserting PoS headers into the DB in the downwards direction. Previously we had a single number for header stage's progress: the highest header inserted into the DB. Now there are essentially two numbers: the highest PoW header & the lowest PoS header that are inserted into the DB. So what happens when a node is restarted while there's a gap between the two? It doesn't fit into our current notion of a single number describing stage progress.
I'm wondering why do we have to insert PoS headers into the DB in the downwards direction anyway? To my mind it's simpler to tweak the current algo, have a long PoS segment growing downwards in memory, and then insert it into the DB when it touches the terminal PoW block (or a pre-verified PoS block after the Merge). @AlexeyAkhunov what do you think?

Copy link
Contributor Author

@Giulio2002 Giulio2002 Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to insert them backwards to check their validity, we need to check if previous block parenthash is equal to current block hash and also if we need to unwind

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we can verify parent hashes in memory, before committing to the DB, and grow in memory a long PoS segment from the Beacon chain tip downwards until we touch a PoW block in the DB. That's what the current algo does: it grows segments in memory downwards.

tx, err := cs.db.BeginRo(ctx)
defer tx.Rollback()
if err != nil {
return err
}
}

if canRequestMore {
currentTime := uint64(time.Now().Unix())
req, penalties := cs.Hd.RequestMoreHeaders(currentTime)
if req != nil {
if _, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer {
// If request was actually sent to a peer, we update retry time to be 5 seconds in the future
cs.Hd.UpdateRetryTime(req, currentTime, 5 /* timeout */)
log.Trace("Sent request", "height", req.Number)
for _, segment := range segments {
if err := cs.Hd.ProcessSegmentPOS(segment, tx); err != nil {
return err
}
}
} else {
var canRequestMore bool
for _, segment := range segments {
requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(peerID))
canRequestMore = canRequestMore || requestMore
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)
}
}
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)

if canRequestMore {
currentTime := uint64(time.Now().Unix())
req, penalties := cs.Hd.RequestMoreHeaders(currentTime)
if req != nil {
if _, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer {
// If request was actually sent to a peer, we update retry time to be 5 seconds in the future
cs.Hd.UpdateRetryTime(req, currentTime, 5 /* timeout */)
log.Trace("Sent request", "height", req.Number)
}
}
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)
}
}
}
} else {
Expand Down
103 changes: 96 additions & 7 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/c2h5oh/datasize"
"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces"
"github.com/ledgerwatch/erigon/common"
Expand Down Expand Up @@ -38,6 +39,7 @@ type HeadersCfg struct {
penalize func(context.Context, []headerdownload.PenaltyItem)
batchSize datasize.ByteSize
noP2PDiscovery bool
tmpdir string
reverseDownloadCh chan types.Header
waitingPosHeaders *bool
snapshots *snapshotsync.AllSnapshots
Expand All @@ -58,6 +60,7 @@ func StageHeadersCfg(
waitingPosHeaders *bool,
snapshots *snapshotsync.AllSnapshots,
blockReader interfaces.FullBlockReader,
tmpdir string,
) HeadersCfg {
return HeadersCfg{
db: db,
Expand Down Expand Up @@ -110,7 +113,7 @@ func SpawnStageHeaders(
}

if isTrans {
return HeadersDownward(s, u, ctx, tx, cfg, initialCycle, test)
return HeadersDownward(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx)
} else {
return HeadersForward(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx)
}
Expand All @@ -125,15 +128,14 @@ func HeadersDownward(
cfg HeadersCfg,
initialCycle bool,
test bool, // Set to true in tests, allows the stage to fail rather than wait indefinitely
useExternalTx bool,
) error {
*cfg.waitingPosHeaders = true
// Waiting for the beacon chain
log.Info("Waiting for payloads...")
header := <-cfg.reverseDownloadCh
*cfg.waitingPosHeaders = false

defer tx.Commit()

headerNumber := header.Number.Uint64()

blockHash, err := rawdb.ReadCanonicalHash(tx, headerNumber)
Expand Down Expand Up @@ -162,10 +164,97 @@ func HeadersDownward(
return err
}
if parent != nil && parent.Hash() == header.ParentHash {
return s.Update(tx, header.Number.Uint64())
if err := s.Update(tx, header.Number.Uint64()); err != nil {
return err
}
// For the sake of simplicity we can just assume it will be valid for now. (TODO: move to execution stage)
cfg.statusCh <- privateapi.ExecutionStatus{
Status: privateapi.Valid,
HeadHash: header.Hash(),
}
return tx.Commit()
}
cfg.hd.SetBackwards(true)
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
return err
}
cfg.hd.SetProcessed(header.Number.Uint64())
cfg.hd.SetExpectedHash(header.ParentHash)
cfg.hd.SetFetching(true)
defer cfg.hd.SetFetching(false)
logPrefix := s.LogPrefix()

logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

// Allow other stages to run 1 cycle if no network available
if initialCycle && cfg.noP2PDiscovery {
return nil
}

cfg.statusCh <- privateapi.ExecutionStatus{
Status: privateapi.Syncing,
HeadHash: rawdb.ReadHeadBlockHash(tx),
}
// Downward sync if we need to process more (TODO)
return s.Update(tx, header.Number.Uint64())
log.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", header.Number.Uint64())

cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})

stopped := false
prevProgress := header.Number.Uint64()
go func() {
for {
timer := time.NewTimer(1 * time.Second)
select {
case <-ctx.Done():
stopped = true
case <-logEvery.C:
diff := prevProgress - cfg.hd.Progress()
if cfg.hd.Progress() <= prevProgress {
log.Info("Wrote Block Headers backwards", "from", header.Number.Uint64(),
"now", cfg.hd.Progress(), "blk/sec", float64(diff)/float64(logInterval/time.Second))
prevProgress = cfg.hd.Progress()
}
}
timer.Stop()
}
}()
headerCollector := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
canonicalHeadersCollector := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
cfg.hd.SetHeadersCollector(headerCollector)
cfg.hd.SetCanonicalHashesCollector(canonicalHeadersCollector)
defer headerCollector.Close()
defer canonicalHeadersCollector.Close()
var req headerdownload.HeaderRequest
for !stopped {
sentToPeer := false
for !sentToPeer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it is this loop that will be mostly tight

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is one of the reason in the forward sync I limited number of iterations to 64, and also inserted select clause with timer, with ctx.Done() and also with printing progress. I recommend you do it also here

Copy link
Contributor Author

@Giulio2002 Giulio2002 Dec 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i used a 30 seconds timer i would get 6.8 blk/sec, what i could do is sentToPeer==false ? then i sleep for 30 milliseconds, by doing it i dont implode the CPU.

req = cfg.hd.RequestMoreHeadersForPOS()
_, sentToPeer = cfg.headerReqSend(ctx, &req)
}
// Load headers into the database
announces := cfg.hd.GrabAnnounces()
if len(announces) > 0 {
cfg.announceNewHashes(ctx, announces)
}
if cfg.hd.Synced() { // We do not break unless there best header changed
break
}
}
if err := headerCollector.Load(tx, kv.Headers, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
return err
}
if err := canonicalHeadersCollector.Load(tx, kv.HeaderCanonical, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
return err
}
if s.BlockNumber >= cfg.hd.Progress() {
u.UnwindTo(cfg.hd.Progress(), common.Hash{})
} else {
if err := s.Update(tx, header.Number.Uint64()); err != nil {
return err
}
}
return tx.Commit()
}

// HeadersForward progresses Headers stage in the forward direction
Expand Down Expand Up @@ -261,7 +350,6 @@ func HeadersForward(
if header.Eip3675 {
return nil
}

if td.Cmp(cfg.terminalTotalDifficulty) > 0 {
return rawdb.MarkTransition(tx, blockNum)
}
Expand Down Expand Up @@ -305,6 +393,7 @@ func HeadersForward(
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
return err
}
cfg.hd.SetBackwards(false)
cfg.hd.SetFetching(true)
defer cfg.hd.SetFetching(false)
headerProgress = cfg.hd.Progress()
Expand Down
1 change: 0 additions & 1 deletion ethdb/privateapi/ethbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *type
if header.Hash() != blockHash {
return nil, fmt.Errorf("invalid hash for payload. got: %s, wanted: %s", common.Bytes2Hex(blockHash[:]), common.Bytes2Hex(header.Hash().Bytes()))
}
log.Info("Received Payload from beacon-chain", "hash", blockHash)
// Send the block over
s.numberSent = req.BlockNumber
s.reverseDownloadCh <- header
Expand Down
98 changes: 97 additions & 1 deletion turbo/stages/headerdownload/header_algos.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"
"time"

"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces"
"github.com/ledgerwatch/erigon/common"
Expand Down Expand Up @@ -571,6 +572,19 @@ func (hd *HeaderDownload) RequestMoreHeaders(currentTime uint64) (*HeaderRequest
return nil, penalties
}

func (hd *HeaderDownload) RequestMoreHeadersForPOS() HeaderRequest {
hd.lock.RLock()
defer hd.lock.RUnlock()
// Assemble the request
return HeaderRequest{
Hash: common.Hash{},
Number: hd.lastProcessedPayload - 1,
Length: 192,
Skip: 0,
Reverse: true,
}
}

func (hd *HeaderDownload) UpdateRetryTime(req *HeaderRequest, currentTime, timeout uint64) {
hd.lock.Lock()
defer hd.lock.Unlock()
Expand Down Expand Up @@ -682,6 +696,48 @@ func (hd *HeaderDownload) InsertHeaders(hf func(header *types.Header, headerRaw
return hd.highestInDb >= hd.preverifiedHeight && hd.topSeenHeight > 0 && hd.highestInDb >= hd.topSeenHeight, nil
}

func (hd *HeaderDownload) SetExpectedHash(hash common.Hash) {
hd.lock.Lock()
defer hd.lock.Unlock()
hd.expectedHash = hash
}

func (hd *HeaderDownload) ProcessSegmentPOS(segment ChainSegment, tx kv.Getter) error {
if len(segment) == 0 {
return nil
}
hd.lock.Lock()
defer hd.lock.Unlock()
log.Trace("Collecting...", "from", segment[0].Number, "to", segment[len(segment)-1].Number, "len", len(segment))
for _, segmentFragment := range segment {
header := segmentFragment.Header
// If we found the block number we were missing, we can just dismiss it
if header.Hash() != hd.expectedHash {
return nil
}
currentCanonical, err := rawdb.ReadCanonicalHash(tx, header.Number.Uint64())
if err != nil {
return err
}
if currentCanonical == hd.expectedHash || hd.lastProcessedPayload == 1 {
hd.synced = true
}
data, err := rlp.EncodeToBytes(header)
if err != nil {
log.Crit("Failed to RLP encode header", "err", err)
}
if err := hd.headersCollector.Collect(dbutils.HeaderKey(header.Number.Uint64(), header.Hash()), data); err != nil {
return err
}
if err := hd.canonicalHashesCollector.Collect(dbutils.EncodeBlockNumber(header.Number.Uint64()), header.Hash().Bytes()); err != nil {
return err
}
hd.expectedHash = segmentFragment.Header.ParentHash
hd.lastProcessedPayload = header.Number.Uint64()
}
return nil
}

// GrabAnnounces - returns all available announces and forget them
func (hd *HeaderDownload) GrabAnnounces() []Announce {
hd.lock.Lock()
Expand All @@ -694,7 +750,11 @@ func (hd *HeaderDownload) GrabAnnounces() []Announce {
func (hd *HeaderDownload) Progress() uint64 {
hd.lock.RLock()
defer hd.lock.RUnlock()
return hd.highestInDb
if hd.backwards {
return hd.lastProcessedPayload
} else {
return hd.highestInDb
}
}

func (hd *HeaderDownload) HasLink(linkHash common.Hash) bool {
Expand Down Expand Up @@ -965,6 +1025,42 @@ func (hd *HeaderDownload) SetFetching(fetching bool) {
hd.fetching = fetching
}

func (hd *HeaderDownload) SetProcessed(lastProcessed uint64) {
hd.lock.Lock()
defer hd.lock.Unlock()
hd.lastProcessedPayload = lastProcessed
}

func (hd *HeaderDownload) SetHeadersCollector(collector *etl.Collector) {
hd.lock.Lock()
defer hd.lock.Unlock()
hd.headersCollector = collector
}

func (hd *HeaderDownload) SetCanonicalHashesCollector(collector *etl.Collector) {
hd.lock.Lock()
defer hd.lock.Unlock()
hd.canonicalHashesCollector = collector
}

func (hd *HeaderDownload) SetBackwards(backwards bool) {
hd.lock.Lock()
defer hd.lock.Unlock()
hd.backwards = backwards
}

func (hd *HeaderDownload) GetBackwards() bool {
hd.lock.RLock()
defer hd.lock.RUnlock()
return hd.backwards
}

func (hd *HeaderDownload) Synced() bool {
hd.lock.RLock()
defer hd.lock.RUnlock()
return hd.synced
}

func (hd *HeaderDownload) RequestChaining() bool {
hd.lock.RLock()
defer hd.lock.RUnlock()
Expand Down
8 changes: 8 additions & 0 deletions turbo/stages/headerdownload/header_data_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

lru "github.com/hashicorp/golang-lru"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus"
"github.com/ledgerwatch/erigon/core/types"
Expand Down Expand Up @@ -219,6 +220,13 @@ type HeaderDownload struct {
topSeenHeight uint64
requestChaining bool // Whether the downloader is allowed to issue more requests when previous responses created or moved an anchor
fetching bool // Set when the stage that is actively fetching the headers is in progress
// proof-of-stake
lastProcessedPayload uint64 // The last header number inserted when processing the chain backwards
expectedHash common.Hash // Parenthash of the last header inserted, we keep it so that we do not read it from database over and over
synced bool // if we found a canonical hash during backward sync, in this case our sync process is done
backwards bool // True if the chain is syncing backwards or not
headersCollector *etl.Collector // ETL collector for headers
canonicalHashesCollector *etl.Collector // ETL collector for canonical hashes
}

// HeaderRecord encapsulates two forms of the same header - raw RLP encoding (to avoid duplicated decodings and encodings), and parsed value types.Header
Expand Down
Loading