Skip to content

Commit

Permalink
Remove concurrency from fee history and support request cancelation (e…
Browse files Browse the repository at this point in the history
…rigontech#2504)

* remove fee history concurrency

* remove fee history concurrency

* add concext cancel support
  • Loading branch information
AskAlexSharov authored Aug 8, 2021
1 parent 6c2beb2 commit b666c67
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 43 deletions.
71 changes: 31 additions & 40 deletions eth/gasprice/feehistory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync/atomic"

"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/consensus/misc"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/rpc"
Expand All @@ -40,10 +41,6 @@ const (
// maxFeeHistory is the maximum number of blocks that can be retrieved for a
// fee history request.
maxFeeHistory = 1024

// maxBlockFetchers is the max number of goroutines to spin up to pull blocks
// for the fee history calculation (mostly relevant for LES).
maxBlockFetchers = 4
)

// blockFees represents a single block for processing
Expand Down Expand Up @@ -235,50 +232,44 @@ func (oracle *Oracle) FeeHistory(ctx context.Context, blocks int, lastBlock rpc.
oldestBlock := lastBlock + 1 - rpc.BlockNumber(blocks)

var (
next = int64(oldestBlock)
results = make(chan *blockFees, blocks)
next = int64(oldestBlock)
)
for i := 0; i < maxBlockFetchers && i < blocks; i++ {
go func() {
for {
// Retrieve the next block number to fetch with this goroutine
blockNumber := rpc.BlockNumber(atomic.AddInt64(&next, 1) - 1)
if blockNumber > lastBlock {
return
}

fees := &blockFees{blockNumber: blockNumber}
if pendingBlock != nil && blockNumber >= rpc.BlockNumber(pendingBlock.NumberU64()) {
fees.block, fees.receipts = pendingBlock, pendingReceipts
} else {
if len(rewardPercentiles) != 0 {
fees.block, fees.err = oracle.backend.BlockByNumber(ctx, blockNumber)
if fees.block != nil && fees.err == nil {
fees.receipts, fees.err = oracle.backend.GetReceipts(ctx, fees.block.Hash())
}
} else {
fees.header, fees.err = oracle.backend.HeaderByNumber(ctx, blockNumber)
}
}
if fees.block != nil {
fees.header = fees.block.Header()
}
if fees.header != nil {
oracle.processBlock(fees, rewardPercentiles)
}
// send to results even if empty to guarantee that blocks items are sent in total
results <- fees
}
}()
}
var (
reward = make([][]*big.Int, blocks)
baseFee = make([]*big.Int, blocks+1)
gasUsedRatio = make([]float64, blocks)
firstMissing = blocks
)
for ; blocks > 0; blocks-- {
fees := <-results
if err = common.Stopped(ctx.Done()); err != nil {
return 0, nil, nil, nil, err
}
// Retrieve the next block number to fetch with this goroutine
blockNumber := rpc.BlockNumber(atomic.AddInt64(&next, 1) - 1)
if blockNumber > lastBlock {
continue
}

fees := &blockFees{blockNumber: blockNumber}
if pendingBlock != nil && blockNumber >= rpc.BlockNumber(pendingBlock.NumberU64()) {
fees.block, fees.receipts = pendingBlock, pendingReceipts
} else {
if len(rewardPercentiles) != 0 {
fees.block, fees.err = oracle.backend.BlockByNumber(ctx, blockNumber)
if fees.block != nil && fees.err == nil {
fees.receipts, fees.err = oracle.backend.GetReceipts(ctx, fees.block.Hash())
}
} else {
fees.header, fees.err = oracle.backend.HeaderByNumber(ctx, blockNumber)
}
}
if fees.block != nil {
fees.header = fees.block.Header()
}
if fees.header != nil {
oracle.processBlock(fees, rewardPercentiles)
}

if fees.err != nil {
return 0, nil, nil, nil, fees.err
}
Expand Down
4 changes: 3 additions & 1 deletion eth/stagedsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ Loop:
if stopped {
return common.ErrStopped
}
log.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest", bodyProgress)
if bodyProgress > s.BlockNumber+16 {
log.Info(fmt.Sprintf("[%s] Processed", logPrefix), "highest", bodyProgress)
}
return nil
}

Expand Down
2 changes: 0 additions & 2 deletions eth/stagedsync/stage_interhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,6 @@ func incrementIntermediateHashes(logPrefix string, s *StageState, db kv.RwTx, to
if cfg.checkRoot && hash != expectedRootHash {
return hash, nil
}
log.Info(fmt.Sprintf("[%s] Trie root", logPrefix),
" hash", hash.Hex())

if err := accTrieCollector.Load(logPrefix, db, kv.TrieOfAccounts, etl.IdentityLoadFunc, etl.TransformArgs{Quit: quit}); err != nil {
return trie.EmptyRoot, err
Expand Down

0 comments on commit b666c67

Please sign in to comment.