Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, light: get rid of the dual mutexes, hard to reason with #18436

Merged
merged 1 commit into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 27 additions & 47 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ type BlockChain struct {
scope event.SubscriptionScope
genesisBlock *types.Block

mu sync.RWMutex // global mutex for locking chain operations
chainmu sync.RWMutex // blockchain insertion lock
procmu sync.RWMutex // block processor lock

Expand Down Expand Up @@ -281,8 +280,8 @@ func (bc *BlockChain) loadLastState() error {
func (bc *BlockChain) SetHead(head uint64) error {
log.Warn("Rewinding blockchain", "target", head)

bc.mu.Lock()
defer bc.mu.Unlock()
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

// Rewind the header chain, deleting all block bodies until then
delFn := func(db rawdb.DatabaseDeleter, hash common.Hash, num uint64) {
Expand Down Expand Up @@ -340,9 +339,9 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
return err
}
// If all checks out, manually set the head block
bc.mu.Lock()
bc.chainmu.Lock()
bc.currentBlock.Store(block)
bc.mu.Unlock()
bc.chainmu.Unlock()

log.Info("Committed new head block", "number", block.Number(), "hash", hash)
return nil
Expand Down Expand Up @@ -420,8 +419,8 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
if err := bc.SetHead(0); err != nil {
return err
}
bc.mu.Lock()
defer bc.mu.Unlock()
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

// Prepare the genesis block and reinitialise the chain
if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
Expand Down Expand Up @@ -468,8 +467,8 @@ func (bc *BlockChain) Export(w io.Writer) error {

// ExportN writes a subset of the active chain to the given writer.
func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
bc.mu.RLock()
defer bc.mu.RUnlock()
bc.chainmu.RLock()
defer bc.chainmu.RUnlock()

if first > last {
return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
Expand All @@ -490,7 +489,6 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
reported = time.Now()
}
}

return nil
}

Expand Down Expand Up @@ -756,8 +754,8 @@ const (
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (bc *BlockChain) Rollback(chain []common.Hash) {
bc.mu.Lock()
defer bc.mu.Unlock()
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]
Expand Down Expand Up @@ -881,7 +879,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}

// Update the head fast sync block if better
bc.mu.Lock()
bc.chainmu.Lock()
head := blockChain[len(blockChain)-1]
if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
currentFastBlock := bc.CurrentFastBlock()
Expand All @@ -890,7 +888,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
bc.currentFastBlock.Store(head)
}
}
bc.mu.Unlock()
bc.chainmu.Unlock()

context := []interface{}{
"count", stats.processed, "elapsed", common.PrettyDuration(time.Since(start)),
Expand Down Expand Up @@ -924,6 +922,15 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e

// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

return bc.writeBlockWithState(block, receipts, state)
}

// writeBlockWithState writes the block and all associated state to the database,
// but is expects the chain mutex to be held.
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB) (status WriteStatus, err error) {
bc.wg.Add(1)
defer bc.wg.Done()

Expand All @@ -933,9 +940,6 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, consensus.ErrUnknownAncestor
}
// Make sure no inconsistent state is leaked during insertion
bc.mu.Lock()
defer bc.mu.Unlock()

currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(block.Difficulty(), ptd)
Expand Down Expand Up @@ -1212,7 +1216,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
proctime := time.Since(start)

// Write the block to the chain and get the status.
status, err := bc.WriteBlockWithState(block, receipts, state)
status, err := bc.writeBlockWithState(block, receipts, state)
t3 := time.Now()
if err != nil {
return it.index, events, coalescedLogs, err
Expand Down Expand Up @@ -1281,7 +1285,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
func (bc *BlockChain) insertSidechain(it *insertIterator) (int, []interface{}, []*types.Log, error) {
var (
externTd *big.Int
current = bc.CurrentBlock().NumberU64()
current = bc.CurrentBlock()
)
// The first sidechain block error is already verified to be ErrPrunedAncestor.
// Since we don't import them here, we expect ErrUnknownAncestor for the remaining
Expand All @@ -1290,7 +1294,7 @@ func (bc *BlockChain) insertSidechain(it *insertIterator) (int, []interface{}, [
block, err := it.current(), consensus.ErrPrunedAncestor
for ; block != nil && (err == consensus.ErrPrunedAncestor); block, err = it.next() {
// Check the canonical state root for that number
if number := block.NumberU64(); current >= number {
if number := block.NumberU64(); current.NumberU64() >= number {
if canonical := bc.GetBlockByNumber(number); canonical != nil && canonical.Root() == block.Root() {
// This is most likely a shadow-state attack. When a fork is imported into the
// database, and it eventually reaches a block height which is not pruned, we
Expand Down Expand Up @@ -1329,7 +1333,7 @@ func (bc *BlockChain) insertSidechain(it *insertIterator) (int, []interface{}, [
//
// If the externTd was larger than our local TD, we now need to reimport the previous
// blocks to regenerate the required state
localTd := bc.GetTd(bc.CurrentBlock().Hash(), current)
localTd := bc.GetTd(current.Hash(), current.NumberU64())
if localTd.Cmp(externTd) > 0 {
log.Info("Sidechain written to disk", "start", it.first().NumberU64(), "end", it.previous().NumberU64(), "sidetd", externTd, "localtd", localTd)
return it.index, nil, nil, err
Expand Down Expand Up @@ -1597,36 +1601,12 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
defer bc.wg.Done()

whFunc := func(header *types.Header) error {
bc.mu.Lock()
defer bc.mu.Unlock()

_, err := bc.hc.WriteHeader(header)
return err
}

return bc.hc.InsertHeaderChain(chain, whFunc, start)
}

// writeHeader writes a header into the local chain, given that its parent is
// already known. If the total difficulty of the newly inserted header becomes
// greater than the current known TD, the canonical chain is re-routed.
//
// Note: This method is not concurrent-safe with inserting blocks simultaneously
// into the chain, as side effects caused by reorganisations cannot be emulated
// without the real blocks. Hence, writing headers directly should only be done
// in two scenarios: pure-header mode of operation (light clients), or properly
// separated header/block phases (non-archive clients).
func (bc *BlockChain) writeHeader(header *types.Header) error {
bc.wg.Add(1)
defer bc.wg.Done()

bc.mu.Lock()
defer bc.mu.Unlock()

_, err := bc.hc.WriteHeader(header)
return err
}

// CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the HeaderChain's internal cache.
func (bc *BlockChain) CurrentHeader() *types.Header {
Expand Down Expand Up @@ -1675,8 +1655,8 @@ func (bc *BlockChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []com
//
// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
func (bc *BlockChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
bc.chainmu.RLock()
defer bc.chainmu.RUnlock()

return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
}
Expand Down
8 changes: 4 additions & 4 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,11 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
blockchain.reportBlock(block, receipts, err)
return err
}
blockchain.mu.Lock()
blockchain.chainmu.Lock()
rawdb.WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash())))
rawdb.WriteBlock(blockchain.db, block)
statedb.Commit(false)
blockchain.mu.Unlock()
blockchain.chainmu.Unlock()
}
return nil
}
Expand All @@ -180,10 +180,10 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error
return err
}
// Manually insert the header into the database, but don't reorganise (allows subsequent testing)
blockchain.mu.Lock()
blockchain.chainmu.Lock()
rawdb.WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash)))
rawdb.WriteHeader(blockchain.db, header)
blockchain.mu.Unlock()
blockchain.chainmu.Unlock()
}
return nil
}
Expand Down
29 changes: 11 additions & 18 deletions light/lightchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type LightChain struct {
scope event.SubscriptionScope
genesisBlock *types.Block

mu sync.RWMutex
chainmu sync.RWMutex

bodyCache *lru.Cache // Cache for the most recent block bodies
Expand Down Expand Up @@ -165,8 +164,8 @@ func (self *LightChain) loadLastState() error {
// SetHead rewinds the local chain to a new head. Everything above the new
// head will be deleted and the new one set.
func (bc *LightChain) SetHead(head uint64) {
bc.mu.Lock()
defer bc.mu.Unlock()
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

bc.hc.SetHead(head, nil)
bc.loadLastState()
Expand All @@ -188,8 +187,8 @@ func (bc *LightChain) ResetWithGenesisBlock(genesis *types.Block) {
// Dump the entire block chain and purge the caches
bc.SetHead(0)

bc.mu.Lock()
defer bc.mu.Unlock()
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

// Prepare the genesis block and reinitialise the chain
rawdb.WriteTd(bc.chainDb, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty())
Expand Down Expand Up @@ -315,8 +314,8 @@ func (bc *LightChain) Stop() {
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (self *LightChain) Rollback(chain []common.Hash) {
self.mu.Lock()
defer self.mu.Unlock()
self.chainmu.Lock()
defer self.chainmu.Unlock()

for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]
Expand Down Expand Up @@ -362,19 +361,13 @@ func (self *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int)

// Make sure only one thread manipulates the chain at once
self.chainmu.Lock()
defer func() {
self.chainmu.Unlock()
time.Sleep(time.Millisecond * 10) // ugly hack; do not hog chain lock in case syncing is CPU-limited by validation
}()
defer self.chainmu.Unlock()

self.wg.Add(1)
defer self.wg.Done()

var events []interface{}
whFunc := func(header *types.Header) error {
self.mu.Lock()
defer self.mu.Unlock()

status, err := self.hc.WriteHeader(header)

switch status {
Expand Down Expand Up @@ -441,8 +434,8 @@ func (self *LightChain) GetBlockHashesFromHash(hash common.Hash, max uint64) []c
//
// Note: ancestor == 0 returns the same block, 1 returns its parent and so on.
func (bc *LightChain) GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()
bc.chainmu.RLock()
defer bc.chainmu.RUnlock()

return bc.hc.GetAncestor(hash, number, ancestor, maxNonCanonical)
}
Expand Down Expand Up @@ -483,8 +476,8 @@ func (self *LightChain) SyncCht(ctx context.Context) bool {
}
// Retrieve the latest useful header and update to it
if header, err := GetHeaderByNumber(ctx, self.odr, latest); header != nil && err == nil {
self.mu.Lock()
defer self.mu.Unlock()
self.chainmu.Lock()
defer self.chainmu.Unlock()

// Ensure the chain didn't move past the latest block while retrieving it
if self.hc.CurrentHeader().Number.Uint64() < header.Number.Uint64() {
Expand Down
4 changes: 2 additions & 2 deletions light/lightchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ func testHeaderChainImport(chain []*types.Header, lightchain *LightChain) error
return err
}
// Manually insert the header into the database, but don't reorganize (allows subsequent testing)
lightchain.mu.Lock()
lightchain.chainmu.Lock()
rawdb.WriteTd(lightchain.chainDb, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, lightchain.GetTdByHash(header.ParentHash)))
rawdb.WriteHeader(lightchain.chainDb, header)
lightchain.mu.Unlock()
lightchain.chainmu.Unlock()
}
return nil
}
Expand Down