From eae36aafff226ef77c5cd2cf1799c359a666e210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 16 Mar 2022 15:23:31 +0200 Subject: [PATCH 1/2] eth/downloader: terminate beacon sync early when linked to local chain --- eth/downloader/beaconsync.go | 45 ++++++++++++----- eth/downloader/downloader.go | 7 ++- eth/downloader/skeleton.go | 97 ++++++++++++++++++++++++------------ 3 files changed, 101 insertions(+), 48 deletions(-) diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index 2a2e0d6b0507..d8ea58c239fc 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -175,7 +175,7 @@ func (d *Downloader) beaconSync(mode SyncMode, head *types.Header, force bool) e // sync and on the correct chain, checking the top N links should already get us // a match. In the rare scenario when we ended up on a long reorganisation (i.e. // none of the head links match), we do a binary search to find the ancestor. -func (d *Downloader) findBeaconAncestor() uint64 { +func (d *Downloader) findBeaconAncestor() (uint64, error) { // Figure out the current local head position var chainHead *types.Header @@ -189,17 +189,36 @@ func (d *Downloader) findBeaconAncestor() uint64 { } number := chainHead.Number.Uint64() - // If the head is present in the skeleton chain, return that - if chainHead.Hash() == d.skeleton.Header(number).Hash() { - return number - } - // Head header not present, binary search to find the ancestor - start, end := uint64(0), number - - beaconHead, err := d.skeleton.Head() + // Retrieve the skeleton bounds and ensure they are linked to the local chain + beaconHead, beaconTail, err := d.skeleton.Bounds() if err != nil { - panic(fmt.Sprintf("failed to read skeleton head: %v", err)) // can't reach this method without a head + // This is a programming error. The chain backfiller was called with an + // invalid beacon sync state. Ideally we would panic here, but erroring + // gives us at least a remote chance to recover. It's still a big fault! + log.Error("Failed to retrieve beacon bounds", "err", err) + return 0, err + } + var linked bool + switch d.getMode() { + case FullSync: + linked = d.blockchain.HasBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1) + case SnapSync: + linked = d.blockchain.HasFastBlock(beaconTail.ParentHash, beaconTail.Number.Uint64()-1) + default: + linked = d.blockchain.HasHeader(beaconTail.ParentHash, beaconTail.Number.Uint64()-1) + } + if !linked { + // This is a programming error. The chain backfiller was called with a + // tail that's not linked to the local chain. Whilst this should never + // happen, there might be some weirdnesses if beacon sync backfilling + // races with the user (or beacon client) calling setHead. Whilst panic + // would be the ideal thing to do, it is safer long term to attempt a + // recovery and fix any noticed issue after the fact. + log.Error("Beacon sync linkup unavailable", "number", beaconTail.Number.Uint64()-1, "hash", beaconTail.ParentHash) + return 0, fmt.Errorf("beacon linkup unavailable locally: %d [%x]", beaconTail.Number.Uint64()-1, beaconTail.ParentHash) } + // Binary search to find the ancestor + start, end := beaconTail.Number.Uint64()-1, number if number := beaconHead.Number.Uint64(); end > number { // This shouldn't really happen in a healty network, but if the consensus // clients feeds us a shorter chain as the canonical, we should not attempt @@ -229,13 +248,13 @@ func (d *Downloader) findBeaconAncestor() uint64 { } start = check } - return start + return start, nil } // fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling // until sync errors or is finished. func (d *Downloader) fetchBeaconHeaders(from uint64) error { - head, err := d.skeleton.Head() + head, _, err := d.skeleton.Bounds() if err != nil { return err } @@ -281,7 +300,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { case <-d.cancelCh: return errCanceled } - head, err = d.skeleton.Head() + head, _, err = d.skeleton.Bounds() if err != nil { return err } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 9cd4cfda0191..c1b1fdf79d0e 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -472,7 +472,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * } } else { // In beacon mode, user the skeleton chain to retrieve the headers from - latest, err = d.skeleton.Head() + latest, _, err = d.skeleton.Bounds() if err != nil { return err } @@ -498,7 +498,10 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * } } else { // In beacon mode, use the skeleton chain for the ancestor lookup - origin = d.findBeaconAncestor() + origin, err = d.findBeaconAncestor() + if err != nil { + return err + } } d.syncStatsLock.Lock() if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index 29cbd2ef8c1e..82d3065c3530 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -352,7 +352,10 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { // If the sync is already done, resume the backfiller. When the loop stops, // terminate the backfiller too. - if s.scratchHead == 0 { + linked := len(s.progress.Subchains) == 1 && + rawdb.HasBody(s.db, s.progress.Subchains[0].Next, s.scratchHead) && + rawdb.HasReceipts(s.db, s.progress.Subchains[0].Next, s.scratchHead) + if linked { s.filler.resume() } defer s.filler.suspend() @@ -391,8 +394,9 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { } for { // Something happened, try to assign new tasks to any idle peers - s.assignTasks(responses, requestFails, cancel) - + if !linked { + s.assignTasks(responses, requestFails, cancel) + } // Wait for something to happen select { case event := <-peering: @@ -443,23 +447,20 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { case res := <-responses: // Process the batch of headers. If though processing we managed to - // link the curret subchain to a previously downloaded one, abort the - // sync and restart with the merged subchains. We could probably hack - // the internal state to switch the scratch space over to the tail of - // the extended subchain, but since the scenario is rare, it's cleaner - // to rely on the restart mechanism than a stateful modification. - if merged := s.processResponse(res); merged { + // link the current subchain to a previously downloaded one, abort the + // sync and restart with the merged subchains. + // + // If we managed to link to the existing local chain or genesis block, + // abort sync altogether. + linked, merged := s.processResponse(res) + if linked { + log.Debug("Beacon sync linked to local chain") + return nil, errSyncLinked + } + if merged { log.Debug("Beacon sync merged subchains") return nil, errSyncMerged } - // If we've just reached the genesis block, tear down the sync cycle - // and restart it to resume the backfiller. We could just as well do - // a signalling here, but it's a tad cleaner to have only one entry - // pathway to suspending/resuming it. - if len(s.progress.Subchains) == 1 && s.progress.Subchains[0].Tail == 1 { - log.Debug("Beacon sync linked to genesis") - return nil, errSyncLinked - } // We still have work to do, loop and repeat } } @@ -852,7 +853,7 @@ func (s *skeleton) revertRequest(req *headerRequest) { s.scratchOwners[(s.scratchHead-req.head)/requestHeaders] = "" } -func (s *skeleton) processResponse(res *headerResponse) bool { +func (s *skeleton) processResponse(res *headerResponse) (linked bool, merged bool) { res.peer.log.Trace("Processing header response", "head", res.headers[0].Number, "hash", res.headers[0].Hash(), "count", len(res.headers)) // Whether the response is valid, we can mark the peer as idle and notify @@ -866,7 +867,7 @@ func (s *skeleton) processResponse(res *headerResponse) bool { // gets fulfilled successfully. It should not be possible to deliver a // response to a non-existing request. res.peer.log.Error("Unexpected header packet") - return false + return false, false } delete(s.requests, res.reqid) @@ -877,11 +878,9 @@ func (s *skeleton) processResponse(res *headerResponse) bool { // If there's still a gap in the head of the scratch space, abort if s.scratchSpace[0] == nil { - return false + return false, false } // Try to consume any head headers, validating the boundary conditions - var merged bool // Whether subchains were merged - batch := s.db.NewBatch() for s.scratchSpace[0] != nil { // Next batch of headers available, cross-reference with the subchain @@ -916,15 +915,44 @@ func (s *skeleton) processResponse(res *headerResponse) bool { s.progress.Subchains[0].Tail-- s.progress.Subchains[0].Next = header.ParentHash + + // If we've reached an existing block in the chain, stop retrieving + // headers. Note, if we want to support light clients with the same + // code we'd need to switch here based on the downloader mode. That + // said, there's no such functionality for now, so don't complicate. + // + // In the case of full sync it would be enough to check for the body, + // but even a full syncing node will generate a receipt once block + // processing is done, so it's just one more "needless" check. + var ( + hasBody = rawdb.HasBody(s.db, header.ParentHash, header.Number.Uint64()-1) + hasReceipt = rawdb.HasReceipts(s.db, header.ParentHash, header.Number.Uint64()-1) + ) + if hasBody && hasReceipt { + linked = true + break + } } } - // Batch of headers consumed, shift the download window forward head := s.progress.Subchains[0].Head tail := s.progress.Subchains[0].Tail next := s.progress.Subchains[0].Next log.Trace("Primary subchain extended", "head", head, "tail", tail, "next", next) + // If the beacon chain was linked to the local chain, completely swap out + // all internal progress and abort header synchronization. + if linked { + // Note, linking into the local chain should also mean that there are + // no leftover subchains, but just in case there's some junk due to + // strange conditions or bugs, clean up all internal state. + if len(s.progress.Subchains) > 1 { + log.Error("Cleaning up leftovers after beacon link") + s.progress.Subchains = s.progress.Subchains[:1] + } + break + } + // Batch of headers consumed, shift the download window forward copy(s.scratchSpace, s.scratchSpace[requestHeaders:]) for i := 0; i < requestHeaders; i++ { s.scratchSpace[scratchHeaders-i-1] = nil @@ -979,6 +1007,9 @@ func (s *skeleton) processResponse(res *headerResponse) bool { } // Print a progress report making the UX a bit nicer left := s.progress.Subchains[0].Tail - 1 + if linked { + left = 0 + } if time.Since(s.logged) > 8*time.Second || left == 0 { s.logged = time.Now() @@ -989,11 +1020,11 @@ func (s *skeleton) processResponse(res *headerResponse) bool { log.Info("Syncing beacon headers", "downloaded", s.pulled, "left", left, "eta", common.PrettyDuration(eta)) } } - return merged + return linked, merged } -// Head retrieves the current head tracked by the skeleton syncer. This method -// is meant to be used by the backfiller, whose life cycle is controlled by the +// Bounds retrieves the current head and tail tracked by the skeleton syncer. +// This method is used by the backfiller, whose life cycle is controlled by the // skeleton syncer. // // Note, the method will not use the internal state of the skeleton, but will @@ -1002,23 +1033,23 @@ func (s *skeleton) processResponse(res *headerResponse) bool { // There might be new heads appended, but those are atomic from the perspective // of this method. Any head reorg will first tear down the backfiller and only // then make the modification. -func (s *skeleton) Head() (*types.Header, error) { +func (s *skeleton) Bounds() (head *types.Header, tail *types.Header, err error) { // Read the current sync progress from disk and figure out the current head. // Although there's a lot of error handling here, these are mostly as sanity // checks to avoid crashing if a programming error happens. These should not // happen in live code. status := rawdb.ReadSkeletonSyncStatus(s.db) if len(status) == 0 { - return nil, errors.New("beacon sync not yet started") + return nil, nil, errors.New("beacon sync not yet started") } progress := new(skeletonProgress) if err := json.Unmarshal(status, progress); err != nil { - return nil, err + return nil, nil, err } - if progress.Subchains[0].Tail != 1 { - return nil, errors.New("beacon sync not yet finished") - } - return rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head), nil + head = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Head) + tail = rawdb.ReadSkeletonHeader(s.db, progress.Subchains[0].Tail) + + return head, tail, nil } // Header retrieves a specific header tracked by the skeleton syncer. This method From 847be24f48939be2d527a2ae43041ec8d46d3d77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 18 Mar 2022 18:10:52 +0200 Subject: [PATCH 2/2] eth/downloader: fix backfiller resume on early beacon termination --- eth/downloader/skeleton.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index 82d3065c3530..bebf273da52e 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -438,7 +438,7 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { // New head was integrated into the skeleton chain. If the backfiller // is still running, it will pick it up. If it already terminated, // a new cycle needs to be spun up. - if s.scratchHead == 0 { + if linked { s.filler.resume() }