Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth, eth/downloader: remove references to LightChain, LightSync #29711

Merged
merged 4 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
chainHead = d.blockchain.CurrentBlock()
case SnapSync:
chainHead = d.blockchain.CurrentSnapBlock()
default:
chainHead = d.lightchain.CurrentHeader()
jwasinger marked this conversation as resolved.
Show resolved Hide resolved
}
number := chainHead.Number.Uint64()

Expand Down Expand Up @@ -256,8 +254,6 @@ func (d *Downloader) findBeaconAncestor() (uint64, error) {
known = d.blockchain.HasBlock(h.Hash(), n)
case SnapSync:
known = d.blockchain.HasFastBlock(h.Hash(), n)
default:
known = d.lightchain.HasHeader(h.Hash(), n)
jwasinger marked this conversation as resolved.
Show resolved Hide resolved
}
if !known {
end = check
Expand Down
65 changes: 25 additions & 40 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ var (
errCancelContentProcessing = errors.New("content processing canceled (requested)")
errCanceled = errors.New("syncing canceled (requested)")
errNoPivotHeader = errors.New("pivot header is not found")
ErrMergeTransition = errors.New("legacy sync reached the merge")
)

// peerDropFn is a callback type for dropping a peer detected as malicious.
Expand Down Expand Up @@ -98,7 +97,6 @@ type Downloader struct {
syncStatsChainHeight uint64 // Highest block number known when syncing started
syncStatsLock sync.RWMutex // Lock protecting the sync stats fields

lightchain LightChain
blockchain BlockChain

// Callbacks
Expand Down Expand Up @@ -143,8 +141,8 @@ type Downloader struct {
syncLogTime time.Time // Time instance when status was last reported
}

// LightChain encapsulates functions required to synchronise a light chain.
type LightChain interface {
// BlockChain encapsulates functions required to sync a (full or snap) blockchain.
type BlockChain interface {
// HasHeader verifies a header's presence in the local chain.
HasHeader(common.Hash, uint64) bool

Expand All @@ -162,11 +160,6 @@ type LightChain interface {

// SetHead rewinds the local chain to a new head.
SetHead(uint64) error
}

// BlockChain encapsulates functions required to sync a (full or snap) blockchain.
type BlockChain interface {
LightChain

// HasBlock verifies a block's presence in the local chain.
HasBlock(common.Hash, uint64) bool
Expand Down Expand Up @@ -201,17 +194,13 @@ type BlockChain interface {
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func()) *Downloader {
if lightchain == nil {
lightchain = chain
}
func New(stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, dropPeer peerDropFn, success func()) *Downloader {
dl := &Downloader{
stateDB: stateDb,
mux: mux,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
Expand Down Expand Up @@ -240,15 +229,13 @@ func (d *Downloader) Progress() ethereum.SyncProgress {

current := uint64(0)
mode := d.getMode()
switch {
case d.blockchain != nil && mode == FullSync:
switch mode {
case FullSync:
current = d.blockchain.CurrentBlock().Number.Uint64()
case d.blockchain != nil && mode == SnapSync:
case SnapSync:
current = d.blockchain.CurrentSnapBlock().Number.Uint64()
case d.lightchain != nil:
current = d.lightchain.CurrentHeader().Number.Uint64()
default:
log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", mode)
log.Error("Unknown downloader mode", "mode", mode)
}
progress, pending := d.SnapSyncer.Progress()

Expand Down Expand Up @@ -402,7 +389,7 @@ func (d *Downloader) syncToHead() (err error) {
if err != nil {
d.mux.Post(FailedEvent{err})
} else {
latest := d.lightchain.CurrentHeader()
latest := d.blockchain.CurrentHeader()
d.mux.Post(DoneEvent{latest})
}
}()
Expand Down Expand Up @@ -520,7 +507,7 @@ func (d *Downloader) syncToHead() (err error) {
}
// Rewind the ancient store and blockchain if reorg happens.
if origin+1 < frozen {
if err := d.lightchain.SetHead(origin); err != nil {
if err := d.blockchain.SetHead(origin); err != nil {
return err
}
log.Info("Truncated excess ancient chain segment", "oldhead", frozen-1, "newhead", origin)
Expand Down Expand Up @@ -690,34 +677,32 @@ func (d *Downloader) processHeaders(origin uint64) error {
chunkHashes := hashes[:limit]

// In case of header only syncing, validate the chunk immediately
if mode == SnapSync || mode == LightSync {
if mode == SnapSync {
// Although the received headers might be all valid, a legacy
// PoW/PoA sync must not accept post-merge headers. Make sure
// that any transition is rejected at this point.
if len(chunkHeaders) > 0 {
if n, err := d.lightchain.InsertHeaderChain(chunkHeaders); err != nil {
if n, err := d.blockchain.InsertHeaderChain(chunkHeaders); err != nil {
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if mode == FullSync || mode == SnapSync {
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select {
case <-d.cancelCh:
return errCanceled
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
return fmt.Errorf("%w: stale headers", errBadPeer)
// If we've reached the allowed number of pending headers, stall a bit
for d.queue.PendingBodies() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
timer.Reset(time.Second)
select {
case <-d.cancelCh:
return errCanceled
case <-timer.C:
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
return fmt.Errorf("%w: stale headers", errBadPeer)
}

headers = headers[limit:]
hashes = hashes[limit:]
origin += uint64(limit)
Expand Down Expand Up @@ -1056,7 +1041,7 @@ func (d *Downloader) readHeaderRange(last *types.Header, count int) []*types.Hea
headers []*types.Header
)
for {
parent := d.lightchain.GetHeaderByHash(current.ParentHash)
parent := d.blockchain.GetHeaderByHash(current.ParentHash)
if parent == nil {
break // The chain is not continuous, or the chain is exhausted
}
Expand Down
44 changes: 13 additions & 31 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func newTesterWithNotification(t *testing.T, success func()) *downloadTester {
chain: chain,
peers: make(map[string]*downloadTesterPeer),
}
tester.downloader = New(db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, success)
tester.downloader = New(db, new(event.TypeMux), tester.chain, tester.dropPeer, success)
return tester
}

Expand Down Expand Up @@ -390,9 +390,6 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
t.Helper()

headers, blocks, receipts := length, length, length
if tester.downloader.getMode() == LightSync {
blocks, receipts = 1, 1
}
if hs := int(tester.chain.CurrentHeader().Number.Uint64()) + 1; hs != headers {
t.Fatalf("synchronised headers mismatch: have %v, want %v", hs, headers)
}
Expand All @@ -404,9 +401,8 @@ func assertOwnChain(t *testing.T, tester *downloadTester, length int) {
}
}

func TestCanonicalSynchronisation68Full(t *testing.T) { testCanonSync(t, eth.ETH68, FullSync) }
func TestCanonicalSynchronisation68Snap(t *testing.T) { testCanonSync(t, eth.ETH68, SnapSync) }
func TestCanonicalSynchronisation68Light(t *testing.T) { testCanonSync(t, eth.ETH68, LightSync) }
func TestCanonicalSynchronisation68Full(t *testing.T) { testCanonSync(t, eth.ETH68, FullSync) }
func TestCanonicalSynchronisation68Snap(t *testing.T) { testCanonSync(t, eth.ETH68, SnapSync) }

func testCanonSync(t *testing.T, protocol uint, mode SyncMode) {
success := make(chan struct{})
Expand Down Expand Up @@ -511,9 +507,8 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) {
}

// Tests that a canceled download wipes all previously accumulated state.
func TestCancel68Full(t *testing.T) { testCancel(t, eth.ETH68, FullSync) }
func TestCancel68Snap(t *testing.T) { testCancel(t, eth.ETH68, SnapSync) }
func TestCancel68Light(t *testing.T) { testCancel(t, eth.ETH68, LightSync) }
func TestCancel68Full(t *testing.T) { testCancel(t, eth.ETH68, FullSync) }
func TestCancel68Snap(t *testing.T) { testCancel(t, eth.ETH68, SnapSync) }

func testCancel(t *testing.T, protocol uint, mode SyncMode) {
complete := make(chan struct{})
Expand Down Expand Up @@ -544,9 +539,8 @@ func testCancel(t *testing.T, protocol uint, mode SyncMode) {

// Tests that synchronisations behave well in multi-version protocol environments
// and not wreak havoc on other nodes in the network.
func TestMultiProtoSynchronisation68Full(t *testing.T) { testMultiProtoSync(t, eth.ETH68, FullSync) }
func TestMultiProtoSynchronisation68Snap(t *testing.T) { testMultiProtoSync(t, eth.ETH68, SnapSync) }
func TestMultiProtoSynchronisation68Light(t *testing.T) { testMultiProtoSync(t, eth.ETH68, LightSync) }
func TestMultiProtoSynchronisation68Full(t *testing.T) { testMultiProtoSync(t, eth.ETH68, FullSync) }
func TestMultiProtoSynchronisation68Snap(t *testing.T) { testMultiProtoSync(t, eth.ETH68, SnapSync) }

func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) {
complete := make(chan struct{})
Expand Down Expand Up @@ -584,9 +578,8 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) {

// Tests that if a block is empty (e.g. header only), no body request should be
// made, and instead the header should be assembled into a whole block in itself.
func TestEmptyShortCircuit68Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, FullSync) }
func TestEmptyShortCircuit68Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, SnapSync) }
func TestEmptyShortCircuit68Light(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, LightSync) }
func TestEmptyShortCircuit68Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, FullSync) }
func TestEmptyShortCircuit68Snap(t *testing.T) { testEmptyShortCircuit(t, eth.ETH68, SnapSync) }

func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
success := make(chan struct{})
Expand Down Expand Up @@ -625,7 +618,7 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) {
// Validate the number of block bodies that should have been requested
bodiesNeeded, receiptsNeeded := 0, 0
for _, block := range chain.blocks[1:] {
if mode != LightSync && (len(block.Transactions()) > 0 || len(block.Uncles()) > 0) {
if len(block.Transactions()) > 0 || len(block.Uncles()) > 0 {
bodiesNeeded++
}
}
Expand Down Expand Up @@ -700,9 +693,8 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) {

// Tests that synchronisation progress (origin block number, current block number
// and highest block number) is tracked and updated correctly.
func TestSyncProgress68Full(t *testing.T) { testSyncProgress(t, eth.ETH68, FullSync) }
func TestSyncProgress68Snap(t *testing.T) { testSyncProgress(t, eth.ETH68, SnapSync) }
func TestSyncProgress68Light(t *testing.T) { testSyncProgress(t, eth.ETH68, LightSync) }
func TestSyncProgress68Full(t *testing.T) { testSyncProgress(t, eth.ETH68, FullSync) }
func TestSyncProgress68Snap(t *testing.T) { testSyncProgress(t, eth.ETH68, SnapSync) }

func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
success := make(chan struct{})
Expand Down Expand Up @@ -740,17 +732,7 @@ func testSyncProgress(t *testing.T, protocol uint, mode SyncMode) {
if err := tester.downloader.BeaconSync(mode, chain.blocks[len(chain.blocks)-1].Header(), nil); err != nil {
t.Fatalf("failed to beacon-sync chain: %v", err)
}
var startingBlock uint64
if mode == LightSync {
// in light-sync mode:
// * the starting block is 0 on the second sync cycle because blocks
// are never downloaded.
// * The current/highest blocks reported in the progress reflect the
// current/highest header.
startingBlock = 0
} else {
startingBlock = uint64(len(chain.blocks)/2 - 1)
}
startingBlock := uint64(len(chain.blocks)/2 - 1)

select {
case <-success:
Expand Down
2 changes: 1 addition & 1 deletion eth/downloader/modes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (
)

func (mode SyncMode) IsValid() bool {
return mode >= FullSync && mode <= LightSync
return mode >= FullSync && mode < LightSync
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also get rid of downloader.LightSync constant IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are okay to remove the deprecated light flags as well then?

}

// String implements the stringer interface.
Expand Down
2 changes: 1 addition & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
return nil, errors.New("snap sync not supported with snapshots disabled")
}
// Construct the downloader (long sync)
h.downloader = downloader.New(config.Database, h.eventMux, h.chain, nil, h.removePeer, h.enableSyncedFeatures)
h.downloader = downloader.New(config.Database, h.eventMux, h.chain, h.removePeer, h.enableSyncedFeatures)

fetchTx := func(peer string, hashes []common.Hash) error {
p := h.peers.peer(peer)
Expand Down
Loading