diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 0a70c9ece1db..8d80647fc241 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -34,8 +34,8 @@ type Filter struct { addresses []common.Address topics [][]common.Hash - block common.Hash // Block hash if filtering a single block - begin, end int64 // Range interval if filtering multiple blocks + block *common.Hash // Block hash if filtering a single block + begin, end int64 // Range interval if filtering multiple blocks matcher *bloombits.Matcher } @@ -78,7 +78,7 @@ func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Add func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { // Create a generic filter and convert it into a block filter filter := newFilter(sys, addresses, topics) - filter.block = block + filter.block = &block return filter } @@ -96,8 +96,8 @@ func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common. // first block that contains matches, updating the start of the filter accordingly. func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { // If we're doing singleton block filtering, execute and return - if f.block != (common.Hash{}) { - header, err := f.sys.backend.HeaderByHash(ctx, f.block) + if f.block != nil { + header, err := f.sys.backend.HeaderByHash(ctx, *f.block) if err != nil { return nil, err } diff --git a/eth/handler.go b/eth/handler.go index 4224a9f33a84..143147b0c815 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -391,11 +391,16 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { if h.checkpointHash != (common.Hash{}) { // Request the peer's checkpoint header for chain height/weight validation resCh := make(chan *eth.Response) - if _, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh); err != nil { + + req, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh) + if err != nil { return err } // Start a timer to disconnect if the peer doesn't reply in time go func() { + // Ensure the request gets cancelled in case of error/drop + defer req.Close() + timeout := time.NewTimer(syncChallengeTimeout) defer timeout.Stop() @@ -437,10 +442,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // If we have any explicit peer required block hashes, request them for number, hash := range h.requiredBlocks { resCh := make(chan *eth.Response) - if _, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh); err != nil { + + req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh) + if err != nil { return err } - go func(number uint64, hash common.Hash) { + go func(number uint64, hash common.Hash, req *eth.Request) { + // Ensure the request gets cancelled in case of error/drop + defer req.Close() + timeout := time.NewTimer(syncChallengeTimeout) defer timeout.Stop() @@ -469,7 +479,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name()) h.removePeer(peer.ID()) } - }(number, hash) + }(number, hash, req) } // Handle incoming messages until the connection is torn down return handler(peer) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index 1455bacbcb88..eb8260bf7c97 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -21,10 +21,12 @@ import ( "encoding/json" "errors" "fmt" + gomath "math" "math/big" "math/rand" "sort" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -78,6 +80,29 @@ const ( // and waste round trip times. If it's too high, we're capping responses and // waste bandwidth. maxTrieRequestCount = maxRequestSize / 512 + + // trienodeHealRateMeasurementImpact is the impact a single measurement has on + // the local node's trienode processing capacity. A value closer to 0 reacts + // slower to sudden changes, but it is also more stable against temporary hiccups. + trienodeHealRateMeasurementImpact = 0.005 + + // minTrienodeHealThrottle is the minimum divisor for throttling trie node + // heal requests to avoid overloading the local node and exessively expanding + // the state trie bedth wise. + minTrienodeHealThrottle = 1 + + // maxTrienodeHealThrottle is the maximum divisor for throttling trie node + // heal requests to avoid overloading the local node and exessively expanding + // the state trie bedth wise. + maxTrienodeHealThrottle = maxTrieRequestCount + + // trienodeHealThrottleIncrease is the multiplier for the throttle when the + // rate of arriving data is higher than the rate of processing it. + trienodeHealThrottleIncrease = 1.33 + + // trienodeHealThrottleDecrease is the divisor for the throttle when the + // rate of arriving data is lower than the rate of processing it. + trienodeHealThrottleDecrease = 1.25 ) var ( @@ -431,6 +456,11 @@ type Syncer struct { trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running + trienodeHealRate float64 // Average heal rate for processing trie node data + trienodeHealPend uint64 // Number of trie nodes currently pending for processing + trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested + trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated + trienodeHealSynced uint64 // Number of state trie nodes downloaded trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk trienodeHealDups uint64 // Number of state trie nodes already processed @@ -476,9 +506,10 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer { trienodeHealIdlers: make(map[string]struct{}), bytecodeHealIdlers: make(map[string]struct{}), - trienodeHealReqs: make(map[uint64]*trienodeHealRequest), - bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest), - stateWriter: db.NewBatch(), + trienodeHealReqs: make(map[uint64]*trienodeHealRequest), + bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest), + trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk + stateWriter: db.NewBatch(), extProgress: new(SyncProgress), } @@ -1321,6 +1352,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai if cap > maxTrieRequestCount { cap = maxTrieRequestCount } + cap = int(float64(cap) / s.trienodeHealThrottle) + if cap <= 0 { + cap = 1 + } var ( hashes = make([]common.Hash, 0, cap) paths = make([]string, 0, cap) @@ -2090,6 +2125,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // processTrienodeHealResponse integrates an already validated trienode response // into the healer tasks. func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { + var ( + start = time.Now() + fills int + ) for i, hash := range res.hashes { node := res.nodes[i] @@ -2098,6 +2137,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { res.task.trieTasks[res.paths[i]] = res.hashes[i] continue } + fills++ + // Push the trie node into the state syncer s.trienodeHealSynced++ s.trienodeHealBytes += common.StorageSize(len(node)) @@ -2121,6 +2162,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { log.Crit("Failed to persist healing data", "err", err) } log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) + + // Calculate the processing rate of one filled trie node + rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second)) + + // Update the currently measured trienode queueing and processing throughput. + // + // The processing rate needs to be updated uniformly independent if we've + // processed 1x100 trie nodes or 100x1 to keep the rate consistent even in + // the face of varying network packets. As such, we cannot just measure the + // time it took to process N trie nodes and update once, we need one update + // per trie node. + // + // Naively, that would be: + // + // for i:=0; i time.Second { + // Periodically adjust the trie node throttler + if float64(pending) > 2*s.trienodeHealRate { + s.trienodeHealThrottle *= trienodeHealThrottleIncrease + } else { + s.trienodeHealThrottle /= trienodeHealThrottleDecrease + } + if s.trienodeHealThrottle > maxTrienodeHealThrottle { + s.trienodeHealThrottle = maxTrienodeHealThrottle + } else if s.trienodeHealThrottle < minTrienodeHealThrottle { + s.trienodeHealThrottle = minTrienodeHealThrottle + } + s.trienodeHealThrottled = time.Now() + + log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle) + } } // processBytecodeHealResponse integrates an already validated bytecode response @@ -2655,10 +2740,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error // Cross reference the requested trienodes with the response to find gaps // that the serving node is missing - hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState) - hash := make([]byte, 32) - - nodes := make([][]byte, len(req.hashes)) + var ( + hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState) + hash = make([]byte, 32) + nodes = make([][]byte, len(req.hashes)) + fills uint64 + ) for i, j := 0, 0; i < len(trienodes); i++ { // Find the next hash that we've been served, leaving misses with nils hasher.Reset() @@ -2670,16 +2757,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error } if j < len(req.hashes) { nodes[j] = trienodes[i] + fills++ j++ continue } // We've either ran out of hashes, or got unrequested data logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i) + // Signal this request as failed, and ready for rescheduling s.scheduleRevertTrienodeHealRequest(req) return errors.New("unexpected healing trienode") } // Response validated, send it to the scheduler for filling + atomic.AddUint64(&s.trienodeHealPend, fills) + defer func() { + atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1)) + }() response := &trienodeHealResponse{ paths: req.paths, task: req.task, diff --git a/graphql/graphql_test.go b/graphql/graphql_test.go index d55f4e063486..491c73152113 100644 --- a/graphql/graphql_test.go +++ b/graphql/graphql_test.go @@ -17,6 +17,8 @@ package graphql import ( + "context" + "encoding/json" "fmt" "io" "math/big" @@ -51,15 +53,21 @@ func TestBuildSchema(t *testing.T) { } defer stack.Close() // Make sure the schema can be parsed and matched up to the object model. - if err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil { + if _, err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil { t.Errorf("Could not construct GraphQL handler: %v", err) } } // Tests that a graphQL request is successfully handled when graphql is enabled on the specified endpoint func TestGraphQLBlockSerialization(t *testing.T) { - stack := createNode(t, true, false) + stack := createNode(t) defer stack.Close() + genesis := &core.Genesis{ + Config: params.AllEthashProtocolChanges, + GasLimit: 11500000, + Difficulty: big.NewInt(1048576), + } + newGQLService(t, stack, genesis, 10, func(i int, gen *core.BlockGen) {}) // start node if err := stack.Start(); err != nil { t.Fatalf("could not start node: %v", err) @@ -161,8 +169,55 @@ func TestGraphQLBlockSerialization(t *testing.T) { } func TestGraphQLBlockSerializationEIP2718(t *testing.T) { - stack := createNode(t, true, true) + // Account for signing txes + var ( + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000000000) + dad = common.HexToAddress("0x0000000000000000000000000000000000000dad") + ) + stack := createNode(t) defer stack.Close() + genesis := &core.Genesis{ + Config: params.AllEthashProtocolChanges, + GasLimit: 11500000, + Difficulty: big.NewInt(1048576), + Alloc: core.GenesisAlloc{ + address: {Balance: funds}, + // The address 0xdad sloads 0x00 and 0x01 + dad: { + Code: []byte{byte(vm.PC), byte(vm.PC), byte(vm.SLOAD), byte(vm.SLOAD)}, + Nonce: 0, + Balance: big.NewInt(0), + }, + }, + BaseFee: big.NewInt(params.InitialBaseFee), + } + signer := types.LatestSigner(genesis.Config) + newGQLService(t, stack, genesis, 1, func(i int, gen *core.BlockGen) { + gen.SetCoinbase(common.Address{1}) + tx, _ := types.SignNewTx(key, signer, &types.LegacyTx{ + Nonce: uint64(0), + To: &dad, + Value: big.NewInt(100), + Gas: 50000, + GasPrice: big.NewInt(params.InitialBaseFee), + }) + gen.AddTx(tx) + tx, _ = types.SignNewTx(key, signer, &types.AccessListTx{ + ChainID: genesis.Config.ChainID, + Nonce: uint64(1), + To: &dad, + Gas: 30000, + GasPrice: big.NewInt(params.InitialBaseFee), + Value: big.NewInt(50), + AccessList: types.AccessList{{ + Address: dad, + StorageKeys: []common.Hash{{0}}, + }}, + }) + gen.AddTx(tx) + }) // start node if err := stack.Start(); err != nil { t.Fatalf("could not start node: %v", err) @@ -198,7 +253,7 @@ func TestGraphQLBlockSerializationEIP2718(t *testing.T) { // Tests that a graphQL request is not handled successfully when graphql is not enabled on the specified endpoint func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) { - stack := createNode(t, false, false) + stack := createNode(t) defer stack.Close() if err := stack.Start(); err != nil { t.Fatalf("could not start node: %v", err) @@ -212,7 +267,59 @@ func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) { assert.Equal(t, http.StatusNotFound, resp.StatusCode) } -func createNode(t *testing.T, gqlEnabled bool, txEnabled bool) *node.Node { +func TestGraphQLTransactionLogs(t *testing.T) { + var ( + key, _ = crypto.GenerateKey() + addr = crypto.PubkeyToAddress(key.PublicKey) + dadStr = "0x0000000000000000000000000000000000000dad" + dad = common.HexToAddress(dadStr) + genesis = &core.Genesis{ + Config: params.AllEthashProtocolChanges, + GasLimit: 11500000, + Difficulty: big.NewInt(1048576), + Alloc: core.GenesisAlloc{ + addr: {Balance: big.NewInt(params.Ether)}, + dad: { + // LOG0(0, 0), LOG0(0, 0), RETURN(0, 0) + Code: common.Hex2Bytes("60006000a060006000a060006000f3"), + Nonce: 0, + Balance: big.NewInt(0), + }, + }, + } + signer = types.LatestSigner(genesis.Config) + stack = createNode(t) + ) + defer stack.Close() + + handler := newGQLService(t, stack, genesis, 1, func(i int, gen *core.BlockGen) { + tx, _ := types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)}) + gen.AddTx(tx) + tx, _ = types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Nonce: 1, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)}) + gen.AddTx(tx) + tx, _ = types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Nonce: 2, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)}) + gen.AddTx(tx) + }) + // start node + if err := stack.Start(); err != nil { + t.Fatalf("could not start node: %v", err) + } + query := `{block { transactions { logs { account { address } } } } }` + res := handler.Schema.Exec(context.Background(), query, "", map[string]interface{}{}) + if res.Errors != nil { + t.Fatalf("graphql query failed: %v", res.Errors) + } + have, err := json.Marshal(res.Data) + if err != nil { + t.Fatalf("failed to encode graphql response: %s", err) + } + want := fmt.Sprintf(`{"block":{"transactions":[{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]},{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]},{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]}]}}`, dadStr, dadStr, dadStr, dadStr, dadStr, dadStr) + if string(have) != want { + t.Errorf("response unmatch. expected %s, got %s", want, have) + } +} + +func createNode(t *testing.T) *node.Node { stack, err := node.New(&node.Config{ HTTPHost: "127.0.0.1", HTTPPort: 0, @@ -222,83 +329,12 @@ func createNode(t *testing.T, gqlEnabled bool, txEnabled bool) *node.Node { if err != nil { t.Fatalf("could not create node: %v", err) } - if !gqlEnabled { - return stack - } - if !txEnabled { - createGQLService(t, stack) - } else { - createGQLServiceWithTransactions(t, stack) - } return stack } -func createGQLService(t *testing.T, stack *node.Node) { - // create backend - ethConf := ðconfig.Config{ - Genesis: &core.Genesis{ - Config: params.AllEthashProtocolChanges, - GasLimit: 11500000, - Difficulty: big.NewInt(1048576), - }, - Ethash: ethash.Config{ - PowMode: ethash.ModeFake, - }, - NetworkId: 1337, - TrieCleanCache: 5, - TrieCleanCacheJournal: "triecache", - TrieCleanCacheRejournal: 60 * time.Minute, - TrieDirtyCache: 5, - TrieTimeout: 60 * time.Minute, - SnapshotCache: 5, - } - ethBackend, err := eth.New(stack, ethConf) - if err != nil { - t.Fatalf("could not create eth backend: %v", err) - } - // Create some blocks and import them - chain, _ := core.GenerateChain(params.AllEthashProtocolChanges, ethBackend.BlockChain().Genesis(), - ethash.NewFaker(), ethBackend.ChainDb(), 10, func(i int, gen *core.BlockGen) {}) - _, err = ethBackend.BlockChain().InsertChain(chain) - if err != nil { - t.Fatalf("could not create import blocks: %v", err) - } - // create gql service - filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{}) - err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) - if err != nil { - t.Fatalf("could not create graphql service: %v", err) - } -} - -func createGQLServiceWithTransactions(t *testing.T, stack *node.Node) { - // create backend - key, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - address := crypto.PubkeyToAddress(key.PublicKey) - funds := big.NewInt(1000000000000000) - dad := common.HexToAddress("0x0000000000000000000000000000000000000dad") - +func newGQLService(t *testing.T, stack *node.Node, gspec *core.Genesis, genBlocks int, genfunc func(i int, gen *core.BlockGen)) *handler { ethConf := ðconfig.Config{ - Genesis: &core.Genesis{ - Config: params.AllEthashProtocolChanges, - GasLimit: 11500000, - Difficulty: big.NewInt(1048576), - Alloc: core.GenesisAlloc{ - address: {Balance: funds}, - // The address 0xdad sloads 0x00 and 0x01 - dad: { - Code: []byte{ - byte(vm.PC), - byte(vm.PC), - byte(vm.SLOAD), - byte(vm.SLOAD), - }, - Nonce: 0, - Balance: big.NewInt(0), - }, - }, - BaseFee: big.NewInt(params.InitialBaseFee), - }, + Genesis: gspec, Ethash: ethash.Config{ PowMode: ethash.ModeFake, }, @@ -310,49 +346,22 @@ func createGQLServiceWithTransactions(t *testing.T, stack *node.Node) { TrieTimeout: 60 * time.Minute, SnapshotCache: 5, } - ethBackend, err := eth.New(stack, ethConf) if err != nil { t.Fatalf("could not create eth backend: %v", err) } - signer := types.LatestSigner(ethConf.Genesis.Config) - - legacyTx, _ := types.SignNewTx(key, signer, &types.LegacyTx{ - Nonce: uint64(0), - To: &dad, - Value: big.NewInt(100), - Gas: 50000, - GasPrice: big.NewInt(params.InitialBaseFee), - }) - envelopTx, _ := types.SignNewTx(key, signer, &types.AccessListTx{ - ChainID: ethConf.Genesis.Config.ChainID, - Nonce: uint64(1), - To: &dad, - Gas: 30000, - GasPrice: big.NewInt(params.InitialBaseFee), - Value: big.NewInt(50), - AccessList: types.AccessList{{ - Address: dad, - StorageKeys: []common.Hash{{0}}, - }}, - }) - // Create some blocks and import them chain, _ := core.GenerateChain(params.AllEthashProtocolChanges, ethBackend.BlockChain().Genesis(), - ethash.NewFaker(), ethBackend.ChainDb(), 1, func(i int, b *core.BlockGen) { - b.SetCoinbase(common.Address{1}) - b.AddTx(legacyTx) - b.AddTx(envelopTx) - }) - + ethash.NewFaker(), ethBackend.ChainDb(), genBlocks, genfunc) _, err = ethBackend.BlockChain().InsertChain(chain) if err != nil { t.Fatalf("could not create import blocks: %v", err) } - // create gql service + // Set up handler filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{}) - err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) + handler, err := newHandler(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) if err != nil { t.Fatalf("could not create graphql service: %v", err) } + return handler } diff --git a/graphql/service.go b/graphql/service.go index 019026bc7ea7..6f6e58335991 100644 --- a/graphql/service.go +++ b/graphql/service.go @@ -57,17 +57,18 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // New constructs a new GraphQL service instance. func New(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error { - return newHandler(stack, backend, filterSystem, cors, vhosts) + _, err := newHandler(stack, backend, filterSystem, cors, vhosts) + return err } // newHandler returns a new `http.Handler` that will answer GraphQL queries. // It additionally exports an interactive query browser on the / endpoint. -func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error { +func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) (*handler, error) { q := Resolver{backend, filterSystem} s, err := graphql.ParseSchema(schema, &q) if err != nil { - return err + return nil, err } h := handler{Schema: s} handler := node.NewHTTPHandlerStack(h, cors, vhosts, nil) @@ -76,5 +77,5 @@ func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters. stack.RegisterHandler("GraphQL", "/graphql", handler) stack.RegisterHandler("GraphQL", "/graphql/", handler) - return nil + return &h, nil } diff --git a/params/config.go b/params/config.go index d535d230493c..af08b87f1066 100644 --- a/params/config.go +++ b/params/config.go @@ -59,25 +59,26 @@ var ( // MainnetChainConfig is the chain parameters to run a node on the main network. MainnetChainConfig = &ChainConfig{ - ChainID: big.NewInt(1), - HomesteadBlock: big.NewInt(1_150_000), - DAOForkBlock: big.NewInt(1_920_000), - DAOForkSupport: true, - EIP150Block: big.NewInt(2_463_000), - EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"), - EIP155Block: big.NewInt(2_675_000), - EIP158Block: big.NewInt(2_675_000), - ByzantiumBlock: big.NewInt(4_370_000), - ConstantinopleBlock: big.NewInt(7_280_000), - PetersburgBlock: big.NewInt(7_280_000), - IstanbulBlock: big.NewInt(9_069_000), - MuirGlacierBlock: big.NewInt(9_200_000), - BerlinBlock: big.NewInt(12_244_000), - LondonBlock: big.NewInt(12_965_000), - ArrowGlacierBlock: big.NewInt(13_773_000), - GrayGlacierBlock: big.NewInt(15_050_000), - TerminalTotalDifficulty: MainnetTerminalTotalDifficulty, // 58_750_000_000_000_000_000_000 - Ethash: new(EthashConfig), + ChainID: big.NewInt(1), + HomesteadBlock: big.NewInt(1_150_000), + DAOForkBlock: big.NewInt(1_920_000), + DAOForkSupport: true, + EIP150Block: big.NewInt(2_463_000), + EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"), + EIP155Block: big.NewInt(2_675_000), + EIP158Block: big.NewInt(2_675_000), + ByzantiumBlock: big.NewInt(4_370_000), + ConstantinopleBlock: big.NewInt(7_280_000), + PetersburgBlock: big.NewInt(7_280_000), + IstanbulBlock: big.NewInt(9_069_000), + MuirGlacierBlock: big.NewInt(9_200_000), + BerlinBlock: big.NewInt(12_244_000), + LondonBlock: big.NewInt(12_965_000), + ArrowGlacierBlock: big.NewInt(13_773_000), + GrayGlacierBlock: big.NewInt(15_050_000), + TerminalTotalDifficulty: MainnetTerminalTotalDifficulty, // 58_750_000_000_000_000_000_000 + TerminalTotalDifficultyPassed: true, + Ethash: new(EthashConfig), } // MainnetTrustedCheckpoint contains the light client trusted checkpoint for the main network. diff --git a/params/version.go b/params/version.go index 317241539325..d9a3958512c3 100644 --- a/params/version.go +++ b/params/version.go @@ -21,10 +21,10 @@ import ( ) const ( - VersionMajor = 1 // Major version component of the current release - VersionMinor = 11 // Minor version component of the current release - VersionPatch = 0 // Patch version component of the current release - VersionMeta = "unstable" // Version metadata to append to the version string + VersionMajor = 1 // Major version component of the current release + VersionMinor = 10 // Minor version component of the current release + VersionPatch = 26 // Patch version component of the current release + VersionMeta = "stable" // Version metadata to append to the version string ) // Version holds the textual version string. diff --git a/rpc/client.go b/rpc/client.go index d3ce0297754c..fcd31319a3c5 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -31,6 +31,7 @@ import ( ) var ( + ErrBadResult = errors.New("bad result in JSON-RPC response") ErrClientQuit = errors.New("client is closed") ErrNoResult = errors.New("no result in JSON-RPC response") ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") diff --git a/rpc/client_test.go b/rpc/client_test.go index 04c847d0d626..5ae549d86533 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -19,6 +19,7 @@ package rpc import ( "context" "encoding/json" + "errors" "fmt" "math/rand" "net" @@ -144,6 +145,53 @@ func TestClientBatchRequest(t *testing.T) { } } +func TestClientBatchRequest_len(t *testing.T) { + b, err := json.Marshal([]jsonrpcMessage{ + {Version: "2.0", ID: json.RawMessage("1"), Method: "foo", Result: json.RawMessage(`"0x1"`)}, + {Version: "2.0", ID: json.RawMessage("2"), Method: "bar", Result: json.RawMessage(`"0x2"`)}, + }) + if err != nil { + t.Fatal("failed to encode jsonrpc message:", err) + } + s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + _, err := rw.Write(b) + if err != nil { + t.Error("failed to write response:", err) + } + })) + t.Cleanup(s.Close) + + client, err := Dial(s.URL) + if err != nil { + t.Fatal("failed to dial test server:", err) + } + defer client.Close() + + t.Run("too-few", func(t *testing.T) { + batch := []BatchElem{ + {Method: "foo"}, + {Method: "bar"}, + {Method: "baz"}, + } + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) + defer cancelFn() + if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { + t.Errorf("expected %q but got: %v", ErrBadResult, err) + } + }) + + t.Run("too-many", func(t *testing.T) { + batch := []BatchElem{ + {Method: "foo"}, + } + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) + defer cancelFn() + if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { + t.Errorf("expected %q but got: %v", ErrBadResult, err) + } + }) +} + func TestClientNotify(t *testing.T) { server := newTestServer() defer server.Stop() diff --git a/rpc/http.go b/rpc/http.go index 858d80858652..b82ea2707a54 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -173,6 +173,9 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { return err } + if len(respmsgs) != len(msgs) { + return fmt.Errorf("batch has %d requests but response has %d: %w", len(msgs), len(respmsgs), ErrBadResult) + } for i := 0; i < len(respmsgs); i++ { op.resp <- &respmsgs[i] } diff --git a/trie/sync.go b/trie/sync.go index 303fcbfa22e2..862ce7e16e6c 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -19,6 +19,7 @@ package trie import ( "errors" "fmt" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/prque" @@ -381,11 +382,11 @@ func (s *Sync) scheduleCodeRequest(req *codeRequest) { // retrieval scheduling. func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { // Gather all the children of the node, irrelevant whether known or not - type child struct { + type childNode struct { path []byte node node } - var children []child + var children []childNode switch node := (object).(type) { case *shortNode: @@ -393,14 +394,14 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { if hasTerm(key) { key = key[:len(key)-1] } - children = []child{{ + children = []childNode{{ node: node.Val, path: append(append([]byte(nil), req.path...), key...), }} case *fullNode: for i := 0; i < 17; i++ { if node.Children[i] != nil { - children = append(children, child{ + children = append(children, childNode{ node: node.Children[i], path: append(append([]byte(nil), req.path...), byte(i)), }) @@ -410,7 +411,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { panic(fmt.Sprintf("unknown node: %+v", node)) } // Iterate over the children, and request all unknown ones - requests := make([]*nodeRequest, 0, len(children)) + var ( + missing = make(chan *nodeRequest, len(children)) + pending sync.WaitGroup + ) for _, child := range children { // Notify any external watcher of a new key/value node if req.callback != nil { @@ -433,19 +437,36 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { if s.membatch.hasNode(child.path) { continue } - // If database says duplicate, then at least the trie node is present - // and we hold the assumption that it's NOT legacy contract code. - chash := common.BytesToHash(node) - if rawdb.HasTrieNode(s.database, chash) { - continue - } - // Locally unknown node, schedule for retrieval - requests = append(requests, &nodeRequest{ - path: child.path, - hash: chash, - parent: req, - callback: req.callback, - }) + // Check the presence of children concurrently + pending.Add(1) + go func(child childNode) { + defer pending.Done() + + // If database says duplicate, then at least the trie node is present + // and we hold the assumption that it's NOT legacy contract code. + chash := common.BytesToHash(node) + if rawdb.HasTrieNode(s.database, chash) { + return + } + // Locally unknown node, schedule for retrieval + missing <- &nodeRequest{ + path: child.path, + hash: chash, + parent: req, + callback: req.callback, + } + }(child) + } + } + pending.Wait() + + requests := make([]*nodeRequest, 0, len(children)) + for done := false; !done; { + select { + case miss := <-missing: + requests = append(requests, miss) + default: + done = true } } return requests, nil