diff --git a/core/blockchain.go b/core/blockchain.go index d38b3eec6ace..be6281b6cabe 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -48,6 +48,7 @@ import ( "github.com/XinFinOrg/XDPoSChain/ethclient" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" + "github.com/XinFinOrg/XDPoSChain/internal/syncx" "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/metrics" "github.com/XinFinOrg/XDPoSChain/params" @@ -81,6 +82,9 @@ var ( blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil) blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil) + errInsertionInterrupted = errors.New("insertion is interrupted") + errChainStopped = errors.New("blockchain is stopped") + CheckpointCh = make(chan int) ) @@ -149,8 +153,11 @@ type BlockChain struct { scope event.SubscriptionScope genesisBlock *types.Block - chainmu sync.RWMutex // blockchain insertion lock - procmu sync.RWMutex // block processor lock + // This mutex synchronizes chain write operations. + // Readers don't need to take it, they can just read the database. + chainmu *syncx.ClosableMutex + + procmu sync.RWMutex // block processor lock currentBlock atomic.Value // Current head of the block chain currentFastBlock atomic.Value // Current head of the fast-sync chain (may be above the block chain!) @@ -169,10 +176,10 @@ type BlockChain struct { // future blocks are blocks added for later processing futureBlocks *lru.Cache[common.Hash, *types.Block] - wg sync.WaitGroup // chain processing wait group for shutting down - quit chan struct{} // shutdown signal, closed in Stop. - running int32 // 0 if chain is running, 1 when stopped - procInterrupt int32 // interrupt signaler for block processing + wg sync.WaitGroup + quit chan struct{} // shutdown signal, closed in Stop. + running int32 // 0 if chain is running, 1 when stopped + procInterrupt int32 // interrupt signaler for block processing engine consensus.Engine processor Processor // block processor interface @@ -211,6 +218,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par triegc: prque.New[int64, common.Hash](nil), stateCache: state.NewDatabase(db), quit: make(chan struct{}), + chainmu: syncx.NewClosableMutex(), bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit), bodyRLPCache: lru.NewCache[common.Hash, rlp.RawValue](bodyCacheLimit), receiptsCache: lru.NewCache[common.Hash, types.Receipts](receiptsCacheLimit), @@ -261,8 +269,11 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par } } } - // Take ownership of this particular state - go bc.update() + + // Start future block processor. + bc.wg.Add(1) + go bc.futureBlocksLoop() + return bc, nil } @@ -410,7 +421,9 @@ func (bc *BlockChain) loadLastState() error { func (bc *BlockChain) SetHead(head uint64) error { log.Warn("Rewinding blockchain", "target", head) - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return errChainStopped + } defer bc.chainmu.Unlock() updateFn := func(db ethdb.KeyValueWriter, header *types.Header) { @@ -499,8 +512,11 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error { if _, err := trie.NewSecure(block.Root(), bc.stateCache.TrieDB()); err != nil { return err } - // If all checks out, manually set the head block - bc.chainmu.Lock() + + // If all checks out, manually set the head block. + if !bc.chainmu.TryLock() { + return errChainStopped + } bc.currentBlock.Store(block) headBlockGauge.Update(int64(block.NumberU64())) bc.chainmu.Unlock() @@ -622,7 +638,9 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { if err := bc.SetHead(0); err != nil { return err } - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return errChainStopped + } defer bc.chainmu.Unlock() // Prepare the genesis block and reinitialise the chain @@ -698,8 +716,10 @@ func (bc *BlockChain) Export(w io.Writer) error { // ExportN writes a subset of the active chain to the given writer. func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { - bc.chainmu.RLock() - defer bc.chainmu.RUnlock() + if !bc.chainmu.TryLock() { + return errChainStopped + } + defer bc.chainmu.Unlock() if first > last { return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last) @@ -1033,15 +1053,38 @@ func (bc *BlockChain) Stop() { if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) { return } - // Unsubscribe all subscriptions registered from blockchain + + // Unsubscribe all subscriptions registered from blockchain. bc.scope.Close() + + // Signal shutdown to all goroutines. close(bc.quit) - atomic.StoreInt32(&bc.procInterrupt, 1) + bc.StopInsert() + + // Now wait for all chain modifications to end and persistent goroutines to exit. + // + // Note: Close waits for the mutex to become available, i.e. any running chain + // modification will have exited when Close returns. Since we also called StopInsert, + // the mutex should become available quickly. It cannot be taken again after Close has + // returned. + bc.chainmu.Close() bc.wg.Wait() bc.saveData() log.Info("Blockchain manager stopped") } +// StopInsert interrupts all insertion methods, causing them to return +// errInsertionInterrupted as soon as possible. Insertion is permanently disabled after +// calling this method. +func (bc *BlockChain) StopInsert() { + atomic.StoreInt32(&bc.procInterrupt, 1) +} + +// insertStopped returns true after StopInsert has been called. +func (bc *BlockChain) insertStopped() bool { + return atomic.LoadInt32(&bc.procInterrupt) == 1 +} + func (bc *BlockChain) procFutureBlocks() { blocks := make([]*types.Block, 0, bc.futureBlocks.Len()) for _, hash := range bc.futureBlocks.Keys() { @@ -1085,7 +1128,9 @@ const ( // Rollback is designed to remove a chain of links from the database that aren't // certain enough to be valid. func (bc *BlockChain) Rollback(chain []common.Hash) { - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return + } defer bc.chainmu.Unlock() batch := bc.db.NewBatch() @@ -1124,6 +1169,8 @@ func (bc *BlockChain) Rollback(chain []common.Hash) { // InsertReceiptChain attempts to complete an already existing header chain with // transaction and receipt data. func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) { + // We don't require the chainMu here since we want to maximize the + // concurrency of header insertion and receipt insertion. bc.wg.Add(1) defer bc.wg.Done() @@ -1195,7 +1242,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ } // Update the head fast sync block if better - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } head := blockChain[len(blockChain)-1] if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case currentFastBlock := bc.CurrentFastBlock() @@ -1221,12 +1270,13 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ var lastWrite uint64 -// WriteBlockWithoutState writes only the block and its metadata to the database, +// writeBlockWithoutState writes only the block and its metadata to the database, // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. -func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (err error) { - bc.wg.Add(1) - defer bc.wg.Done() +func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) { + if bc.insertStopped() { + return errInsertionInterrupted + } batch := bc.db.NewBatch() rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) @@ -1239,17 +1289,19 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e // WriteBlockWithState writes the block and all associated state to the database. func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) { - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return NonStatTy, errInsertionInterrupted + } defer bc.chainmu.Unlock() - return bc.writeBlockWithState(block, receipts, state, tradingState, lendingState) } // writeBlockWithState writes the block and all associated state to the database, // but is expects the chain mutex to be held. func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) { - bc.wg.Add(1) - defer bc.wg.Done() + if bc.insertStopped() { + return NonStatTy, errInsertionInterrupted + } // Calculate the total difficulty of the block ptd := bc.GetTd(block.ParentHash(), block.NumberU64()-1) @@ -1470,38 +1522,51 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // // After insertion is done, all accumulated events will be fired. func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { - n, events, logs, err := bc.insertChain(chain, true) - bc.PostChainEvents(events, logs) - return n, err -} - -// insertChain will execute the actual chain insertion and event aggregation. The -// only reason this method exists as a separate one is to make locking cleaner -// with deferred statements. -func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { // Sanity check that we have something meaningful to import if len(chain) == 0 { - return 0, nil, nil, nil + return 0, nil } - engine, _ := bc.Engine().(*XDPoS.XDPoS) // Do a sanity check that the provided chain is actually ordered and linked for i := 1; i < len(chain); i++ { - if chain[i].NumberU64() != chain[i-1].NumberU64()+1 || chain[i].ParentHash() != chain[i-1].Hash() { + block, prev := chain[i], chain[i-1] + if block.NumberU64() != chain[i-1].NumberU64()+1 || block.ParentHash() != chain[i-1].Hash() { // Chain broke ancestry, log a messge (programming error) and skip insertion - log.Error("Non contiguous block insert", "number", chain[i].Number(), "hash", chain[i].Hash(), - "parent", chain[i].ParentHash(), "prevnumber", chain[i-1].Number(), "prevhash", chain[i-1].Hash()) + log.Error("Non contiguous block insert", + "number", block.Number(), + "hash", block.Hash(), + "parent", block.ParentHash(), + "prevnumber", prev.Number(), + "prevhash", prev.Hash()) - return 0, nil, nil, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, chain[i-1].NumberU64(), - chain[i-1].Hash().Bytes()[:4], i, chain[i].NumberU64(), chain[i].Hash().Bytes()[:4], chain[i].ParentHash().Bytes()[:4]) + return 0, fmt.Errorf("non contiguous insert: item %d is #%d [%x..], item %d is #%d [%x..] (parent [%x..])", i-1, prev.NumberU64(), + prev.Hash().Bytes()[:4], i, block.NumberU64(), block.Hash().Bytes()[:4], block.ParentHash().Bytes()[:4]) } } - // Pre-checks passed, start the full block imports - bc.wg.Add(1) - defer bc.wg.Done() - bc.chainmu.Lock() + // Pre-check passed, start the full block imports. + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } defer bc.chainmu.Unlock() + n, events, logs, err := bc.insertChain(chain, true) + bc.PostChainEvents(events, logs) + return n, err +} + +// insertChain is the internal implementation of InsertChain, which assumes that +// 1) chains are contiguous, and 2) The chain mutex is held. +// +// This method is split out so that import batches that require re-injecting +// historical blocks can do so without releasing the lock, which could lead to +// racey behaviour. If a sidechain import is in progress, and the historic state +// is imported, but then new canon-head is added before the actual sidechain +// completes, then the historic state could be pruned again +func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []interface{}, []*types.Log, error) { + // If the chain is terminating, don't even bother starting up. + if bc.insertStopped() { + return 0, nil, nil, nil + } // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex @@ -1578,7 +1643,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) externTd := new(big.Int).Add(bc.GetTd(block.ParentHash(), block.NumberU64()-1), block.Difficulty()) if localTd.Cmp(externTd) > 0 { - if err = bc.WriteBlockWithoutState(block, externTd); err != nil { + if err = bc.writeBlockWithoutState(block, externTd); err != nil { return i, events, coalescedLogs, err } continue @@ -1596,10 +1661,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] } log.Debug("Number block need calculated again", "number", block.NumberU64(), "hash", block.Hash().Hex(), "winners", len(winner)) // Import all the pruned blocks to make the state available - bc.chainmu.Unlock() // During reorg, we use verifySeals=false _, evs, logs, err := bc.insertChain(winner, false) - bc.chainmu.Lock() events, coalescedLogs = evs, logs if err != nil { @@ -1628,6 +1691,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, [] var tradingService utils.TradingService var lendingService utils.LendingService isSDKNode := false + engine, _ := bc.Engine().(*XDPoS.XDPoS) if bc.Config().IsTIPXDCXReceiver(block.Number()) && bc.chainConfig.XDPoS != nil && engine != nil && block.NumberU64() > bc.chainConfig.XDPoS.Epoch { author, err := bc.Engine().Author(block.Header()) // Ignore error, we're past header validation if err != nil { @@ -2091,7 +2155,9 @@ func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.L bc.wg.Add(1) defer bc.wg.Done() // Write the block to the chain and get the status. - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return nil, nil, errChainStopped + } defer bc.chainmu.Unlock() if bc.HasBlockAndFullState(block.Hash(), block.NumberU64()) { return events, coalescedLogs, nil @@ -2387,7 +2453,10 @@ func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) { } } -func (bc *BlockChain) update() { +// futureBlocksLoop processes the 'future block' queue. +func (bc *BlockChain) futureBlocksLoop() { + defer bc.wg.Done() + futureTimer := time.NewTicker(10 * time.Millisecond) defer futureTimer.Stop() for { @@ -2471,13 +2540,11 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i return i, err } - // Make sure only one thread manipulates the chain at once - bc.chainmu.Lock() + if !bc.chainmu.TryLock() { + return 0, errChainStopped + } defer bc.chainmu.Unlock() - bc.wg.Add(1) - defer bc.wg.Done() - whFunc := func(header *types.Header) error { _, err := bc.hc.WriteHeader(header) return err diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 41283aa592aa..31ba9bb4a701 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -128,7 +128,7 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error { blockchain.reportBlock(block, receipts, err) return err } - blockchain.chainmu.Lock() + blockchain.chainmu.MustLock() WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash()))) rawdb.WriteBlock(blockchain.db, block) statedb.Commit(true) @@ -146,7 +146,7 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error return err } // Manually insert the header into the database, but don't reorganise (allows subsequent testing) - blockchain.chainmu.Lock() + blockchain.chainmu.MustLock() WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash))) rawdb.WriteHeader(blockchain.db, header) blockchain.chainmu.Unlock() diff --git a/internal/syncx/mutex.go b/internal/syncx/mutex.go new file mode 100644 index 000000000000..96a21986c60c --- /dev/null +++ b/internal/syncx/mutex.go @@ -0,0 +1,64 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package syncx contains exotic synchronization primitives. +package syncx + +// ClosableMutex is a mutex that can also be closed. +// Once closed, it can never be taken again. +type ClosableMutex struct { + ch chan struct{} +} + +func NewClosableMutex() *ClosableMutex { + ch := make(chan struct{}, 1) + ch <- struct{}{} + return &ClosableMutex{ch} +} + +// TryLock attempts to lock cm. +// If the mutex is closed, TryLock returns false. +func (cm *ClosableMutex) TryLock() bool { + _, ok := <-cm.ch + return ok +} + +// MustLock locks cm. +// If the mutex is closed, MustLock panics. +func (cm *ClosableMutex) MustLock() { + _, ok := <-cm.ch + if !ok { + panic("mutex closed") + } +} + +// Unlock unlocks cm. +func (cm *ClosableMutex) Unlock() { + select { + case cm.ch <- struct{}{}: + default: + panic("Unlock of already-unlocked ClosableMutex") + } +} + +// Close locks the mutex, then closes it. +func (cm *ClosableMutex) Close() { + _, ok := <-cm.ch + if !ok { + panic("Close of already-closed ClosableMutex") + } + close(cm.ch) +}