diff --git a/.circleci/config.yml b/.circleci/config.yml index 295b3e6500c..391d15921be 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -811,11 +811,6 @@ workflows: - gofmt - gen-check - docs-check - - test: - name: test-itest-actor_events - suite: itest-actor_events - target: "./itests/actor_events_test.go" - - test: name: test-itest-api suite: itest-api @@ -916,6 +911,16 @@ workflows: suite: itest-dup_mpool_messages target: "./itests/dup_mpool_messages_test.go" + - test: + name: test-itest-eth_filter + suite: itest-eth_filter + target: "./itests/eth_filter_test.go" + + - test: + name: test-itest-fevm_events + suite: itest-fevm_events + target: "./itests/fevm_events_test.go" + - test: name: test-itest-fevm suite: itest-fevm diff --git a/api/api_full.go b/api/api_full.go index 211cb3b5948..3265551c027 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -181,6 +181,9 @@ type FullNode interface { // ChainBlockstoreInfo returns some basic information about the blockstore ChainBlockstoreInfo(context.Context) (map[string]interface{}, error) //perm:read + // ChainGetEvents returns the events under an event AMT root CID. + ChainGetEvents(context.Context, cid.Cid) ([]types.Event, error) //perm:read + // GasEstimateFeeCap estimates gas fee cap GasEstimateFeeCap(context.Context, *types.Message, int64, types.TipSetKey) (types.BigInt, error) //perm:read @@ -824,7 +827,7 @@ type FullNode interface { // - logs: notify new event logs that match a criteria // params contains additional parameters used with the log event type // The client will receive a stream of EthSubscriptionResponse values until EthUnsubscribe is called. - EthSubscribe(ctx context.Context, eventTypes []string, params EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) //perm:write + EthSubscribe(ctx context.Context, eventType string, params *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) //perm:write // Unsubscribe from a websocket subscription EthUnsubscribe(ctx context.Context, id EthSubscriptionID) (bool, error) //perm:write diff --git a/api/eth_types.go b/api/eth_types.go index 7f2e52dbbd4..82fca07c281 100644 --- a/api/eth_types.go +++ b/api/eth_types.go @@ -19,11 +19,17 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/big" builtintypes "github.com/filecoin-project/go-state-types/builtin" - "github.com/filecoin-project/go-state-types/builtin/v10/eam" "github.com/filecoin-project/lotus/build" ) +var ( + EthTopic1 = "topic1" + EthTopic2 = "topic2" + EthTopic3 = "topic3" + EthTopic4 = "topic4" +) + type EthUint64 uint64 func (e EthUint64) MarshalJSON() ([]byte, error) { @@ -47,6 +53,14 @@ func (e *EthUint64) UnmarshalJSON(b []byte) error { return nil } +func EthUint64FromHex(s string) (EthUint64, error) { + parsedInt, err := strconv.ParseUint(strings.Replace(s, "0x", "", -1), 16, 64) + if err != nil { + return EthUint64(0), err + } + return EthUint64(parsedInt), nil +} + type EthBigInt big.Int var EthBigIntZero = EthBigInt{Int: big.Zero().Int} @@ -177,14 +191,12 @@ func (c *EthCall) UnmarshalJSON(b []byte) error { } type EthTxReceipt struct { - TransactionHash EthHash `json:"transactionHash"` - TransactionIndex EthUint64 `json:"transactionIndex"` - BlockHash EthHash `json:"blockHash"` - BlockNumber EthUint64 `json:"blockNumber"` - From EthAddress `json:"from"` - To *EthAddress `json:"to"` - // Logs - // LogsBloom + TransactionHash EthHash `json:"transactionHash"` + TransactionIndex EthUint64 `json:"transactionIndex"` + BlockHash EthHash `json:"blockHash"` + BlockNumber EthUint64 `json:"blockNumber"` + From EthAddress `json:"from"` + To *EthAddress `json:"to"` StateRoot EthHash `json:"root"` Status EthUint64 `json:"status"` ContractAddress *EthAddress `json:"contractAddress"` @@ -192,47 +204,7 @@ type EthTxReceipt struct { GasUsed EthUint64 `json:"gasUsed"` EffectiveGasPrice EthBigInt `json:"effectiveGasPrice"` LogsBloom EthBytes `json:"logsBloom"` - Logs []string `json:"logs"` -} - -func NewEthTxReceipt(tx EthTx, lookup *MsgLookup, replay *InvocResult) (EthTxReceipt, error) { - receipt := EthTxReceipt{ - TransactionHash: tx.Hash, - TransactionIndex: tx.TransactionIndex, - BlockHash: tx.BlockHash, - BlockNumber: tx.BlockNumber, - From: tx.From, - To: tx.To, - StateRoot: EmptyEthHash, - LogsBloom: []byte{0}, - Logs: []string{}, - } - - if receipt.To == nil && lookup.Receipt.ExitCode.IsSuccess() { - // Create and Create2 return the same things. - var ret eam.CreateReturn - if err := ret.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil { - return EthTxReceipt{}, xerrors.Errorf("failed to parse contract creation result: %w", err) - } - addr := EthAddress(ret.EthAddress) - receipt.ContractAddress = &addr - } - - if lookup.Receipt.ExitCode.IsSuccess() { - receipt.Status = 1 - } - if lookup.Receipt.ExitCode.IsError() { - receipt.Status = 0 - } - - receipt.GasUsed = EthUint64(lookup.Receipt.GasUsed) - - // TODO: handle CumulativeGasUsed - receipt.CumulativeGasUsed = EmptyEthInt - - effectiveGasPrice := big.Div(replay.GasCost.TotalCost, big.NewInt(lookup.Receipt.GasUsed)) - receipt.EffectiveGasPrice = EthBigInt(effectiveGasPrice) - return receipt, nil + Logs []EthLog `json:"logs"` } const ( @@ -484,6 +456,9 @@ type EthFilterSpec struct { type EthAddressList []EthAddress func (e *EthAddressList) UnmarshalJSON(b []byte) error { + if bytes.Equal(b, []byte{'n', 'u', 'l', 'l'}) { + return nil + } if len(b) > 0 && b[0] == '[' { var addrs []EthAddress err := json.Unmarshal(b, &addrs) @@ -542,34 +517,29 @@ func (e *EthHashList) UnmarshalJSON(b []byte) error { return nil } -// FilterResult represents the response from executing a filter: a list of bloack hashes, a list of transaction hashes +// FilterResult represents the response from executing a filter: a list of block hashes, a list of transaction hashes // or a list of logs // This is a union type. Only one field will be populated. // The JSON encoding must produce an array of the populated field. type EthFilterResult struct { - // List of block hashes. Only populated when the filter has been installed via EthNewBlockFilter - NewBlockHashes []EthHash - - // List of transaction hashes. Only populated when the filter has been installed via EthNewPendingTransactionFilter - NewTransactionHashes []EthHash - - // List of event logs. Only populated when the filter has been installed via EthNewFilter - NewLogs []EthLog + Results []interface{} } func (h EthFilterResult) MarshalJSON() ([]byte, error) { - if h.NewBlockHashes != nil { - return json.Marshal(h.NewBlockHashes) - } - if h.NewTransactionHashes != nil { - return json.Marshal(h.NewTransactionHashes) - } - if h.NewLogs != nil { - return json.Marshal(h.NewLogs) + if h.Results != nil { + return json.Marshal(h.Results) } return []byte{'[', ']'}, nil } +func (h *EthFilterResult) UnmarshalJSON(b []byte) error { + if bytes.Equal(b, []byte{'n', 'u', 'l', 'l'}) { + return nil + } + err := json.Unmarshal(b, &h.Results) + return err +} + // EthLog represents the results of an event filter execution. type EthLog struct { // Address is the address of the actor that produced the event log. diff --git a/api/eth_types_test.go b/api/eth_types_test.go index e11a31ce4e8..c7268c0bc49 100644 --- a/api/eth_types_test.go +++ b/api/eth_types_test.go @@ -193,30 +193,33 @@ func TestEthFilterResultMarshalJSON(t *testing.T) { { res: EthFilterResult{ - NewBlockHashes: []EthHash{hash1, hash2}, + Results: []any{hash1, hash2}, }, want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`, }, { res: EthFilterResult{ - NewTransactionHashes: []EthHash{hash1, hash2}, + Results: []any{hash1, hash2}, }, want: `["0x013dbb9442ca9667baccc6230fcd5c1c4b2d4d2870f4bd20681d4d47cfd15184","0xab8653edf9f51785664a643b47605a7ba3d917b5339a0724e7642c114d0e4738"]`, }, { res: EthFilterResult{ - NewLogs: []EthLog{log}, + Results: []any{log}, }, want: `[` + string(logjson) + `]`, }, } for _, tc := range testcases { - data, err := json.Marshal(tc.res) - require.NoError(t, err) - require.Equal(t, tc.want, string(data)) + tc := tc + t.Run("", func(t *testing.T) { + data, err := json.Marshal(tc.res) + require.NoError(t, err) + require.Equal(t, tc.want, string(data)) + }) } } @@ -325,12 +328,23 @@ func TestEthAddressListUnmarshalJSON(t *testing.T) { input: `"0xd4c5fb16488Aa48081296299d54b0c648C9333dA"`, want: EthAddressList{addr1}, }, + { + input: `[]`, + want: EthAddressList{}, + }, + { + input: `null`, + want: EthAddressList(nil), + }, } for _, tc := range testcases { - var got EthAddressList - err := json.Unmarshal([]byte(tc.input), &got) - require.NoError(t, err) - require.Equal(t, tc.want, got) + tc := tc + t.Run("", func(t *testing.T) { + var got EthAddressList + err := json.Unmarshal([]byte(tc.input), &got) + require.NoError(t, err) + require.Equal(t, tc.want, got) + }) } } diff --git a/api/mocks/mock_full.go b/api/mocks/mock_full.go index 7801497f4c6..e4338763ce9 100644 --- a/api/mocks/mock_full.go +++ b/api/mocks/mock_full.go @@ -183,6 +183,21 @@ func (mr *MockFullNodeMockRecorder) ChainGetBlockMessages(arg0, arg1 interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainGetBlockMessages", reflect.TypeOf((*MockFullNode)(nil).ChainGetBlockMessages), arg0, arg1) } +// ChainGetEvents mocks base method. +func (m *MockFullNode) ChainGetEvents(arg0 context.Context, arg1 cid.Cid) ([]types.Event, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainGetEvents", arg0, arg1) + ret0, _ := ret[0].([]types.Event) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChainGetEvents indicates an expected call of ChainGetEvents. +func (mr *MockFullNodeMockRecorder) ChainGetEvents(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainGetEvents", reflect.TypeOf((*MockFullNode)(nil).ChainGetEvents), arg0, arg1) +} + // ChainGetGenesis mocks base method. func (m *MockFullNode) ChainGetGenesis(arg0 context.Context) (*types.TipSet, error) { m.ctrl.T.Helper() @@ -1342,7 +1357,7 @@ func (mr *MockFullNodeMockRecorder) EthSendRawTransaction(arg0, arg1 interface{} } // EthSubscribe mocks base method. -func (m *MockFullNode) EthSubscribe(arg0 context.Context, arg1 []string, arg2 api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) { +func (m *MockFullNode) EthSubscribe(arg0 context.Context, arg1 string, arg2 *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "EthSubscribe", arg0, arg1, arg2) ret0, _ := ret[0].(<-chan api.EthSubscriptionResponse) diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 2677c48bb51..5f28d5b0b04 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -123,6 +123,8 @@ type FullNodeStruct struct { ChainGetBlockMessages func(p0 context.Context, p1 cid.Cid) (*BlockMessages, error) `perm:"read"` + ChainGetEvents func(p0 context.Context, p1 cid.Cid) ([]types.Event, error) `perm:"read"` + ChainGetGenesis func(p0 context.Context) (*types.TipSet, error) `perm:"read"` ChainGetMessage func(p0 context.Context, p1 cid.Cid) (*types.Message, error) `perm:"read"` @@ -275,7 +277,7 @@ type FullNodeStruct struct { EthSendRawTransaction func(p0 context.Context, p1 EthBytes) (EthHash, error) `perm:"read"` - EthSubscribe func(p0 context.Context, p1 []string, p2 EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) `perm:"write"` + EthSubscribe func(p0 context.Context, p1 string, p2 *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) `perm:"write"` EthUninstallFilter func(p0 context.Context, p1 EthFilterID) (bool, error) `perm:"write"` @@ -1330,6 +1332,17 @@ func (s *FullNodeStub) ChainGetBlockMessages(p0 context.Context, p1 cid.Cid) (*B return nil, ErrNotSupported } +func (s *FullNodeStruct) ChainGetEvents(p0 context.Context, p1 cid.Cid) ([]types.Event, error) { + if s.Internal.ChainGetEvents == nil { + return *new([]types.Event), ErrNotSupported + } + return s.Internal.ChainGetEvents(p0, p1) +} + +func (s *FullNodeStub) ChainGetEvents(p0 context.Context, p1 cid.Cid) ([]types.Event, error) { + return *new([]types.Event), ErrNotSupported +} + func (s *FullNodeStruct) ChainGetGenesis(p0 context.Context) (*types.TipSet, error) { if s.Internal.ChainGetGenesis == nil { return nil, ErrNotSupported @@ -2166,14 +2179,14 @@ func (s *FullNodeStub) EthSendRawTransaction(p0 context.Context, p1 EthBytes) (E return *new(EthHash), ErrNotSupported } -func (s *FullNodeStruct) EthSubscribe(p0 context.Context, p1 []string, p2 EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) { +func (s *FullNodeStruct) EthSubscribe(p0 context.Context, p1 string, p2 *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) { if s.Internal.EthSubscribe == nil { return nil, ErrNotSupported } return s.Internal.EthSubscribe(p0, p1, p2) } -func (s *FullNodeStub) EthSubscribe(p0 context.Context, p1 []string, p2 EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) { +func (s *FullNodeStub) EthSubscribe(p0 context.Context, p1 string, p2 *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) { return nil, ErrNotSupported } diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index aa746530ac9..e88f7a07662 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/gateway.json.gz b/build/openrpc/gateway.json.gz index 0a451717e54..aeb2fba2906 100644 Binary files a/build/openrpc/gateway.json.gz and b/build/openrpc/gateway.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 60110421c0e..e06a28adec5 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 2e9fb4cee2d..a035f920a4a 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/chain/events/filter/event.go b/chain/events/filter/event.go index cf92fc95cf0..010390182cc 100644 --- a/chain/events/filter/event.go +++ b/chain/events/filter/event.go @@ -3,14 +3,17 @@ package filter import ( "bytes" "context" + "math" "sync" "time" "github.com/google/uuid" "github.com/ipfs/go-cid" + cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + amt4 "github.com/filecoin-project/go-amt-ipld/v4" "github.com/filecoin-project/go-state-types/abi" blockadt "github.com/filecoin-project/specs-actors/actors/util/adt" @@ -18,10 +21,6 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -type RobustAddresser interface { - LookupRobustAddress(ctx context.Context, idAddr address.Address, ts *types.TipSet) (address.Address, error) -} - const indexed uint8 = 0x01 type EventFilter struct { @@ -42,7 +41,7 @@ type EventFilter struct { var _ Filter = (*EventFilter)(nil) type CollectedEvent struct { - Event *types.Event + Entries []types.EventEntry EmitterAddr address.Address // f4 address of emitter EventIdx int // index of the event within the list of emitted events Reverted bool @@ -104,7 +103,7 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever // event matches filter, so record it cev := &CollectedEvent{ - Event: ev, + Entries: ev.Entries, EmitterAddr: addr, EventIdx: evIdx, Reverted: revert, @@ -134,6 +133,12 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever return nil } +func (f *EventFilter) setCollectedEvents(ces []*CollectedEvent) { + f.mu.Lock() + f.collected = ces + f.mu.Unlock() +} + func (f *EventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent { f.mu.Lock() collected := f.collected @@ -282,24 +287,33 @@ type EventFilterManager struct { ChainStore *cstore.ChainStore AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) MaxFilterResults int + EventIndex *EventIndex - mu sync.Mutex // guards mutations to filters - filters map[string]*EventFilter + mu sync.Mutex // guards mutations to filters + filters map[string]*EventFilter + currentHeight abi.ChainEpoch } func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet) error { m.mu.Lock() defer m.mu.Unlock() - if len(m.filters) == 0 { + m.currentHeight = to.Height() + + if len(m.filters) == 0 && m.EventIndex == nil { return nil } - tse := &TipSetEvents{ msgTs: from, rctTs: to, load: m.loadExecutedMessages, } + if m.EventIndex != nil { + if err := m.EventIndex.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil { + return err + } + } + // TODO: could run this loop in parallel with errgroup if there are many filters for _, f := range m.filters { if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil { @@ -313,7 +327,9 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet) func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) error { m.mu.Lock() defer m.mu.Unlock() - if len(m.filters) == 0 { + m.currentHeight = to.Height() + + if len(m.filters) == 0 && m.EventIndex == nil { return nil } @@ -323,6 +339,12 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) load: m.loadExecutedMessages, } + if m.EventIndex != nil { + if err := m.EventIndex.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil { + return err + } + } + // TODO: could run this loop in parallel with errgroup if there are many filters for _, f := range m.filters { if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil { @@ -334,6 +356,14 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet) } func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight abi.ChainEpoch, tipsetCid cid.Cid, addresses []address.Address, keys map[string][][]byte) (*EventFilter, error) { + m.mu.Lock() + currentHeight := m.currentHeight + m.mu.Unlock() + + if m.EventIndex == nil && minHeight < currentHeight { + return nil, xerrors.Errorf("historic event index disabled") + } + id, err := uuid.NewRandom() if err != nil { return nil, xerrors.Errorf("new uuid: %w", err) @@ -349,7 +379,17 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a maxResults: m.MaxFilterResults, } + if m.EventIndex != nil && minHeight < currentHeight { + // Filter needs historic events + if err := m.EventIndex.PrefillFilter(ctx, f); err != nil { + return nil, err + } + } + m.mu.Lock() + if m.filters == nil { + m.filters = make(map[string]*EventFilter) + } m.filters[id.String()] = f m.mu.Unlock() @@ -402,19 +442,30 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc continue } - evtArr, err := blockadt.AsArray(st, *rct.EventsRoot) + evtArr, err := amt4.LoadAMT(ctx, st, *rct.EventsRoot, amt4.UseTreeBitWidth(5)) if err != nil { return nil, xerrors.Errorf("load events amt: %w", err) } - ems[i].evs = make([]*types.Event, evtArr.Length()) + ems[i].evs = make([]*types.Event, evtArr.Len()) var evt types.Event - _ = arr.ForEach(&evt, func(i int64) error { + err = evtArr.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error { + if u > math.MaxInt { + return xerrors.Errorf("too many events") + } + if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil { + return err + } + cpy := evt - ems[i].evs[int(i)] = &cpy + ems[i].evs[int(u)] = &cpy //nolint:scopelint return nil }) + if err != nil { + return nil, xerrors.Errorf("read events: %w", err) + } + } return ems, nil diff --git a/chain/events/filter/event_test.go b/chain/events/filter/event_test.go index 24c788218a3..80c4c7fc014 100644 --- a/chain/events/filter/event_test.go +++ b/chain/events/filter/event_test.go @@ -60,7 +60,7 @@ func TestEventFilterCollectEvents(t *testing.T) { noCollectedEvents := []*CollectedEvent{} oneCollectedEvent := []*CollectedEvent{ { - Event: ev1, + Entries: ev1.Entries, EmitterAddr: a1, EventIdx: 0, Reverted: false, diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go new file mode 100644 index 00000000000..c5d0b62effa --- /dev/null +++ b/chain/events/filter/index.go @@ -0,0 +1,399 @@ +package filter + +import ( + "context" + "database/sql" + "errors" + "fmt" + "sort" + "strings" + + "github.com/ipfs/go-cid" + _ "github.com/mattn/go-sqlite3" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/chain/types" +) + +var pragmas = []string{ + "PRAGMA synchronous = normal", + "PRAGMA temp_store = memory", + "PRAGMA mmap_size = 30000000000", + "PRAGMA page_size = 32768", + "PRAGMA auto_vacuum = NONE", + "PRAGMA automatic_index = OFF", + "PRAGMA journal_mode = WAL", + "PRAGMA read_uncommitted = ON", +} + +var ddls = []string{ + `CREATE TABLE IF NOT EXISTS event ( + id INTEGER PRIMARY KEY, + height INTEGER NOT NULL, + tipset_key BLOB NOT NULL, + tipset_key_cid BLOB NOT NULL, + emitter_addr BLOB NOT NULL, + event_index INTEGER NOT NULL, + message_cid BLOB NOT NULL, + message_index INTEGER NOT NULL, + reverted INTEGER NOT NULL + )`, + + `CREATE TABLE IF NOT EXISTS event_entry ( + event_id INTEGER, + indexed INTEGER NOT NULL, + flags BLOB NOT NULL, + key TEXT NOT NULL, + value BLOB NOT NULL + )`, + + // metadata containing version of schema + `CREATE TABLE IF NOT EXISTS _meta ( + version UINT64 NOT NULL UNIQUE + )`, + + // version 1. + `INSERT OR IGNORE INTO _meta (version) VALUES (1)`, +} + +const schemaVersion = 1 + +const ( + insertEvent = `INSERT OR IGNORE INTO event + (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) + VALUES(?, ?, ?, ?, ?, ?, ?, ?)` + + insertEntry = `INSERT OR IGNORE INTO event_entry + (event_id, indexed, flags, key, value) + VALUES(?, ?, ?, ?, ?)` +) + +type EventIndex struct { + db *sql.DB +} + +func NewEventIndex(path string) (*EventIndex, error) { + db, err := sql.Open("sqlite3", path+"?mode=rwc") + if err != nil { + return nil, xerrors.Errorf("open sqlite3 database: %w", err) + } + + for _, pragma := range pragmas { + if _, err := db.Exec(pragma); err != nil { + _ = db.Close() + return nil, xerrors.Errorf("exec pragma %q: %w", pragma, err) + } + } + + q, err := db.Query("SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';") + if err == sql.ErrNoRows || !q.Next() { + // empty database, create the schema + for _, ddl := range ddls { + if _, err := db.Exec(ddl); err != nil { + _ = db.Close() + return nil, xerrors.Errorf("exec ddl %q: %w", ddl, err) + } + } + } else if err != nil { + _ = db.Close() + return nil, xerrors.Errorf("looking for _meta table: %w", err) + } else { + // Ensure we don't open a database from a different schema version + + row := db.QueryRow("SELECT max(version) FROM _meta") + var version int + err := row.Scan(&version) + if err != nil { + _ = db.Close() + return nil, xerrors.Errorf("invalid database version: no version found") + } + if version != schemaVersion { + _ = db.Close() + return nil, xerrors.Errorf("invalid database version: got %d, expected %d", version, schemaVersion) + } + } + + return &EventIndex{ + db: db, + }, nil +} + +func (ei *EventIndex) Close() error { + if ei.db == nil { + return nil + } + return ei.db.Close() +} + +func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error { + // cache of lookups between actor id and f4 address + + addressLookups := make(map[abi.ActorID]address.Address) + + ems, err := te.messages(ctx) + if err != nil { + return xerrors.Errorf("load executed messages: %w", err) + } + + tx, err := ei.db.Begin() + if err != nil { + return xerrors.Errorf("begin transaction: %w", err) + } + stmtEvent, err := tx.Prepare(insertEvent) + if err != nil { + return xerrors.Errorf("prepare insert event: %w", err) + } + stmtEntry, err := tx.Prepare(insertEntry) + if err != nil { + return xerrors.Errorf("prepare insert entry: %w", err) + } + + for msgIdx, em := range ems { + for evIdx, ev := range em.Events() { + addr, found := addressLookups[ev.Emitter] + if !found { + var ok bool + addr, ok = resolver(ctx, ev.Emitter, te.rctTs) + if !ok { + // not an address we will be able to match against + continue + } + addressLookups[ev.Emitter] = addr + } + + tsKeyCid, err := te.msgTs.Key().Cid() + if err != nil { + return xerrors.Errorf("tipset key cid: %w", err) + } + + res, err := stmtEvent.Exec( + te.msgTs.Height(), // height + te.msgTs.Key().Bytes(), // tipset_key + tsKeyCid.Bytes(), // tipset_key_cid + addr.Bytes(), // emitter_addr + evIdx, // event_index + em.Message().Cid().Bytes(), // message_cid + msgIdx, // message_index + revert, // reverted + ) + if err != nil { + return xerrors.Errorf("exec insert event: %w", err) + } + + lastID, err := res.LastInsertId() + if err != nil { + return xerrors.Errorf("get last row id: %w", err) + } + + for _, entry := range ev.Entries { + _, err := stmtEntry.Exec( + lastID, // event_id + entry.Flags&indexed == indexed, // indexed + []byte{entry.Flags}, // flags + entry.Key, // key + entry.Value, // value + ) + if err != nil { + return xerrors.Errorf("exec insert entry: %w", err) + } + } + } + } + + if err := tx.Commit(); err != nil { + return xerrors.Errorf("commit transaction: %w", err) + } + + return nil +} + +// PrefillFilter fills a filter's collection of events from the historic index +func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error { + clauses := []string{} + values := []any{} + joins := []string{} + + if f.tipsetCid != cid.Undef { + clauses = append(clauses, "event.tipset_key_cid=?") + values = append(values, f.tipsetCid.Bytes()) + } else { + if f.minHeight >= 0 { + clauses = append(clauses, "event.height>=?") + values = append(values, f.minHeight) + } + if f.maxHeight >= 0 { + clauses = append(clauses, "event.height<=?") + values = append(values, f.maxHeight) + } + } + + if len(f.addresses) > 0 { + subclauses := []string{} + for _, addr := range f.addresses { + subclauses = append(subclauses, "emitter_addr=?") + values = append(values, addr.Bytes()) + } + clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") + } + + if len(f.keys) > 0 { + join := 0 + for key, vals := range f.keys { + join++ + joinAlias := fmt.Sprintf("ee%d", join) + joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias)) + clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias)) + values = append(values, key) + subclauses := []string{} + for _, val := range vals { + subclauses = append(subclauses, fmt.Sprintf("%s.value=?", joinAlias)) + values = append(values, val) + } + clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") + + } + } + + s := `SELECT + event.id, + event.height, + event.tipset_key, + event.tipset_key_cid, + event.emitter_addr, + event.event_index, + event.message_cid, + event.message_index, + event.reverted, + event_entry.flags, + event_entry.key, + event_entry.value + FROM event JOIN event_entry ON event.id=event_entry.event_id` + + if len(joins) > 0 { + s = s + ", " + strings.Join(joins, ", ") + } + + if len(clauses) > 0 { + s = s + " WHERE " + strings.Join(clauses, " AND ") + } + + s += " ORDER BY event.height DESC" + + stmt, err := ei.db.Prepare(s) + if err != nil { + return xerrors.Errorf("prepare prefill query: %w", err) + } + + q, err := stmt.QueryContext(ctx, values...) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil + } + return xerrors.Errorf("exec prefill query: %w", err) + } + + var ces []*CollectedEvent + var currentID int64 = -1 + var ce *CollectedEvent + + for q.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + var row struct { + id int64 + height uint64 + tipsetKey []byte + tipsetKeyCid []byte + emitterAddr []byte + eventIndex int + messageCid []byte + messageIndex int + reverted bool + flags []byte + key string + value []byte + } + + if err := q.Scan( + &row.id, + &row.height, + &row.tipsetKey, + &row.tipsetKeyCid, + &row.emitterAddr, + &row.eventIndex, + &row.messageCid, + &row.messageIndex, + &row.reverted, + &row.flags, + &row.key, + &row.value, + ); err != nil { + return xerrors.Errorf("read prefill row: %w", err) + } + + if row.id != currentID { + if ce != nil { + ces = append(ces, ce) + ce = nil + // Unfortunately we can't easily incorporate the max results limit into the query due to the + // unpredictable number of rows caused by joins + // Break here to stop collecting rows + if f.maxResults > 0 && len(ces) >= f.maxResults { + break + } + } + + currentID = row.id + ce = &CollectedEvent{ + EventIdx: row.eventIndex, + Reverted: row.reverted, + Height: abi.ChainEpoch(row.height), + MsgIdx: row.messageIndex, + } + + ce.EmitterAddr, err = address.NewFromBytes(row.emitterAddr) + if err != nil { + return xerrors.Errorf("parse emitter addr: %w", err) + } + + ce.TipSetKey, err = types.TipSetKeyFromBytes(row.tipsetKey) + if err != nil { + return xerrors.Errorf("parse tipsetkey: %w", err) + } + + ce.MsgCid, err = cid.Cast(row.messageCid) + if err != nil { + return xerrors.Errorf("parse message cid: %w", err) + } + } + + ce.Entries = append(ce.Entries, types.EventEntry{ + Flags: row.flags[0], + Key: row.key, + Value: row.value, + }) + + } + + if ce != nil { + ces = append(ces, ce) + } + + if len(ces) == 0 { + return nil + } + + // collected event list is in inverted order since we selected only the most recent events + // sort it into height order + sort.Slice(ces, func(i, j int) bool { return ces[i].Height < ces[j].Height }) + f.setCollectedEvents(ces) + + return nil +} diff --git a/chain/events/filter/index_test.go b/chain/events/filter/index_test.go new file mode 100644 index 00000000000..ee2ae8611b5 --- /dev/null +++ b/chain/events/filter/index_test.go @@ -0,0 +1,283 @@ +package filter + +import ( + "context" + pseudo "math/rand" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/chain/types" +) + +func TestEventIndexPrefillFilter(t *testing.T) { + rng := pseudo.New(pseudo.NewSource(299792458)) + a1 := randomF4Addr(t, rng) + a2 := randomF4Addr(t, rng) + + a1ID := abi.ActorID(1) + a2ID := abi.ActorID(2) + + addrMap := addressMap{} + addrMap.add(a1ID, a1) + addrMap.add(a2ID, a2) + + ev1 := fakeEvent( + a1ID, + []kv{ + {k: "type", v: []byte("approval")}, + {k: "signer", v: []byte("addr1")}, + }, + []kv{ + {k: "amount", v: []byte("2988181")}, + }, + ) + + st := newStore() + events := []*types.Event{ev1} + em := executedMessage{ + msg: fakeMessage(randomF4Addr(t, rng), randomF4Addr(t, rng)), + rct: fakeReceipt(t, rng, st, events), + evs: events, + } + + events14000 := buildTipSetEvents(t, rng, 14000, em) + cid14000, err := events14000.msgTs.Key().Cid() + require.NoError(t, err, "tipset cid") + + noCollectedEvents := []*CollectedEvent{} + oneCollectedEvent := []*CollectedEvent{ + { + Entries: ev1.Entries, + EmitterAddr: a1, + EventIdx: 0, + Reverted: false, + Height: 14000, + TipSetKey: events14000.msgTs.Key(), + MsgIdx: 0, + MsgCid: em.msg.Cid(), + }, + } + + workDir, err := os.MkdirTemp("", "lotusevents") + require.NoError(t, err, "create temporary work directory") + + defer func() { + _ = os.RemoveAll(workDir) + }() + t.Logf("using work dir %q", workDir) + + dbPath := filepath.Join(workDir, "actorevents.db") + + ei, err := NewEventIndex(dbPath) + require.NoError(t, err, "create event index") + if err := ei.CollectEvents(context.Background(), events14000, false, addrMap.ResolveAddress); err != nil { + require.NoError(t, err, "collect events") + } + + testCases := []struct { + name string + filter *EventFilter + te *TipSetEvents + want []*CollectedEvent + }{ + { + name: "nomatch tipset min height", + filter: &EventFilter{ + minHeight: 14001, + maxHeight: -1, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch tipset max height", + filter: &EventFilter{ + minHeight: -1, + maxHeight: 13999, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "match tipset min height", + filter: &EventFilter{ + minHeight: 14000, + maxHeight: -1, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "match tipset cid", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + tipsetCid: cid14000, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "nomatch address", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a2}, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "match address", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + addresses: []address.Address{a1}, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "match one entry", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("approval"), + }, + }, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "match one entry with alternate values", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("cancel"), + []byte("propose"), + []byte("approval"), + }, + }, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "nomatch one entry by missing value", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("cancel"), + []byte("propose"), + }, + }, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry by missing key", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "method": { + []byte("approval"), + }, + }, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "match one entry with multiple keys", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("approval"), + }, + "signer": { + []byte("addr1"), + }, + }, + }, + te: events14000, + want: oneCollectedEvent, + }, + { + name: "nomatch one entry with one mismatching key", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("approval"), + }, + "approver": { + []byte("addr1"), + }, + }, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry with one mismatching value", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "type": { + []byte("approval"), + }, + "signer": { + []byte("addr2"), + }, + }, + }, + te: events14000, + want: noCollectedEvents, + }, + { + name: "nomatch one entry with one unindexed key", + filter: &EventFilter{ + minHeight: -1, + maxHeight: -1, + keys: map[string][][]byte{ + "amount": { + []byte("2988181"), + }, + }, + }, + te: events14000, + want: noCollectedEvents, + }, + } + + for _, tc := range testCases { + tc := tc // appease lint + t.Run(tc.name, func(t *testing.T) { + if err := ei.PrefillFilter(context.Background(), tc.filter); err != nil { + require.NoError(t, err, "prefill filter events") + } + + coll := tc.filter.TakeCollectedEvents(context.Background()) + require.ElementsMatch(t, coll, tc.want) + }) + } +} diff --git a/chain/events/filter/mempool.go b/chain/events/filter/mempool.go index dcea0f54c62..5be35064450 100644 --- a/chain/events/filter/mempool.go +++ b/chain/events/filter/mempool.go @@ -124,6 +124,9 @@ func (m *MemPoolFilterManager) Install(ctx context.Context) (*MemPoolFilter, err } m.mu.Lock() + if m.filters == nil { + m.filters = make(map[string]*MemPoolFilter) + } m.filters[id.String()] = f m.mu.Unlock() diff --git a/chain/events/filter/tipset.go b/chain/events/filter/tipset.go index 1f43b09a348..0e43c96efe6 100644 --- a/chain/events/filter/tipset.go +++ b/chain/events/filter/tipset.go @@ -111,6 +111,9 @@ func (m *TipSetFilterManager) Install(ctx context.Context) (*TipSetFilter, error } m.mu.Lock() + if m.filters == nil { + m.filters = make(map[string]*TipSetFilter) + } m.filters[id.String()] = f m.mu.Unlock() diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 6047f8d0218..9af3b699ef0 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -15,6 +15,7 @@ * [ChainExport](#ChainExport) * [ChainGetBlock](#ChainGetBlock) * [ChainGetBlockMessages](#ChainGetBlockMessages) + * [ChainGetEvents](#ChainGetEvents) * [ChainGetGenesis](#ChainGetGenesis) * [ChainGetMessage](#ChainGetMessage) * [ChainGetMessagesInTipset](#ChainGetMessagesInTipset) @@ -612,6 +613,37 @@ Response: } ``` +### ChainGetEvents +ChainGetEvents returns the events under an event AMT root CID. + + +Perms: read + +Inputs: +```json +[ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } +] +``` + +Response: +```json +[ + { + "Emitter": 1000, + "Entries": [ + { + "Flags": 7, + "Key": "string value", + "Value": "Ynl0ZSBhcnJheQ==" + } + ] + } +] +``` + ### ChainGetGenesis ChainGetGenesis returns the genesis tipset. @@ -2485,7 +2517,7 @@ Inputs: Response: ```json [ - "0x0707070707070707070707070707070707070707070707070707070707070707" + {} ] ``` @@ -2506,7 +2538,7 @@ Inputs: Response: ```json [ - "0x0707070707070707070707070707070707070707070707070707070707070707" + {} ] ``` @@ -2532,7 +2564,7 @@ Inputs: Response: ```json [ - "0x0707070707070707070707070707070707070707070707070707070707070707" + {} ] ``` @@ -2703,7 +2735,21 @@ Response: "effectiveGasPrice": "0x0", "logsBloom": "0x07", "logs": [ - "string value" + { + "address": "0x0707070707070707070707070707070707070707", + "data": [ + "0x0707070707070707070707070707070707070707070707070707070707070707" + ], + "topics": [ + "0x0707070707070707070707070707070707070707070707070707070707070707" + ], + "removed": true, + "logIndex": "0x5", + "transactionIndex": "0x5", + "transactionHash": "0x0707070707070707070707070707070707070707070707070707070707070707", + "blockHash": "0x0707070707070707070707070707070707070707070707070707070707070707", + "blockNumber": "0x5" + } ] } ``` @@ -2796,9 +2842,7 @@ Perms: write Inputs: ```json [ - [ - "string value" - ], + "string value", { "topics": [ [ diff --git a/extern/filecoin-ffi b/extern/filecoin-ffi index 14151f5e623..39bed0d7a47 160000 --- a/extern/filecoin-ffi +++ b/extern/filecoin-ffi @@ -1 +1 @@ -Subproject commit 14151f5e623b37c9d47aee5192af6b3eeeae4a35 +Subproject commit 39bed0d7a477eae618d310a476233eafe3e6b571 diff --git a/go.mod b/go.mod index 37dc2ccfca0..9886821e59b 100644 --- a/go.mod +++ b/go.mod @@ -117,6 +117,7 @@ require ( github.com/libp2p/go-libp2p-routing-helpers v0.2.3 github.com/libp2p/go-maddr-filter v0.1.0 github.com/mattn/go-isatty v0.0.16 + github.com/mattn/go-sqlite3 v1.14.16 github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-base32 v0.0.4 diff --git a/go.sum b/go.sum index f34e4dbde78..615415ad75a 100644 --- a/go.sum +++ b/go.sum @@ -1317,6 +1317,8 @@ github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzp github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.10 h1:CoZ3S2P7pvtP45xOtBw+/mDL2z0RKI576gSkzRRpdGg= github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= +github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y= +github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mattn/go-xmlrpc v0.0.3/go.mod h1:mqc2dz7tP5x5BKlCahN/n+hs7OSZKJkS9JsHNBRlrxA= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= diff --git a/itests/actor_events_test.go b/itests/actor_events_test.go deleted file mode 100644 index f0f05418e3f..00000000000 --- a/itests/actor_events_test.go +++ /dev/null @@ -1,176 +0,0 @@ -// stm: #integration -package itests - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/go-state-types/big" - - "github.com/filecoin-project/lotus/chain/store" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/itests/kit" -) - -func TestActorEventsMpool(t *testing.T) { - ctx := context.Background() - - kit.QuietMiningLogs() - - client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(10 * time.Millisecond) - - // create a new address where to send funds. - addr, err := client.WalletNew(ctx, types.KTBLS) - require.NoError(t, err) - - // get the existing balance from the default wallet to then split it. - bal, err := client.WalletBalance(ctx, client.DefaultKey.Address) - require.NoError(t, err) - - // install filter - filterID, err := client.EthNewPendingTransactionFilter(ctx) - require.NoError(t, err) - - const iterations = 100 - - // we'll send half our balance (saving the other half for gas), - // in `iterations` increments. - toSend := big.Div(bal, big.NewInt(2)) - each := big.Div(toSend, big.NewInt(iterations)) - - waitAllCh := make(chan struct{}) - go func() { - headChangeCh, err := client.ChainNotify(ctx) - require.NoError(t, err) - <-headChangeCh // skip hccurrent - - count := 0 - for { - select { - case headChanges := <-headChangeCh: - for _, change := range headChanges { - if change.Type == store.HCApply { - msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) - require.NoError(t, err) - count += len(msgs) - if count == iterations { - waitAllCh <- struct{}{} - } - } - } - } - } - }() - - var sms []*types.SignedMessage - for i := 0; i < iterations; i++ { - msg := &types.Message{ - From: client.DefaultKey.Address, - To: addr, - Value: each, - } - - sm, err := client.MpoolPushMessage(ctx, msg, nil) - require.NoError(t, err) - require.EqualValues(t, i, sm.Message.Nonce) - - sms = append(sms, sm) - } - - select { - case <-waitAllCh: - case <-time.After(time.Minute): - t.Errorf("timeout to wait for pack messages") - } - - // collect filter results - res, err := client.EthGetFilterChanges(ctx, filterID) - require.NoError(t, err) - - // expect to have seen iteration number of mpool messages - require.Equal(t, iterations, len(res.NewTransactionHashes)) -} - -func TestActorEventsTipsets(t *testing.T) { - ctx := context.Background() - - kit.QuietMiningLogs() - - client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(10 * time.Millisecond) - - // create a new address where to send funds. - addr, err := client.WalletNew(ctx, types.KTBLS) - require.NoError(t, err) - - // get the existing balance from the default wallet to then split it. - bal, err := client.WalletBalance(ctx, client.DefaultKey.Address) - require.NoError(t, err) - - // install filter - filterID, err := client.EthNewBlockFilter(ctx) - require.NoError(t, err) - - const iterations = 100 - - // we'll send half our balance (saving the other half for gas), - // in `iterations` increments. - toSend := big.Div(bal, big.NewInt(2)) - each := big.Div(toSend, big.NewInt(iterations)) - - waitAllCh := make(chan struct{}) - go func() { - headChangeCh, err := client.ChainNotify(ctx) - require.NoError(t, err) - <-headChangeCh // skip hccurrent - - count := 0 - for { - select { - case headChanges := <-headChangeCh: - for _, change := range headChanges { - if change.Type == store.HCApply { - msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) - require.NoError(t, err) - count += len(msgs) - if count == iterations { - waitAllCh <- struct{}{} - } - } - } - } - } - }() - - var sms []*types.SignedMessage - for i := 0; i < iterations; i++ { - msg := &types.Message{ - From: client.DefaultKey.Address, - To: addr, - Value: each, - } - - sm, err := client.MpoolPushMessage(ctx, msg, nil) - require.NoError(t, err) - require.EqualValues(t, i, sm.Message.Nonce) - - sms = append(sms, sm) - } - - select { - case <-waitAllCh: - case <-time.After(time.Minute): - t.Errorf("timeout to wait for pack messages") - } - - // collect filter results - res, err := client.EthGetFilterChanges(ctx, filterID) - require.NoError(t, err) - - // expect to have seen iteration number of tipsets - require.Equal(t, iterations, len(res.NewBlockHashes)) -} diff --git a/itests/eth_filter_test.go b/itests/eth_filter_test.go new file mode 100644 index 00000000000..d3b703281d0 --- /dev/null +++ b/itests/eth_filter_test.go @@ -0,0 +1,594 @@ +// stm: #integration +package itests + +import ( + "context" + "encoding/hex" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/big" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/itests/kit" +) + +func TestEthNewPendingTransactionFilter(t *testing.T) { + ctx := context.Background() + + kit.QuietMiningLogs() + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI()) + ens.InterconnectAll().BeginMining(10 * time.Millisecond) + + // create a new address where to send funds. + addr, err := client.WalletNew(ctx, types.KTBLS) + require.NoError(t, err) + + // get the existing balance from the default wallet to then split it. + bal, err := client.WalletBalance(ctx, client.DefaultKey.Address) + require.NoError(t, err) + + // install filter + filterID, err := client.EthNewPendingTransactionFilter(ctx) + require.NoError(t, err) + + const iterations = 100 + + // we'll send half our balance (saving the other half for gas), + // in `iterations` increments. + toSend := big.Div(bal, big.NewInt(2)) + each := big.Div(toSend, big.NewInt(iterations)) + + waitAllCh := make(chan struct{}) + go func() { + headChangeCh, err := client.ChainNotify(ctx) + require.NoError(t, err) + <-headChangeCh // skip hccurrent + + count := 0 + for { + select { + case headChanges := <-headChangeCh: + for _, change := range headChanges { + if change.Type == store.HCApply { + msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) + require.NoError(t, err) + count += len(msgs) + if count == iterations { + waitAllCh <- struct{}{} + } + } + } + } + } + }() + + var sms []*types.SignedMessage + for i := 0; i < iterations; i++ { + msg := &types.Message{ + From: client.DefaultKey.Address, + To: addr, + Value: each, + } + + sm, err := client.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm.Message.Nonce) + + sms = append(sms, sm) + } + + select { + case <-waitAllCh: + case <-time.After(time.Minute): + t.Errorf("timeout to wait for pack messages") + } + + // collect filter results + res, err := client.EthGetFilterChanges(ctx, filterID) + require.NoError(t, err) + + // expect to have seen iteration number of mpool messages + require.Equal(t, iterations, len(res.Results)) +} + +func TestEthNewBlockFilter(t *testing.T) { + ctx := context.Background() + + kit.QuietMiningLogs() + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI()) + ens.InterconnectAll().BeginMining(10 * time.Millisecond) + + // create a new address where to send funds. + addr, err := client.WalletNew(ctx, types.KTBLS) + require.NoError(t, err) + + // get the existing balance from the default wallet to then split it. + bal, err := client.WalletBalance(ctx, client.DefaultKey.Address) + require.NoError(t, err) + + // install filter + filterID, err := client.EthNewBlockFilter(ctx) + require.NoError(t, err) + + const iterations = 30 + + // we'll send half our balance (saving the other half for gas), + // in `iterations` increments. + toSend := big.Div(bal, big.NewInt(2)) + each := big.Div(toSend, big.NewInt(iterations)) + + waitAllCh := make(chan struct{}) + go func() { + headChangeCh, err := client.ChainNotify(ctx) + require.NoError(t, err) + <-headChangeCh // skip hccurrent + + count := 0 + for { + select { + case headChanges := <-headChangeCh: + for _, change := range headChanges { + if change.Type == store.HCApply || change.Type == store.HCRevert { + count++ + if count == iterations { + waitAllCh <- struct{}{} + } + } + } + } + } + }() + + var sms []*types.SignedMessage + for i := 0; i < iterations; i++ { + msg := &types.Message{ + From: client.DefaultKey.Address, + To: addr, + Value: each, + } + + sm, err := client.MpoolPushMessage(ctx, msg, nil) + require.NoError(t, err) + require.EqualValues(t, i, sm.Message.Nonce) + + sms = append(sms, sm) + } + + select { + case <-waitAllCh: + case <-time.After(time.Minute): + t.Errorf("timeout to wait for pack messages") + } + + // collect filter results + res, err := client.EthGetFilterChanges(ctx, filterID) + require.NoError(t, err) + + // expect to have seen iteration number of tipsets + require.Equal(t, iterations, len(res.Results)) +} + +func TestEthNewFilterCatchAll(t *testing.T) { + require := require.New(t) + + kit.QuietMiningLogs() + + blockTime := 100 * time.Millisecond + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI()) + ens.InterconnectAll().BeginMining(blockTime) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // install contract + contractHex, err := os.ReadFile("contracts/events.bin") + require.NoError(err) + + contract, err := hex.DecodeString(string(contractHex)) + require.NoError(err) + + fromAddr, err := client.WalletDefaultAddress(ctx) + require.NoError(err) + + result := client.EVM().DeployContract(ctx, fromAddr, contract) + + idAddr, err := address.NewIDAddress(result.ActorID) + require.NoError(err) + t.Logf("actor ID address is %s", idAddr) + + // install filter + filterID, err := client.EthNewFilter(ctx, &api.EthFilterSpec{}) + require.NoError(err) + + const iterations = 10 + + type msgInTipset struct { + msg api.Message + ts *types.TipSet + } + + msgChan := make(chan msgInTipset, iterations) + + waitAllCh := make(chan struct{}) + go func() { + headChangeCh, err := client.ChainNotify(ctx) + require.NoError(err) + <-headChangeCh // skip hccurrent + + count := 0 + for { + select { + case headChanges := <-headChangeCh: + for _, change := range headChanges { + if change.Type == store.HCApply || change.Type == store.HCRevert { + msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) + require.NoError(err) + + count += len(msgs) + for _, m := range msgs { + select { + case msgChan <- msgInTipset{msg: m, ts: change.Val}: + default: + } + } + + if count == iterations { + close(msgChan) + close(waitAllCh) + return + } + } + } + } + } + }() + + time.Sleep(blockTime * 6) + + for i := 0; i < iterations; i++ { + // log a four topic event with data + ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil) + require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed") + } + + select { + case <-waitAllCh: + case <-time.After(time.Minute): + t.Errorf("timeout to wait for pack messages") + } + + received := make(map[api.EthHash]msgInTipset) + for m := range msgChan { + eh, err := api.NewEthHashFromCid(m.msg.Cid) + require.NoError(err) + received[eh] = m + } + require.Equal(iterations, len(received), "all messages on chain") + + ts, err := client.ChainHead(ctx) + require.NoError(err) + + actor, err := client.StateGetActor(ctx, idAddr, ts.Key()) + require.NoError(err) + require.NotNil(actor.Address) + ethContractAddr, err := api.EthAddressFromFilecoinAddress(*actor.Address) + require.NoError(err) + + // collect filter results + res, err := client.EthGetFilterChanges(ctx, filterID) + require.NoError(err) + + // expect to have seen iteration number of events + require.Equal(iterations, len(res.Results)) + + topic1Hash := api.EthHashData([]byte{0x42, 0x11, 0x11}) + topic2Hash := api.EthHashData([]byte{0x42, 0x22, 0x22}) + topic3Hash := api.EthHashData([]byte{0x42, 0x33, 0x33}) + topic4Hash := api.EthHashData([]byte{0x42, 0x44, 0x44}) + data1Hash := api.EthHashData([]byte{0x48, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88}) + + for _, r := range res.Results { + // since response is a union and Go doesn't support them well, go-jsonrpc won't give us typed results + rc, ok := r.(map[string]interface{}) + require.True(ok, "result type") + + elog, err := ParseEthLog(rc) + require.NoError(err) + + require.Equal(ethContractAddr, elog.Address, "event address") + require.Equal(api.EthUint64(0), elog.TransactionIndex, "transaction index") // only one message per tipset + + msg, exists := received[elog.TransactionHash] + require.True(exists, "message seen on chain") + + tsCid, err := msg.ts.Key().Cid() + require.NoError(err) + + tsCidHash, err := api.NewEthHashFromCid(tsCid) + require.NoError(err) + + require.Equal(tsCidHash, elog.BlockHash, "block hash") + + require.Equal(4, len(elog.Topics), "number of topics") + require.Equal(topic1Hash, elog.Topics[0], "topic1") + require.Equal(topic2Hash, elog.Topics[1], "topic2") + require.Equal(topic3Hash, elog.Topics[2], "topic3") + require.Equal(topic4Hash, elog.Topics[3], "topic4") + + require.Equal(1, len(elog.Data), "number of data") + require.Equal(data1Hash, elog.Data[0], "data1") + + } +} + +func ParseEthLog(in map[string]interface{}) (*api.EthLog, error) { + el := &api.EthLog{} + + ethHash := func(k string, v interface{}) (api.EthHash, error) { + s, ok := v.(string) + if !ok { + return api.EthHash{}, xerrors.Errorf(k + " not a string") + } + return api.EthHashFromHex(s) + } + + ethUint64 := func(k string, v interface{}) (api.EthUint64, error) { + s, ok := v.(string) + if !ok { + return 0, xerrors.Errorf(k + " not a string") + } + parsedInt, err := strconv.ParseUint(strings.Replace(s, "0x", "", -1), 16, 64) + if err != nil { + return 0, err + } + return api.EthUint64(parsedInt), nil + } + + var err error + for k, v := range in { + switch k { + case "removed": + b, ok := v.(bool) + if ok { + el.Removed = b + continue + } + s, ok := v.(string) + if !ok { + return nil, xerrors.Errorf(k + " not a string") + } + el.Removed, err = strconv.ParseBool(s) + if err != nil { + return nil, err + } + case "address": + s, ok := v.(string) + if !ok { + return nil, xerrors.Errorf(k + " not a string") + } + el.Address, err = api.EthAddressFromHex(s) + if err != nil { + return nil, err + } + case "logIndex": + el.LogIndex, err = ethUint64(k, v) + if err != nil { + return nil, err + } + case "transactionIndex": + el.TransactionIndex, err = ethUint64(k, v) + if err != nil { + return nil, err + } + case "blockNumber": + el.BlockNumber, err = ethUint64(k, v) + if err != nil { + return nil, err + } + case "transactionHash": + el.TransactionHash, err = ethHash(k, v) + if err != nil { + return nil, err + } + case "blockHash": + el.BlockHash, err = ethHash(k, v) + if err != nil { + return nil, err + } + case "data": + sl, ok := v.([]interface{}) + if !ok { + return nil, xerrors.Errorf(k + " not a slice") + } + for _, s := range sl { + data, err := ethHash(k, s) + if err != nil { + return nil, err + } + el.Data = append(el.Data, data) + } + case "topics": + sl, ok := v.([]interface{}) + if !ok { + return nil, xerrors.Errorf(k + " not a slice") + } + for _, s := range sl { + topic, err := ethHash(k, s) + if err != nil { + return nil, err + } + el.Topics = append(el.Topics, topic) + } + } + } + + return el, err +} + +func TestEthGetLogsAll(t *testing.T) { + require := require.New(t) + + kit.QuietMiningLogs() + + blockTime := 100 * time.Millisecond + dbpath := filepath.Join(t.TempDir(), "actorevents.db") + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath)) + ens.InterconnectAll().BeginMining(blockTime) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // install contract + contractHex, err := os.ReadFile("contracts/events.bin") + require.NoError(err) + + contract, err := hex.DecodeString(string(contractHex)) + require.NoError(err) + + fromAddr, err := client.WalletDefaultAddress(ctx) + require.NoError(err) + + result := client.EVM().DeployContract(ctx, fromAddr, contract) + + idAddr, err := address.NewIDAddress(result.ActorID) + require.NoError(err) + t.Logf("actor ID address is %s", idAddr) + + const iterations = 10 + + type msgInTipset struct { + msg api.Message + ts *types.TipSet + } + + msgChan := make(chan msgInTipset, iterations) + + waitAllCh := make(chan struct{}) + go func() { + headChangeCh, err := client.ChainNotify(ctx) + require.NoError(err) + <-headChangeCh // skip hccurrent + + count := 0 + for { + select { + case headChanges := <-headChangeCh: + for _, change := range headChanges { + if change.Type == store.HCApply || change.Type == store.HCRevert { + msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key()) + require.NoError(err) + + count += len(msgs) + for _, m := range msgs { + select { + case msgChan <- msgInTipset{msg: m, ts: change.Val}: + default: + } + } + + if count == iterations { + close(msgChan) + close(waitAllCh) + return + } + } + } + } + } + }() + + time.Sleep(blockTime * 6) + + for i := 0; i < iterations; i++ { + // log a four topic event with data + ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil) + require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed") + } + + select { + case <-waitAllCh: + case <-time.After(time.Minute): + t.Errorf("timeout to wait for pack messages") + } + + received := make(map[api.EthHash]msgInTipset) + for m := range msgChan { + eh, err := api.NewEthHashFromCid(m.msg.Cid) + require.NoError(err) + received[eh] = m + } + require.Equal(iterations, len(received), "all messages on chain") + + head, err := client.ChainHead(ctx) + require.NoError(err) + + actor, err := client.StateGetActor(ctx, idAddr, head.Key()) + require.NoError(err) + require.NotNil(actor.Address) + ethContractAddr, err := api.EthAddressFromFilecoinAddress(*actor.Address) + require.NoError(err) + + topic1Hash := api.EthHashData([]byte{0x42, 0x11, 0x11}) + topic2Hash := api.EthHashData([]byte{0x42, 0x22, 0x22}) + topic3Hash := api.EthHashData([]byte{0x42, 0x33, 0x33}) + topic4Hash := api.EthHashData([]byte{0x42, 0x44, 0x44}) + data1Hash := api.EthHashData([]byte{0x48, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88}) + + pstring := func(s string) *string { return &s } + + // get logs + res, err := client.EthGetLogs(ctx, &api.EthFilterSpec{ + FromBlock: pstring("0x0"), + }) + require.NoError(err) + + // expect to have all messages sent + require.Equal(len(received), len(res.Results)) + + for _, r := range res.Results { + // since response is a union and Go doesn't support them well, go-jsonrpc won't give us typed results + rc, ok := r.(map[string]interface{}) + require.True(ok, "result type") + + elog, err := ParseEthLog(rc) + require.NoError(err) + + require.Equal(ethContractAddr, elog.Address, "event address") + require.Equal(api.EthUint64(0), elog.TransactionIndex, "transaction index") // only one message per tipset + + msg, exists := received[elog.TransactionHash] + require.True(exists, "message seen on chain") + + tsCid, err := msg.ts.Key().Cid() + require.NoError(err) + + tsCidHash, err := api.NewEthHashFromCid(tsCid) + require.NoError(err) + + require.Equal(tsCidHash, elog.BlockHash, "block hash") + + require.Equal(4, len(elog.Topics), "number of topics") + require.Equal(topic1Hash, elog.Topics[0], "topic1") + require.Equal(topic2Hash, elog.Topics[1], "topic2") + require.Equal(topic3Hash, elog.Topics[2], "topic3") + require.Equal(topic4Hash, elog.Topics[3], "topic4") + + require.Equal(1, len(elog.Data), "number of data") + require.Equal(data1Hash, elog.Data[0], "data1") + + } +} diff --git a/itests/kit/node_opts.go b/itests/kit/node_opts.go index 9c482700c26..a315763e81d 100644 --- a/itests/kit/node_opts.go +++ b/itests/kit/node_opts.go @@ -280,3 +280,19 @@ func SplitstoreMessges() NodeOpt { return nil }) } + +func RealTimeFilterAPI() NodeOpt { + return WithCfgOpt(func(cfg *config.FullNode) error { + cfg.ActorEvent.EnableRealTimeFilterAPI = true + return nil + }) +} + +func HistoricFilterAPI(dbpath string) NodeOpt { + return WithCfgOpt(func(cfg *config.FullNode) error { + cfg.ActorEvent.EnableRealTimeFilterAPI = true + cfg.ActorEvent.EnableHistoricFilterAPI = true + cfg.ActorEvent.ActorEventDatabasePath = dbpath + return nil + }) +} diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index f8f4c5fdb1d..6dd12f6f83a 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -69,6 +69,14 @@ this time become eligible for automatic deletion.`, Comment: `MaxFilterHeightRange specifies the maximum range of heights that can be used in a filter (to avoid querying the entire chain)`, }, + { + Name: "ActorEventDatabasePath", + Type: "string", + + Comment: `EventHistoryDatabasePath is the full path to a sqlite database that will be used to index actor events to +support the historic filter APIs. If the database does not exist it will be created. The directory containing +the database must already exist and be writeable.`, + }, }, "Backup": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index 0032930fd9e..197a5c6b362 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -624,6 +624,11 @@ type ActorEventConfig struct { // the entire chain) MaxFilterHeightRange uint64 + // EventHistoryDatabasePath is the full path to a sqlite database that will be used to index actor events to + // support the historic filter APIs. If the database does not exist it will be created. The directory containing + // the database must already exist and be writeable. + ActorEventDatabasePath string + // Others, not implemented yet: // Set a limit on the number of active websocket subscriptions (may be zero) // Set a timeout for subscription clients diff --git a/node/impl/full/chain.go b/node/impl/full/chain.go index ca245dcdad3..d06c854021d 100644 --- a/node/impl/full/chain.go +++ b/node/impl/full/chain.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "io" + "math" "strconv" "strings" "sync" @@ -24,6 +25,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + amt4 "github.com/filecoin-project/go-amt-ipld/v4" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-actors/actors/util/adt" @@ -654,6 +656,33 @@ func (a *ChainAPI) ChainBlockstoreInfo(ctx context.Context) (map[string]interfac return info.Info(), nil } +// ChainGetEvents returns the events under an event AMT root CID. +// +// TODO (raulk) make copies of this logic elsewhere use this (e.g. itests, CLI, events filter). +func (a *ChainAPI) ChainGetEvents(ctx context.Context, root cid.Cid) ([]types.Event, error) { + store := cbor.NewCborStore(a.ExposedBlockstore) + evtArr, err := amt4.LoadAMT(ctx, store, root, amt4.UseTreeBitWidth(5)) + if err != nil { + return nil, xerrors.Errorf("load events amt: %w", err) + } + + ret := make([]types.Event, 0, evtArr.Len()) + var evt types.Event + err = evtArr.ForEach(ctx, func(u uint64, deferred *cbg.Deferred) error { + if u > math.MaxInt { + return xerrors.Errorf("too many events") + } + if err := evt.UnmarshalCBOR(bytes.NewReader(deferred.Raw)); err != nil { + return err + } + + ret = append(ret, evt) + return nil + }) + + return ret, err +} + func (a *ChainAPI) ChainPrune(ctx context.Context, opts api.PruneOpts) error { pruner, ok := a.BaseBlockstore.(interface { PruneChain(opts api.PruneOpts) error diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index 49890f3d9de..a5e8b446250 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -59,7 +59,6 @@ type EthModuleAPI interface { EthCall(ctx context.Context, tx api.EthCall, blkParam string) (api.EthBytes, error) EthMaxPriorityFeePerGas(ctx context.Context) (api.EthBigInt, error) EthSendRawTransaction(ctx context.Context, rawTx api.EthBytes) (api.EthHash, error) - // EthFeeHistory(ctx context.Context, blkCount string) } type EthEventAPI interface { @@ -70,7 +69,7 @@ type EthEventAPI interface { EthNewBlockFilter(ctx context.Context) (api.EthFilterID, error) EthNewPendingTransactionFilter(ctx context.Context) (api.EthFilterID, error) EthUninstallFilter(ctx context.Context, id api.EthFilterID) (bool, error) - EthSubscribe(ctx context.Context, eventTypes []string, params api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) + EthSubscribe(ctx context.Context, eventType string, params *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error) } @@ -97,12 +96,13 @@ type EthModule struct { var _ EthModuleAPI = (*EthModule)(nil) type EthEvent struct { + EthModuleAPI Chain *store.ChainStore EventFilterManager *filter.EventFilterManager TipSetFilterManager *filter.TipSetFilterManager MemPoolFilterManager *filter.MemPoolFilterManager FilterStore filter.FilterStore - SubManager ethSubscriptionManager + SubManager *EthSubscriptionManager MaxFilterHeightRange abi.ChainEpoch } @@ -236,10 +236,19 @@ func (a *EthModule) EthGetTransactionReceipt(ctx context.Context, txHash api.Eth return nil, nil } - receipt, err := api.NewEthTxReceipt(tx, msgLookup, replay) + var events []types.Event + if rct := replay.MsgRct; rct != nil && rct.EventsRoot != nil { + events, err = a.ChainAPI.ChainGetEvents(ctx, *rct.EventsRoot) + if err != nil { + return nil, nil + } + } + + receipt, err := a.newEthTxReceipt(ctx, tx, msgLookup, replay, events) if err != nil { return nil, nil } + return &receipt, nil } @@ -870,9 +879,96 @@ func (a *EthModule) newEthTxFromFilecoinMessageLookup(ctx context.Context, msgLo return tx, nil } -func (e *EthEvent) EthGetLogs(ctx context.Context, filter *api.EthFilterSpec) (*api.EthFilterResult, error) { - // TODO: implement EthGetLogs - return nil, api.ErrNotSupported +func (a *EthModule) newEthTxReceipt(ctx context.Context, tx api.EthTx, lookup *api.MsgLookup, replay *api.InvocResult, events []types.Event) (api.EthTxReceipt, error) { + receipt := api.EthTxReceipt{ + TransactionHash: tx.Hash, + TransactionIndex: tx.TransactionIndex, + BlockHash: tx.BlockHash, + BlockNumber: tx.BlockNumber, + From: tx.From, + To: tx.To, + StateRoot: api.EmptyEthHash, + LogsBloom: []byte{0}, + } + + if receipt.To == nil && lookup.Receipt.ExitCode.IsSuccess() { + // Create and Create2 return the same things. + var ret eam.CreateReturn + if err := ret.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil { + return api.EthTxReceipt{}, xerrors.Errorf("failed to parse contract creation result: %w", err) + } + addr := api.EthAddress(ret.EthAddress) + receipt.ContractAddress = &addr + } + + if lookup.Receipt.ExitCode.IsSuccess() { + receipt.Status = 1 + } + if lookup.Receipt.ExitCode.IsError() { + receipt.Status = 0 + } + + if len(events) > 0 { + receipt.Logs = make([]api.EthLog, 0, len(events)) + for i, evt := range events { + l := api.EthLog{ + Removed: false, + LogIndex: api.EthUint64(i), + TransactionIndex: tx.TransactionIndex, + TransactionHash: tx.Hash, + BlockHash: tx.BlockHash, + BlockNumber: tx.BlockNumber, + } + + for _, entry := range evt.Entries { + hash := api.EthHashData(entry.Value) + if entry.Key == api.EthTopic1 || entry.Key == api.EthTopic2 || entry.Key == api.EthTopic3 || entry.Key == api.EthTopic4 { + l.Topics = append(l.Topics, hash) + } else { + l.Data = append(l.Data, hash) + } + } + + addr, err := address.NewIDAddress(uint64(evt.Emitter)) + if err != nil { + return api.EthTxReceipt{}, xerrors.Errorf("failed to create ID address: %w", err) + } + + l.Address, err = a.lookupEthAddress(ctx, addr) + if err != nil { + return api.EthTxReceipt{}, xerrors.Errorf("failed to resolve Ethereum address: %w", err) + } + + receipt.Logs = append(receipt.Logs, l) + } + } + + receipt.GasUsed = api.EthUint64(lookup.Receipt.GasUsed) + + // TODO: handle CumulativeGasUsed + receipt.CumulativeGasUsed = api.EmptyEthInt + + effectiveGasPrice := big.Div(replay.GasCost.TotalCost, big.NewInt(lookup.Receipt.GasUsed)) + receipt.EffectiveGasPrice = api.EthBigInt(effectiveGasPrice) + + return receipt, nil +} + +func (e *EthEvent) EthGetLogs(ctx context.Context, filterSpec *api.EthFilterSpec) (*api.EthFilterResult, error) { + if e.EventFilterManager == nil { + return nil, api.ErrNotSupported + } + + // Create a temporary filter + f, err := e.installEthFilterSpec(ctx, filterSpec) + if err != nil { + return nil, err + } + ces := f.TakeCollectedEvents(ctx) + + _ = e.uninstallFilter(ctx, f) + + return ethFilterResultFromEvents(ces) } func (e *EthEvent) EthGetFilterChanges(ctx context.Context, id api.EthFilterID) (*api.EthFilterResult, error) { @@ -915,11 +1011,7 @@ func (e *EthEvent) EthGetFilterLogs(ctx context.Context, id api.EthFilterID) (*a return nil, xerrors.Errorf("wrong filter type") } -func (e *EthEvent) EthNewFilter(ctx context.Context, filter *api.EthFilterSpec) (api.EthFilterID, error) { - if e.FilterStore == nil || e.EventFilterManager == nil { - return "", api.ErrNotSupported - } - +func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *api.EthFilterSpec) (*filter.EventFilter, error) { var ( minHeight abi.ChainEpoch maxHeight abi.ChainEpoch @@ -928,39 +1020,39 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filter *api.EthFilterSpec) keys = map[string][][]byte{} ) - if filter.BlockHash != nil { - if filter.FromBlock != nil || filter.ToBlock != nil { - return "", xerrors.Errorf("must not specify block hash and from/to block") + if filterSpec.BlockHash != nil { + if filterSpec.FromBlock != nil || filterSpec.ToBlock != nil { + return nil, xerrors.Errorf("must not specify block hash and from/to block") } // TODO: derive a tipset hash from eth hash - might need to push this down into the EventFilterManager } else { - if filter.FromBlock == nil || *filter.FromBlock == "latest" { + if filterSpec.FromBlock == nil || *filterSpec.FromBlock == "latest" { ts := e.Chain.GetHeaviestTipSet() minHeight = ts.Height() - } else if *filter.FromBlock == "earliest" { + } else if *filterSpec.FromBlock == "earliest" { minHeight = 0 - } else if *filter.FromBlock == "pending" { - return "", api.ErrNotSupported + } else if *filterSpec.FromBlock == "pending" { + return nil, api.ErrNotSupported } else { - epoch, err := strconv.ParseUint(*filter.FromBlock, 10, 64) + epoch, err := api.EthUint64FromHex(*filterSpec.FromBlock) if err != nil { - return "", xerrors.Errorf("invalid epoch") + return nil, xerrors.Errorf("invalid epoch") } minHeight = abi.ChainEpoch(epoch) } - if filter.ToBlock == nil || *filter.ToBlock == "latest" { + if filterSpec.ToBlock == nil || *filterSpec.ToBlock == "latest" { // here latest means the latest at the time maxHeight = -1 - } else if *filter.ToBlock == "earliest" { + } else if *filterSpec.ToBlock == "earliest" { maxHeight = 0 - } else if *filter.ToBlock == "pending" { - return "", api.ErrNotSupported + } else if *filterSpec.ToBlock == "pending" { + return nil, api.ErrNotSupported } else { - epoch, err := strconv.ParseUint(*filter.ToBlock, 10, 64) + epoch, err := api.EthUint64FromHex(*filterSpec.FromBlock) if err != nil { - return "", xerrors.Errorf("invalid epoch") + return nil, xerrors.Errorf("invalid epoch") } maxHeight = abi.ChainEpoch(epoch) } @@ -970,33 +1062,33 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filter *api.EthFilterSpec) // Here the client is looking for events between the head and some future height ts := e.Chain.GetHeaviestTipSet() if maxHeight-ts.Height() > e.MaxFilterHeightRange { - return "", xerrors.Errorf("invalid epoch range") + return nil, xerrors.Errorf("invalid epoch range") } } else if minHeight >= 0 && maxHeight == -1 { // Here the client is looking for events between some time in the past and the current head ts := e.Chain.GetHeaviestTipSet() if ts.Height()-minHeight > e.MaxFilterHeightRange { - return "", xerrors.Errorf("invalid epoch range") + return nil, xerrors.Errorf("invalid epoch range") } } else if minHeight >= 0 && maxHeight >= 0 { if minHeight > maxHeight || maxHeight-minHeight > e.MaxFilterHeightRange { - return "", xerrors.Errorf("invalid epoch range") + return nil, xerrors.Errorf("invalid epoch range") } } } // Convert all addresses to filecoin f4 addresses - for _, ea := range filter.Address { + for _, ea := range filterSpec.Address { a, err := ea.ToFilecoinAddress() if err != nil { - return "", xerrors.Errorf("invalid address %x", ea) + return nil, xerrors.Errorf("invalid address %x", ea) } addresses = append(addresses, a) } - for idx, vals := range filter.Topics { + for idx, vals := range filterSpec.Topics { // Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4 key := fmt.Sprintf("topic%d", idx+1) keyvals := make([][]byte, len(vals)) @@ -1006,7 +1098,15 @@ func (e *EthEvent) EthNewFilter(ctx context.Context, filter *api.EthFilterSpec) keys[key] = keyvals } - f, err := e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys) + return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys) +} + +func (e *EthEvent) EthNewFilter(ctx context.Context, filterSpec *api.EthFilterSpec) (api.EthFilterID, error) { + if e.FilterStore == nil || e.EventFilterManager == nil { + return "", api.ErrNotSupported + } + + f, err := e.installEthFilterSpec(ctx, filterSpec) if err != nil { return "", err } @@ -1119,39 +1219,31 @@ const ( EthSubscribeEventTypeLogs = "logs" ) -func (e *EthEvent) EthSubscribe(ctx context.Context, eventTypes []string, params api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) { +func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) { + if e.SubManager == nil { + return nil, api.ErrNotSupported + } // Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the // method to be "eth_subscription". This probably doesn't matter in practice. - // Validate event types and parameters first - for _, et := range eventTypes { - switch et { - case EthSubscribeEventTypeHeads: - case EthSubscribeEventTypeLogs: - default: - return nil, xerrors.Errorf("unsupported event type: %s", et) - - } - } - sub, err := e.SubManager.StartSubscription(ctx) if err != nil { return nil, err } - for _, et := range eventTypes { - switch et { - case EthSubscribeEventTypeHeads: - f, err := e.TipSetFilterManager.Install(ctx) - if err != nil { - // clean up any previous filters added and stop the sub - _, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id)) - return nil, err - } - sub.addFilter(ctx, f) + switch eventType { + case EthSubscribeEventTypeHeads: + f, err := e.TipSetFilterManager.Install(ctx) + if err != nil { + // clean up any previous filters added and stop the sub + _, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id)) + return nil, err + } + sub.addFilter(ctx, f) - case EthSubscribeEventTypeLogs: - keys := map[string][][]byte{} + case EthSubscribeEventTypeLogs: + keys := map[string][][]byte{} + if params != nil { for idx, vals := range params.Topics { // Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4 key := fmt.Sprintf("topic%d", idx+1) @@ -1161,21 +1253,27 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventTypes []string, params } keys[key] = keyvals } + } - f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, []address.Address{}, keys) - if err != nil { - // clean up any previous filters added and stop the sub - _, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id)) - return nil, err - } - sub.addFilter(ctx, f) + f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, []address.Address{}, keys) + if err != nil { + // clean up any previous filters added and stop the sub + _, _ = e.EthUnsubscribe(ctx, api.EthSubscriptionID(sub.id)) + return nil, err } + sub.addFilter(ctx, f) + default: + return nil, xerrors.Errorf("unsupported event type: %s", eventType) } return sub.out, nil } func (e *EthEvent) EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error) { + if e.SubManager == nil { + return false, api.ErrNotSupported + } + filters, err := e.SubManager.StopSubscription(ctx, string(id)) if err != nil { return false, nil @@ -1227,16 +1325,8 @@ type filterTipSetCollector interface { TakeCollectedTipSets(context.Context) []types.TipSetKey } -var ( - ethTopic1 = "topic1" - ethTopic2 = "topic2" - ethTopic3 = "topic3" - ethTopic4 = "topic4" -) - func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResult, error) { res := &api.EthFilterResult{} - for _, ev := range evs { log := api.EthLog{ Removed: ev.Reverted, @@ -1247,9 +1337,9 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResu var err error - for _, entry := range ev.Event.Entries { + for _, entry := range ev.Entries { hash := api.EthHashData(entry.Value) - if entry.Key == ethTopic1 || entry.Key == ethTopic2 || entry.Key == ethTopic3 || entry.Key == ethTopic4 { + if entry.Key == api.EthTopic1 || entry.Key == api.EthTopic2 || entry.Key == api.EthTopic3 || entry.Key == api.EthTopic4 { log.Topics = append(log.Topics, hash) } else { log.Data = append(log.Data, hash) @@ -1275,7 +1365,7 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResu return nil, err } - res.NewLogs = append(res.NewLogs, log) + res.Results = append(res.Results, log) } return res, nil @@ -1294,7 +1384,7 @@ func ethFilterResultFromTipSets(tsks []types.TipSetKey) (*api.EthFilterResult, e return nil, err } - res.NewBlockHashes = append(res.NewBlockHashes, hash) + res.Results = append(res.Results, hash) } return res, nil @@ -1309,18 +1399,19 @@ func ethFilterResultFromMessages(cs []cid.Cid) (*api.EthFilterResult, error) { return nil, err } - res.NewTransactionHashes = append(res.NewTransactionHashes, hash) + res.Results = append(res.Results, hash) } return res, nil } -type ethSubscriptionManager struct { +type EthSubscriptionManager struct { + EthModuleAPI mu sync.Mutex subs map[string]*ethSubscription } -func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) { +func (e *EthSubscriptionManager) StartSubscription(ctx context.Context) (*ethSubscription, error) { id, err := uuid.NewRandom() if err != nil { return nil, xerrors.Errorf("new uuid: %w", err) @@ -1329,10 +1420,11 @@ func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub ctx, quit := context.WithCancel(ctx) sub := ðSubscription{ - id: id.String(), - in: make(chan interface{}, 200), - out: make(chan api.EthSubscriptionResponse), - quit: quit, + EthModuleAPI: e.EthModuleAPI, + id: id.String(), + in: make(chan interface{}, 200), + out: make(chan api.EthSubscriptionResponse), + quit: quit, } e.mu.Lock() @@ -1347,7 +1439,7 @@ func (e *ethSubscriptionManager) StartSubscription(ctx context.Context) (*ethSub return sub, nil } -func (e *ethSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) { +func (e *EthSubscriptionManager) StopSubscription(ctx context.Context, id string) ([]filter.Filter, error) { e.mu.Lock() defer e.mu.Unlock() @@ -1362,6 +1454,7 @@ func (e *ethSubscriptionManager) StopSubscription(ctx context.Context, id string } type ethSubscription struct { + EthModuleAPI id string in chan interface{} out chan api.EthSubscriptionResponse @@ -1394,7 +1487,24 @@ func (e *ethSubscription) start(ctx context.Context) { case *filter.CollectedEvent: resp.Result, err = ethFilterResultFromEvents([]*filter.CollectedEvent{vt}) case *types.TipSet: - resp.Result = vt + // Sadly convoluted since the logic for conversion to eth block is long and buried away + // in unexported methods of EthModule + tsCid, err := vt.Key().Cid() + if err != nil { + break + } + + hash, err := api.NewEthHashFromCid(tsCid) + if err != nil { + break + } + + eb, err := e.EthGetBlockByHash(ctx, hash, true) + if err != nil { + break + } + + resp.Result = eb default: log.Warnf("unexpected subscription value type: %T", vt) } diff --git a/node/modules/actorevent.go b/node/modules/actorevent.go index 20405c3e792..4f25f07220c 100644 --- a/node/modules/actorevent.go +++ b/node/modules/actorevent.go @@ -31,18 +31,23 @@ type EventAPI struct { var _ events.EventAPI = &EventAPI{} -func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool) (*full.EthEvent, error) { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool) (*full.EthEvent, error) { +func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecycle, *store.ChainStore, *stmgr.StateManager, EventAPI, *messagepool.MessagePool, full.EthModuleAPI) (*full.EthEvent, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, sm *stmgr.StateManager, evapi EventAPI, mp *messagepool.MessagePool, em full.EthModuleAPI) (*full.EthEvent, error) { ee := &full.EthEvent{ + EthModuleAPI: em, Chain: cs, MaxFilterHeightRange: abi.ChainEpoch(cfg.MaxFilterHeightRange), } - if !cfg.EnableRealTimeFilterAPI && !cfg.EnableHistoricFilterAPI { + if !cfg.EnableRealTimeFilterAPI { // all event functionality is disabled + // the historic filter API relies on the real time one return ee, nil } + ee.SubManager = &full.EthSubscriptionManager{ + EthModuleAPI: em, + } ee.FilterStore = filter.NewMemFilterStore(cfg.MaxFilters) // Start garbage collection for filters @@ -53,68 +58,80 @@ func EthEventAPI(cfg config.ActorEventConfig) func(helpers.MetricsCtx, fx.Lifecy }, }) - if cfg.EnableRealTimeFilterAPI { - ee.EventFilterManager = &filter.EventFilterManager{ - ChainStore: cs, - AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { - // we only want to match using f4 addresses - idAddr, err := address.NewIDAddress(uint64(emitter)) - if err != nil { - return address.Undef, false - } - addr, err := sm.LookupRobustAddress(ctx, idAddr, ts) - if err != nil { - return address.Undef, false - } - // if robust address is not f4 then we won't match against it so bail early - if addr.Protocol() != address.Delegated { - return address.Undef, false - } - // we have an f4 address, make sure it's assigned by the EAM - if namespace, _, err := varint.FromUvarint(addr.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID { - return address.Undef, false - } - return addr, true - }, - - MaxFilterResults: cfg.MaxFilterResults, - } - ee.TipSetFilterManager = &filter.TipSetFilterManager{ - MaxFilterResults: cfg.MaxFilterResults, - } - ee.MemPoolFilterManager = &filter.MemPoolFilterManager{ - MaxFilterResults: cfg.MaxFilterResults, + // Enable indexing of actor events + var eventIndex *filter.EventIndex + if cfg.EnableHistoricFilterAPI { + var err error + eventIndex, err = filter.NewEventIndex(cfg.ActorEventDatabasePath) + if err != nil { + return nil, err } - const ChainHeadConfidence = 1 - - ctx := helpers.LifecycleCtx(mctx, lc) lc.Append(fx.Hook{ - OnStart: func(context.Context) error { - ev, err := events.NewEventsWithConfidence(ctx, &evapi, ChainHeadConfidence) - if err != nil { - return err - } - // ignore returned tipsets - _ = ev.Observe(ee.EventFilterManager) - _ = ev.Observe(ee.TipSetFilterManager) - - ch, err := mp.Updates(ctx) - if err != nil { - return err - } - go ee.MemPoolFilterManager.WaitForMpoolUpdates(ctx, ch) - - return nil + OnStop: func(ctx context.Context) error { + return eventIndex.Close() }, }) - } - if cfg.EnableHistoricFilterAPI { - // TODO: enable indexer + ee.EventFilterManager = &filter.EventFilterManager{ + ChainStore: cs, + EventIndex: eventIndex, // will be nil unless EnableHistoricFilterAPI is true + AddressResolver: func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { + // we only want to match using f4 addresses + idAddr, err := address.NewIDAddress(uint64(emitter)) + if err != nil { + return address.Undef, false + } + + actor, err := sm.LoadActor(ctx, idAddr, ts) + if err != nil || actor.Address == nil { + return address.Undef, false + } + + // if robust address is not f4 then we won't match against it so bail early + if actor.Address.Protocol() != address.Delegated { + return address.Undef, false + } + // we have an f4 address, make sure it's assigned by the EAM + if namespace, _, err := varint.FromUvarint(actor.Address.Payload()); err != nil || namespace != builtintypes.EthereumAddressManagerActorID { + return address.Undef, false + } + return *actor.Address, true + }, + + MaxFilterResults: cfg.MaxFilterResults, + } + ee.TipSetFilterManager = &filter.TipSetFilterManager{ + MaxFilterResults: cfg.MaxFilterResults, + } + ee.MemPoolFilterManager = &filter.MemPoolFilterManager{ + MaxFilterResults: cfg.MaxFilterResults, } + const ChainHeadConfidence = 1 + + ctx := helpers.LifecycleCtx(mctx, lc) + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + ev, err := events.NewEventsWithConfidence(ctx, &evapi, ChainHeadConfidence) + if err != nil { + return err + } + // ignore returned tipsets + _ = ev.Observe(ee.EventFilterManager) + _ = ev.Observe(ee.TipSetFilterManager) + + ch, err := mp.Updates(ctx) + if err != nil { + return err + } + go ee.MemPoolFilterManager.WaitForMpoolUpdates(ctx, ch) + + return nil + }, + }) + return ee, nil } } diff --git a/node/rpc.go b/node/rpc.go index 2648056f64a..58854ad36c2 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -99,6 +99,16 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server rpcServer.AliasMethod("eth_estimateGas", "Filecoin.EthEstimateGas") rpcServer.AliasMethod("eth_call", "Filecoin.EthCall") + rpcServer.AliasMethod("eth_getLogs", "Filecoin.EthGetLogs") + rpcServer.AliasMethod("eth_getFilterChanges", "Filecoin.EthGetFilterChanges") + rpcServer.AliasMethod("eth_getFilterLogs", "Filecoin.EthGetFilterLogs") + rpcServer.AliasMethod("eth_newFilter", "Filecoin.EthNewFilter") + rpcServer.AliasMethod("eth_newBlockFilter", "Filecoin.EthNewBlockFilter") + rpcServer.AliasMethod("eth_newPendingTransactionFilter", "Filecoin.EthNewPendingTransactionFilter") + rpcServer.AliasMethod("eth_uninstallFilter", "Filecoin.EthUninstallFilter") + rpcServer.AliasMethod("eth_subscribe", "Filecoin.EthSubscribe") + rpcServer.AliasMethod("eth_unsubscribe", "Filecoin.EthUnsubscribe") + rpcServer.AliasMethod("net_version", "Filecoin.NetVersion") rpcServer.AliasMethod("net_listening", "Filecoin.NetListening")