Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge 1.10.26 #5

Merged
merged 13 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions eth/filters/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type Filter struct {
addresses []common.Address
topics [][]common.Hash

block common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks

matcher *bloombits.Matcher
}
Expand Down Expand Up @@ -78,7 +78,7 @@ func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Add
func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
// Create a generic filter and convert it into a block filter
filter := newFilter(sys, addresses, topics)
filter.block = block
filter.block = &block
return filter
}

Expand All @@ -96,8 +96,8 @@ func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.
// first block that contains matches, updating the start of the filter accordingly.
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
// If we're doing singleton block filtering, execute and return
if f.block != (common.Hash{}) {
header, err := f.sys.backend.HeaderByHash(ctx, f.block)
if f.block != nil {
header, err := f.sys.backend.HeaderByHash(ctx, *f.block)
if err != nil {
return nil, err
}
Expand Down
18 changes: 14 additions & 4 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,16 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
if h.checkpointHash != (common.Hash{}) {
// Request the peer's checkpoint header for chain height/weight validation
resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh); err != nil {

req, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh)
if err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
go func() {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()

timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop()

Expand Down Expand Up @@ -437,10 +442,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// If we have any explicit peer required block hashes, request them
for number, hash := range h.requiredBlocks {
resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh); err != nil {

req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh)
if err != nil {
return err
}
go func(number uint64, hash common.Hash) {
go func(number uint64, hash common.Hash, req *eth.Request) {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()

timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop()

Expand Down Expand Up @@ -469,7 +479,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name())
h.removePeer(peer.ID())
}
}(number, hash)
}(number, hash, req)
}
// Handle incoming messages until the connection is torn down
return handler(peer)
Expand Down
107 changes: 100 additions & 7 deletions eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"encoding/json"
"errors"
"fmt"
gomath "math"
"math/big"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -78,6 +80,29 @@ const (
// and waste round trip times. If it's too high, we're capping responses and
// waste bandwidth.
maxTrieRequestCount = maxRequestSize / 512

// trienodeHealRateMeasurementImpact is the impact a single measurement has on
// the local node's trienode processing capacity. A value closer to 0 reacts
// slower to sudden changes, but it is also more stable against temporary hiccups.
trienodeHealRateMeasurementImpact = 0.005

// minTrienodeHealThrottle is the minimum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
minTrienodeHealThrottle = 1

// maxTrienodeHealThrottle is the maximum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
maxTrienodeHealThrottle = maxTrieRequestCount

// trienodeHealThrottleIncrease is the multiplier for the throttle when the
// rate of arriving data is higher than the rate of processing it.
trienodeHealThrottleIncrease = 1.33

// trienodeHealThrottleDecrease is the divisor for the throttle when the
// rate of arriving data is lower than the rate of processing it.
trienodeHealThrottleDecrease = 1.25
)

var (
Expand Down Expand Up @@ -431,6 +456,11 @@ type Syncer struct {
trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running

trienodeHealRate float64 // Average heal rate for processing trie node data
trienodeHealPend uint64 // Number of trie nodes currently pending for processing
trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated

trienodeHealSynced uint64 // Number of state trie nodes downloaded
trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
trienodeHealDups uint64 // Number of state trie nodes already processed
Expand Down Expand Up @@ -476,9 +506,10 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
trienodeHealIdlers: make(map[string]struct{}),
bytecodeHealIdlers: make(map[string]struct{}),

trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
stateWriter: db.NewBatch(),
trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
stateWriter: db.NewBatch(),

extProgress: new(SyncProgress),
}
Expand Down Expand Up @@ -1321,6 +1352,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
if cap > maxTrieRequestCount {
cap = maxTrieRequestCount
}
cap = int(float64(cap) / s.trienodeHealThrottle)
if cap <= 0 {
cap = 1
}
var (
hashes = make([]common.Hash, 0, cap)
paths = make([]string, 0, cap)
Expand Down Expand Up @@ -2090,6 +2125,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// processTrienodeHealResponse integrates an already validated trienode response
// into the healer tasks.
func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
var (
start = time.Now()
fills int
)
for i, hash := range res.hashes {
node := res.nodes[i]

Expand All @@ -2098,6 +2137,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
res.task.trieTasks[res.paths[i]] = res.hashes[i]
continue
}
fills++

// Push the trie node into the state syncer
s.trienodeHealSynced++
s.trienodeHealBytes += common.StorageSize(len(node))
Expand All @@ -2121,6 +2162,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
log.Crit("Failed to persist healing data", "err", err)
}
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))

// Calculate the processing rate of one filled trie node
rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second))

// Update the currently measured trienode queueing and processing throughput.
//
// The processing rate needs to be updated uniformly independent if we've
// processed 1x100 trie nodes or 100x1 to keep the rate consistent even in
// the face of varying network packets. As such, we cannot just measure the
// time it took to process N trie nodes and update once, we need one update
// per trie node.
//
// Naively, that would be:
//
// for i:=0; i<fills; i++ {
// healRate = (1-measurementImpact)*oldRate + measurementImpact*newRate
// }
//
// Essentially, a recursive expansion of HR = (1-MI)*HR + MI*NR.
//
// We can expand that formula for the Nth item as:
// HR(N) = (1-MI)^N*OR + (1-MI)^(N-1)*MI*NR + (1-MI)^(N-2)*MI*NR + ... + (1-MI)^0*MI*NR
//
// The above is a geometric sequence that can be summed to:
// HR(N) = (1-MI)^N*(OR-NR) + NR
s.trienodeHealRate = gomath.Pow(1-trienodeHealRateMeasurementImpact, float64(fills))*(s.trienodeHealRate-rate) + rate

pending := atomic.LoadUint64(&s.trienodeHealPend)
if time.Since(s.trienodeHealThrottled) > time.Second {
// Periodically adjust the trie node throttler
if float64(pending) > 2*s.trienodeHealRate {
s.trienodeHealThrottle *= trienodeHealThrottleIncrease
} else {
s.trienodeHealThrottle /= trienodeHealThrottleDecrease
}
if s.trienodeHealThrottle > maxTrienodeHealThrottle {
s.trienodeHealThrottle = maxTrienodeHealThrottle
} else if s.trienodeHealThrottle < minTrienodeHealThrottle {
s.trienodeHealThrottle = minTrienodeHealThrottle
}
s.trienodeHealThrottled = time.Now()

log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle)
}
}

// processBytecodeHealResponse integrates an already validated bytecode response
Expand Down Expand Up @@ -2655,10 +2740,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error

// Cross reference the requested trienodes with the response to find gaps
// that the serving node is missing
hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash := make([]byte, 32)

nodes := make([][]byte, len(req.hashes))
var (
hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash = make([]byte, 32)
nodes = make([][]byte, len(req.hashes))
fills uint64
)
for i, j := 0, 0; i < len(trienodes); i++ {
// Find the next hash that we've been served, leaving misses with nils
hasher.Reset()
Expand All @@ -2670,16 +2757,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
}
if j < len(req.hashes) {
nodes[j] = trienodes[i]
fills++
j++
continue
}
// We've either ran out of hashes, or got unrequested data
logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)

// Signal this request as failed, and ready for rescheduling
s.scheduleRevertTrienodeHealRequest(req)
return errors.New("unexpected healing trienode")
}
// Response validated, send it to the scheduler for filling
atomic.AddUint64(&s.trienodeHealPend, fills)
defer func() {
atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1))
}()
response := &trienodeHealResponse{
paths: req.paths,
task: req.task,
Expand Down
Loading