From 37a9621b07069e8fab08255366e049b4c9ad5f2c Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 21 Oct 2024 19:07:20 +0300 Subject: [PATCH 1/2] Introduce dedicated BlockHeader type for usage in newHeads subscription endpoint --- api/models.go | 18 ++++++++++ api/stream.go | 55 ++++++++++++++++++++++++++---- bootstrap/bootstrap.go | 1 - tests/web3js/eth_streaming_test.js | 37 +++++++++++++++----- 4 files changed, 96 insertions(+), 15 deletions(-) diff --git a/api/models.go b/api/models.go index 1aaba88a4..98b5ba7d8 100644 --- a/api/models.go +++ b/api/models.go @@ -291,6 +291,24 @@ type Block struct { BaseFeePerGas hexutil.Big `json:"baseFeePerGas"` } +type BlockHeader struct { + Number hexutil.Uint64 `json:"number"` + Hash common.Hash `json:"hash"` + ParentHash common.Hash `json:"parentHash"` + Nonce types.BlockNonce `json:"nonce"` + Sha3Uncles common.Hash `json:"sha3Uncles"` + LogsBloom hexutil.Bytes `json:"logsBloom"` + TransactionsRoot common.Hash `json:"transactionsRoot"` + StateRoot common.Hash `json:"stateRoot"` + ReceiptsRoot common.Hash `json:"receiptsRoot"` + Miner common.Address `json:"miner"` + ExtraData hexutil.Bytes `json:"extraData"` + GasLimit hexutil.Uint64 `json:"gasLimit"` + GasUsed hexutil.Uint64 `json:"gasUsed"` + Timestamp hexutil.Uint64 `json:"timestamp"` + Difficulty hexutil.Uint64 `json:"difficulty"` +} + type SyncStatus struct { StartingBlock hexutil.Uint64 `json:"startingBlock"` CurrentBlock hexutil.Uint64 `json:"currentBlock"` diff --git a/api/stream.go b/api/stream.go index f84e8df17..11e81f816 100644 --- a/api/stream.go +++ b/api/stream.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + evmTypes "github.com/onflow/flow-go/fvm/evm/types" + "github.com/onflow/go-ethereum/common/hexutil" gethTypes "github.com/onflow/go-ethereum/core/types" "github.com/onflow/go-ethereum/eth/filters" "github.com/onflow/go-ethereum/rpc" @@ -11,13 +13,13 @@ import ( "github.com/onflow/flow-evm-gateway/config" "github.com/onflow/flow-evm-gateway/models" + errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-evm-gateway/services/logs" "github.com/onflow/flow-evm-gateway/storage" ) type StreamAPI struct { logger zerolog.Logger - api *BlockChainAPI config *config.Config blocks storage.BlockIndexer transactions storage.TransactionIndexer @@ -30,7 +32,6 @@ type StreamAPI struct { func NewStreamAPI( logger zerolog.Logger, config *config.Config, - api *BlockChainAPI, blocks storage.BlockIndexer, transactions storage.TransactionIndexer, receipts storage.ReceiptIndexer, @@ -41,7 +42,6 @@ func NewStreamAPI( return &StreamAPI{ logger: logger, config: config, - api: api, blocks: blocks, transactions: transactions, receipts: receipts, @@ -59,12 +59,12 @@ func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { s.blocksPublisher, func(notifier *rpc.Notifier, sub *rpc.Subscription) func(block *models.Block) error { return func(block *models.Block) error { - response, err := s.api.prepareBlockResponse(block, false) + blockHeader, err := s.prepareBlockHeader(block) if err != nil { - return fmt.Errorf("failed to get block response: %w", err) + return fmt.Errorf("failed to get block header response: %w", err) } - return notifier.Notify(sub.ID, response) + return notifier.Notify(sub.ID, blockHeader) } }, ) @@ -121,6 +121,49 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) ( ) } +func (s *StreamAPI) prepareBlockHeader( + block *models.Block, +) (*BlockHeader, error) { + h, err := block.Hash() + if err != nil { + s.logger.Error().Err(err).Msg("failed to calculate hash for block by number") + return nil, errs.ErrInternal + } + + blockHeader := &BlockHeader{ + Number: hexutil.Uint64(block.Height), + Hash: h, + ParentHash: block.ParentBlockHash, + Nonce: gethTypes.BlockNonce{0x1}, + Sha3Uncles: gethTypes.EmptyUncleHash, + LogsBloom: gethTypes.LogsBloom([]*gethTypes.Log{}), + TransactionsRoot: block.TransactionHashRoot, + ReceiptsRoot: block.ReceiptRoot, + Miner: evmTypes.CoinbaseAddress.ToCommon(), + GasLimit: hexutil.Uint64(blockGasLimit), + Timestamp: hexutil.Uint64(block.Timestamp), + } + + txHashes := block.TransactionHashes + if len(txHashes) > 0 { + totalGasUsed := hexutil.Uint64(0) + logs := make([]*gethTypes.Log, 0) + for _, txHash := range txHashes { + txReceipt, err := s.receipts.GetByTransactionID(txHash) + if err != nil { + return nil, err + } + totalGasUsed += hexutil.Uint64(txReceipt.GasUsed) + logs = append(logs, txReceipt.Logs...) + } + blockHeader.GasUsed = totalGasUsed + // TODO(m-Peter): Consider if its worthwhile to move this in storage. + blockHeader.LogsBloom = gethTypes.LogsBloom(logs) + } + + return blockHeader, nil +} + func newSubscription[T any]( ctx context.Context, logger zerolog.Logger, diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 1d926700a..dbed6f484 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -292,7 +292,6 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error { streamAPI := api.NewStreamAPI( b.logger, b.config, - blockchainAPI, b.storages.Blocks, b.storages.Transactions, b.storages.Receipts, diff --git a/tests/web3js/eth_streaming_test.js b/tests/web3js/eth_streaming_test.js index fc8e90772..b2c5ee1e6 100644 --- a/tests/web3js/eth_streaming_test.js +++ b/tests/web3js/eth_streaming_test.js @@ -2,6 +2,7 @@ const conf = require('./config') const helpers = require('./helpers') const { assert } = require('chai') const { Web3 } = require('web3') +const web3 = conf.web3 it('streaming of blocks, transactions, logs using filters', async () => { // this is a failsafe if socket is kept open since test node process won't finish otherwise @@ -24,21 +25,21 @@ it('streaming of blocks, transactions, logs using filters', async () => { // wait for subscription for a bit await new Promise((res, rej) => setTimeout(() => res(), 1000)) - // subscribe to new blocks being produced by bellow transaction submission - let blockTxHashes = [] + // subscribe to new blocks being produced by transaction submissions below + let blocksHeaders = [] let subBlocks = await ws.eth.subscribe('newBlockHeaders') subBlocks.on('error', async (err) => { assert.fail(err.message) }) subBlocks.on('data', async (block) => { - blockTxHashes.push(block.transactions[0]) // add received tx hash + blocksHeaders.push(block) // add received tx hash - if (blockTxHashes.length === testValues.length) { + if (blocksHeaders.length === testValues.length) { subBlocks.unsubscribe() } }) - // subscribe to all new transaction events being produced by transaction submission bellow + // subscribe to all new transaction events being produced by transaction submissions below let txHashes = [] let subTx = await ws.eth.subscribe('pendingTransactions') subTx.on('error', async (err) => { @@ -52,7 +53,7 @@ it('streaming of blocks, transactions, logs using filters', async () => { } }) - // subscribe to events being emitted by a deployed contract and bellow transaction interactions + // subscribe to events being emitted by a deployed contract and transaction interactions below let logTxHashes = [] let subLog = await ws.eth.subscribe('logs', { address: contractAddress, @@ -86,8 +87,28 @@ it('streaming of blocks, transactions, logs using filters', async () => { await new Promise((res, rej) => setTimeout(() => res(), 1000)) // check that transaction hashes we received when submitting transactions above - // match array of transaction hashes received from events for blocks and txs - assert.deepEqual(blockTxHashes, sentHashes) + // match array of transaction hashes received from subscriptions assert.deepEqual(txHashes, sentHashes) assert.deepEqual(logTxHashes, sentHashes) + + assert.lengthOf(blocksHeaders, testValues.length) + for (let blockHeader of blocksHeaders) { + let block = await web3.eth.getBlock(blockHeader.number) + + assert.equal(blockHeader.number, block.number) + assert.equal(blockHeader.hash, block.hash) + assert.equal(blockHeader.parentHash, block.parentHash) + assert.equal(blockHeader.nonce, block.nonce) + assert.equal(blockHeader.sha3Uncles, block.sha3Uncles) + assert.equal(blockHeader.logsBloom, block.logsBloom) + assert.equal(blockHeader.transactionsRoot, block.transactionsRoot) + assert.equal(blockHeader.stateRoot, block.stateRoot) + assert.equal(blockHeader.receiptsRoot, block.receiptsRoot) + assert.equal(blockHeader.miner, block.miner) + assert.equal(blockHeader.extraData, block.extraData) + assert.equal(blockHeader.gasLimit, block.gasLimit) + assert.equal(blockHeader.gasUsed, block.gasUsed) + assert.equal(blockHeader.timestamp, block.timestamp) + assert.equal(blockHeader.difficulty, block.difficulty) + } }) From 5e57a617ce0cb7875cd9b4126e09b666c4183573 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Mon, 21 Oct 2024 19:45:48 +0300 Subject: [PATCH 2/2] Add more assertive tests for log subscription --- tests/web3js/eth_streaming_test.js | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/tests/web3js/eth_streaming_test.js b/tests/web3js/eth_streaming_test.js index b2c5ee1e6..2f82a90e0 100644 --- a/tests/web3js/eth_streaming_test.js +++ b/tests/web3js/eth_streaming_test.js @@ -54,7 +54,7 @@ it('streaming of blocks, transactions, logs using filters', async () => { }) // subscribe to events being emitted by a deployed contract and transaction interactions below - let logTxHashes = [] + let logs = [] let subLog = await ws.eth.subscribe('logs', { address: contractAddress, }) @@ -62,9 +62,9 @@ it('streaming of blocks, transactions, logs using filters', async () => { assert.fail(err.message) }) subLog.on('data', async (log) => { - logTxHashes.push(log.transactionHash) + logs.push(log) - if (logTxHashes.length === testValues.length) { + if (logs.length === testValues.length) { subLog.unsubscribe() } }) @@ -89,7 +89,6 @@ it('streaming of blocks, transactions, logs using filters', async () => { // check that transaction hashes we received when submitting transactions above // match array of transaction hashes received from subscriptions assert.deepEqual(txHashes, sentHashes) - assert.deepEqual(logTxHashes, sentHashes) assert.lengthOf(blocksHeaders, testValues.length) for (let blockHeader of blocksHeaders) { @@ -111,4 +110,14 @@ it('streaming of blocks, transactions, logs using filters', async () => { assert.equal(blockHeader.timestamp, block.timestamp) assert.equal(blockHeader.difficulty, block.difficulty) } + + assert.lengthOf(logs, testValues.length) + for (let log of logs) { + let matchingLogs = await web3.eth.getPastLogs({ + address: log.address, + blockHash: log.blockHash + }) + assert.lengthOf(matchingLogs, 1) + assert.deepEqual(log, matchingLogs[0]) + } })