From 1c555b2defdeb420f83361a73ddb6f5fdd45e0ee Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Mon, 23 Dec 2024 19:28:54 +0800 Subject: [PATCH 1/6] core, light: get rid of the dual mutexes (#18436) --- core/blockchain.go | 45 ++++++++++++++++++++-------------------- core/blockchain_test.go | 11 +++++----- light/lightchain.go | 26 +++++++++-------------- light/lightchain_test.go | 7 ++++--- 4 files changed, 42 insertions(+), 47 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index b6d328cc642b..6496abd2f9fa 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -149,7 +149,6 @@ type BlockChain struct { scope event.SubscriptionScope genesisBlock *types.Block - mu sync.RWMutex // global mutex for locking chain operations chainmu sync.RWMutex // blockchain insertion lock procmu sync.RWMutex // block processor lock @@ -411,8 +410,8 @@ func (bc *BlockChain) loadLastState() error { func (bc *BlockChain) SetHead(head uint64) error { log.Warn("Rewinding blockchain", "target", head) - bc.mu.Lock() - defer bc.mu.Unlock() + bc.chainmu.Lock() + defer bc.chainmu.Unlock() // Rewind the header chain, deleting all block bodies until then delFn := func(hash common.Hash, num uint64) { @@ -476,10 +475,10 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { return err } // If all checks out, manually set the head block - bc.mu.Lock() + bc.chainmu.Lock() bc.currentBlock.Store(block) headBlockGauge.Update(int64(block.NumberU64())) - bc.mu.Unlock() + bc.chainmu.Unlock() log.Info("Committed new head block", "number", block.Number(), "hash", hash) return nil @@ -598,8 +597,8 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { if err := bc.SetHead(0); err != nil { return err } - bc.mu.Lock() - defer bc.mu.Unlock() + bc.chainmu.Lock() + defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { @@ -672,8 +671,8 @@ func (bc *BlockChain) Export(w io.Writer) error { // ExportN writes a subset of the active chain to the given writer. func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { - bc.mu.RLock() - defer bc.mu.RUnlock() + bc.chainmu.RLock() + defer bc.chainmu.RUnlock() if first > last { return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last) @@ -690,7 +689,6 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { return err } } - return nil } @@ -1053,8 +1051,8 @@ const ( // Rollback is designed to remove a chain of links from the database that aren't // certain enough to be valid. func (bc *BlockChain) Rollback(chain []common.Hash) { - bc.mu.Lock() - defer bc.mu.Unlock() + bc.chainmu.Lock() + defer bc.chainmu.Unlock() for i := len(chain) - 1; i >= 0; i-- { hash := chain[i] @@ -1146,7 +1144,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // Update the head fast sync block if better - bc.mu.Lock() + bc.chainmu.Lock() head := blockChain[len(blockChain)-1] if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case currentFastBlock := bc.CurrentFastBlock() @@ -1158,7 +1156,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ headFastBlockGauge.Update(int64(head.NumberU64())) } } - bc.mu.Unlock() + bc.chainmu.Unlock() log.Info("Imported new block receipts", "count", stats.processed, @@ -1188,6 +1186,15 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e // WriteBlockWithState writes the block and all associated state to the database. func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) { + bc.chainmu.Lock() + defer bc.chainmu.Unlock() + + return bc.writeBlockWithState(block, receipts, state, tradingState, lendingState) +} + +// writeBlockWithState writes the block and all associated state to the database, +// but is expects the chain mutex to be held. +func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) { bc.wg.Add(1) defer bc.wg.Done() @@ -1197,9 +1204,6 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. return NonStatTy, consensus.ErrUnknownAncestor } // Make sure no inconsistent state is leaked during insertion - bc.mu.Lock() - defer bc.mu.Unlock() - currentBlock := bc.CurrentBlock() localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) externTd := new(big.Int).Add(block.Difficulty(), ptd) @@ -1710,7 +1714,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] proctime := time.Since(bstart) // Write the block to the chain and get the status. - status, err := bc.WriteBlockWithState(block, receipts, statedb, tradingState, lendingState) + status, err := bc.writeBlockWithState(block, receipts, statedb, tradingState, lendingState) t3 := time.Now() if err != nil { return i, events, coalescedLogs, err @@ -2061,7 +2065,7 @@ func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.L if bc.HasBlockAndFullState(block.Hash(), block.NumberU64()) { return events, coalescedLogs, nil } - status, err := bc.WriteBlockWithState(block, result.receipts, result.state, result.tradingState, result.lendingState) + status, err := bc.writeBlockWithState(block, result.receipts, result.state, result.tradingState, result.lendingState) if err != nil { return events, coalescedLogs, err @@ -2425,9 +2429,6 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i defer bc.wg.Done() whFunc := func(header *types.Header) error { - bc.mu.Lock() - defer bc.mu.Unlock() - _, err := bc.hc.WriteHeader(header) return err } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index de15d9ca6c5a..41283aa592aa 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -25,10 +25,9 @@ import ( "testing" "time" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" - "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus/ethash" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/state" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/core/vm" @@ -129,11 +128,11 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { blockchain.reportBlock(block, receipts, err) return err } - blockchain.mu.Lock() + blockchain.chainmu.Lock() WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash()))) rawdb.WriteBlock(blockchain.db, block) statedb.Commit(true) - blockchain.mu.Unlock() + blockchain.chainmu.Unlock() } return nil } @@ -147,10 +146,10 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error return err } // Manually insert the header into the database, but don't reorganise (allows subsequent testing) - blockchain.mu.Lock() + blockchain.chainmu.Lock() WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash))) rawdb.WriteHeader(blockchain.db, header) - blockchain.mu.Unlock() + blockchain.chainmu.Unlock() } return nil } diff --git a/light/lightchain.go b/light/lightchain.go index 9cbf95ddf9c6..7664750615a7 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -56,7 +56,6 @@ type LightChain struct { scope event.SubscriptionScope genesisBlock *types.Block - mu sync.RWMutex chainmu sync.RWMutex bodyCache *lru.Cache[common.Hash, *types.Body] @@ -159,8 +158,8 @@ func (lc *LightChain) loadLastState() error { // SetHead rewinds the local chain to a new head. Everything above the new // head will be deleted and the new one set. func (lc *LightChain) SetHead(head uint64) { - lc.mu.Lock() - defer lc.mu.Unlock() + lc.chainmu.Lock() + defer lc.chainmu.Unlock() lc.hc.SetHead(head, nil) lc.loadLastState() @@ -182,8 +181,8 @@ func (lc *LightChain) ResetWithGenesisBlock(genesis *types.Block) { // Dump the entire block chain and purge the caches lc.SetHead(0) - lc.mu.Lock() - defer lc.mu.Unlock() + lc.chainmu.Lock() + defer lc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain if err := core.WriteTd(lc.chainDb, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { @@ -297,8 +296,8 @@ func (lc *LightChain) Stop() { // Rollback is designed to remove a chain of links from the database that aren't // certain enough to be valid. func (lc *LightChain) Rollback(chain []common.Hash) { - lc.mu.Lock() - defer lc.mu.Unlock() + lc.chainmu.Lock() + defer lc.chainmu.Unlock() for i := len(chain) - 1; i >= 0; i-- { hash := chain[i] @@ -344,19 +343,13 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i // Make sure only one thread manipulates the chain at once lc.chainmu.Lock() - defer func() { - lc.chainmu.Unlock() - time.Sleep(time.Millisecond * 10) // ugly hack; do not hog chain lock in case syncing is CPU-limited by validation - }() + defer lc.chainmu.Unlock() lc.wg.Add(1) defer lc.wg.Done() var events []interface{} whFunc := func(header *types.Header) error { - lc.mu.Lock() - defer lc.mu.Unlock() - status, err := lc.hc.WriteHeader(header) switch status { @@ -450,11 +443,12 @@ func (lc *LightChain) SyncCht(ctx context.Context) bool { num := chtCount*CHTFrequencyClient - 1 header, err := GetHeaderByNumber(ctx, lc.odr, num) if header != nil && err == nil { - lc.mu.Lock() + lc.chainmu.Lock() + defer lc.chainmu.Unlock() + if lc.hc.CurrentHeader().Number.Uint64() < header.Number.Uint64() { lc.hc.SetCurrentHeader(header) } - lc.mu.Unlock() return true } } diff --git a/light/lightchain_test.go b/light/lightchain_test.go index 7d762e2e3f73..4733810e94d9 100644 --- a/light/lightchain_test.go +++ b/light/lightchain_test.go @@ -18,10 +18,11 @@ package light import ( "context" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "math/big" "testing" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" + "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus/ethash" "github.com/XinFinOrg/XDPoSChain/core" @@ -122,10 +123,10 @@ func testHeaderChainImport(chain []*types.Header, lightchain *LightChain) error return err } // Manually insert the header into the database, but don't reorganize (allows subsequent testing) - lightchain.mu.Lock() + lightchain.chainmu.Lock() core.WriteTd(lightchain.chainDb, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, lightchain.GetTdByHash(header.ParentHash))) rawdb.WriteHeader(lightchain.chainDb, header) - lightchain.mu.Unlock() + lightchain.chainmu.Unlock() } return nil } From d7e0e9d734a519835d2a461daf997ba09859045d Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Tue, 24 Dec 2024 15:09:18 +0800 Subject: [PATCH 2/6] core: write chain data in atomic way (#20287) --- core/blockchain.go | 302 ++++++++++++++++++------------- core/database_util.go | 5 - core/database_util_test.go | 4 +- core/headerchain.go | 159 +++++++++------- core/rawdb/accessors_chain.go | 130 ++++++++++++- core/rawdb/accessors_indexes.go | 56 ++++++ core/rawdb/accessors_metadata.go | 34 ++++ core/rawdb/schema.go | 33 ++++ light/lightchain.go | 28 ++- light/txpool.go | 5 +- 10 files changed, 548 insertions(+), 208 deletions(-) create mode 100644 core/rawdb/accessors_indexes.go create mode 100644 core/rawdb/accessors_metadata.go diff --git a/core/blockchain.go b/core/blockchain.go index 6496abd2f9fa..480fa606017b 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -413,12 +413,69 @@ func (bc *BlockChain) SetHead(head uint64) error { bc.chainmu.Lock() defer bc.chainmu.Unlock() + updateFn := func(db ethdb.KeyValueWriter, header *types.Header) { + // Rewind the block chain, ensuring we don't end up with a stateless head block + if currentBlock := bc.CurrentBlock(); currentBlock != nil && header.Number.Uint64() < currentBlock.NumberU64() { + newHeadBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) + if newHeadBlock == nil { + newHeadBlock = bc.genesisBlock + } else { + if _, err := state.New(newHeadBlock.Root(), bc.stateCache); err != nil { + // Rewound state missing, rolled back to before pivot, reset to genesis + newHeadBlock = bc.genesisBlock + } + } + rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash()) + + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of SetHead is from high + // to low, so it's safe the update in-memory markers directly. + bc.currentBlock.Store(newHeadBlock) + headBlockGauge.Update(int64(newHeadBlock.NumberU64())) + } + + // Rewind the fast block in a simpleton way to the target head + if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && header.Number.Uint64() < currentFastBlock.NumberU64() { + newHeadFastBlock := bc.GetBlock(header.Hash(), header.Number.Uint64()) + // If either blocks reached nil, reset to the genesis state + if newHeadFastBlock == nil { + newHeadFastBlock = bc.genesisBlock + } + rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash()) + + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of SetHead is from high + // to low, so it's safe the update in-memory markers directly. + bc.currentFastBlock.Store(newHeadFastBlock) + headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64())) + } + } + // Rewind the header chain, deleting all block bodies until then - delFn := func(hash common.Hash, num uint64) { - DeleteBody(bc.db, hash, num) + delFn := func(db ethdb.KeyValueWriter, hash common.Hash, num uint64) { + // Ignore the error here since light client won't hit this path + frozen, _ := bc.db.Ancients() + if num+1 <= frozen { + // Truncate all relative data(header, total difficulty, body, receipt + // and canonical hash) from ancient store. + if err := bc.db.TruncateAncients(num + 1); err != nil { + log.Crit("Failed to truncate ancient data", "number", num, "err", err) + } + + // Remove the hash <-> number mapping from the active store. + rawdb.DeleteHeaderNumber(db, hash) + } else { + // Remove relative body and receipts from the active store. + // The header, total difficulty and canonical hash will be + // removed in the hc.SetHead function. + rawdb.DeleteBody(db, hash, num) + rawdb.DeleteReceipts(db, hash, num) + } + // Todo(rjl493456442) txlookup, bloombits, etc } - bc.hc.SetHead(head, delFn) - currentHeader := bc.hc.CurrentHeader() + bc.hc.SetHead(head, updateFn, delFn) // Clear out any stale content from the caches bc.bodyCache.Purge() @@ -428,38 +485,6 @@ func (bc *BlockChain) SetHead(head uint64) error { bc.futureBlocks.Purge() bc.blocksHashCache.Purge() - // Rewind the block chain, ensuring we don't end up with a stateless head block - if currentBlock := bc.CurrentBlock(); currentBlock != nil && currentHeader.Number.Uint64() < currentBlock.NumberU64() { - bc.currentBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())) - headBlockGauge.Update(int64(currentHeader.Number.Uint64())) - } - if currentBlock := bc.CurrentBlock(); currentBlock != nil { - if _, err := state.New(currentBlock.Root(), bc.stateCache); err != nil { - // Rewound state missing, rolled back to before pivot, reset to genesis - bc.currentBlock.Store(bc.genesisBlock) - headBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) - } - } - // Rewind the fast block in a simpleton way to the target head - if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock != nil && currentHeader.Number.Uint64() < currentFastBlock.NumberU64() { - bc.currentFastBlock.Store(bc.GetBlock(currentHeader.Hash(), currentHeader.Number.Uint64())) - headFastBlockGauge.Update(int64(currentHeader.Number.Uint64())) - } - // If either blocks reached nil, reset to the genesis state - if currentBlock := bc.CurrentBlock(); currentBlock == nil { - bc.currentBlock.Store(bc.genesisBlock) - headBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) - } - if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock == nil { - bc.currentFastBlock.Store(bc.genesisBlock) - headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) - } - currentBlock := bc.CurrentBlock() - currentFastBlock := bc.CurrentFastBlock() - rawdb.WriteHeadBlockHash(bc.db, currentBlock.Hash()) - if err := WriteHeadFastBlockHash(bc.db, currentFastBlock.Hash()); err != nil { - log.Crit("Failed to reset head fast block", "err", err) - } return bc.loadLastState() } @@ -601,20 +626,22 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain - if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { - log.Crit("Failed to write genesis block TD", "err", err) + batch := bc.db.NewBatch() + rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()) + rawdb.WriteBlock(batch, genesis) + if err := batch.Write(); err != nil { + log.Crit("Failed to write genesis block", "err", err) } - rawdb.WriteBlock(bc.db, genesis) + bc.writeHeadBlock(genesis, false) + + // Last update all in-memory chain markers bc.genesisBlock = genesis - bc.insert(bc.genesisBlock, false) bc.currentBlock.Store(bc.genesisBlock) headBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) - bc.hc.SetGenesis(bc.genesisBlock.Header()) bc.hc.SetCurrentHeader(bc.genesisBlock.Header()) bc.currentFastBlock.Store(bc.genesisBlock) headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) - return nil } @@ -692,27 +719,45 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { return nil } -// insert injects a new head block into the current block chain. This method +// writeHeadBlock injects a new head block into the current block chain. This method // assumes that the block is indeed a true head. It will also reset the head // header and the head fast sync block to this very same block if they are older // or if they are on a different side chain. // // Note, this function assumes that the `mu` mutex is held! -func (bc *BlockChain) insert(block *types.Block, writeBlock bool) { - +func (bc *BlockChain) writeHeadBlock(block *types.Block, writeBlock bool) { blockHash := block.Hash() blockNumberU64 := block.NumberU64() // If the block is on a side chain or an unknown one, force other heads onto it too - updateHeads := GetCanonicalHash(bc.db, blockNumberU64) != blockHash + updateHeads := rawdb.ReadCanonicalHash(bc.db, blockNumberU64) != blockHash // Add the block to the canonical chain number scheme and mark as the head - rawdb.WriteCanonicalHash(bc.db, blockHash, blockNumberU64) - rawdb.WriteHeadBlockHash(bc.db, blockHash) + batch := bc.db.NewBatch() + rawdb.WriteCanonicalHash(batch, blockHash, blockNumberU64) + rawdb.WriteTxLookupEntries(batch, block) + rawdb.WriteHeadBlockHash(batch, blockHash) if writeBlock { - rawdb.WriteBlock(bc.db, block) + rawdb.WriteBlock(batch, block) + } + + // If the block is better than our head or is on a different chain, force update heads + if updateHeads { + rawdb.WriteHeadHeaderHash(batch, blockHash) + rawdb.WriteHeadFastBlockHash(batch, blockHash) + } + + // Flush the whole batch into the disk, exit the node if failed + if err := batch.Write(); err != nil { + log.Crit("Failed to update chain indexes and markers", "err", err) } + // Update all in-memory chain markers in the last step + if updateHeads { + bc.hc.SetCurrentHeader(block.Header()) + bc.currentFastBlock.Store(block) + headFastBlockGauge.Update(int64(blockNumberU64)) + } bc.currentBlock.Store(block) headBlockGauge.Update(int64(block.NumberU64())) @@ -723,17 +768,6 @@ func (bc *BlockChain) insert(block *types.Block, writeBlock bool) { engine.CacheNoneTIPSigningTxs(block.Header(), block.Transactions(), bc.GetReceiptsByHash(blockHash)) } } - - // If the block is better than our head or is on a different chain, force update heads - if updateHeads { - bc.hc.SetCurrentHeader(block.Header()) - - if err := WriteHeadFastBlockHash(bc.db, blockHash); err != nil { - log.Crit("Failed to insert head fast block hash", "err", err) - } - bc.currentFastBlock.Store(block) - headFastBlockGauge.Update(int64(block.NumberU64())) - } } // Genesis retrieves the chain's genesis block. @@ -1054,26 +1088,37 @@ func (bc *BlockChain) Rollback(chain []common.Hash) { bc.chainmu.Lock() defer bc.chainmu.Unlock() + batch := bc.db.NewBatch() for i := len(chain) - 1; i >= 0; i-- { hash := chain[i] + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of rollback is from high + // to low, so it's safe the update in-memory markers directly. currentHeader := bc.hc.CurrentHeader() if currentHeader.Hash() == hash { - bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)) + newHeadHeader := bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1) + rawdb.WriteHeadHeaderHash(batch, currentHeader.ParentHash) + bc.hc.SetCurrentHeader(newHeadHeader) } if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash { newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1) + rawdb.WriteHeadFastBlockHash(batch, currentFastBlock.ParentHash()) bc.currentFastBlock.Store(newFastBlock) - WriteHeadFastBlockHash(bc.db, newFastBlock.Hash()) headFastBlockGauge.Update(int64(newFastBlock.NumberU64())) } if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash { newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1) + rawdb.WriteHeadBlockHash(batch, currentBlock.ParentHash()) bc.currentBlock.Store(newBlock) headBlockGauge.Update(int64(newBlock.NumberU64())) - rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash()) } } + if err := batch.Write(); err != nil { + log.Crit("Failed to rollback chain markers", "err", err) + } + // TODO: Truncate ancient data which exceeds the current header. } // InsertReceiptChain attempts to complete an already existing header chain with @@ -1126,8 +1171,10 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ if err := WriteTxLookupEntries(batch, block); err != nil { return i, fmt.Errorf("failed to write lookup metadata: %v", err) } - stats.processed++ + // Write everything belongs to the blocks into the database. So that + // we can ensure all components of body is completed(body, receipts, + // tx indexes) if batch.ValueSize() >= ethdb.IdealBatchSize { if err := batch.Write(); err != nil { return 0, err @@ -1135,7 +1182,11 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ bytes += batch.ValueSize() batch.Reset() } + stats.processed++ } + // Write everything belongs to the blocks into the database. So that + // we can ensure all components of body is completed(body, receipts, + // tx indexes) if batch.ValueSize() > 0 { bytes += batch.ValueSize() if err := batch.Write(); err != nil { @@ -1177,10 +1228,12 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e bc.wg.Add(1) defer bc.wg.Done() - if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil { - return err + batch := bc.db.NewBatch() + rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) + rawdb.WriteBlock(batch, block) + if err := batch.Write(); err != nil { + log.Crit("Failed to write block into disk", "err", err) } - rawdb.WriteBlock(bc.db, block) return nil } @@ -1208,17 +1261,25 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) externTd := new(big.Int).Add(block.Difficulty(), ptd) - // Irrelevant of the canonical status, write the block itself to the database - if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil { - return NonStatTy, err - } - // Write other block data using a batch. - batch := bc.db.NewBatch() - rawdb.WriteBlock(batch, block) + // Irrelevant of the canonical status, write the block itself to the database. + // + // Note all the components of block(td, hash->number map, header, body, receipts) + // should be written atomically. BlockBatch is used for containing all components. + blockBatch := bc.db.NewBatch() + rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) + rawdb.WriteBlock(blockBatch, block) + rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) + rawdb.WritePreimages(blockBatch, state.Preimages()) + if err := blockBatch.Write(); err != nil { + log.Crit("Failed to write block into disk", "err", err) + } + // Commit all cached state changes into underlying memory database. root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) if err != nil { return NonStatTy, err } + triedb := bc.stateCache.TrieDB() + tradingRoot := common.Hash{} if tradingState != nil { tradingRoot, err = tradingState.Commit() @@ -1233,6 +1294,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. return NonStatTy, err } } + engine, _ := bc.Engine().(*XDPoS.XDPoS) var tradingTrieDb *trie.Database var tradingService utils.TradingService @@ -1248,7 +1310,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. lendingTrieDb = lendingService.GetStateCache().TrieDB() } } - triedb := bc.stateCache.TrieDB() + // If we're running an archive node, always flush if bc.cacheConfig.Disabled { if err := triedb.Commit(root, false); err != nil { @@ -1356,9 +1418,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. } } } - if err := WriteBlockReceipts(batch, block.Hash(), block.NumberU64(), receipts); err != nil { - return NonStatTy, err - } + // If the total difficulty is higher than our known, add it to the canonical chain // Second clause in the if statement reduces the vulnerability to selfish mining. // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf @@ -1368,24 +1428,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // Split same-difficulty blocks by number reorg = block.NumberU64() > currentBlock.NumberU64() } - - // This is the ETH fix. We shall ultimately have this workflow, - // but due to below code has diverged significantly between ETH and XDC, and current issue we have, - // it's best to have it in a different PR with more investigations. - // if reorg { - // // Write the positional metadata for transaction and receipt lookups - // if err := WriteTxLookupEntries(batch, block); err != nil { - // return NonStatTy, err - // } - // // Write hash preimages - // if err := WritePreimages(bc.db, block.NumberU64(), state.Preimages()); err != nil { - // return NonStatTy, err - // } - // } - // if err := batch.Write(); err != nil { - // return NonStatTy, err - // } - if reorg { // Reorganise the chain if the parent is not the head block if block.ParentHash() != currentBlock.Hash() { @@ -1393,26 +1435,15 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. return NonStatTy, err } } - // Write the positional metadata for transaction and receipt lookups - if err := WriteTxLookupEntries(batch, block); err != nil { - return NonStatTy, err - } - // Write hash preimages - if err := WritePreimages(bc.db, block.NumberU64(), state.Preimages()); err != nil { - return NonStatTy, err - } status = CanonStatTy } else { status = SideStatTy } - if err := batch.Write(); err != nil { - return NonStatTy, err - } // Set new head. if status == CanonStatTy { // WriteBlock has already been called, no need to write again - bc.insert(block, false) + bc.writeHeadBlock(block, false) // prepare set of masternodes for the next epoch if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) { err := bc.UpdateM1() @@ -2181,15 +2212,16 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { return logs } -// reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them -// to be part of the new canonical chain and accumulates potential missing transactions and post an -// event about them +// reorg takes two blocks, an old chain and a new chain and will reconstruct the +// blocks and inserts them to be part of the new canonical chain and accumulates +// potential missing transactions and post an event about them. func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { var ( newChain types.Blocks oldChain types.Blocks commonBlock *types.Block deletedTxs types.Transactions + addedTxs types.Transactions deletedLogs []*types.Log ) log.Warn("Reorg", "oldBlock hash", oldBlock.Hash().Hex(), "number", oldBlock.NumberU64(), "newBlock hash", newBlock.Hash().Hex(), "number", newBlock.NumberU64()) @@ -2238,6 +2270,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { return errors.New("invalid new chain") } } + // Ensure XDPoS engine committed block will be not reverted if xdpos, ok := bc.Engine().(*XDPoS.XDPoS); ok { latestCommittedBlock := xdpos.EngineV2.GetLatestCommittedBlockInfo() @@ -2263,6 +2296,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { } } } + // Ensure the user sees large reorgs if len(oldChain) > 0 && len(newChain) > 0 { logFn := log.Warn @@ -2277,16 +2311,16 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { } else { log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash()) } - // Insert the new chain, taking care of the proper incremental order - var addedTxs types.Transactions + + // Insert the new chain(except the head block(reverse order)), + // taking care of the proper incremental order. for i := len(newChain) - 1; i >= 0; i-- { // insert the block in the canonical way, re-writing history - bc.insert(newChain[i], true) - // write lookup entries for hash based transaction/receipt searches - if err := WriteTxLookupEntries(bc.db, newChain[i]); err != nil { - return err - } + bc.writeHeadBlock(newChain[i], true) + + // Collect the new added transactions. addedTxs = append(addedTxs, newChain[i].Transactions()...) + // prepare set of masternodes for the next epoch if bc.chainConfig.XDPoS != nil && ((newChain[i].NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) { err := bc.UpdateM1() @@ -2295,20 +2329,36 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { } } } - // calculate the difference between deleted and added transactions - diff := types.TxDifference(deletedTxs, addedTxs) - // When transactions get deleted from the database that means the - // receipts that were created in the fork must also be deleted - for _, tx := range diff { - DeleteTxLookupEntry(bc.db, tx.Hash()) + + // Delete useless indexes right now which includes the non-canonical + // transaction indexes, canonical chain indexes which above the head. + indexesBatch := bc.db.NewBatch() + for _, tx := range types.TxDifference(deletedTxs, addedTxs) { + rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash()) + } + // Delete any canonical number assignments above the new head + number := bc.CurrentBlock().NumberU64() + for i := number + 1; ; i++ { + hash := rawdb.ReadCanonicalHash(bc.db, i) + if hash == (common.Hash{}) { + break + } + rawdb.DeleteCanonicalHash(indexesBatch, i) + } + if err := indexesBatch.Write(); err != nil { + log.Crit("Failed to delete useless indexes", "err", err) } + // If any logs need to be fired, do it now. In theory we could avoid creating + // this goroutine if there are no events to fire, but realistcally that only + // ever happens if we're reorging empty blocks, which will only happen on idle + // networks where performance is not an issue either way. if len(deletedLogs) > 0 { go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs}) } if len(oldChain) > 0 { go func() { - for _, block := range oldChain { - bc.chainSideFeed.Send(ChainSideEvent{Block: block}) + for i := len(oldChain) - 1; i >= 0; i-- { + bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]}) } }() } diff --git a/core/database_util.go b/core/database_util.go index e65ac0ff8e2c..d25e06ac3a6f 100644 --- a/core/database_util.go +++ b/core/database_util.go @@ -487,11 +487,6 @@ func DeleteBlockReceipts(db DatabaseDeleter, hash common.Hash, number uint64) { db.Delete(append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)) } -// DeleteTxLookupEntry removes all transaction data associated with a hash. -func DeleteTxLookupEntry(db DatabaseDeleter, hash common.Hash) { - db.Delete(append(lookupPrefix, hash.Bytes()...)) -} - // PreimageTable returns a Database instance with the key prefix for preimage entries. func PreimageTable(db ethdb.Database) ethdb.Database { return rawdb.NewTable(db, preimagePrefix) diff --git a/core/database_util_test.go b/core/database_util_test.go index 1f29908b963f..0c1494ba527e 100644 --- a/core/database_util_test.go +++ b/core/database_util_test.go @@ -24,8 +24,8 @@ import ( "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" - "golang.org/x/crypto/sha3" "github.com/XinFinOrg/XDPoSChain/rlp" + "golang.org/x/crypto/sha3" ) // Tests block header storage and retrieval operations. @@ -304,7 +304,7 @@ func TestLookupStorage(t *testing.T) { } // Delete the transactions and check purge for i, tx := range txs { - DeleteTxLookupEntry(db, tx.Hash()) + rawdb.DeleteTxLookupEntry(db, tx.Hash()) if txn, _, _, _ := GetTransaction(db, tx.Hash()); txn != nil { t.Fatalf("tx #%d [%x]: deleted transaction returned: %v", i, tx.Hash(), txn) } diff --git a/core/headerchain.go b/core/headerchain.go index 51a876d2ecac..633803f2faab 100644 --- a/core/headerchain.go +++ b/core/headerchain.go @@ -45,6 +45,14 @@ const ( // HeaderChain implements the basic block header chain logic that is shared by // core.BlockChain and light.LightChain. It is not usable in itself, only as // a part of either structure. +// +// HeaderChain is responsible for maintaining the header chain including the +// header query and updating. +// +// The components maintained by headerchain includes: (1) total difficult +// (2) header (3) block hash -> number mapping (4) canonical number -> hash mapping +// and (5) head header flag. +// // It is not thread safe either, the encapsulating chain structures should do // the necessary mutex locking/unlocking. type HeaderChain struct { @@ -66,11 +74,8 @@ type HeaderChain struct { engine consensus.Engine } -// NewHeaderChain creates a new HeaderChain structure. -// -// getValidator should return the parent's validator -// procInterrupt points to the parent's interrupt semaphore -// wg points to the parent's shutdown wait group +// NewHeaderChain creates a new HeaderChain structure. ProcInterrupt points +// to the parent's interrupt semaphore. func NewHeaderChain(chainDb ethdb.Database, config *params.ChainConfig, engine consensus.Engine, procInterrupt func() bool) (*HeaderChain, error) { // Seed a fast but crypto originating random generator seed, err := crand.Int(crand.Reader, big.NewInt(math.MaxInt64)) @@ -143,41 +148,54 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er externTd := new(big.Int).Add(header.Difficulty, ptd) // Irrelevant of the canonical status, write the td and header to the database - if err := hc.WriteTd(hash, number, externTd); err != nil { - log.Crit("Failed to write header total difficulty", "err", err) + // + // Note all the components of header(td, hash->number index and header) should + // be written atomically. + headerBatch := hc.chainDb.NewBatch() + rawdb.WriteTd(headerBatch, hash, number, externTd) + rawdb.WriteHeader(headerBatch, header) + if err := headerBatch.Write(); err != nil { + log.Crit("Failed to write header into disk", "err", err) } - rawdb.WriteHeader(hc.chainDb, header) // If the total difficulty is higher than our known, add it to the canonical chain // Second clause in the if statement reduces the vulnerability to selfish mining. // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf if externTd.Cmp(localTd) > 0 || (externTd.Cmp(localTd) == 0 && mrand.Float64() < 0.5) { + // If the header can be added into canonical chain, adjust the + // header chain markers(canonical indexes and head header flag). + // + // Note all markers should be written atomically. + // Delete any canonical number assignments above the new head + markerBatch := hc.chainDb.NewBatch() for i := number + 1; ; i++ { - hash := GetCanonicalHash(hc.chainDb, i) + hash := rawdb.ReadCanonicalHash(hc.chainDb, i) if hash == (common.Hash{}) { break } - DeleteCanonicalHash(hc.chainDb, i) + rawdb.DeleteCanonicalHash(markerBatch, i) } + // Overwrite any stale canonical number assignments var ( headHash = header.ParentHash headNumber = header.Number.Uint64() - 1 headHeader = hc.GetHeader(headHash, headNumber) ) - for GetCanonicalHash(hc.chainDb, headNumber) != headHash { - rawdb.WriteCanonicalHash(hc.chainDb, headHash, headNumber) + for rawdb.ReadCanonicalHash(hc.chainDb, headNumber) != headHash { + rawdb.WriteCanonicalHash(markerBatch, headHash, headNumber) headHash = headHeader.ParentHash headNumber = headHeader.Number.Uint64() - 1 headHeader = hc.GetHeader(headHash, headNumber) } // Extend the canonical chain with the new header - rawdb.WriteCanonicalHash(hc.chainDb, hash, number) - if err := WriteHeadHeaderHash(hc.chainDb, hash); err != nil { - log.Crit("Failed to insert head header hash", "err", err) + rawdb.WriteCanonicalHash(markerBatch, hash, number) + rawdb.WriteHeadHeaderHash(markerBatch, hash) + if err := markerBatch.Write(); err != nil { + log.Crit("Failed to write header markers into disk", "err", err) } - + // Last step update all in-memory head header markers hc.currentHeaderHash = hash hc.currentHeader.Store(types.CopyHeader(header)) headHeaderGauge.Update(header.Number.Int64()) @@ -186,10 +204,9 @@ func (hc *HeaderChain) WriteHeader(header *types.Header) (status WriteStatus, er } else { status = SideStatTy } - + hc.tdCache.Add(hash, externTd) hc.headerCache.Add(hash, header) hc.numberCache.Add(hash, number) - return } @@ -328,16 +345,6 @@ func (hc *HeaderChain) GetTdByHash(hash common.Hash) *big.Int { return hc.GetTd(hash, hc.GetBlockNumber(hash)) } -// WriteTd stores a block's total difficulty into the database, also caching it -// along the way. -func (hc *HeaderChain) WriteTd(hash common.Hash, number uint64, td *big.Int) error { - if err := WriteTd(hc.chainDb, hash, number, td); err != nil { - return err - } - hc.tdCache.Add(hash, new(big.Int).Set(td)) - return nil -} - // GetHeader retrieves a block header from the database by hash and number, // caching it if found. func (hc *HeaderChain) GetHeader(hash common.Hash, number uint64) *types.Header { @@ -361,12 +368,13 @@ func (hc *HeaderChain) GetHeaderByHash(hash common.Hash) *types.Header { } // HasHeader checks if a block header is present in the database or not. +// In theory, if header is present in the database, all relative components +// like td and hash->number should be present too. func (hc *HeaderChain) HasHeader(hash common.Hash, number uint64) bool { if hc.numberCache.Contains(hash) || hc.headerCache.Contains(hash) { return true } - ok, _ := hc.chainDb.Has(headerKey(hash, number)) - return ok + return rawdb.HasHeader(hc.chainDb, hash, number) } // GetHeaderByNumber retrieves a block header from the database by number, @@ -390,58 +398,79 @@ func (hc *HeaderChain) CurrentHeader() *types.Header { return hc.currentHeader.Load().(*types.Header) } -// SetCurrentHeader sets the current head header of the canonical chain. +// SetCurrentHeader sets the in-memory head header marker of the canonical chan +// as the given header. func (hc *HeaderChain) SetCurrentHeader(head *types.Header) { - if err := WriteHeadHeaderHash(hc.chainDb, head.Hash()); err != nil { - log.Crit("Failed to insert head header hash", "err", err) - } - hc.currentHeader.Store(head) hc.currentHeaderHash = head.Hash() headHeaderGauge.Update(head.Number.Int64()) } -// DeleteCallback is a callback function that is called by SetHead before -// each header is deleted. -type DeleteCallback func(common.Hash, uint64) - -// SetHead rewinds the local chain to a new head. Everything above the new head -// will be deleted and the new one set. -func (hc *HeaderChain) SetHead(head uint64, delFn DeleteCallback) { - height := uint64(0) +type ( + // UpdateHeadBlocksCallback is a callback function that is called by SetHead + // before head header is updated. + UpdateHeadBlocksCallback func(ethdb.KeyValueWriter, *types.Header) - if hdr := hc.CurrentHeader(); hdr != nil { - height = hdr.Number.Uint64() - } + // DeleteBlockContentCallback is a callback function that is called by SetHead + // before each header is deleted. + DeleteBlockContentCallback func(ethdb.KeyValueWriter, common.Hash, uint64) +) +// SetHead rewinds the local chain to a new head. In the case of headers, everything +// above the new head will be deleted and the new one set. In the case of blocks +// though, the head may be further rewound if block bodies are missing (non-archive +// nodes after a fast sync). +func (hc *HeaderChain) SetHead(head uint64, updateFn UpdateHeadBlocksCallback, delFn DeleteBlockContentCallback) { + var ( + parentHash common.Hash + batch = hc.chainDb.NewBatch() + ) for hdr := hc.CurrentHeader(); hdr != nil && hdr.Number.Uint64() > head; hdr = hc.CurrentHeader() { - hash := hdr.Hash() - num := hdr.Number.Uint64() + hash, num := hdr.Hash(), hdr.Number.Uint64() + + // Rewind block chain to new head. + parent := hc.GetHeader(hdr.ParentHash, num-1) + if parent == nil { + parent = hc.genesisHeader + } + parentHash = hdr.ParentHash + // Notably, since geth has the possibility for setting the head to a low + // height which is even lower than ancient head. + // In order to ensure that the head is always no higher than the data in + // the database(ancient store or active store), we need to update head + // first then remove the relative data from the database. + // + // Update head first(head fast block, head full block) before deleting the data. + markerBatch := hc.chainDb.NewBatch() + if updateFn != nil { + updateFn(markerBatch, parent) + } + // Update head header then. + rawdb.WriteHeadHeaderHash(markerBatch, parentHash) + if err := markerBatch.Write(); err != nil { + log.Crit("Failed to update chain markers", "error", err) + } + hc.currentHeader.Store(parent) + hc.currentHeaderHash = parentHash + headHeaderGauge.Update(parent.Number.Int64()) + + // Remove the relative data from the database. if delFn != nil { - delFn(hash, num) + delFn(batch, hash, num) } - DeleteHeader(hc.chainDb, hash, num) - DeleteTd(hc.chainDb, hash, num) - hc.currentHeader.Store(hc.GetHeader(hdr.ParentHash, hdr.Number.Uint64()-1)) + // Rewind header chain to new head. + rawdb.DeleteHeader(batch, hash, num) + rawdb.DeleteTd(batch, hash, num) + rawdb.DeleteCanonicalHash(batch, num) } - // Roll back the canonical chain numbering - for i := height; i > head; i-- { - DeleteCanonicalHash(hc.chainDb, i) + // Flush all accumulated deletions. + if err := batch.Write(); err != nil { + log.Crit("Failed to rewind block", "error", err) } // Clear out any stale content from the caches hc.headerCache.Purge() hc.tdCache.Purge() hc.numberCache.Purge() - - if hc.CurrentHeader() == nil { - hc.currentHeader.Store(hc.genesisHeader) - } - hc.currentHeaderHash = hc.CurrentHeader().Hash() - headHeaderGauge.Update(hc.CurrentHeader().Number.Int64()) - - if err := WriteHeadHeaderHash(hc.chainDb, hc.currentHeaderHash); err != nil { - log.Crit("Failed to reset head header hash", "err", err) - } } // SetGenesis sets a new genesis block header for the chain diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 7073e8431a60..b8cb9cbd7650 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -24,12 +24,32 @@ import ( "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/params" "github.com/XinFinOrg/XDPoSChain/rlp" ) +// ReadCanonicalHash retrieves the hash assigned to a canonical block number. +func ReadCanonicalHash(db ethdb.Reader, number uint64) common.Hash { + data, _ := db.Ancient(freezerHashTable, number) + if len(data) == 0 { + data, _ = db.Get(headerHashKey(number)) + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + if len(data) == 0 { + data, _ = db.Ancient(freezerHashTable, number) + } + } + if len(data) == 0 { + return common.Hash{} + } + return common.BytesToHash(data) +} + // WriteCanonicalHash stores the hash assigned to a canonical block number. func WriteCanonicalHash(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { if err := db.Put(headerHashKey(number), hash.Bytes()); err != nil { @@ -37,6 +57,13 @@ func WriteCanonicalHash(db ethdb.KeyValueWriter, hash common.Hash, number uint64 } } +// DeleteCanonicalHash removes the number to hash canonical mapping. +func DeleteCanonicalHash(db ethdb.KeyValueWriter, number uint64) { + if err := db.Delete(headerHashKey(number)); err != nil { + log.Crit("Failed to delete number to hash mapping", "err", err) + } +} + // ReadHeaderNumber returns the header number assigned to a hash. func ReadHeaderNumber(db ethdb.KeyValueReader, hash common.Hash) *uint64 { data, _ := db.Get(headerNumberKey(hash)) @@ -56,6 +83,20 @@ func WriteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) } } +// DeleteHeaderNumber removes hash->number mapping. +func DeleteHeaderNumber(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Delete(headerNumberKey(hash)); err != nil { + log.Crit("Failed to delete hash to number mapping", "err", err) + } +} + +// WriteHeadHeaderHash stores the hash of the current canonical head header. +func WriteHeadHeaderHash(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Put(headHeaderKey, hash.Bytes()); err != nil { + log.Crit("Failed to store last header's hash", "err", err) + } +} + // WriteHeadBlockHash stores the head block's hash. func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { if err := db.Put(headBlockKey, hash.Bytes()); err != nil { @@ -63,10 +104,47 @@ func WriteHeadBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { } } +// WriteHeadFastBlockHash stores the hash of the current fast-sync head block. +func WriteHeadFastBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Put(headFastBlockKey, hash.Bytes()); err != nil { + log.Crit("Failed to store last fast block's hash", "err", err) + } +} + // ReadHeaderRLP retrieves a block header in its raw RLP database encoding. func ReadHeaderRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { - data, _ := db.Get(headerKey(number, hash)) - return data + // First try to look up the data in ancient database. Extra hash + // comparison is necessary since ancient database only maintains + // the canonical data. + data, _ := db.Ancient(freezerHeaderTable, number) + if len(data) > 0 && crypto.Keccak256Hash(data) == hash { + return data + } + // Then try to look up the data in leveldb. + data, _ = db.Get(headerKey(number, hash)) + if len(data) > 0 { + return data + } + // In the background freezer is moving data from leveldb to flatten files. + // So during the first check for ancient db, the data is not yet in there, + // but when we reach into leveldb, the data was already moved. That would + // result in a not found error. + data, _ = db.Ancient(freezerHeaderTable, number) + if len(data) > 0 && crypto.Keccak256Hash(data) == hash { + return data + } + return nil // Can't find the data anywhere. +} + +// HasHeader verifies the existence of a block header corresponding to the hash. +func HasHeader(db ethdb.Reader, hash common.Hash, number uint64) bool { + if has, err := db.Ancient(freezerHashTable, number); err == nil && common.BytesToHash(has) == hash { + return true + } + if has, err := db.Has(headerKey(number, hash)); !has || err != nil { + return false + } + return true } // ReadHeader retrieves the block header corresponding to the hash. @@ -104,6 +182,22 @@ func WriteHeader(db ethdb.KeyValueWriter, header *types.Header) { } } +// DeleteHeader removes all block header data associated with a hash. +func DeleteHeader(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { + deleteHeaderWithoutNumber(db, hash, number) + if err := db.Delete(headerNumberKey(hash)); err != nil { + log.Crit("Failed to delete hash to number mapping", "err", err) + } +} + +// deleteHeaderWithoutNumber removes only the block header but does not remove +// the hash to number mapping. +func deleteHeaderWithoutNumber(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { + if err := db.Delete(headerKey(number, hash)); err != nil { + log.Crit("Failed to delete header", "err", err) + } +} + // ReadBodyRLP retrieves the block body (transactions and uncles) in RLP encoding. func ReadBodyRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { // First try to look up the data in ancient database. Extra hash @@ -165,6 +259,31 @@ func WriteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64, body *t WriteBodyRLP(db, hash, number, data) } +// DeleteBody removes all block body data associated with a hash. +func DeleteBody(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { + if err := db.Delete(blockBodyKey(number, hash)); err != nil { + log.Crit("Failed to delete block body", "err", err) + } +} + +// WriteTd stores the total difficulty of a block into the database. +func WriteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64, td *big.Int) { + data, err := rlp.EncodeToBytes(td) + if err != nil { + log.Crit("Failed to RLP encode block total difficulty", "err", err) + } + if err := db.Put(headerTDKey(number, hash), data); err != nil { + log.Crit("Failed to store block total difficulty", "err", err) + } +} + +// DeleteTd removes all block total difficulty data associated with a hash. +func DeleteTd(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { + if err := db.Delete(headerTDKey(number, hash)); err != nil { + log.Crit("Failed to delete block total difficulty", "err", err) + } +} + // ReadReceiptsRLP retrieves all the transaction receipts belonging to a block in RLP encoding. func ReadReceiptsRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.RawValue { // First try to look up the data in ancient database. Extra hash @@ -270,6 +389,13 @@ func WriteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64, rec } } +// DeleteReceipts removes all receipt data associated with a block hash. +func DeleteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64) { + if err := db.Delete(blockReceiptsKey(number, hash)); err != nil { + log.Crit("Failed to delete block receipts", "err", err) + } +} + // storedReceiptRLP is the storage encoding of a receipt. // Re-definition in core/types/receipt.go. type storedReceiptRLP struct { diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go new file mode 100644 index 000000000000..edb263325705 --- /dev/null +++ b/core/rawdb/accessors_indexes.go @@ -0,0 +1,56 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/core/types" + "github.com/XinFinOrg/XDPoSChain/ethdb" + "github.com/XinFinOrg/XDPoSChain/log" + "github.com/XinFinOrg/XDPoSChain/rlp" +) + +type TxLookupEntry struct { + BlockHash common.Hash + BlockIndex uint64 + Index uint64 +} + +// WriteTxLookupEntries stores a positional metadata for every transaction from +// a block, enabling hash based transaction and receipt lookups. +func WriteTxLookupEntries(db ethdb.KeyValueWriter, block *types.Block) { + // Iterate over each transaction and encode its metadata + for i, tx := range block.Transactions() { + entry := TxLookupEntry{ + BlockHash: block.Hash(), + BlockIndex: block.NumberU64(), + Index: uint64(i), + } + data, err := rlp.EncodeToBytes(entry) + if err != nil { + log.Crit("Failed to RLP encode TxLookupEntry", "err", err) + } + if err := db.Put(txLookupKey(tx.Hash()), data); err != nil { + log.Crit("Failed to store tx lookup entry", "err", err) + } + } +} + +// DeleteTxLookupEntry removes all transaction data associated with a hash. +func DeleteTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash) { + db.Delete(txLookupKey(hash)) +} diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go new file mode 100644 index 000000000000..a56ab22d7b4c --- /dev/null +++ b/core/rawdb/accessors_metadata.go @@ -0,0 +1,34 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/ethdb" + "github.com/XinFinOrg/XDPoSChain/log" +) + +// WritePreimages writes the provided set of preimages to the database. +func WritePreimages(db ethdb.KeyValueWriter, preimages map[common.Hash][]byte) { + for hash, preimage := range preimages { + if err := db.Put(preimageKey(hash), preimage); err != nil { + log.Crit("Failed to store trie preimage", "err", err) + } + } + preimageCounter.Inc(int64(len(preimages))) + preimageHitCounter.Inc(int64(len(preimages))) +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 26d098347e74..a0a1852c7550 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -21,23 +21,41 @@ import ( "encoding/binary" "github.com/XinFinOrg/XDPoSChain/common" + "github.com/XinFinOrg/XDPoSChain/metrics" ) // The fields below define the low level database schema prefixing. var ( + // headHeaderKey tracks the latest known header's hash. + headHeaderKey = []byte("LastHeader") + // headBlockKey tracks the latest known full block's hash. headBlockKey = []byte("LastBlock") + // headFastBlockKey tracks the latest known incomplete block's hash during fast sync. + headFastBlockKey = []byte("LastFast") + // Data item prefixes (use single byte to avoid mixing data types, avoid `i`, used for indexes). headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header + headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td headerHashSuffix = []byte("n") // headerPrefix + num (uint64 big endian) + headerHashSuffix -> hash headerNumberPrefix = []byte("H") // headerNumberPrefix + hash -> num (uint64 big endian) blockBodyPrefix = []byte("b") // blockBodyPrefix + num (uint64 big endian) + hash -> block body blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts + + txLookupPrefix = []byte("l") // txLookupPrefix + hash -> transaction/receipt lookup metadata + + preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage + + preimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil) + preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil) ) const ( + // freezerHeaderTable indicates the name of the freezer header table. + freezerHeaderTable = "headers" + // freezerHashTable indicates the name of the freezer canonical hash table. freezerHashTable = "hashes" @@ -60,6 +78,11 @@ func headerKey(number uint64, hash common.Hash) []byte { return append(append(headerPrefix, encodeBlockNumber(number)...), hash.Bytes()...) } +// headerTDKey = headerPrefix + num (uint64 big endian) + hash + headerTDSuffix +func headerTDKey(number uint64, hash common.Hash) []byte { + return append(headerKey(number, hash), headerTDSuffix...) +} + // headerHashKey = headerPrefix + num (uint64 big endian) + headerHashSuffix func headerHashKey(number uint64) []byte { return append(append(headerPrefix, encodeBlockNumber(number)...), headerHashSuffix...) @@ -79,3 +102,13 @@ func blockBodyKey(number uint64, hash common.Hash) []byte { func blockReceiptsKey(number uint64, hash common.Hash) []byte { return append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...) } + +// txLookupKey = txLookupPrefix + hash +func txLookupKey(hash common.Hash) []byte { + return append(txLookupPrefix, hash.Bytes()...) +} + +// preimageKey = preimagePrefix + hash +func preimageKey(hash common.Hash) []byte { + return append(preimagePrefix, hash.Bytes()...) +} diff --git a/light/lightchain.go b/light/lightchain.go index 7664750615a7..4b90cf3cca7a 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -146,7 +146,6 @@ func (lc *LightChain) loadLastState() error { lc.hc.SetCurrentHeader(header) } } - // Issue a status log and return header := lc.hc.CurrentHeader() headerTd := lc.GetTd(header.Hash(), header.Number.Uint64()) @@ -161,7 +160,7 @@ func (lc *LightChain) SetHead(head uint64) { lc.chainmu.Lock() defer lc.chainmu.Unlock() - lc.hc.SetHead(head, nil) + lc.hc.SetHead(head, nil, nil) lc.loadLastState() } @@ -185,10 +184,13 @@ func (lc *LightChain) ResetWithGenesisBlock(genesis *types.Block) { defer lc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain - if err := core.WriteTd(lc.chainDb, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { - log.Crit("Failed to write genesis block TD", "err", err) + batch := lc.chainDb.NewBatch() + rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()) + rawdb.WriteBlock(batch, genesis) + rawdb.WriteHeadHeaderHash(batch, genesis.Hash()) + if err := batch.Write(); err != nil { + log.Crit("Failed to reset genesis block", "err", err) } - rawdb.WriteBlock(lc.chainDb, genesis) lc.genesisBlock = genesis lc.hc.SetGenesis(lc.genesisBlock.Header()) lc.hc.SetCurrentHeader(lc.genesisBlock.Header()) @@ -299,13 +301,22 @@ func (lc *LightChain) Rollback(chain []common.Hash) { lc.chainmu.Lock() defer lc.chainmu.Unlock() + batch := lc.chainDb.NewBatch() for i := len(chain) - 1; i >= 0; i-- { hash := chain[i] + // Degrade the chain markers if they are explicitly reverted. + // In theory we should update all in-memory markers in the + // last step, however the direction of rollback is from high + // to low, so it's safe the update in-memory markers directly. if head := lc.hc.CurrentHeader(); head.Hash() == hash { + rawdb.WriteHeadHeaderHash(batch, head.ParentHash) lc.hc.SetCurrentHeader(lc.GetHeader(head.ParentHash, head.Number.Uint64()-1)) } } + if err := batch.Write(); err != nil { + log.Crit("Failed to rollback light chain", "error", err) + } } // postChainEvents iterates over the events generated by a chain insertion and @@ -441,12 +452,15 @@ func (lc *LightChain) SyncCht(ctx context.Context) bool { chtCount, _, _ := lc.odr.ChtIndexer().Sections() if headNum+1 < chtCount*CHTFrequencyClient { num := chtCount*CHTFrequencyClient - 1 - header, err := GetHeaderByNumber(ctx, lc.odr, num) - if header != nil && err == nil { + // Retrieve the latest useful header and update to it + if header, err := GetHeaderByNumber(ctx, lc.odr, num); header != nil && err == nil { lc.chainmu.Lock() defer lc.chainmu.Unlock() + // Ensure the chain didn't move past the latest block while retrieving it if lc.hc.CurrentHeader().Number.Uint64() < header.Number.Uint64() { + log.Info("Updated latest header based on CHT", "number", header.Number, "hash", header.Hash()) + rawdb.WriteHeadHeaderHash(lc.chainDb, header.Hash()) lc.hc.SetCurrentHeader(header) } return true diff --git a/light/txpool.go b/light/txpool.go index a1d9190a3874..7a7bf619d7f3 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -25,6 +25,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/core" + "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/state" "github.com/XinFinOrg/XDPoSChain/core/txpool" "github.com/XinFinOrg/XDPoSChain/core/types" @@ -205,15 +206,17 @@ func (p *TxPool) checkMinedTxs(ctx context.Context, hash common.Hash, number uin // rollbackTxs marks the transactions contained in recently rolled back blocks // as rolled back. It also removes any positional lookup entries. func (p *TxPool) rollbackTxs(hash common.Hash, txc txStateChanges) { + batch := p.chainDb.NewBatch() if list, ok := p.mined[hash]; ok { for _, tx := range list { txHash := tx.Hash() - core.DeleteTxLookupEntry(p.chainDb, txHash) + rawdb.DeleteTxLookupEntry(batch, txHash) p.pending[txHash] = tx txc.setState(txHash, false) } delete(p.mined, hash) } + batch.Write() } // reorgOnNewHead sets a new head header, processing (and rolling back if necessary) From 2d5dc550d0c4daa648446c9de1b2dc797624116d Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Tue, 24 Dec 2024 17:36:11 +0800 Subject: [PATCH 3/6] core/rawdb: rename WriteTxLookupEntries to WriteTxLookupEntriesByBlock (#21480) --- core/blockchain.go | 2 +- core/rawdb/accessors_indexes.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 480fa606017b..d38b3eec6ace 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -735,7 +735,7 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block, writeBlock bool) { // Add the block to the canonical chain number scheme and mark as the head batch := bc.db.NewBatch() rawdb.WriteCanonicalHash(batch, blockHash, blockNumberU64) - rawdb.WriteTxLookupEntries(batch, block) + rawdb.WriteTxLookupEntriesByBlock(batch, block) rawdb.WriteHeadBlockHash(batch, blockHash) if writeBlock { rawdb.WriteBlock(batch, block) diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index edb263325705..7f9409875193 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -30,9 +30,9 @@ type TxLookupEntry struct { Index uint64 } -// WriteTxLookupEntries stores a positional metadata for every transaction from +// WriteTxLookupEntriesByBlock stores a positional metadata for every transaction from // a block, enabling hash based transaction and receipt lookups. -func WriteTxLookupEntries(db ethdb.KeyValueWriter, block *types.Block) { +func WriteTxLookupEntriesByBlock(db ethdb.KeyValueWriter, block *types.Block) { // Iterate over each transaction and encode its metadata for i, tx := range block.Transactions() { entry := TxLookupEntry{ From 2baadc6e8768607c4b33cdeb510c882575e3e98b Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Tue, 24 Dec 2024 23:30:33 +0800 Subject: [PATCH 4/6] core: improve shutdown synchronization in BlockChain (#22853) --- core/blockchain.go | 179 +++++++++++++++++++++++++++------------- core/blockchain_test.go | 4 +- internal/syncx/mutex.go | 64 ++++++++++++++ 3 files changed, 189 insertions(+), 58 deletions(-) create mode 100644 internal/syncx/mutex.go diff --git a/core/blockchain.go b/core/blockchain.go index d38b3eec6ace..be6281b6cabe 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -48,6 +48,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/ethclient" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" + "github.com/XinFinOrg/XDPoSChain/internal/syncx" "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/metrics" "github.com/XinFinOrg/XDPoSChain/params" @@ -81,6 +82,9 @@ var ( blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) + errInsertionInterrupted = errors.New("insertion is interrupted") + errChainStopped = errors.New("blockchain is stopped") + CheckpointCh = make(chan int) ) @@ -149,8 +153,11 @@ type BlockChain struct { scope event.SubscriptionScope genesisBlock *types.Block - chainmu sync.RWMutex // blockchain insertion lock - procmu sync.RWMutex // block processor lock + // This mutex synchronizes chain write operations. + // Readers don't need to take it, they can just read the database. + chainmu *syncx.ClosableMutex + + procmu sync.RWMutex // block processor lock currentBlock atomic.Value // Current head of the block chain currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) @@ -169,10 +176,10 @@ type BlockChain struct { // future blocks are blocks added for later processing futureBlocks *lru.Cache[common.Hash, *types.Block] - wg sync.WaitGroup // chain processing wait group for shutting down - quit chan struct{} // shutdown signal, closed in Stop. - running int32 // 0 if chain is running, 1 when stopped - procInterrupt int32 // interrupt signaler for block processing + wg sync.WaitGroup + quit chan struct{} // shutdown signal, closed in Stop. + running int32 // 0 if chain is running, 1 when stopped + procInterrupt int32 // interrupt signaler for block processing engine consensus.Engine processor Processor // block processor interface @@ -211,6 +218,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par triegc: prque.New[int64, common.Hash](nil), stateCache: state.NewDatabase(db), quit: make(chan struct{}), + chainmu: syncx.NewClosableMutex(), bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), receiptsCache: lru.NewCache[common.Hash, types.Receipts](receiptsCacheLimit), @@ -261,8 +269,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } } - // Take ownership of this particular state - go bc.update() + + // Start future block processor. + bc.wg.Add(1) + go bc.futureBlocksLoop() + return bc, nil } @@ -410,7 +421,9 @@ func (bc *BlockChain) loadLastState() error { func (bc *BlockChain) SetHead(head uint64) error { log.Warn("Rewinding blockchain", "target", head) - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return errChainStopped + } defer bc.chainmu.Unlock() updateFn := func(db ethdb.KeyValueWriter, header *types.Header) { @@ -499,8 +512,11 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB()); err != nil { return err } - // If all checks out, manually set the head block - bc.chainmu.Lock() + + // If all checks out, manually set the head block. + if !bc.chainmu.TryLock() { + return errChainStopped + } bc.currentBlock.Store(block) headBlockGauge.Update(int64(block.NumberU64())) bc.chainmu.Unlock() @@ -622,7 +638,9 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { if err := bc.SetHead(0); err != nil { return err } - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return errChainStopped + } defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain @@ -698,8 +716,10 @@ func (bc *BlockChain) Export(w io.Writer) error { // ExportN writes a subset of the active chain to the given writer. func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { - bc.chainmu.RLock() - defer bc.chainmu.RUnlock() + if !bc.chainmu.TryLock() { + return errChainStopped + } + defer bc.chainmu.Unlock() if first > last { return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last) @@ -1033,15 +1053,38 @@ func (bc *BlockChain) Stop() { if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { return } - // Unsubscribe all subscriptions registered from blockchain + + // Unsubscribe all subscriptions registered from blockchain. bc.scope.Close() + + // Signal shutdown to all goroutines. close(bc.quit) - atomic.StoreInt32(&bc.procInterrupt, 1) + bc.StopInsert() + + // Now wait for all chain modifications to end and persistent goroutines to exit. + // + // Note: Close waits for the mutex to become available, i.e. any running chain + // modification will have exited when Close returns. Since we also called StopInsert, + // the mutex should become available quickly. It cannot be taken again after Close has + // returned. + bc.chainmu.Close() bc.wg.Wait() bc.saveData() log.Info("Blockchain manager stopped") } +// StopInsert interrupts all insertion methods, causing them to return +// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after +// calling this method. +func (bc *BlockChain) StopInsert() { + atomic.StoreInt32(&bc.procInterrupt, 1) +} + +// insertStopped returns true after StopInsert has been called. +func (bc *BlockChain) insertStopped() bool { + return atomic.LoadInt32(&bc.procInterrupt) == 1 +} + func (bc *BlockChain) procFutureBlocks() { blocks := make([]*types.Block, 0, bc.futureBlocks.Len()) for _, hash := range bc.futureBlocks.Keys() { @@ -1085,7 +1128,9 @@ const ( // Rollback is designed to remove a chain of links from the database that aren't // certain enough to be valid. func (bc *BlockChain) Rollback(chain []common.Hash) { - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return + } defer bc.chainmu.Unlock() batch := bc.db.NewBatch() @@ -1124,6 +1169,8 @@ func (bc *BlockChain) Rollback(chain []common.Hash) { // InsertReceiptChain attempts to complete an already existing header chain with // transaction and receipt data. func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { + // We don't require the chainMu here since we want to maximize the + // concurrency of header insertion and receipt insertion. bc.wg.Add(1) defer bc.wg.Done() @@ -1195,7 +1242,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // Update the head fast sync block if better - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } head := blockChain[len(blockChain)-1] if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case currentFastBlock := bc.CurrentFastBlock() @@ -1221,12 +1270,13 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ var lastWrite uint64 -// WriteBlockWithoutState writes only the block and its metadata to the database, +// writeBlockWithoutState writes only the block and its metadata to the database, // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. -func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) { - bc.wg.Add(1) - defer bc.wg.Done() +func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) { + if bc.insertStopped() { + return errInsertionInterrupted + } batch := bc.db.NewBatch() rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) @@ -1239,17 +1289,19 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e // WriteBlockWithState writes the block and all associated state to the database. func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) { - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return NonStatTy, errInsertionInterrupted + } defer bc.chainmu.Unlock() - return bc.writeBlockWithState(block, receipts, state, tradingState, lendingState) } // writeBlockWithState writes the block and all associated state to the database, // but is expects the chain mutex to be held. func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) { - bc.wg.Add(1) - defer bc.wg.Done() + if bc.insertStopped() { + return NonStatTy, errInsertionInterrupted + } // Calculate the total difficulty of the block ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1) @@ -1470,38 +1522,51 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // // After insertion is done, all accumulated events will be fired. func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { - n, events, logs, err := bc.insertChain(chain, true) - bc.PostChainEvents(events, logs) - return n, err -} - -// insertChain will execute the actual chain insertion and event aggregation. The -// only reason this method exists as a separate one is to make locking cleaner -// with deferred statements. -func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { // Sanity check that we have something meaningful to import if len(chain) == 0 { - return 0, nil, nil, nil + return 0, nil } - engine, _ := bc.Engine().(*XDPoS.XDPoS) // Do a sanity check that the provided chain is actually ordered and linked for i := 1; i < len(chain); i++ { - if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { + block, prev := chain[i], chain[i-1] + if block.NumberU64() != chain[i-1].NumberU64()+1 || block.ParentHash() != chain[i-1].Hash() { // Chain broke ancestry, log a messge (programming error) and skip insertion - log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), - "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) + log.Error("Non contiguous block insert", + "number", block.Number(), + "hash", block.Hash(), + "parent", block.ParentHash(), + "prevnumber", prev.Number(), + "prevhash", prev.Hash()) - return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, chain[i-1].NumberU64(), - chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) + return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, prev.NumberU64(), + prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4]) } } - // Pre-checks passed, start the full block imports - bc.wg.Add(1) - defer bc.wg.Done() - bc.chainmu.Lock() + // Pre-check passed, start the full block imports. + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } defer bc.chainmu.Unlock() + n, events, logs, err := bc.insertChain(chain, true) + bc.PostChainEvents(events, logs) + return n, err +} + +// insertChain is the internal implementation of InsertChain, which assumes that +// 1) chains are contiguous, and 2) The chain mutex is held. +// +// This method is split out so that import batches that require re-injecting +// historical blocks can do so without releasing the lock, which could lead to +// racey behaviour. If a sidechain import is in progress, and the historic state +// is imported, but then new canon-head is added before the actual sidechain +// completes, then the historic state could be pruned again +func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { + // If the chain is terminating, don't even bother starting up. + if bc.insertStopped() { + return 0, nil, nil, nil + } // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex @@ -1578,7 +1643,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty()) if localTd.Cmp(externTd) > 0 { - if err = bc.WriteBlockWithoutState(block, externTd); err != nil { + if err = bc.writeBlockWithoutState(block, externTd); err != nil { return i, events, coalescedLogs, err } continue @@ -1596,10 +1661,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] } log.Debug("Number block need calculated again", "number", block.NumberU64(), "hash", block.Hash().Hex(), "winners", len(winner)) // Import all the pruned blocks to make the state available - bc.chainmu.Unlock() // During reorg, we use verifySeals=false _, evs, logs, err := bc.insertChain(winner, false) - bc.chainmu.Lock() events, coalescedLogs = evs, logs if err != nil { @@ -1628,6 +1691,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] var tradingService utils.TradingService var lendingService utils.LendingService isSDKNode := false + engine, _ := bc.Engine().(*XDPoS.XDPoS) if bc.Config().IsTIPXDCXReceiver(block.Number()) && bc.chainConfig.XDPoS != nil && engine != nil && block.NumberU64() > bc.chainConfig.XDPoS.Epoch { author, err := bc.Engine().Author(block.Header()) // Ignore error, we're past header validation if err != nil { @@ -2091,7 +2155,9 @@ func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.L bc.wg.Add(1) defer bc.wg.Done() // Write the block to the chain and get the status. - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return nil, nil, errChainStopped + } defer bc.chainmu.Unlock() if bc.HasBlockAndFullState(block.Hash(), block.NumberU64()) { return events, coalescedLogs, nil @@ -2387,7 +2453,10 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { } } -func (bc *BlockChain) update() { +// futureBlocksLoop processes the 'future block' queue. +func (bc *BlockChain) futureBlocksLoop() { + defer bc.wg.Done() + futureTimer := time.NewTicker(10 * time.Millisecond) defer futureTimer.Stop() for { @@ -2471,13 +2540,11 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i return i, err } - // Make sure only one thread manipulates the chain at once - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } defer bc.chainmu.Unlock() - bc.wg.Add(1) - defer bc.wg.Done() - whFunc := func(header *types.Header) error { _, err := bc.hc.WriteHeader(header) return err diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 41283aa592aa..31ba9bb4a701 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -128,7 +128,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { blockchain.reportBlock(block, receipts, err) return err } - blockchain.chainmu.Lock() + blockchain.chainmu.MustLock() WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash()))) rawdb.WriteBlock(blockchain.db, block) statedb.Commit(true) @@ -146,7 +146,7 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error return err } // Manually insert the header into the database, but don't reorganise (allows subsequent testing) - blockchain.chainmu.Lock() + blockchain.chainmu.MustLock() WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash))) rawdb.WriteHeader(blockchain.db, header) blockchain.chainmu.Unlock() diff --git a/internal/syncx/mutex.go b/internal/syncx/mutex.go new file mode 100644 index 000000000000..96a21986c60c --- /dev/null +++ b/internal/syncx/mutex.go @@ -0,0 +1,64 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package syncx contains exotic synchronization primitives. +package syncx + +// ClosableMutex is a mutex that can also be closed. +// Once closed, it can never be taken again. +type ClosableMutex struct { + ch chan struct{} +} + +func NewClosableMutex() *ClosableMutex { + ch := make(chan struct{}, 1) + ch <- struct{}{} + return &ClosableMutex{ch} +} + +// TryLock attempts to lock cm. +// If the mutex is closed, TryLock returns false. +func (cm *ClosableMutex) TryLock() bool { + _, ok := <-cm.ch + return ok +} + +// MustLock locks cm. +// If the mutex is closed, MustLock panics. +func (cm *ClosableMutex) MustLock() { + _, ok := <-cm.ch + if !ok { + panic("mutex closed") + } +} + +// Unlock unlocks cm. +func (cm *ClosableMutex) Unlock() { + select { + case cm.ch <- struct{}{}: + default: + panic("Unlock of already-unlocked ClosableMutex") + } +} + +// Close locks the mutex, then closes it. +func (cm *ClosableMutex) Close() { + _, ok := <-cm.ch + if !ok { + panic("Close of already-closed ClosableMutex") + } + close(cm.ch) +} From d03859d49e8297aed73958d9ec0a60aebf97d694 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Wed, 25 Dec 2024 11:02:44 +0800 Subject: [PATCH 5/6] core: update the fast block on chain import at the end of the fast sync phase (#23576) --- core/blockchain.go | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index be6281b6cabe..9fd7582960a8 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -749,11 +749,10 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block, writeBlock bool) { blockHash := block.Hash() blockNumberU64 := block.NumberU64() - // If the block is on a side chain or an unknown one, force other heads onto it too - updateHeads := rawdb.ReadCanonicalHash(bc.db, blockNumberU64) != blockHash - // Add the block to the canonical chain number scheme and mark as the head batch := bc.db.NewBatch() + rawdb.WriteHeadHeaderHash(batch, blockHash) + rawdb.WriteHeadFastBlockHash(batch, blockHash) rawdb.WriteCanonicalHash(batch, blockHash, blockNumberU64) rawdb.WriteTxLookupEntriesByBlock(batch, block) rawdb.WriteHeadBlockHash(batch, blockHash) @@ -761,23 +760,17 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block, writeBlock bool) { rawdb.WriteBlock(batch, block) } - // If the block is better than our head or is on a different chain, force update heads - if updateHeads { - rawdb.WriteHeadHeaderHash(batch, blockHash) - rawdb.WriteHeadFastBlockHash(batch, blockHash) - } - // Flush the whole batch into the disk, exit the node if failed if err := batch.Write(); err != nil { log.Crit("Failed to update chain indexes and markers", "err", err) } // Update all in-memory chain markers in the last step - if updateHeads { - bc.hc.SetCurrentHeader(block.Header()) - bc.currentFastBlock.Store(block) - headFastBlockGauge.Update(int64(blockNumberU64)) - } + bc.hc.SetCurrentHeader(block.Header()) + + bc.currentFastBlock.Store(block) + headFastBlockGauge.Update(int64(blockNumberU64)) + bc.currentBlock.Store(block) headBlockGauge.Update(int64(block.NumberU64())) From c8c39efb630a9b4c573722b7101f97268e4a4b29 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Wed, 25 Dec 2024 11:33:56 +0800 Subject: [PATCH 6/6] core: remove unnecessary log copy (#27475) --- core/blockchain.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 9fd7582960a8..66b6fdcddc67 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2263,9 +2263,10 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log { var logs []*types.Log for _, receipt := range receipts { for _, log := range receipt.Logs { - l := *log - l.Removed = removed - logs = append(logs, &l) + if removed { + log.Removed = true + } + logs = append(logs, log) } } return logs