Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/downloader: terminate beacon sync early when linked to local chain #24550

Merged
merged 2 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) e
// sync and on the correct chain, checking the top N links should already get us
// a match. In the rare scenario when we ended up on a long reorganisation (i.e.
// none of the head links match), we do a binary search to find the ancestor.
func (d *Downloader) findBeaconAncestor() uint64 {
func (d *Downloader) findBeaconAncestor() (uint64, error) {
// Figure out the current local head position
var chainHead *types.Header

Expand All @@ -189,17 +189,36 @@ func (d *Downloader) findBeaconAncestor() uint64 {
}
number := chainHead.Number.Uint64()

// If the head is present in the skeleton chain, return that
if chainHead.Hash() == d.skeleton.Header(number).Hash() {
return number
}
// Head header not present, binary search to find the ancestor
start, end := uint64(0), number

beaconHead, err := d.skeleton.Head()
// Retrieve the skeleton bounds and ensure they are linked to the local chain
beaconHead, beaconTail, err := d.skeleton.Bounds()
if err != nil {
panic(fmt.Sprintf("failed to read skeleton head: %v", err)) // can't reach this method without a head
// This is a programming error. The chain backfiller was called with an
// invalid beacon sync state. Ideally we would panic here, but erroring
// gives us at least a remote chance to recover. It's still a big fault!
log.Error("Failed to retrieve beacon bounds", "err", err)
return 0, err
}
var linked bool
switch d.getMode() {
case FullSync:
linked = d.blockchain.HasBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
case SnapSync:
linked = d.blockchain.HasFastBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
default:
linked = d.blockchain.HasHeader(beaconTail.ParentHash, beaconTail.Number.Uint64()-1)
}
if !linked {
// This is a programming error. The chain backfiller was called with a
// tail that's not linked to the local chain. Whilst this should never
// happen, there might be some weirdnesses if beacon sync backfilling
// races with the user (or beacon client) calling setHead. Whilst panic
// would be the ideal thing to do, it is safer long term to attempt a
// recovery and fix any noticed issue after the fact.
log.Error("Beacon sync linkup unavailable", "number", beaconTail.Number.Uint64()-1, "hash", beaconTail.ParentHash)
return 0, fmt.Errorf("beacon linkup unavailable locally: %d [%x]", beaconTail.Number.Uint64()-1, beaconTail.ParentHash)
}
// Binary search to find the ancestor
start, end := beaconTail.Number.Uint64()-1, number
if number := beaconHead.Number.Uint64(); end > number {
// This shouldn't really happen in a healty network, but if the consensus
// clients feeds us a shorter chain as the canonical, we should not attempt
Expand Down Expand Up @@ -229,13 +248,13 @@ func (d *Downloader) findBeaconAncestor() uint64 {
}
start = check
}
return start
return start, nil
}

// fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling
// until sync errors or is finished.
func (d *Downloader) fetchBeaconHeaders(from uint64) error {
head, err := d.skeleton.Head()
head, _, err := d.skeleton.Bounds()
if err != nil {
return err
}
Expand Down Expand Up @@ -281,7 +300,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
case <-d.cancelCh:
return errCanceled
}
head, err = d.skeleton.Head()
head, _, err = d.skeleton.Bounds()
if err != nil {
return err
}
Expand Down
7 changes: 5 additions & 2 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
}
} else {
// In beacon mode, user the skeleton chain to retrieve the headers from
latest, err = d.skeleton.Head()
latest, _, err = d.skeleton.Bounds()
if err != nil {
return err
}
Expand All @@ -498,7 +498,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
}
} else {
// In beacon mode, use the skeleton chain for the ancestor lookup
origin = d.findBeaconAncestor()
origin, err = d.findBeaconAncestor()
if err != nil {
return err
}
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
Expand Down
99 changes: 65 additions & 34 deletions eth/downloader/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,10 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {

// If the sync is already done, resume the backfiller. When the loop stops,
// terminate the backfiller too.
if s.scratchHead == 0 {
linked := len(s.progress.Subchains) == 1 &&
rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) &&
rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead)
if linked {
s.filler.resume()
}
defer s.filler.suspend()
Expand Down Expand Up @@ -391,8 +394,9 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
}
for {
// Something happened, try to assign new tasks to any idle peers
s.assignTasks(responses, requestFails, cancel)

if !linked {
s.assignTasks(responses, requestFails, cancel)
}
// Wait for something to happen
select {
case event := <-peering:
Expand Down Expand Up @@ -434,7 +438,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {
// New head was integrated into the skeleton chain. If the backfiller
// is still running, it will pick it up. If it already terminated,
// a new cycle needs to be spun up.
if s.scratchHead == 0 {
if linked {
s.filler.resume()
}

Expand All @@ -443,23 +447,20 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) {

case res := <-responses:
// Process the batch of headers. If though processing we managed to
// link the curret subchain to a previously downloaded one, abort the
// sync and restart with the merged subchains. We could probably hack
// the internal state to switch the scratch space over to the tail of
// the extended subchain, but since the scenario is rare, it's cleaner
// to rely on the restart mechanism than a stateful modification.
if merged := s.processResponse(res); merged {
// link the current subchain to a previously downloaded one, abort the
// sync and restart with the merged subchains.
//
// If we managed to link to the existing local chain or genesis block,
// abort sync altogether.
linked, merged := s.processResponse(res)
if linked {
log.Debug("Beacon sync linked to local chain")
return nil, errSyncLinked
}
if merged {
log.Debug("Beacon sync merged subchains")
return nil, errSyncMerged
}
// If we've just reached the genesis block, tear down the sync cycle
// and restart it to resume the backfiller. We could just as well do
// a signalling here, but it's a tad cleaner to have only one entry
// pathway to suspending/resuming it.
if len(s.progress.Subchains) == 1 && s.progress.Subchains[0].Tail == 1 {
log.Debug("Beacon sync linked to genesis")
return nil, errSyncLinked
}
// We still have work to do, loop and repeat
}
}
Expand Down Expand Up @@ -852,7 +853,7 @@ func (s *skeleton) revertRequest(req *headerRequest) {
s.scratchOwners[(s.scratchHead-req.head)/requestHeaders] = ""
}

func (s *skeleton) processResponse(res *headerResponse) bool {
func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged bool) {
res.peer.log.Trace("Processing header response", "head", res.headers[0].Number, "hash", res.headers[0].Hash(), "count", len(res.headers))

// Whether the response is valid, we can mark the peer as idle and notify
Expand All @@ -866,7 +867,7 @@ func (s *skeleton) processResponse(res *headerResponse) bool {
// gets fulfilled successfully. It should not be possible to deliver a
// response to a non-existing request.
res.peer.log.Error("Unexpected header packet")
return false
return false, false
}
delete(s.requests, res.reqid)

Expand All @@ -877,11 +878,9 @@ func (s *skeleton) processResponse(res *headerResponse) bool {

// If there's still a gap in the head of the scratch space, abort
if s.scratchSpace[0] == nil {
return false
return false, false
}
// Try to consume any head headers, validating the boundary conditions
var merged bool // Whether subchains were merged

batch := s.db.NewBatch()
for s.scratchSpace[0] != nil {
// Next batch of headers available, cross-reference with the subchain
Expand Down Expand Up @@ -916,15 +915,44 @@ func (s *skeleton) processResponse(res *headerResponse) bool {

s.progress.Subchains[0].Tail--
s.progress.Subchains[0].Next = header.ParentHash

// If we've reached an existing block in the chain, stop retrieving
// headers. Note, if we want to support light clients with the same
// code we'd need to switch here based on the downloader mode. That
// said, there's no such functionality for now, so don't complicate.
//
// In the case of full sync it would be enough to check for the body,
// but even a full syncing node will generate a receipt once block
// processing is done, so it's just one more "needless" check.
var (
hasBody = rawdb.HasBody(s.db, header.ParentHash, header.Number.Uint64()-1)
hasReceipt = rawdb.HasReceipts(s.db, header.ParentHash, header.Number.Uint64()-1)
)
if hasBody && hasReceipt {
linked = true
break
}
}
}
// Batch of headers consumed, shift the download window forward
head := s.progress.Subchains[0].Head
tail := s.progress.Subchains[0].Tail
next := s.progress.Subchains[0].Next

log.Trace("Primary subchain extended", "head", head, "tail", tail, "next", next)

// If the beacon chain was linked to the local chain, completely swap out
// all internal progress and abort header synchronization.
if linked {
// Note, linking into the local chain should also mean that there are
// no leftover subchains, but just in case there's some junk due to
// strange conditions or bugs, clean up all internal state.
if len(s.progress.Subchains) > 1 {
log.Error("Cleaning up leftovers after beacon link")
s.progress.Subchains = s.progress.Subchains[:1]
}
break
}
// Batch of headers consumed, shift the download window forward
copy(s.scratchSpace, s.scratchSpace[requestHeaders:])
for i := 0; i < requestHeaders; i++ {
s.scratchSpace[scratchHeaders-i-1] = nil
Expand Down Expand Up @@ -979,6 +1007,9 @@ func (s *skeleton) processResponse(res *headerResponse) bool {
}
// Print a progress report making the UX a bit nicer
left := s.progress.Subchains[0].Tail - 1
if linked {
left = 0
}
if time.Since(s.logged) > 8*time.Second || left == 0 {
s.logged = time.Now()

Expand All @@ -989,11 +1020,11 @@ func (s *skeleton) processResponse(res *headerResponse) bool {
log.Info("Syncing beacon headers", "downloaded", s.pulled, "left", left, "eta", common.PrettyDuration(eta))
}
}
return merged
return linked, merged
}

// Head retrieves the current head tracked by the skeleton syncer. This method
// is meant to be used by the backfiller, whose life cycle is controlled by the
// Bounds retrieves the current head and tail tracked by the skeleton syncer.
// This method is used by the backfiller, whose life cycle is controlled by the
// skeleton syncer.
//
// Note, the method will not use the internal state of the skeleton, but will
Expand All @@ -1002,23 +1033,23 @@ func (s *skeleton) processResponse(res *headerResponse) bool {
// There might be new heads appended, but those are atomic from the perspective
// of this method. Any head reorg will first tear down the backfiller and only
// then make the modification.
func (s *skeleton) Head() (*types.Header, error) {
func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, err error) {
// Read the current sync progress from disk and figure out the current head.
// Although there's a lot of error handling here, these are mostly as sanity
// checks to avoid crashing if a programming error happens. These should not
// happen in live code.
status := rawdb.ReadSkeletonSyncStatus(s.db)
if len(status) == 0 {
return nil, errors.New("beacon sync not yet started")
return nil, nil, errors.New("beacon sync not yet started")
}
progress := new(skeletonProgress)
if err := json.Unmarshal(status, progress); err != nil {
return nil, err
return nil, nil, err
}
if progress.Subchains[0].Tail != 1 {
return nil, errors.New("beacon sync not yet finished")
}
return rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head), nil
head = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head)
tail = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Tail)

return head, tail, nil
}

// Header retrieves a specific header tracked by the skeleton syncer. This method
Expand Down