Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

miner: discard interrupted blocks #24638

Merged
merged 6 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/beacon"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
Expand Down Expand Up @@ -349,11 +348,3 @@ func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *beacon.Pa
}
return beacon.BlockToExecutableData(block), nil
}

// Used in tests to add a the list of transactions from a block to the tx pool.
func (api *ConsensusAPI) insertTransactions(txs types.Transactions) error {
for _, tx := range txs {
api.eth.TxPool().AddLocal(tx)
}
return nil
}
2 changes: 1 addition & 1 deletion eth/catalyst/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) {
api := NewConsensusAPI(ethservice)

// Put the 10th block's tx in the pool and produce a new block
api.insertTransactions(blocks[9].Transactions())
api.eth.TxPool().AddRemotesSync(blocks[9].Transactions())
blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[8].Time() + 5,
}
Expand Down
32 changes: 23 additions & 9 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ const (
staleThreshold = 7
)

var (
errBlockInterruptedByNewHead = errors.New("new head arrived while building block")
errBlockInterruptedByRecommit = errors.New("recommit interrupt while building block")
)

// environment is the worker's current environment and holds all
// information of the sealing block generation.
type environment struct {
Expand Down Expand Up @@ -841,7 +846,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*
return receipt.Logs, nil
}

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool {
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) error {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -866,8 +871,9 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
ratio: ratio,
inc: true,
}
return errBlockInterruptedByRecommit
}
return atomic.LoadInt32(interrupt) == commitInterruptNewHead
return errBlockInterruptedByNewHead
}
// If we don't have enough gas for any further transactions then we're done
if env.gasPool.Gas() < params.TxGas {
Expand Down Expand Up @@ -951,7 +957,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
if interrupt != nil {
w.resubmitAdjustCh <- &intervalAdjust{inc: false}
}
return false
return nil
}

// generateParams wraps various of settings for generating sealing task.
Expand Down Expand Up @@ -1050,7 +1056,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) {
// fillTransactions retrieves the pending transactions from the txpool and fills them
// into the given sealing block. The transaction selection and ordering strategy can
// be customized with the plugin in the future.
func (w *worker) fillTransactions(interrupt *int32, env *environment) {
func (w *worker) fillTransactions(interrupt *int32, env *environment) error {
// Split the pending transactions into locals and remotes
// Fill the block with all available pending transactions.
pending := w.eth.TxPool().Pending(true)
Expand All @@ -1063,16 +1069,17 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) {
}
if len(localTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interrupt) {
return
if err := w.commitTransactions(env, txs, interrupt); err != nil {
return err
}
}
if len(remoteTxs) > 0 {
txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee)
if w.commitTransactions(env, txs, interrupt) {
return
if err := w.commitTransactions(env, txs, interrupt); err != nil {
return err
}
}
return nil
}

// generateWork generates a sealing block based on the given parameters.
Expand All @@ -1084,6 +1091,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
defer work.discard()

w.fillTransactions(nil, work)

return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}

Expand Down Expand Up @@ -1113,8 +1121,14 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
w.commit(work.copy(), nil, false, start)
}

// Fill pending transactions from the txpool
w.fillTransactions(interrupt, work)
err = w.fillTransactions(interrupt, work)
if errors.Is(err, errBlockInterruptedByNewHead) {
work.discard()
return
}

w.commit(work.copy(), w.fullTaskHook, true, start)

// Swap out the old work with the new one, terminating any leftover
Expand Down