diff --git a/core/blockchain.go b/core/blockchain.go index 66bc395c9dc4..b1bc92cb801b 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -198,6 +198,7 @@ type BlockChain struct { quit chan struct{} // blockchain quit channel wg sync.WaitGroup // chain processing wait group for shutting down + wgMu sync.Mutex // Mutex to provide safety between calls to Add and Wait running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing @@ -995,7 +996,9 @@ func (bc *BlockChain) Stop() { bc.scope.Close() close(bc.quit) bc.StopInsert() + bc.wgMu.Lock() bc.wg.Wait() + bc.wgMu.Unlock() // Ensure that the entirety of the state snapshot is journalled to disk. var snapBase common.Hash @@ -1095,7 +1098,9 @@ type numberHash struct { func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain []types.Receipts, ancientLimit uint64) (int, error) { // We don't require the chainMu here since we want to maximize the // concurrency of header insertion and receipt insertion. + bc.wgMu.Lock() bc.wg.Add(1) + bc.wgMu.Unlock() defer bc.wg.Done() var ( @@ -1372,7 +1377,9 @@ var lastWrite uint64 // but does not write any state. This is used to construct competing side forks // up to the point where they exceed the canonical total difficulty. func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (err error) { + bc.wgMu.Lock() bc.wg.Add(1) + bc.wgMu.Unlock() defer bc.wg.Done() batch := bc.db.NewBatch() @@ -1387,7 +1394,9 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e // writeKnownBlock updates the head block flag with a known block // and introduces chain reorg if necessary. func (bc *BlockChain) writeKnownBlock(block *types.Block) error { + bc.wgMu.Lock() bc.wg.Add(1) + bc.wgMu.Unlock() defer bc.wg.Done() current := bc.CurrentBlock() @@ -1411,7 +1420,9 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. // writeBlockWithState writes the block and all associated state to the database, // but is expects the chain mutex to be held. func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) { + bc.wgMu.Lock() bc.wg.Add(1) + bc.wgMu.Unlock() defer bc.wg.Done() // Calculate the total difficulty of the block @@ -1594,7 +1605,9 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { } } // Pre-checks passed, start the full block imports + bc.wgMu.Lock() bc.wg.Add(1) + bc.wgMu.Unlock() bc.chainmu.Lock() n, err := bc.insertChain(chain, true) bc.chainmu.Unlock() @@ -1610,7 +1623,9 @@ func (bc *BlockChain) InsertChainWithoutSealVerification(block *types.Block) (in defer bc.blockProcFeed.Send(false) // Pre-checks passed, start the full block imports + bc.wgMu.Lock() bc.wg.Add(1) + bc.wgMu.Unlock() bc.chainmu.Lock() n, err := bc.insertChain(types.Blocks([]*types.Block{block}), false) bc.chainmu.Unlock() @@ -2355,7 +2370,9 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i bc.chainmu.Lock() defer bc.chainmu.Unlock() + bc.wgMu.Lock() bc.wg.Add(1) + bc.wgMu.Unlock() defer bc.wg.Done() _, err := bc.hc.InsertHeaderChain(chain, start) return 0, err