-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added reversed for proof-of-stake syncing #3092
Changes from 26 commits
5ca7263
a30cde0
0e08e99
c1ef4eb
3b133e6
7d357ae
cc2ddcf
9f39c16
7c2d6c5
3568d3c
6a19eeb
aff42e7
d9d6470
5a80bdf
0ecde3b
c8cfb55
456560a
15e90f8
ebfd90b
eb62c97
8524720
344f996
226153e
3d33cd2
850ff37
c62845c
b0a74d5
9af084a
2186e3e
58a21a7
daafe78
2cd664d
2d52f79
0527723
7f9e9a8
c88561e
1f8d95b
2e00bfd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ import ( | |
"github.com/c2h5oh/datasize" | ||
"github.com/holiman/uint256" | ||
libcommon "github.com/ledgerwatch/erigon-lib/common" | ||
"github.com/ledgerwatch/erigon-lib/etl" | ||
"github.com/ledgerwatch/erigon-lib/kv" | ||
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces" | ||
"github.com/ledgerwatch/erigon/common" | ||
|
@@ -38,6 +39,7 @@ type HeadersCfg struct { | |
penalize func(context.Context, []headerdownload.PenaltyItem) | ||
batchSize datasize.ByteSize | ||
noP2PDiscovery bool | ||
tmpdir string | ||
reverseDownloadCh chan types.Header | ||
waitingPosHeaders *bool | ||
snapshots *snapshotsync.AllSnapshots | ||
|
@@ -58,6 +60,7 @@ func StageHeadersCfg( | |
waitingPosHeaders *bool, | ||
snapshots *snapshotsync.AllSnapshots, | ||
blockReader interfaces.FullBlockReader, | ||
tmpdir string, | ||
) HeadersCfg { | ||
return HeadersCfg{ | ||
db: db, | ||
|
@@ -110,7 +113,7 @@ func SpawnStageHeaders( | |
} | ||
|
||
if isTrans { | ||
return HeadersDownward(s, u, ctx, tx, cfg, initialCycle, test) | ||
return HeadersDownward(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx) | ||
} else { | ||
return HeadersForward(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx) | ||
} | ||
|
@@ -125,15 +128,14 @@ func HeadersDownward( | |
cfg HeadersCfg, | ||
initialCycle bool, | ||
test bool, // Set to true in tests, allows the stage to fail rather than wait indefinitely | ||
useExternalTx bool, | ||
) error { | ||
*cfg.waitingPosHeaders = true | ||
// Waiting for the beacon chain | ||
log.Info("Waiting for payloads...") | ||
header := <-cfg.reverseDownloadCh | ||
*cfg.waitingPosHeaders = false | ||
|
||
defer tx.Commit() | ||
|
||
headerNumber := header.Number.Uint64() | ||
|
||
blockHash, err := rawdb.ReadCanonicalHash(tx, headerNumber) | ||
|
@@ -162,10 +164,97 @@ func HeadersDownward( | |
return err | ||
} | ||
if parent != nil && parent.Hash() == header.ParentHash { | ||
return s.Update(tx, header.Number.Uint64()) | ||
if err := s.Update(tx, header.Number.Uint64()); err != nil { | ||
return err | ||
} | ||
// For the sake of simplicity we can just assume it will be valid for now. (TODO: move to execution stage) | ||
cfg.statusCh <- privateapi.ExecutionStatus{ | ||
Status: privateapi.Valid, | ||
HeadHash: header.Hash(), | ||
} | ||
return tx.Commit() | ||
} | ||
cfg.hd.SetBackwards(true) | ||
if err = cfg.hd.ReadProgressFromDb(tx); err != nil { | ||
return err | ||
} | ||
cfg.hd.SetProcessed(header.Number.Uint64()) | ||
cfg.hd.SetExpectedHash(header.ParentHash) | ||
cfg.hd.SetFetching(true) | ||
defer cfg.hd.SetFetching(false) | ||
logPrefix := s.LogPrefix() | ||
|
||
logEvery := time.NewTicker(logInterval) | ||
defer logEvery.Stop() | ||
|
||
// Allow other stages to run 1 cycle if no network available | ||
if initialCycle && cfg.noP2PDiscovery { | ||
return nil | ||
} | ||
|
||
cfg.statusCh <- privateapi.ExecutionStatus{ | ||
Status: privateapi.Syncing, | ||
HeadHash: rawdb.ReadHeadBlockHash(tx), | ||
} | ||
// Downward sync if we need to process more (TODO) | ||
return s.Update(tx, header.Number.Uint64()) | ||
log.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", header.Number.Uint64()) | ||
|
||
cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader}) | ||
|
||
stopped := false | ||
prevProgress := header.Number.Uint64() | ||
go func() { | ||
for { | ||
timer := time.NewTimer(1 * time.Second) | ||
select { | ||
case <-ctx.Done(): | ||
stopped = true | ||
case <-logEvery.C: | ||
diff := prevProgress - cfg.hd.Progress() | ||
if cfg.hd.Progress() <= prevProgress { | ||
log.Info("Wrote Block Headers backwards", "from", header.Number.Uint64(), | ||
"now", cfg.hd.Progress(), "blk/sec", float64(diff)/float64(logInterval/time.Second)) | ||
prevProgress = cfg.hd.Progress() | ||
} | ||
} | ||
timer.Stop() | ||
} | ||
}() | ||
headerCollector := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize)) | ||
Giulio2002 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
canonicalHeadersCollector := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize)) | ||
cfg.hd.SetHeadersCollector(headerCollector) | ||
cfg.hd.SetCanonicalHashesCollector(canonicalHeadersCollector) | ||
defer headerCollector.Close() | ||
Giulio2002 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer canonicalHeadersCollector.Close() | ||
var req headerdownload.HeaderRequest | ||
for !stopped { | ||
yperbasis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sentToPeer := false | ||
for !sentToPeer { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, it is this loop that will be mostly tight There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is one of the reason in the forward sync I limited number of iterations to 64, and also inserted There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if i used a 30 seconds timer i would get 6.8 blk/sec, what i could do is |
||
req = cfg.hd.RequestMoreHeadersForPOS() | ||
_, sentToPeer = cfg.headerReqSend(ctx, &req) | ||
} | ||
// Load headers into the database | ||
announces := cfg.hd.GrabAnnounces() | ||
if len(announces) > 0 { | ||
cfg.announceNewHashes(ctx, announces) | ||
} | ||
if cfg.hd.Synced() { // We do not break unless there best header changed | ||
break | ||
} | ||
} | ||
if err := headerCollector.Load(tx, kv.Headers, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil { | ||
return err | ||
} | ||
if err := canonicalHeadersCollector.Load(tx, kv.HeaderCanonical, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil { | ||
return err | ||
} | ||
if s.BlockNumber >= cfg.hd.Progress() { | ||
u.UnwindTo(cfg.hd.Progress(), common.Hash{}) | ||
} else { | ||
if err := s.Update(tx, header.Number.Uint64()); err != nil { | ||
return err | ||
} | ||
} | ||
return tx.Commit() | ||
} | ||
|
||
// HeadersForward progresses Headers stage in the forward direction | ||
|
@@ -261,7 +350,6 @@ func HeadersForward( | |
if header.Eip3675 { | ||
return nil | ||
} | ||
|
||
if td.Cmp(cfg.terminalTotalDifficulty) > 0 { | ||
return rawdb.MarkTransition(tx, blockNum) | ||
} | ||
|
@@ -305,6 +393,7 @@ func HeadersForward( | |
if err = cfg.hd.ReadProgressFromDb(tx); err != nil { | ||
return err | ||
} | ||
cfg.hd.SetBackwards(false) | ||
cfg.hd.SetFetching(true) | ||
defer cfg.hd.SetFetching(false) | ||
headerProgress = cfg.hd.Progress() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused about this backwards flag. Firstly, my understanding is that the existing (pre-Merge) downloader downloads headers mostly (besides skeleton requests) backwards, towards the genesis. So "backwards" meaning PoS sync is a bit misleading. Secondly, why is it all or nothing mode? Couldn't exclusively backwards downloading of PoS blocks run simultaneously with the old skeleton+backwards algorithm for pre-Merge blocks? To sum up, to my mind "GetBackwards" is a misleading name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to POSSync, the flag indicates when we are doing sync for proof-of-stake or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So does it mean that we can't download PoS and PoW headers simultaneously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can, if we assume that the block supplied by the beacon chain is correct, we sync backwards until we find a block whose parenthash is present in our database, this can be past the terminal block. if such thing happen, we unwind
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. What about a node that wants to sync from scratch some time after the Merge? It doesn't have any PoW headers in the DB yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, i think that headersForward means that headers are inserted in "forward" order, if you were remove preverified hashes, you would have to consensus check in forward order(genesis to head), however in our case we process the headers in downward order we start from head and go to genesis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you think that
headersFoward
is wrong/misleading that can be refactored in another PR, it is not in the scope of this one i believeThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I see the following problem with inserting PoS headers into the DB in the downwards direction. Previously we had a single number for header stage's progress: the highest header inserted into the DB. Now there are essentially two numbers: the highest PoW header & the lowest PoS header that are inserted into the DB. So what happens when a node is restarted while there's a gap between the two? It doesn't fit into our current notion of a single number describing stage progress.
I'm wondering why do we have to insert PoS headers into the DB in the downwards direction anyway? To my mind it's simpler to tweak the current algo, have a long PoS segment growing downwards in memory, and then insert it into the DB when it touches the terminal PoW block (or a pre-verified PoS block after the Merge). @AlexeyAkhunov what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to insert them backwards to check their validity, we need to check if previous block parenthash is equal to current block hash and also if we need to unwind
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we can verify parent hashes in memory, before committing to the DB, and grow in memory a long PoS segment from the Beacon chain tip downwards until we touch a PoW block in the DB. That's what the current algo does: it grows segments in memory downwards.