Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added reversed for proof-of-stake syncing #3092

Merged
merged 38 commits into from
Dec 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5ca7263
reverse sync slow but working
Giulio2002 Dec 7, 2021
a30cde0
progress
Giulio2002 Dec 8, 2021
0e08e99
p
Giulio2002 Dec 9, 2021
c1ef4eb
Merge remote-tracking branch 'origin/devel' into optimistic-sync
Giulio2002 Dec 9, 2021
3b133e6
close request
Giulio2002 Dec 9, 2021
7d357ae
backwards refactoring
Giulio2002 Dec 9, 2021
cc2ddcf
cleanup 1
Giulio2002 Dec 9, 2021
9f39c16
added RequestAssembler
Giulio2002 Dec 9, 2021
7c2d6c5
remove trash code and spaghetti
Giulio2002 Dec 9, 2021
3568d3c
efficient
Giulio2002 Dec 9, 2021
6a19eeb
fix
Giulio2002 Dec 9, 2021
aff42e7
refactor
Giulio2002 Dec 9, 2021
d9d6470
tf
Giulio2002 Dec 9, 2021
5a80bdf
refact
Giulio2002 Dec 9, 2021
0ecde3b
final refactoring
Giulio2002 Dec 9, 2021
c8cfb55
headers forward restored
Giulio2002 Dec 9, 2021
456560a
Merge remote-tracking branch 'origin/devel' into reverse-download
Giulio2002 Dec 9, 2021
15e90f8
test fixed
Giulio2002 Dec 9, 2021
ebfd90b
make CI happy
Giulio2002 Dec 10, 2021
eb62c97
resolved
Giulio2002 Dec 10, 2021
8524720
resolved comments
Giulio2002 Dec 11, 2021
344f996
not using insertList anymore
Giulio2002 Dec 11, 2021
226153e
oops
Giulio2002 Dec 11, 2021
3d33cd2
better collectors
Giulio2002 Dec 11, 2021
850ff37
removed debug config
Giulio2002 Dec 11, 2021
c62845c
avoid pointers
Giulio2002 Dec 11, 2021
b0a74d5
added sleep
Giulio2002 Dec 12, 2021
9af084a
use of channels
Giulio2002 Dec 12, 2021
2186e3e
sleeping
Giulio2002 Dec 12, 2021
58a21a7
added logs for ETL
Giulio2002 Dec 12, 2021
daafe78
added more cleanup
Giulio2002 Dec 12, 2021
2cd664d
correct glacier
Giulio2002 Dec 12, 2021
2d52f79
some refactoring
Giulio2002 Dec 12, 2021
0527723
maxRequests
Giulio2002 Dec 12, 2021
7f9e9a8
tweaks
Giulio2002 Dec 12, 2021
c88561e
config.go
Giulio2002 Dec 12, 2021
1f8d95b
config conflicts
Giulio2002 Dec 13, 2021
2e00bfd
renamed functions
Giulio2002 Dec 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions cmd/sentry/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func NewControlServer(db kv.RwDB, nodeName string, chainConfig *params.ChainConf
1024*1024, /* linkLimit */
engine,
)

if err := hd.RecoverFromDb(db); err != nil {
return nil, fmt.Errorf("recovery from DB failed: %w", err)
}
Expand Down Expand Up @@ -470,30 +471,42 @@ func (cs *ControlServerImpl) blockHeaders(ctx context.Context, pkt eth.BlockHead
Number: number,
})
}

if segments, penaltyKind, err := cs.Hd.SplitIntoSegments(csHeaders); err == nil {
if penaltyKind == headerdownload.NoPenalty {
var canRequestMore bool
for _, segment := range segments {
requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(peerID))
canRequestMore = canRequestMore || requestMore
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)
if cs.Hd.POSSync() {
tx, err := cs.db.BeginRo(ctx)
defer tx.Rollback()
if err != nil {
return err
}
}

if canRequestMore {
currentTime := uint64(time.Now().Unix())
req, penalties := cs.Hd.RequestMoreHeaders(currentTime)
if req != nil {
if _, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer {
// If request was actually sent to a peer, we update retry time to be 5 seconds in the future
cs.Hd.UpdateRetryTime(req, currentTime, 5 /* timeout */)
log.Trace("Sent request", "height", req.Number)
for _, segment := range segments {
if err := cs.Hd.ProcessSegmentPOS(segment, tx); err != nil {
return err
}
}
} else {
var canRequestMore bool
for _, segment := range segments {
requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, ConvertH256ToPeerID(peerID))
canRequestMore = canRequestMore || requestMore
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)
}
}
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)

if canRequestMore {
currentTime := uint64(time.Now().Unix())
req, penalties := cs.Hd.RequestMoreHeaders(currentTime)
if req != nil {
if _, sentToPeer := cs.SendHeaderRequest(ctx, req); sentToPeer {
// If request was actually sent to a peer, we update retry time to be 5 seconds in the future
cs.Hd.UpdateRetryTime(req, currentTime, 5 /* timeout */)
log.Trace("Sent request", "height", req.Number)
}
}
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)
}
}
}
} else {
Expand Down
132 changes: 122 additions & 10 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/c2h5oh/datasize"
"github.com/holiman/uint256"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/cmd/rpcdaemon/interfaces"
"github.com/ledgerwatch/erigon/common"
Expand Down Expand Up @@ -38,6 +39,7 @@ type HeadersCfg struct {
penalize func(context.Context, []headerdownload.PenaltyItem)
batchSize datasize.ByteSize
noP2PDiscovery bool
tmpdir string
reverseDownloadCh chan types.Header
waitingPosHeaders *bool
snapshots *snapshotsync.AllSnapshots
Expand All @@ -58,6 +60,7 @@ func StageHeadersCfg(
waitingPosHeaders *bool,
snapshots *snapshotsync.AllSnapshots,
blockReader interfaces.FullBlockReader,
tmpdir string,
) HeadersCfg {
return HeadersCfg{
db: db,
Expand Down Expand Up @@ -110,30 +113,29 @@ func SpawnStageHeaders(
}

if isTrans {
return HeadersDownward(s, u, ctx, tx, cfg, initialCycle, test)
return HeadersPOS(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx)
} else {
return HeadersForward(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx)
return HeadersPOW(s, u, ctx, tx, cfg, initialCycle, test, useExternalTx)
}
}

// HeadersDownward progresses Headers stage in the downward direction
func HeadersDownward(
func HeadersPOS(
s *StageState,
u Unwinder,
ctx context.Context,
tx kv.RwTx,
cfg HeadersCfg,
initialCycle bool,
test bool, // Set to true in tests, allows the stage to fail rather than wait indefinitely
useExternalTx bool,
) error {
*cfg.waitingPosHeaders = true
// Waiting for the beacon chain
log.Info("Waiting for payloads...")
header := <-cfg.reverseDownloadCh
*cfg.waitingPosHeaders = false

defer tx.Commit()

headerNumber := header.Number.Uint64()

blockHash, err := rawdb.ReadCanonicalHash(tx, headerNumber)
Expand Down Expand Up @@ -162,14 +164,124 @@ func HeadersDownward(
return err
}
if parent != nil && parent.Hash() == header.ParentHash {
return s.Update(tx, header.Number.Uint64())
if err := s.Update(tx, header.Number.Uint64()); err != nil {
return err
}
// For the sake of simplicity we can just assume it will be valid for now. (TODO: move to execution stage)
cfg.statusCh <- privateapi.ExecutionStatus{
Status: privateapi.Valid,
HeadHash: header.Hash(),
}
return tx.Commit()
}
// Downward sync if we need to process more (TODO)
return s.Update(tx, header.Number.Uint64())
cfg.hd.SetPOSSync(true)
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
return err
}
cfg.hd.SetProcessed(header.Number.Uint64())
cfg.hd.SetExpectedHash(header.ParentHash)
cfg.hd.SetFetching(true)
logPrefix := s.LogPrefix()

logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

// Allow other stages to run 1 cycle if no network available
if initialCycle && cfg.noP2PDiscovery {
return nil
}

cfg.statusCh <- privateapi.ExecutionStatus{
Status: privateapi.Syncing,
HeadHash: rawdb.ReadHeadBlockHash(tx),
}
log.Info(fmt.Sprintf("[%s] Waiting for headers...", logPrefix), "from", header.Number.Uint64())

cfg.hd.SetHeaderReader(&chainReader{config: &cfg.chainConfig, tx: tx, blockReader: cfg.blockReader})

stopped := false
prevProgress := header.Number.Uint64()

headerCollector := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
canonicalHeadersCollector := etl.NewCollector(logPrefix, cfg.tmpdir, etl.NewSortableBuffer(etl.BufferOptimalSize))
cfg.hd.SetHeadersCollector(headerCollector)
cfg.hd.SetCanonicalHashesCollector(canonicalHeadersCollector)
// Cleanup after we finish backward sync
defer func() {
headerCollector.Close()
canonicalHeadersCollector.Close()
cfg.hd.SetHeadersCollector(nil)
cfg.hd.SetCanonicalHashesCollector(nil)
cfg.hd.Unsync()
cfg.hd.SetFetching(false)
}()

var req headerdownload.HeaderRequest
for !stopped {
sentToPeer := false
maxRequests := 4096
for !sentToPeer && !stopped && maxRequests != 0 {
req = cfg.hd.RequestMoreHeadersForPOS()
_, sentToPeer = cfg.headerReqSend(ctx, &req)
maxRequests--
}
// Load headers into the database
announces := cfg.hd.GrabAnnounces()
if len(announces) > 0 {
cfg.announceNewHashes(ctx, announces)
}
if cfg.hd.Synced() { // We do not break unless there best header changed
stopped = true
}
// Sleep and check for logs
timer := time.NewTimer(2 * time.Millisecond)
select {
case <-ctx.Done():
stopped = true
case <-logEvery.C:
diff := prevProgress - cfg.hd.Progress()
if cfg.hd.Progress() <= prevProgress {
log.Info("Wrote Block Headers backwards", "from", header.Number.Uint64(),
"now", cfg.hd.Progress(), "blk/sec", float64(diff)/float64(logInterval/time.Second))
prevProgress = cfg.hd.Progress()
}
case <-timer.C:
log.Trace("RequestQueueTime (header) ticked")
}
// Cleanup timer
timer.Stop()
}
// If the user stopped it, we dont update anything
if !cfg.hd.Synced() {
return nil
}

if err := headerCollector.Load(tx, kv.Headers, etl.IdentityLoadFunc, etl.TransformArgs{
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
}); err != nil {
return err
}
if err = canonicalHeadersCollector.Load(tx, kv.HeaderCanonical, etl.IdentityLoadFunc, etl.TransformArgs{
LogDetailsLoad: func(k, v []byte) (additionalLogArguments []interface{}) {
return []interface{}{"block", binary.BigEndian.Uint64(k)}
},
}); err != nil {
return err
}
if s.BlockNumber >= cfg.hd.Progress() {
u.UnwindTo(cfg.hd.Progress(), common.Hash{})
} else {
if err := s.Update(tx, header.Number.Uint64()); err != nil {
return err
}
}
return tx.Commit()
}

// HeadersForward progresses Headers stage in the forward direction
func HeadersForward(
func HeadersPOW(
s *StageState,
u Unwinder,
ctx context.Context,
Expand Down Expand Up @@ -261,7 +373,6 @@ func HeadersForward(
if header.Eip3675 {
return nil
}

if td.Cmp(cfg.terminalTotalDifficulty) > 0 {
return rawdb.MarkTransition(tx, blockNum)
}
Expand Down Expand Up @@ -305,6 +416,7 @@ func HeadersForward(
if err = cfg.hd.ReadProgressFromDb(tx); err != nil {
return err
}
cfg.hd.SetPOSSync(false)
cfg.hd.SetFetching(true)
defer cfg.hd.SetFetching(false)
headerProgress = cfg.hd.Progress()
Expand Down
1 change: 0 additions & 1 deletion ethdb/privateapi/ethbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ func (s *EthBackendServer) EngineExecutePayloadV1(ctx context.Context, req *type
if header.Hash() != blockHash {
return nil, fmt.Errorf("invalid hash for payload. got: %s, wanted: %s", common.Bytes2Hex(blockHash[:]), common.Bytes2Hex(header.Hash().Bytes()))
}
log.Info("Received Payload from beacon-chain", "hash", blockHash)
// Send the block over
s.numberSent = req.BlockNumber
s.reverseDownloadCh <- header
Expand Down
Loading