Skip to content

Commit

Permalink
core, light: get rid of the dual mutexes (ethereum#18436)
Browse files Browse the repository at this point in the history
  • Loading branch information
gzliudan committed Dec 28, 2024
1 parent 3a0d5b6 commit 1c555b2
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 47 deletions.
45 changes: 23 additions & 22 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ type BlockChain struct {
scope event.SubscriptionScope
genesisBlock *types.Block

mu sync.RWMutex // global mutex for locking chain operations
chainmu sync.RWMutex // blockchain insertion lock
procmu sync.RWMutex // block processor lock

Expand Down Expand Up @@ -411,8 +410,8 @@ func (bc *BlockChain) loadLastState() error {
func (bc *BlockChain) SetHead(head uint64) error {
log.Warn("Rewinding blockchain", "target", head)

bc.mu.Lock()
defer bc.mu.Unlock()
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

// Rewind the header chain, deleting all block bodies until then
delFn := func(hash common.Hash, num uint64) {
Expand Down Expand Up @@ -476,10 +475,10 @@ func (bc *BlockChain) FastSyncCommitHead(hash common.Hash) error {
return err
}
// If all checks out, manually set the head block
bc.mu.Lock()
bc.chainmu.Lock()
bc.currentBlock.Store(block)
headBlockGauge.Update(int64(block.NumberU64()))
bc.mu.Unlock()
bc.chainmu.Unlock()

log.Info("Committed new head block", "number", block.Number(), "hash", hash)
return nil
Expand Down Expand Up @@ -598,8 +597,8 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error {
if err := bc.SetHead(0); err != nil {
return err
}
bc.mu.Lock()
defer bc.mu.Unlock()
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

// Prepare the genesis block and reinitialise the chain
if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
Expand Down Expand Up @@ -672,8 +671,8 @@ func (bc *BlockChain) Export(w io.Writer) error {

// ExportN writes a subset of the active chain to the given writer.
func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
bc.mu.RLock()
defer bc.mu.RUnlock()
bc.chainmu.RLock()
defer bc.chainmu.RUnlock()

if first > last {
return fmt.Errorf("export failed: first (%d) is greater than last (%d)", first, last)
Expand All @@ -690,7 +689,6 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error {
return err
}
}

return nil
}

Expand Down Expand Up @@ -1053,8 +1051,8 @@ const (
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (bc *BlockChain) Rollback(chain []common.Hash) {
bc.mu.Lock()
defer bc.mu.Unlock()
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]
Expand Down Expand Up @@ -1146,7 +1144,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}

// Update the head fast sync block if better
bc.mu.Lock()
bc.chainmu.Lock()
head := blockChain[len(blockChain)-1]
if td := bc.GetTd(head.Hash(), head.NumberU64()); td != nil { // Rewind may have occurred, skip in that case
currentFastBlock := bc.CurrentFastBlock()
Expand All @@ -1158,7 +1156,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
headFastBlockGauge.Update(int64(head.NumberU64()))
}
}
bc.mu.Unlock()
bc.chainmu.Unlock()

log.Info("Imported new block receipts",
"count", stats.processed,
Expand Down Expand Up @@ -1188,6 +1186,15 @@ func (bc *BlockChain) WriteBlockWithoutState(block *types.Block, td *big.Int) (e

// WriteBlockWithState writes the block and all associated state to the database.
func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) {
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

return bc.writeBlockWithState(block, receipts, state, tradingState, lendingState)
}

// writeBlockWithState writes the block and all associated state to the database,
// but is expects the chain mutex to be held.
func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.Receipt, state *state.StateDB, tradingState *tradingstate.TradingStateDB, lendingState *lendingstate.LendingStateDB) (status WriteStatus, err error) {
bc.wg.Add(1)
defer bc.wg.Done()

Expand All @@ -1197,9 +1204,6 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
return NonStatTy, consensus.ErrUnknownAncestor
}
// Make sure no inconsistent state is leaked during insertion
bc.mu.Lock()
defer bc.mu.Unlock()

currentBlock := bc.CurrentBlock()
localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
externTd := new(big.Int).Add(block.Difficulty(), ptd)
Expand Down Expand Up @@ -1710,7 +1714,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, []
proctime := time.Since(bstart)

// Write the block to the chain and get the status.
status, err := bc.WriteBlockWithState(block, receipts, statedb, tradingState, lendingState)
status, err := bc.writeBlockWithState(block, receipts, statedb, tradingState, lendingState)
t3 := time.Now()
if err != nil {
return i, events, coalescedLogs, err
Expand Down Expand Up @@ -2061,7 +2065,7 @@ func (bc *BlockChain) insertBlock(block *types.Block) ([]interface{}, []*types.L
if bc.HasBlockAndFullState(block.Hash(), block.NumberU64()) {
return events, coalescedLogs, nil
}
status, err := bc.WriteBlockWithState(block, result.receipts, result.state, result.tradingState, result.lendingState)
status, err := bc.writeBlockWithState(block, result.receipts, result.state, result.tradingState, result.lendingState)

if err != nil {
return events, coalescedLogs, err
Expand Down Expand Up @@ -2425,9 +2429,6 @@ func (bc *BlockChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i
defer bc.wg.Done()

whFunc := func(header *types.Header) error {
bc.mu.Lock()
defer bc.mu.Unlock()

_, err := bc.hc.WriteHeader(header)
return err
}
Expand Down
11 changes: 5 additions & 6 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ import (
"testing"
"time"

"github.com/XinFinOrg/XDPoSChain/core/rawdb"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"github.com/XinFinOrg/XDPoSChain/core/state"
"github.com/XinFinOrg/XDPoSChain/core/types"
"github.com/XinFinOrg/XDPoSChain/core/vm"
Expand Down Expand Up @@ -129,11 +128,11 @@ func testBlockChainImport(chain types.Blocks, blockchain *BlockChain) error {
blockchain.reportBlock(block, receipts, err)
return err
}
blockchain.mu.Lock()
blockchain.chainmu.Lock()
WriteTd(blockchain.db, block.Hash(), block.NumberU64(), new(big.Int).Add(block.Difficulty(), blockchain.GetTdByHash(block.ParentHash())))
rawdb.WriteBlock(blockchain.db, block)
statedb.Commit(true)
blockchain.mu.Unlock()
blockchain.chainmu.Unlock()
}
return nil
}
Expand All @@ -147,10 +146,10 @@ func testHeaderChainImport(chain []*types.Header, blockchain *BlockChain) error
return err
}
// Manually insert the header into the database, but don't reorganise (allows subsequent testing)
blockchain.mu.Lock()
blockchain.chainmu.Lock()
WriteTd(blockchain.db, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, blockchain.GetTdByHash(header.ParentHash)))
rawdb.WriteHeader(blockchain.db, header)
blockchain.mu.Unlock()
blockchain.chainmu.Unlock()
}
return nil
}
Expand Down
26 changes: 10 additions & 16 deletions light/lightchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type LightChain struct {
scope event.SubscriptionScope
genesisBlock *types.Block

mu sync.RWMutex
chainmu sync.RWMutex

bodyCache *lru.Cache[common.Hash, *types.Body]
Expand Down Expand Up @@ -159,8 +158,8 @@ func (lc *LightChain) loadLastState() error {
// SetHead rewinds the local chain to a new head. Everything above the new
// head will be deleted and the new one set.
func (lc *LightChain) SetHead(head uint64) {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.chainmu.Lock()
defer lc.chainmu.Unlock()

lc.hc.SetHead(head, nil)
lc.loadLastState()
Expand All @@ -182,8 +181,8 @@ func (lc *LightChain) ResetWithGenesisBlock(genesis *types.Block) {
// Dump the entire block chain and purge the caches
lc.SetHead(0)

lc.mu.Lock()
defer lc.mu.Unlock()
lc.chainmu.Lock()
defer lc.chainmu.Unlock()

// Prepare the genesis block and reinitialise the chain
if err := core.WriteTd(lc.chainDb, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil {
Expand Down Expand Up @@ -297,8 +296,8 @@ func (lc *LightChain) Stop() {
// Rollback is designed to remove a chain of links from the database that aren't
// certain enough to be valid.
func (lc *LightChain) Rollback(chain []common.Hash) {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.chainmu.Lock()
defer lc.chainmu.Unlock()

for i := len(chain) - 1; i >= 0; i-- {
hash := chain[i]
Expand Down Expand Up @@ -344,19 +343,13 @@ func (lc *LightChain) InsertHeaderChain(chain []*types.Header, checkFreq int) (i

// Make sure only one thread manipulates the chain at once
lc.chainmu.Lock()
defer func() {
lc.chainmu.Unlock()
time.Sleep(time.Millisecond * 10) // ugly hack; do not hog chain lock in case syncing is CPU-limited by validation
}()
defer lc.chainmu.Unlock()

lc.wg.Add(1)
defer lc.wg.Done()

var events []interface{}
whFunc := func(header *types.Header) error {
lc.mu.Lock()
defer lc.mu.Unlock()

status, err := lc.hc.WriteHeader(header)

switch status {
Expand Down Expand Up @@ -450,11 +443,12 @@ func (lc *LightChain) SyncCht(ctx context.Context) bool {
num := chtCount*CHTFrequencyClient - 1
header, err := GetHeaderByNumber(ctx, lc.odr, num)
if header != nil && err == nil {
lc.mu.Lock()
lc.chainmu.Lock()
defer lc.chainmu.Unlock()

if lc.hc.CurrentHeader().Number.Uint64() < header.Number.Uint64() {
lc.hc.SetCurrentHeader(header)
}
lc.mu.Unlock()
return true
}
}
Expand Down
7 changes: 4 additions & 3 deletions light/lightchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package light

import (
"context"
"github.com/XinFinOrg/XDPoSChain/core/rawdb"
"math/big"
"testing"

"github.com/XinFinOrg/XDPoSChain/core/rawdb"

"github.com/XinFinOrg/XDPoSChain/common"
"github.com/XinFinOrg/XDPoSChain/consensus/ethash"
"github.com/XinFinOrg/XDPoSChain/core"
Expand Down Expand Up @@ -122,10 +123,10 @@ func testHeaderChainImport(chain []*types.Header, lightchain *LightChain) error
return err
}
// Manually insert the header into the database, but don't reorganize (allows subsequent testing)
lightchain.mu.Lock()
lightchain.chainmu.Lock()
core.WriteTd(lightchain.chainDb, header.Hash(), header.Number.Uint64(), new(big.Int).Add(header.Difficulty, lightchain.GetTdByHash(header.ParentHash)))
rawdb.WriteHeader(lightchain.chainDb, header)
lightchain.mu.Unlock()
lightchain.chainmu.Unlock()
}
return nil
}
Expand Down

0 comments on commit 1c555b2

Please sign in to comment.