Skip to content
This repository has been archived by the owner on Nov 30, 2021. It is now read-only.

rpc: event websocket subscription #308

Merged
merged 49 commits into from
Jul 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
61b0d31
rpc: event websocket subscription
fedekunze May 19, 2020
aabb8ff
rpc: use tendermint event subscriptions
fedekunze May 22, 2020
9c9b18a
new log events
fedekunze May 22, 2020
bd83591
filter evm transactions
fedekunze May 25, 2020
d0e0d1b
filter logs
fedekunze May 25, 2020
0df44f0
wip: refactor filters
fedekunze May 28, 2020
0670d8a
remove custom BlockNumber
fedekunze May 28, 2020
f4f9f98
Merge branch 'development' of github.com:ChainSafe/ethermint into rpc…
fedekunze May 28, 2020
b97dd0b
Merge branch 'development' into rpc-websocket
fedekunze Jun 1, 2020
08542aa
wip: refactor rpc
fedekunze Jun 1, 2020
b5d2a60
Merge branch 'rpc-websocket' of github.com:ChainSafe/ethermint into r…
fedekunze Jun 1, 2020
7881b78
Merge branch 'development' of github.com:ChainSafe/ethermint into rpc…
fedekunze Jun 1, 2020
44ae722
HeaderByNumber and HeaderByHash
fedekunze Jun 1, 2020
c0d3b86
Merge branch 'development' of github.com:ChainSafe/ethermint into rpc…
fedekunze Jun 4, 2020
bc4ddf3
update Tendermint event system
fedekunze Jun 4, 2020
6c16452
update Filter
fedekunze Jun 4, 2020
b804b3e
update EventSystem
fedekunze Jun 4, 2020
050bd0e
fix lint issues
fedekunze Jun 5, 2020
1b888f0
update rpc filters
fedekunze Jun 8, 2020
a882732
Merge branch 'development' of github.com:ChainSafe/ethermint into rpc…
fedekunze Jun 8, 2020
74db0fe
Merge branch 'development' of github.com:ChainSafe/ethermint into rpc…
fedekunze Jun 22, 2020
820e3f2
upgrade to tendermint v0.33.4
fedekunze Jun 22, 2020
638fe36
update filters
fedekunze Jun 22, 2020
4bb35b3
Merge branch 'development' of github.com:ChainSafe/ethermint into rpc…
fedekunze Jun 24, 2020
0efc30a
fix unsubscription
fedekunze Jun 25, 2020
5efe8e1
updates wip
fedekunze Jun 25, 2020
ef2c903
merge development
fedekunze Jun 30, 2020
8a443f9
initialize channels
fedekunze Jun 30, 2020
5cac4a7
cleanup go routines
fedekunze Jun 30, 2020
fd959dd
pass ResultEvent channel on subscription
fedekunze Jun 30, 2020
5df76d0
error channel
fedekunze Jun 30, 2020
21eec1e
fix conflicts
fedekunze Jul 2, 2020
883100f
merge development
fedekunze Jul 2, 2020
dea0702
add block filter changes test
noot Jul 2, 2020
3c0fa38
add eventCh loop
noot Jul 2, 2020
35c8f2f
pass funcs in select go func, block filter working
noot Jul 2, 2020
324c692
cleanup
noot Jul 2, 2020
7130bea
lint
fedekunze Jul 2, 2020
73ce10c
lint
fedekunze Jul 2, 2020
5a4b15c
NewFilter and GetFilterChanges working
noot Jul 2, 2020
76c37d9
merge
noot Jul 2, 2020
4f6f172
eth_getLogs working
noot Jul 2, 2020
d839c25
lint
noot Jul 2, 2020
2b046b0
lint
noot Jul 2, 2020
414c943
cleanup
fedekunze Jul 3, 2020
ec79038
remove logs and minor fixes
fedekunze Jul 3, 2020
936b0e8
changelog
fedekunze Jul 3, 2020
216a0c2
address @noot comments
fedekunze Jul 3, 2020
322dac2
revert BlockNumber removal
fedekunze Jul 3, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Features

* (rpc) [\#231](https://github.com/ChainSafe/ethermint/issues/231) Implement NewBlockFilter in rpc/filters.go which instantiates a polling block filter
* Polls for new blocks via BlockNumber rpc call; if block number changes, it requests the new block via GetBlockByNumber rpc call and adds it to its internal list of blocks
* (rpc) [\#330](https://github.com/ChainSafe/ethermint/issues/330) Implement `PublicFilterAPI`'s `EventSystem` which subscribes to Tendermint events upon `Filter` creation.
* (rpc) [\#231](https://github.com/ChainSafe/ethermint/issues/231) Implement `NewBlockFilter` in rpc/filters.go which instantiates a polling block filter
* Polls for new blocks via `BlockNumber` rpc call; if block number changes, it requests the new block via `GetBlockByNumber` rpc call and adds it to its internal list of blocks
* Update uninstallFilter and getFilterChanges accordingly
* uninstallFilter stops the polling goroutine
* getFilterChanges returns the filter's internal list of block hashes and resets it
Expand Down
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.14
require (
github.com/allegro/bigcache v1.2.1 // indirect
github.com/aristanetworks/goarista v0.0.0-20200331225509-2cc472e8fbd6 // indirect
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/cespare/cp v1.1.1 // indirect
github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5
github.com/deckarep/golang-set v1.7.1 // indirect
Expand All @@ -22,15 +21,16 @@ require (
github.com/regen-network/cosmos-proto v0.1.1-0.20200213154359-02baa11ea7c2
github.com/rjeczalik/notify v0.9.2 // indirect
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cobra v0.0.7
github.com/spf13/cobra v1.0.0
github.com/spf13/viper v1.7.0
github.com/status-im/keycard-go v0.0.0-20190424133014-d95853db0f48 // indirect
github.com/stretchr/testify v1.6.1
github.com/tendermint/go-amino v0.15.1
github.com/tendermint/tendermint v0.33.3
github.com/tendermint/tendermint v0.33.4
github.com/tendermint/tm-db v0.5.1
golang.org/x/crypto v0.0.0-20200311171314-f7b00557c8c4
golang.org/x/crypto v0.0.0-20200406173513-056763e48d71
gopkg.in/yaml.v2 v2.3.0
)

replace github.com/cosmos/cosmos-sdk => github.com/cosmos/cosmos-sdk v0.34.4-0.20200403200637-7f78e61b93a5
// forked SDK to avoid breaking changes
replace github.com/cosmos/cosmos-sdk => github.com/Chainsafe/cosmos-sdk v0.34.4-0.20200622114457-35ea97f29c5f
46 changes: 31 additions & 15 deletions go.sum

Large diffs are not rendered by default.

23 changes: 14 additions & 9 deletions rpc/apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,15 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)

const Web3Namespace = "web3"
const EthNamespace = "eth"
const PersonalNamespace = "personal"
const NetNamespace = "net"
// RPC namespaces and API version
const (
Web3Namespace = "web3"
EthNamespace = "eth"
PersonalNamespace = "personal"
NetNamespace = "net"

apiVersion = "1.0"
)

// GetRPCAPIs returns the list of all APIs
func GetRPCAPIs(cliCtx context.CLIContext, key emintcrypto.PrivKeySecp256k1) []rpc.API {
Expand All @@ -22,31 +27,31 @@ func GetRPCAPIs(cliCtx context.CLIContext, key emintcrypto.PrivKeySecp256k1) []r
return []rpc.API{
{
Namespace: Web3Namespace,
Version: "1.0",
Version: apiVersion,
Service: NewPublicWeb3API(),
Public: true,
},
{
Namespace: EthNamespace,
Version: "1.0",
Version: apiVersion,
Service: NewPublicEthAPI(cliCtx, backend, nonceLock, key),
Public: true,
},
{
Namespace: PersonalNamespace,
Version: "1.0",
Version: apiVersion,
Service: NewPersonalEthAPI(cliCtx, nonceLock),
Public: false,
},
{
Namespace: EthNamespace,
Version: "1.0",
Version: apiVersion,
Service: NewPublicFilterAPI(cliCtx, backend),
Public: true,
},
{
Namespace: NetNamespace,
Version: "1.0",
Version: apiVersion,
Service: NewPublicNetAPI(cliCtx),
Public: true,
},
Expand Down
135 changes: 128 additions & 7 deletions rpc/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"math/big"
"strconv"

tmtypes "github.com/tendermint/tendermint/types"

evmtypes "github.com/cosmos/ethermint/x/evm/types"

"github.com/cosmos/cosmos-sdk/client/context"
Expand All @@ -19,20 +21,26 @@ import (
type Backend interface {
// Used by block filter; also used for polling
BlockNumber() (hexutil.Uint64, error)
HeaderByNumber(blockNum BlockNumber) (*ethtypes.Header, error)
HeaderByHash(blockHash common.Hash) (*ethtypes.Header, error)
GetBlockByNumber(blockNum BlockNumber, fullTx bool) (map[string]interface{}, error)
GetBlockByHash(hash common.Hash, fullTx bool) (map[string]interface{}, error)
getEthBlockByNumber(height int64, fullTx bool) (map[string]interface{}, error)
getGasLimit() (int64, error)
// returns the logs of a given block
GetLogs(blockHash common.Hash) ([][]*ethtypes.Log, error)

// Used by pending transaction filter
PendingTransactions() ([]*Transaction, error)

// Used by log filter
GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error)
// TODO: Bloom methods
BloomStatus() (uint64, uint64)
}

// EthermintBackend implements Backend
var _ Backend = (*EthermintBackend)(nil)

// EthermintBackend implements the Backend interface
type EthermintBackend struct {
cliCtx context.CLIContext
gasLimit int64
Expand Down Expand Up @@ -68,7 +76,7 @@ func (e *EthermintBackend) GetBlockByNumber(blockNum BlockNumber, fullTx bool) (

// GetBlockByHash returns the block identified by hash.
func (e *EthermintBackend) GetBlockByHash(hash common.Hash, fullTx bool) (map[string]interface{}, error) {
res, _, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex()))
res, height, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, hash.Hex()))
if err != nil {
return nil, err
}
Expand All @@ -78,9 +86,60 @@ func (e *EthermintBackend) GetBlockByHash(hash common.Hash, fullTx bool) (map[st
return nil, err
}

e.cliCtx = e.cliCtx.WithHeight(height)
return e.getEthBlockByNumber(out.Number, fullTx)
}

// HeaderByNumber returns the block header identified by height.
func (e *EthermintBackend) HeaderByNumber(blockNum BlockNumber) (*ethtypes.Header, error) {
return e.getBlockHeader(blockNum.Int64())
}

// HeaderByHash returns the block header identified by hash.
func (e *EthermintBackend) HeaderByHash(blockHash common.Hash) (*ethtypes.Header, error) {
res, height, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, blockHash.Hex()))
if err != nil {
return nil, err
}
var out evmtypes.QueryResBlockNumber
if err := e.cliCtx.Codec.UnmarshalJSON(res, &out); err != nil {
return nil, err
}

e.cliCtx = e.cliCtx.WithHeight(height)
return e.getBlockHeader(out.Number)
}

func (e *EthermintBackend) getBlockHeader(height int64) (*ethtypes.Header, error) {
if height <= 0 {
// get latest block height
num, err := e.BlockNumber()
if err != nil {
return nil, err
}

height = int64(num)
}

block, err := e.cliCtx.Client.Block(&height)
if err != nil {
return nil, err
}

res, _, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryBloom, strconv.FormatInt(height, 10)))
if err != nil {
return nil, err
}

var bloomRes evmtypes.QueryBloomFilter
e.cliCtx.Codec.MustUnmarshalJSON(res, &bloomRes)

ethHeader := EthHeaderFromTendermint(block.Block.Header)
ethHeader.Bloom = bloomRes.Bloom

return ethHeader, nil
}

func (e *EthermintBackend) getEthBlockByNumber(height int64, fullTx bool) (map[string]interface{}, error) {
// Remove this check when 0 query is fixed ref: (https://github.com/tendermint/tendermint/issues/4014)
var blkNumPtr *int64
Expand Down Expand Up @@ -128,7 +187,6 @@ func (e *EthermintBackend) getEthBlockByNumber(height int64, fullTx bool) (map[s

var out evmtypes.QueryBloomFilter
e.cliCtx.Codec.MustUnmarshalJSON(res, &out)

return formatBlock(header, block.Block.Size(), gasLimit, gasUsed, transactions, out.Bloom), nil
}

Expand Down Expand Up @@ -158,11 +216,12 @@ func (e *EthermintBackend) getGasLimit() (int64, error) {
}

// GetTransactionLogs returns the logs given a transaction hash.
// It returns an error if there's an encoding error.
// If no logs are found for the tx hash, the error is nil.
func (e *EthermintBackend) GetTransactionLogs(txHash common.Hash) ([]*ethtypes.Log, error) {
// do we need to use the block height somewhere?
ctx := e.cliCtx

res, _, err := ctx.QueryWithData(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryTransactionLogs, txHash.Hex()), nil)
res, height, err := ctx.QueryWithData(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryTransactionLogs, txHash.Hex()), nil)
if err != nil {
return nil, err
}
Expand All @@ -172,6 +231,7 @@ func (e *EthermintBackend) GetTransactionLogs(txHash common.Hash) ([]*ethtypes.L
return nil, err
}

e.cliCtx = e.cliCtx.WithHeight(height)
return out.Logs, nil
}

Expand All @@ -183,7 +243,7 @@ func (e *EthermintBackend) PendingTransactions() ([]*Transaction, error) {
return nil, err
}

transactions := make([]*Transaction, 0, 100)
transactions := make([]*Transaction, pendingTxs.Count)
for _, tx := range pendingTxs.Txs {
ethTx, err := bytesToEthTx(e.cliCtx, tx)
if err != nil {
Expand All @@ -201,3 +261,64 @@ func (e *EthermintBackend) PendingTransactions() ([]*Transaction, error) {

return transactions, nil
}

// GetLogs returns all the logs from all the ethreum transactions in a block.
func (e *EthermintBackend) GetLogs(blockHash common.Hash) ([][]*ethtypes.Log, error) {
res, _, err := e.cliCtx.Query(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryHashToHeight, blockHash.Hex()))
if err != nil {
return nil, err
}

var out evmtypes.QueryResBlockNumber
if err := e.cliCtx.Codec.UnmarshalJSON(res, &out); err != nil {
return nil, err
}

block, err := e.cliCtx.Client.Block(&out.Number)
if err != nil {
return nil, err
}

var blockLogs = [][]*ethtypes.Log{}
for _, tx := range block.Block.Txs {
// NOTE: we query the state in case the tx result logs are not persisted after an upgrade.
res, _, err := e.cliCtx.QueryWithData(fmt.Sprintf("custom/%s/%s/%s", evmtypes.ModuleName, evmtypes.QueryTransactionLogs, common.BytesToHash(tx.Hash()).Hex()), nil)
if err != nil {
continue
}

out := new(evmtypes.QueryETHLogs)
if err := e.cliCtx.Codec.UnmarshalJSON(res, &out); err != nil {
return nil, err
}

blockLogs = append(blockLogs, out.Logs)
}

return blockLogs, nil
}

// BloomStatus returns the BloomBitsBlocks and the number of processed sections maintained
// by the chain indexer.
func (e *EthermintBackend) BloomStatus() (uint64, uint64) {
return 4096, 0
}

// EthHeaderFromTendermint is an util function that returns an Ethereum Header
// from a tendermint Header.
func EthHeaderFromTendermint(header tmtypes.Header) *ethtypes.Header {
return &ethtypes.Header{
ParentHash: common.BytesToHash(header.LastBlockID.Hash.Bytes()),
UncleHash: common.Hash{},
Coinbase: common.Address{},
Root: common.BytesToHash(header.AppHash),
TxHash: common.BytesToHash(header.DataHash),
ReceiptHash: common.Hash{},
Difficulty: nil,
Number: big.NewInt(header.Height),
Time: uint64(header.Time.Unix()),
Extra: nil,
MixDigest: common.Hash{},
Nonce: ethtypes.BlockNonce{},
}
}
10 changes: 5 additions & 5 deletions rpc/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"

"github.com/cosmos/cosmos-sdk/client/context"
"github.com/cosmos/cosmos-sdk/client/flags"
Expand Down Expand Up @@ -388,7 +387,7 @@ type CallArgs struct {
}

// Call performs a raw contract call.
func (e *PublicEthAPI) Call(args CallArgs, blockNr rpc.BlockNumber, overrides *map[common.Address]account) (hexutil.Bytes, error) {
func (e *PublicEthAPI) Call(args CallArgs, blockNr BlockNumber, overrides *map[common.Address]account) (hexutil.Bytes, error) {
simRes, err := e.doCall(args, blockNr, big.NewInt(emint.DefaultRPCGasLimit))
if err != nil {
return []byte{}, err
Expand Down Expand Up @@ -419,7 +418,7 @@ type account struct {
// DoCall performs a simulated call operation through the evmtypes. It returns the
// estimated gas used on the operation or an error if fails.
func (e *PublicEthAPI) doCall(
args CallArgs, blockNr rpc.BlockNumber, globalGasCap *big.Int,
args CallArgs, blockNr BlockNumber, globalGasCap *big.Int,
) (*sdk.SimulationResponse, error) {
// Set height for historical queries
ctx := e.cliCtx
Expand Down Expand Up @@ -561,7 +560,8 @@ func convertTransactionsToRPC(cliCtx context.CLIContext, txs []tmtypes.Tx, block
for i, tx := range txs {
ethTx, err := bytesToEthTx(cliCtx, tx)
if err != nil {
return nil, nil, err
// continue to next transaction in case it's not a MsgEthereumTx
continue
}
// TODO: Remove gas usage calculation if saving gasUsed per block
gasUsed.Add(gasUsed, ethTx.Fee())
Expand Down Expand Up @@ -602,7 +602,7 @@ func bytesToEthTx(cliCtx context.CLIContext, bz []byte) (*evmtypes.MsgEthereumTx

ethTx, ok := stdTx.(evmtypes.MsgEthereumTx)
if !ok {
return nil, fmt.Errorf("invalid transaction type, must be an amino encoded Ethereum transaction")
return nil, fmt.Errorf("invalid transaction type %T, expected MsgEthereumTx", stdTx)
}
return &ethTx, nil
}
Expand Down
Loading