Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, eth: merge snap-sync chain download progress logs #26676

Merged
merged 1 commit into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if stats.ignored > 0 {
context = append(context, []interface{}{"ignored", stats.ignored}...)
}
log.Info("Imported new block receipts", context...)
log.Debug("Imported new block receipts", context...)

return 0, nil
}
Expand Down
2 changes: 1 addition & 1 deletion core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (hc *HeaderChain) InsertHeaderChain(chain []*types.Header, start time.Time,
if res.ignored > 0 {
context = append(context, []interface{}{"ignored", res.ignored}...)
}
log.Info("Imported new block headers", context...)
log.Debug("Imported new block headers", context...)
return res.status, err
}

Expand Down
26 changes: 13 additions & 13 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash {
var data []byte
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
data, _ = reader.Ancient(chainFreezerHashTable, number)
data, _ = reader.Ancient(ChainFreezerHashTable, number)
if len(data) == 0 {
// Get it by hash from leveldb
data, _ = db.Get(headerHashKey(number))
Expand Down Expand Up @@ -334,7 +334,7 @@ func ReadHeaderRange(db ethdb.Reader, number uint64, count uint64) []rlp.RawValu
}
// read remaining from ancients
max := count * 700
data, err := db.AncientRange(chainFreezerHeaderTable, i+1-count, count, max)
data, err := db.AncientRange(ChainFreezerHeaderTable, i+1-count, count, max)
if err == nil && uint64(len(data)) == count {
// the data is on the order [h, h+1, .., n] -- reordering needed
for i := range data {
Expand All @@ -351,7 +351,7 @@ func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValu
// First try to look up the data in ancient database. Extra hash
// comparison is necessary since ancient database only maintains
// the canonical data.
data, _ = reader.Ancient(chainFreezerHeaderTable, number)
data, _ = reader.Ancient(ChainFreezerHeaderTable, number)
if len(data) > 0 && crypto.Keccak256Hash(data) == hash {
return nil
}
Expand Down Expand Up @@ -427,7 +427,7 @@ func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number
// isCanon is an internal utility method, to check whether the given number/hash
// is part of the ancient (canon) set.
func isCanon(reader ethdb.AncientReaderOp, number uint64, hash common.Hash) bool {
h, err := reader.Ancient(chainFreezerHashTable, number)
h, err := reader.Ancient(ChainFreezerHashTable, number)
if err != nil {
return false
}
Expand All @@ -443,7 +443,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(chainFreezerBodiesTable, number)
data, _ = reader.Ancient(ChainFreezerBodiesTable, number)
return nil
}
// If not, try reading from leveldb
Expand All @@ -458,7 +458,7 @@ func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue
func ReadCanonicalBodyRLP(db ethdb.Reader, number uint64) rlp.RawValue {
var data []byte
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
data, _ = reader.Ancient(chainFreezerBodiesTable, number)
data, _ = reader.Ancient(ChainFreezerBodiesTable, number)
if len(data) > 0 {
return nil
}
Expand Down Expand Up @@ -526,7 +526,7 @@ func ReadTdRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue {
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(chainFreezerDifficultyTable, number)
data, _ = reader.Ancient(ChainFreezerDifficultyTable, number)
return nil
}
// If not, try reading from leveldb
Expand Down Expand Up @@ -586,7 +586,7 @@ func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawVa
db.ReadAncients(func(reader ethdb.AncientReaderOp) error {
// Check if the data is in ancients
if isCanon(reader, number, hash) {
data, _ = reader.Ancient(chainFreezerReceiptTable, number)
data, _ = reader.Ancient(ChainFreezerReceiptTable, number)
return nil
}
// If not, try reading from leveldb
Expand Down Expand Up @@ -787,19 +787,19 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts

func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error {
num := block.NumberU64()
if err := op.AppendRaw(chainFreezerHashTable, num, block.Hash().Bytes()); err != nil {
if err := op.AppendRaw(ChainFreezerHashTable, num, block.Hash().Bytes()); err != nil {
return fmt.Errorf("can't add block %d hash: %v", num, err)
}
if err := op.Append(chainFreezerHeaderTable, num, header); err != nil {
if err := op.Append(ChainFreezerHeaderTable, num, header); err != nil {
return fmt.Errorf("can't append block header %d: %v", num, err)
}
if err := op.Append(chainFreezerBodiesTable, num, block.Body()); err != nil {
if err := op.Append(ChainFreezerBodiesTable, num, block.Body()); err != nil {
return fmt.Errorf("can't append block body %d: %v", num, err)
}
if err := op.Append(chainFreezerReceiptTable, num, receipts); err != nil {
if err := op.Append(ChainFreezerReceiptTable, num, receipts); err != nil {
return fmt.Errorf("can't append block %d receipts: %v", num, err)
}
if err := op.Append(chainFreezerDifficultyTable, num, td); err != nil {
if err := op.Append(ChainFreezerDifficultyTable, num, td); err != nil {
return fmt.Errorf("can't append block %d total difficulty: %v", num, err)
}
return nil
Expand Down
30 changes: 15 additions & 15 deletions core/rawdb/ancient_scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,30 @@ package rawdb

// The list of table names of chain freezer.
const (
// chainFreezerHeaderTable indicates the name of the freezer header table.
chainFreezerHeaderTable = "headers"
// ChainFreezerHeaderTable indicates the name of the freezer header table.
ChainFreezerHeaderTable = "headers"

// chainFreezerHashTable indicates the name of the freezer canonical hash table.
chainFreezerHashTable = "hashes"
// ChainFreezerHashTable indicates the name of the freezer canonical hash table.
ChainFreezerHashTable = "hashes"

// chainFreezerBodiesTable indicates the name of the freezer block body table.
chainFreezerBodiesTable = "bodies"
// ChainFreezerBodiesTable indicates the name of the freezer block body table.
ChainFreezerBodiesTable = "bodies"

// chainFreezerReceiptTable indicates the name of the freezer receipts table.
chainFreezerReceiptTable = "receipts"
// ChainFreezerReceiptTable indicates the name of the freezer receipts table.
ChainFreezerReceiptTable = "receipts"

// chainFreezerDifficultyTable indicates the name of the freezer total difficulty table.
chainFreezerDifficultyTable = "diffs"
// ChainFreezerDifficultyTable indicates the name of the freezer total difficulty table.
ChainFreezerDifficultyTable = "diffs"
)

// chainFreezerNoSnappy configures whether compression is disabled for the ancient-tables.
// Hashes and difficulties don't compress well.
var chainFreezerNoSnappy = map[string]bool{
chainFreezerHeaderTable: false,
chainFreezerHashTable: true,
chainFreezerBodiesTable: false,
chainFreezerReceiptTable: false,
chainFreezerDifficultyTable: true,
ChainFreezerHeaderTable: false,
ChainFreezerHashTable: true,
ChainFreezerBodiesTable: false,
ChainFreezerReceiptTable: false,
ChainFreezerDifficultyTable: true,
}

// The list of identifiers of ancient stores.
Expand Down
10 changes: 5 additions & 5 deletions core/rawdb/chain_freezer.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,19 +280,19 @@ func (f *chainFreezer) freezeRange(nfdb *nofreezedb, number, limit uint64) (hash
}

// Write to the batch.
if err := op.AppendRaw(chainFreezerHashTable, number, hash[:]); err != nil {
if err := op.AppendRaw(ChainFreezerHashTable, number, hash[:]); err != nil {
return fmt.Errorf("can't write hash to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerHeaderTable, number, header); err != nil {
if err := op.AppendRaw(ChainFreezerHeaderTable, number, header); err != nil {
return fmt.Errorf("can't write header to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerBodiesTable, number, body); err != nil {
if err := op.AppendRaw(ChainFreezerBodiesTable, number, body); err != nil {
return fmt.Errorf("can't write body to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerReceiptTable, number, receipts); err != nil {
if err := op.AppendRaw(ChainFreezerReceiptTable, number, receipts); err != nil {
return fmt.Errorf("can't write receipts to Freezer: %v", err)
}
if err := op.AppendRaw(chainFreezerDifficultyTable, number, td); err != nil {
if err := op.AppendRaw(ChainFreezerDifficultyTable, number, td); err != nil {
return fmt.Errorf("can't write td to Freezer: %v", err)
}

Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/chain_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func InitDatabaseFromFreezer(db ethdb.Database) {
if i+count > frozen {
count = frozen - i
}
data, err := db.AncientRange(chainFreezerHashTable, i, count, 32*count)
data, err := db.AncientRange(ChainFreezerHashTable, i, count, 32*count)
if err != nil {
log.Crit("Failed to init database from freezer", "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/rawdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func NewDatabaseWithFreezer(db ethdb.KeyValueStore, ancient string, namespace st
// If the freezer already contains something, ensure that the genesis blocks
// match, otherwise we might mix up freezers across chains and destroy both
// the freezer and the key-value store.
frgenesis, err := frdb.Ancient(chainFreezerHashTable, 0)
frgenesis, err := frdb.Ancient(ChainFreezerHashTable, 0)
if err != nil {
return nil, fmt.Errorf("failed to retrieve genesis from ancient %v", err)
} else if !bytes.Equal(kvgenesis, frgenesis) {
Expand Down
65 changes: 64 additions & 1 deletion eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ type Downloader struct {
bodyFetchHook func([]*types.Header) // Method to call upon starting a block body fetch
receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)

// Progress reporting metrics
syncStartBlock uint64 // Head snap block when Geth was started
syncStartTime time.Time // Time instance when chain sync started
syncLogTime time.Time // Time instance when status was last reported
}

// LightChain encapsulates functions required to synchronise a light chain.
Expand Down Expand Up @@ -231,7 +236,9 @@ func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain Bl
quitCh: make(chan struct{}),
SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()),
stateSyncStart: make(chan *stateSync),
syncStartBlock: chain.CurrentFastBlock().NumberU64(),
}
// Create the post-merge skeleton syncer and start the process
dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success))

go dl.stateFetcher()
Expand Down Expand Up @@ -1614,6 +1621,7 @@ func (d *Downloader) processSnapSyncContent() error {
if len(results) == 0 {
// If pivot sync is done, stop
if oldPivot == nil {
d.reportSnapSyncProgress(true)
return sync.Cancel()
}
// If sync failed, stop
Expand All @@ -1627,6 +1635,8 @@ func (d *Downloader) processSnapSyncContent() error {
if d.chainInsertHook != nil {
d.chainInsertHook(results)
}
d.reportSnapSyncProgress(false)

// If we haven't downloaded the pivot block yet, check pivot staleness
// notifications from the header downloader
d.pivotLock.RLock()
Expand Down Expand Up @@ -1739,7 +1749,7 @@ func (d *Downloader) commitSnapSyncData(results []*fetchResult, stateSync *state
}
default:
}
// Retrieve the a batch of results to import
// Retrieve the batch of results to import
first, last := results[0].Header, results[len(results)-1].Header
log.Debug("Inserting snap-sync blocks", "items", len(results),
"firstnum", first.Number, "firsthash", first.Hash(),
Expand Down Expand Up @@ -1820,3 +1830,56 @@ func (d *Downloader) readHeaderRange(last *types.Header, count int) []*types.Hea
}
return headers
}

// reportSnapSyncProgress calculates various status reports and provides it to the user.
func (d *Downloader) reportSnapSyncProgress(force bool) {
// Initialize the sync start time if it's the first time we're reporting
if d.syncStartTime.IsZero() {
d.syncStartTime = time.Now().Add(-time.Millisecond) // -1ms offset to avoid division by zero
}
// Don't report all the events, just occasionally
if !force && time.Since(d.syncLogTime) < 8*time.Second {
return
}
// Don't report anything until we have a meaningful progress
var (
headerBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerHeaderTable)
bodyBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerBodiesTable)
receiptBytes, _ = d.stateDB.AncientSize(rawdb.ChainFreezerReceiptTable)
karalabe marked this conversation as resolved.
Show resolved Hide resolved
)
syncedBytes := common.StorageSize(headerBytes + bodyBytes + receiptBytes)
if syncedBytes == 0 {
return
}
var (
header = d.blockchain.CurrentHeader()
block = d.blockchain.CurrentFastBlock()
)
syncedBlocks := block.NumberU64() - d.syncStartBlock
if syncedBlocks == 0 {
return
}
// Retrieve the current chain head and calculate the ETA
latest, _, err := d.skeleton.Bounds()
if err != nil {
// We're going to cheat for non-merged networks, but that's fine
latest = d.pivotHeader
}
if latest == nil {
// This should really never happen, but add some defensive code for now.
// TODO(karalabe): Remove it eventually if we don't see it blow.
log.Error("Nil latest block in sync progress report")
return
}
var (
karalabe marked this conversation as resolved.
Show resolved Hide resolved
left = latest.Number.Uint64() - block.NumberU64()
karalabe marked this conversation as resolved.
Show resolved Hide resolved
eta = time.Since(d.syncStartTime) / time.Duration(syncedBlocks) * time.Duration(left)

progress = fmt.Sprintf("%.2f%%", float64(block.NumberU64())*100/float64(latest.Number.Uint64()))
headers = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(header.Number.Uint64()), common.StorageSize(headerBytes).TerminalString())
bodies = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.NumberU64()), common.StorageSize(bodyBytes).TerminalString())
receipts = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(block.NumberU64()), common.StorageSize(receiptBytes).TerminalString())
)
log.Info("Syncing: chain download in progress", "synced", progress, "chain", syncedBytes, "headers", headers, "bodies", bodies, "receipts", receipts, "eta", common.PrettyDuration(eta))
d.syncLogTime = time.Now()
}
9 changes: 5 additions & 4 deletions eth/downloader/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type queue struct {
active *sync.Cond
closed bool

lastStatLog time.Time
logTime time.Time // Time instance when status was last reported
}

// newQueue creates a new download queue for scheduling block retrieval.
Expand Down Expand Up @@ -390,11 +390,12 @@ func (q *queue) Results(block bool) []*fetchResult {
}
}
// Log some info at certain times
if time.Since(q.lastStatLog) > 60*time.Second {
q.lastStatLog = time.Now()
if time.Since(q.logTime) >= 60*time.Second {
q.logTime = time.Now()

info := q.Stats()
info = append(info, "throttle", throttleThreshold)
log.Info("Downloader queue stats", info...)
log.Debug("Downloader queue stats", info...)
}
return results
}
Expand Down