Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce dedicated BlockHeader type for usage in newHeads subscription endpoint #620

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions api/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,24 @@ type Block struct {
BaseFeePerGas hexutil.Big `json:"baseFeePerGas"`
}

type BlockHeader struct {
Number hexutil.Uint64 `json:"number"`
Hash common.Hash `json:"hash"`
ParentHash common.Hash `json:"parentHash"`
Nonce types.BlockNonce `json:"nonce"`
Sha3Uncles common.Hash `json:"sha3Uncles"`
LogsBloom hexutil.Bytes `json:"logsBloom"`
TransactionsRoot common.Hash `json:"transactionsRoot"`
StateRoot common.Hash `json:"stateRoot"`
ReceiptsRoot common.Hash `json:"receiptsRoot"`
Miner common.Address `json:"miner"`
ExtraData hexutil.Bytes `json:"extraData"`
GasLimit hexutil.Uint64 `json:"gasLimit"`
GasUsed hexutil.Uint64 `json:"gasUsed"`
Timestamp hexutil.Uint64 `json:"timestamp"`
Difficulty hexutil.Uint64 `json:"difficulty"`
}

type SyncStatus struct {
StartingBlock hexutil.Uint64 `json:"startingBlock"`
CurrentBlock hexutil.Uint64 `json:"currentBlock"`
Expand Down
55 changes: 49 additions & 6 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@ import (
"context"
"fmt"

evmTypes "github.com/onflow/flow-go/fvm/evm/types"
"github.com/onflow/go-ethereum/common/hexutil"
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/onflow/go-ethereum/eth/filters"
"github.com/onflow/go-ethereum/rpc"
"github.com/rs/zerolog"

"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/models"
errs "github.com/onflow/flow-evm-gateway/models/errors"
"github.com/onflow/flow-evm-gateway/services/logs"
"github.com/onflow/flow-evm-gateway/storage"
)

type StreamAPI struct {
logger zerolog.Logger
api *BlockChainAPI
config *config.Config
blocks storage.BlockIndexer
transactions storage.TransactionIndexer
Expand All @@ -30,7 +32,6 @@ type StreamAPI struct {
func NewStreamAPI(
logger zerolog.Logger,
config *config.Config,
api *BlockChainAPI,
blocks storage.BlockIndexer,
transactions storage.TransactionIndexer,
receipts storage.ReceiptIndexer,
Expand All @@ -41,7 +42,6 @@ func NewStreamAPI(
return &StreamAPI{
logger: logger,
config: config,
api: api,
blocks: blocks,
transactions: transactions,
receipts: receipts,
Expand All @@ -59,12 +59,12 @@ func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
s.blocksPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(block *models.Block) error {
return func(block *models.Block) error {
response, err := s.api.prepareBlockResponse(block, false)
blockHeader, err := s.prepareBlockHeader(block)
if err != nil {
return fmt.Errorf("failed to get block response: %w", err)
return fmt.Errorf("failed to get block header response: %w", err)
}

return notifier.Notify(sub.ID, response)
return notifier.Notify(sub.ID, blockHeader)
m-Peter marked this conversation as resolved.
Show resolved Hide resolved
}
},
)
Expand Down Expand Up @@ -121,6 +121,49 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
)
}

func (s *StreamAPI) prepareBlockHeader(
block *models.Block,
) (*BlockHeader, error) {
h, err := block.Hash()
if err != nil {
s.logger.Error().Err(err).Msg("failed to calculate hash for block by number")
return nil, errs.ErrInternal
m-Peter marked this conversation as resolved.
Show resolved Hide resolved
}

blockHeader := &BlockHeader{
Number: hexutil.Uint64(block.Height),
Hash: h,
ParentHash: block.ParentBlockHash,
Nonce: gethTypes.BlockNonce{0x1},
Sha3Uncles: gethTypes.EmptyUncleHash,
LogsBloom: gethTypes.LogsBloom([]*gethTypes.Log{}),
TransactionsRoot: block.TransactionHashRoot,
ReceiptsRoot: block.ReceiptRoot,
Miner: evmTypes.CoinbaseAddress.ToCommon(),
GasLimit: hexutil.Uint64(blockGasLimit),
Timestamp: hexutil.Uint64(block.Timestamp),
}

txHashes := block.TransactionHashes
if len(txHashes) > 0 {
totalGasUsed := hexutil.Uint64(0)
logs := make([]*gethTypes.Log, 0)
for _, txHash := range txHashes {
txReceipt, err := s.receipts.GetByTransactionID(txHash)
if err != nil {
return nil, err
}
totalGasUsed += hexutil.Uint64(txReceipt.GasUsed)
m-Peter marked this conversation as resolved.
Show resolved Hide resolved
logs = append(logs, txReceipt.Logs...)
m-Peter marked this conversation as resolved.
Show resolved Hide resolved
}
blockHeader.GasUsed = totalGasUsed
// TODO(m-Peter): Consider if its worthwhile to move this in storage.
blockHeader.LogsBloom = gethTypes.LogsBloom(logs)
}

return blockHeader, nil
}
m-Peter marked this conversation as resolved.
Show resolved Hide resolved

func newSubscription[T any](
ctx context.Context,
logger zerolog.Logger,
Expand Down
1 change: 0 additions & 1 deletion bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
streamAPI := api.NewStreamAPI(
b.logger,
b.config,
blockchainAPI,
b.storages.Blocks,
b.storages.Transactions,
b.storages.Receipts,
Expand Down
54 changes: 42 additions & 12 deletions tests/web3js/eth_streaming_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const conf = require('./config')
const helpers = require('./helpers')
const { assert } = require('chai')
const { Web3 } = require('web3')
const web3 = conf.web3

it('streaming of blocks, transactions, logs using filters', async () => {
// this is a failsafe if socket is kept open since test node process won't finish otherwise
Expand All @@ -24,21 +25,21 @@ it('streaming of blocks, transactions, logs using filters', async () => {
// wait for subscription for a bit
await new Promise((res, rej) => setTimeout(() => res(), 1000))

// subscribe to new blocks being produced by bellow transaction submission
let blockTxHashes = []
// subscribe to new blocks being produced by transaction submissions below
let blocksHeaders = []
let subBlocks = await ws.eth.subscribe('newBlockHeaders')
subBlocks.on('error', async (err) => {
assert.fail(err.message)
})
subBlocks.on('data', async (block) => {
blockTxHashes.push(block.transactions[0]) // add received tx hash
blocksHeaders.push(block) // add received tx hash

if (blockTxHashes.length === testValues.length) {
if (blocksHeaders.length === testValues.length) {
subBlocks.unsubscribe()
}
})

// subscribe to all new transaction events being produced by transaction submission bellow
// subscribe to all new transaction events being produced by transaction submissions below
let txHashes = []
let subTx = await ws.eth.subscribe('pendingTransactions')
subTx.on('error', async (err) => {
Expand All @@ -52,18 +53,18 @@ it('streaming of blocks, transactions, logs using filters', async () => {
}
})

// subscribe to events being emitted by a deployed contract and bellow transaction interactions
let logTxHashes = []
// subscribe to events being emitted by a deployed contract and transaction interactions below
let logs = []
let subLog = await ws.eth.subscribe('logs', {
address: contractAddress,
})
subLog.on('error', async err => {
assert.fail(err.message)
})
subLog.on('data', async (log) => {
logTxHashes.push(log.transactionHash)
logs.push(log)

if (logTxHashes.length === testValues.length) {
if (logs.length === testValues.length) {
subLog.unsubscribe()
}
})
Expand All @@ -86,8 +87,37 @@ it('streaming of blocks, transactions, logs using filters', async () => {
await new Promise((res, rej) => setTimeout(() => res(), 1000))

// check that transaction hashes we received when submitting transactions above
// match array of transaction hashes received from events for blocks and txs
assert.deepEqual(blockTxHashes, sentHashes)
// match array of transaction hashes received from subscriptions
assert.deepEqual(txHashes, sentHashes)
assert.deepEqual(logTxHashes, sentHashes)

assert.lengthOf(blocksHeaders, testValues.length)
for (let blockHeader of blocksHeaders) {
let block = await web3.eth.getBlock(blockHeader.number)

assert.equal(blockHeader.number, block.number)
assert.equal(blockHeader.hash, block.hash)
assert.equal(blockHeader.parentHash, block.parentHash)
assert.equal(blockHeader.nonce, block.nonce)
assert.equal(blockHeader.sha3Uncles, block.sha3Uncles)
assert.equal(blockHeader.logsBloom, block.logsBloom)
assert.equal(blockHeader.transactionsRoot, block.transactionsRoot)
assert.equal(blockHeader.stateRoot, block.stateRoot)
assert.equal(blockHeader.receiptsRoot, block.receiptsRoot)
assert.equal(blockHeader.miner, block.miner)
assert.equal(blockHeader.extraData, block.extraData)
assert.equal(blockHeader.gasLimit, block.gasLimit)
assert.equal(blockHeader.gasUsed, block.gasUsed)
assert.equal(blockHeader.timestamp, block.timestamp)
assert.equal(blockHeader.difficulty, block.difficulty)
}

assert.lengthOf(logs, testValues.length)
for (let log of logs) {
let matchingLogs = await web3.eth.getPastLogs({
address: log.address,
blockHash: log.blockHash
})
assert.lengthOf(matchingLogs, 1)
assert.deepEqual(log, matchingLogs[0])
}
})
Loading