Skip to content

Commit

Permalink
Merge branch 'merge-upstream-v1.101304.1' into feature/optimize_chain…
Browse files Browse the repository at this point in the history
…config
  • Loading branch information
bnoieh authored Feb 22, 2024
2 parents be5d67d + eb1c8f7 commit 72af5a0
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 42 deletions.
13 changes: 6 additions & 7 deletions .github/workflows/unit-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Unit Test

on:
pull_request:
types: [ opened, synchronize, reopened ]
types: [opened, synchronize, reopened]
push:
branches: [main, develop]

Expand All @@ -11,9 +11,8 @@ jobs:
name: unit_test
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version: '^1.21.3'
- run: make test

- uses: actions/checkout@v4
- uses: actions/setup-go@v4
with:
go-version-file: go.mod
- run: make test
14 changes: 2 additions & 12 deletions core/txpool/invalid.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
)

const (
AlreadyKnown = "AlreadyKnown"
TypeNotSupportDeposit = "TypeNotSupportDeposit"
TypeNotSupport1559 = "TypeNotSupport1559"
TypeNotSupport2718 = "TypeNotSupport2718"
Expand All @@ -23,22 +22,17 @@ const (
InsufficientFunds = "InsufficientFunds"
Overdraft = "Overdraft"
IntrinsicGas = "IntrinsicGas"
Throttle = "Throttle"
Overflow = "Overflow"
FutureReplacePending = "FutureReplacePending"
ReplaceUnderpriced = "ReplaceUnderpriced"
QueuedDiscard = "QueueDiscard"
GasUnitOverflow = "GasUnitOverflow"
)

func meter(err string) metrics.Meter {
func Meter(err string) metrics.Meter {
return metrics.GetOrRegisterMeter("txpool/invalid/"+err, nil)
}

func init() {
// init the metrics
for _, err := range []string{
AlreadyKnown,
TypeNotSupportDeposit,
TypeNotSupport1559,
TypeNotSupport2718,
Expand All @@ -56,13 +50,9 @@ func init() {
InsufficientFunds,
Overdraft,
IntrinsicGas,
Throttle,
Overflow,
FutureReplacePending,
ReplaceUnderpriced,
QueuedDiscard,
GasUnitOverflow,
} {
meter(err).Mark(0)
Meter(err).Mark(0)
}
}
2 changes: 2 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pool.priced.Put(dropTx, false)
}
log.Trace("Discarding future transaction replacing pending tx", "hash", hash)
txpool.Meter(txpool.FutureReplacePending).Mark(1)
return false, txpool.ErrFutureReplacePending
}
}
Expand Down Expand Up @@ -940,6 +941,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
// If the transaction isn't in lookup set but it's expected to be there,
// show the error log.
if pool.all.Get(hash) == nil && !addAll {
txpool.Meter(txpool.MissingTransaction).Mark(1)
log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
}
if addAll {
Expand Down
28 changes: 16 additions & 12 deletions core/txpool/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func ValidateTransaction(tx *types.Transaction, head *types.Header, signer types
// This is for spam protection, not consensus,
// as the external engine-API user authenticates deposits.
if tx.Type() == types.DepositTxType {
meter(TypeNotSupportDeposit).Mark(1)
Meter(TypeNotSupportDeposit).Mark(1)
return core.ErrTxTypeNotSupported
}
// Ensure transactions not implemented by the calling pool are rejected
Expand All @@ -81,25 +81,25 @@ func ValidateTransaction(tx *types.Transaction, head *types.Header, signer types
}
// Ensure only transactions that have been enabled are accepted
if !opts.Config.IsBerlin(head.Number) && tx.Type() != types.LegacyTxType {
meter(TypeNotSupport2718).Mark(1)
Meter(TypeNotSupport2718).Mark(1)
return fmt.Errorf("%w: type %d rejected, pool not yet in Berlin", core.ErrTxTypeNotSupported, tx.Type())
}
if !opts.Config.IsLondon(head.Number) && tx.Type() == types.DynamicFeeTxType {
meter(TypeNotSupport1559).Mark(1)
Meter(TypeNotSupport1559).Mark(1)
return fmt.Errorf("%w: type %d rejected, pool not yet in London", core.ErrTxTypeNotSupported, tx.Type())
}
if !opts.Config.IsCancun(head.Number, head.Time) && tx.Type() == types.BlobTxType {
return fmt.Errorf("%w: type %d rejected, pool not yet in Cancun", core.ErrTxTypeNotSupported, tx.Type())
}
// Check whether the init code size has been exceeded
if opts.Config.IsShanghai(head.Number, head.Time) && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize {
meter(MaxInitCodeSizeExceeded).Mark(1)
Meter(MaxInitCodeSizeExceeded).Mark(1)
return fmt.Errorf("%w: code size %v, limit %v", core.ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize)
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur for transactions created using the RPC.
if tx.Value().Sign() < 0 {
meter(NegativeValue).Mark(1)
Meter(NegativeValue).Mark(1)
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas
Expand All @@ -108,36 +108,38 @@ func ValidateTransaction(tx *types.Transaction, head *types.Header, signer types
}
// Sanity check for extremely large numbers (supported by RLP or RPC)
if tx.GasFeeCap().BitLen() > 256 {
meter(FeeCapVeryHigh).Mark(1)
Meter(FeeCapVeryHigh).Mark(1)
return core.ErrFeeCapVeryHigh
}
if tx.GasTipCap().BitLen() > 256 {
meter(TipVeryHigh).Mark(1)
Meter(TipVeryHigh).Mark(1)
return core.ErrTipVeryHigh
}
// Ensure gasFeeCap is greater than or equal to gasTipCap
if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 {
meter(TipAboveFeeCap).Mark(1)
Meter(TipAboveFeeCap).Mark(1)
return core.ErrTipAboveFeeCap
}
// Make sure the transaction is signed properly
if _, err := types.Sender(signer, tx); err != nil {
meter(InvalidSender).Mark(1)
Meter(InvalidSender).Mark(1)
return ErrInvalidSender
}
// Ensure the transaction has more gas than the bare minimum needed to cover
// the transaction metadata
intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, opts.Config.IsIstanbul(head.Number), opts.Config.IsShanghai(head.Number, head.Time))
if err != nil {
Meter(GasUnitOverflow).Mark(1)
return err
}
if tx.Gas() < intrGas {
Meter(IntrinsicGas).Mark(1)
return fmt.Errorf("%w: needed %v, allowed %v", core.ErrIntrinsicGas, intrGas, tx.Gas())
}
// Ensure the gasprice is high enough to cover the requirement of the calling
// pool and/or block producer
if tx.GasTipCapIntCmp(opts.MinTip) < 0 {
meter(Underpriced).Mark(1)
Meter(Underpriced).Mark(1)
return fmt.Errorf("%w: tip needed %v, tip permitted %v", ErrUnderpriced, opts.MinTip, tx.GasTipCap())
}
// Ensure blob transactions have valid commitments
Expand Down Expand Up @@ -240,7 +242,7 @@ func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, op
}
next := opts.State.GetNonce(from)
if next > tx.Nonce() {
meter(NonceTooLow).Mark(1)
Meter(NonceTooLow).Mark(1)
return fmt.Errorf("%w: next nonce %v, tx nonce %v", core.ErrNonceTooLow, next, tx.Nonce())
}
// Ensure the transaction doesn't produce a nonce gap in pools that do not
Expand All @@ -261,7 +263,7 @@ func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, op
}
}
if balance.Cmp(cost) < 0 {
meter(InsufficientFunds).Mark(1)
Meter(InsufficientFunds).Mark(1)
return fmt.Errorf("%w: balance %v, tx cost %v, overshot %v", core.ErrInsufficientFunds, balance, cost, new(big.Int).Sub(cost, balance))
}
// Ensure the transactor has enough funds to cover for replacements or nonce
Expand All @@ -271,11 +273,13 @@ func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, op
bump := new(big.Int).Sub(cost, prev)
need := new(big.Int).Add(spent, bump)
if balance.Cmp(need) < 0 {
Meter(Overdraft).Mark(1)
return fmt.Errorf("%w: balance %v, queued cost %v, tx bumped %v, overshot %v", core.ErrInsufficientFunds, balance, spent, bump, new(big.Int).Sub(need, balance))
}
} else {
need := new(big.Int).Add(spent, cost)
if balance.Cmp(need) < 0 {
Meter(Overdraft).Mark(1)
return fmt.Errorf("%w: balance %v, queued cost %v, tx cost %v, overshot %v", core.ErrInsufficientFunds, balance, spent, cost, new(big.Int).Sub(need, balance))
}
// Transaction takes a new nonce value out of the pool. Ensure it doesn't
Expand Down
23 changes: 23 additions & 0 deletions eth/downloader/fetchers_concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ import (
// to each request. Failing to do so is considered a protocol violation.
var timeoutGracePeriod = 2 * time.Minute

// peersRetryInterval is the retry interval when all peers cannot get the request data.
var peersRetryInterval = 100 * time.Millisecond

// maxRetries is the max retry time for unreserved download task
var maxRetries = 5

// typedQueue is an interface defining the adaptor needed to translate the type
// specific downloader/queue schedulers into the type-agnostic general concurrent
// fetcher algorithm calls.
Expand Down Expand Up @@ -125,6 +131,8 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {

// Prepare the queue and fetch block parts until the block header fetcher's done
finished := false

requestRetried := 0
for {
// Short circuit if we lost all our peers
if d.peers.Len() == 0 && !beaconMode {
Expand Down Expand Up @@ -195,6 +203,10 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
// to the queue, that is async, and we can do better here by
// immediately pushing the unfulfilled requests.
queue.unreserve(peer.id) // TODO(karalabe): This needs a non-expiration method
//reset progressed
if len(pending) == 0 {
progressed = false
}
continue
}
pending[peer.id] = req
Expand All @@ -212,6 +224,17 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode {
return errPeersUnavailable
}
// Retry the unreserved task in next loop
if beaconMode && len(pending) == 0 && queued > 0 && !progressed && !throttled && len(idles) == d.peers.Len() {
log.Warn("All idle peers are not valid for current task, will retry ...")
requestRetried++
if requestRetried > maxRetries {
log.Info("max retry exceeded, cancel request")
return errCanceled
}
time.Sleep(peersRetryInterval)
continue
}
}
// Wait for something to happen
select {
Expand Down
18 changes: 13 additions & 5 deletions eth/downloader/skeleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/etherror"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -73,6 +74,9 @@ var errTerminated = errors.New("terminated")
// with a new header, but it does not link up to the existing sync.
var errReorgDenied = errors.New("non-forced head reorg denied")

// maxBlockNumGapTolerance is the max gap tolerance by peer
var maxBlockNumGapTolerance = uint64(30)

func init() {
// Tuning parameters is nice, but the scratch space must be assignable in
// full to peers. It's a useless cornercase to support a dangling half-group.
Expand Down Expand Up @@ -804,25 +808,29 @@ func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) {
case len(headers) == 0:
// No headers were delivered, reject the response and reschedule
peer.log.Debug("No headers delivered")
res.Done <- errors.New("no headers delivered")
res.Done <- etherror.ErrNoHeadersDelivered
s.scheduleRevertRequest(req)

case headers[0].Number.Uint64() != req.head:
// Header batch anchored at non-requested number
peer.log.Debug("Invalid header response head", "have", headers[0].Number, "want", req.head)
res.Done <- errors.New("invalid header batch anchor")
if req.head-headers[0].Number.Uint64() < maxBlockNumGapTolerance {
res.Done <- etherror.ErrHeaderBatchAnchorLow
} else {
res.Done <- etherror.ErrInvalidHeaderBatchAnchor
}
s.scheduleRevertRequest(req)

case req.head >= requestHeaders && len(headers) != requestHeaders:
// Invalid number of non-genesis headers delivered, reject the response and reschedule
peer.log.Debug("Invalid non-genesis header count", "have", len(headers), "want", requestHeaders)
res.Done <- errors.New("not enough non-genesis headers delivered")
res.Done <- etherror.ErrNotEnoughNonGenesisHeaders
s.scheduleRevertRequest(req)

case req.head < requestHeaders && uint64(len(headers)) != req.head:
// Invalid number of genesis headers delivered, reject the response and reschedule
peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64())
res.Done <- errors.New("not enough genesis headers delivered")
res.Done <- etherror.ErrNotEnoughGenesisHeaders
s.scheduleRevertRequest(req)

default:
Expand All @@ -831,7 +839,7 @@ func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) {
for i := 0; i < len(headers)-1; i++ {
if headers[i].ParentHash != headers[i+1].Hash() {
peer.log.Debug("Invalid hash progression", "index", i, "wantparenthash", headers[i].ParentHash, "haveparenthash", headers[i+1].Hash())
res.Done <- errors.New("invalid hash progression")
res.Done <- etherror.ErrInvalidHashProgression
s.scheduleRevertRequest(req)
return
}
Expand Down
12 changes: 12 additions & 0 deletions eth/etherror/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package etherror

import "errors"

var (
ErrNoHeadersDelivered = errors.New("no headers delivered")
ErrInvalidHeaderBatchAnchor = errors.New("invalid header batch anchor")
ErrNotEnoughNonGenesisHeaders = errors.New("not enough non-genesis headers delivered")
ErrNotEnoughGenesisHeaders = errors.New("not enough genesis headers delivered")
ErrInvalidHashProgression = errors.New("invalid hash progression")
ErrHeaderBatchAnchorLow = errors.New("header batch anchor is lower than requested")
)
8 changes: 4 additions & 4 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,14 +461,14 @@ func TestTransactionPendingReannounce(t *testing.T) {

sink := newTestHandler()
defer sink.close()
sink.handler.acceptTxs = 1 // mark synced to accept transactions
sink.handler.synced.Store(true) // mark synced to accept transactions

sourcePipe, sinkPipe := p2p.MsgPipe()
defer sourcePipe.Close()
defer sinkPipe.Close()

sourcePeer := eth.NewPeer(eth.ETH66, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool)
sinkPeer := eth.NewPeer(eth.ETH66, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool)
sourcePeer := eth.NewPeer(eth.ETH68, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool)
sinkPeer := eth.NewPeer(eth.ETH68, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool)
defer sourcePeer.Close()
defer sinkPeer.Close()

Expand All @@ -481,7 +481,7 @@ func TestTransactionPendingReannounce(t *testing.T) {

// Subscribe transaction pools
txCh := make(chan core.NewTxsEvent, 1024)
sub := sink.txpool.SubscribeNewTxsEvent(txCh)
sub := sink.txpool.SubscribeTransactions(txCh, false)
defer sub.Unsubscribe()

txs := make([]*types.Transaction, 64)
Expand Down
14 changes: 13 additions & 1 deletion eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,19 @@ func nodeInfo(chain *core.BlockChain, network uint64) *NodeInfo {
// connection is torn down.
func Handle(backend Backend, peer *Peer) error {
for {
if err := handleMessage(backend, peer); err != nil {
err := handleMessage(backend, peer)
switch {
// TODO: currently no headers not ignored as it may leads to a dead peer not removing as expected
/*
case errors.Is(err, etherror.ErrNoHeadersDelivered):
// ignore no headers delivered
peer.Log().Warn("Message handling failed with no headers")
case errors.Is(err, etherror.ErrHeaderBatchAnchorLow):
// ignore lower header anchor within tolerance
peer.Log().Warn("Message handling failed with lower batch anchor")
*/
case err != nil:
peer.Log().Debug("Message handling failed in `eth`", "err", err)
return err
}
Expand Down
3 changes: 2 additions & 1 deletion miner/payload_building.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,11 @@ func (w *worker) buildPayload(args *BuildPayloadArgs) (*Payload, error) {
}
start := time.Now()
empty := w.getSealingBlock(emptyParams)
log.Debug("Built initial payload", "id", args.Id(), "number", empty.block.NumberU64(), "hash", empty.block.Hash(), "elapsed", common.PrettyDuration(time.Since(start)))
if empty.err != nil {
log.Error("Built initial payload error", "id", args.Id(), "error", empty.err)
return nil, empty.err
}
log.Debug("Built initial payload", "id", args.Id(), "number", empty.block.NumberU64(), "hash", empty.block.Hash(), "elapsed", common.PrettyDuration(time.Since(start)))

// Construct a payload object for return.
payload := newPayload(empty.block, args.Id())
Expand Down

0 comments on commit 72af5a0

Please sign in to comment.