Skip to content

Commit

Permalink
Add basic itests for ethereum filter api
Browse files Browse the repository at this point in the history
  • Loading branch information
iand committed Nov 15, 2022
1 parent ade75af commit 02460a9
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 32 deletions.
31 changes: 13 additions & 18 deletions api/eth_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,34 +542,29 @@ func (e *EthHashList) UnmarshalJSON(b []byte) error {
return nil
}

// FilterResult represents the response from executing a filter: a list of bloack hashes, a list of transaction hashes
// FilterResult represents the response from executing a filter: a list of block hashes, a list of transaction hashes
// or a list of logs
// This is a union type. Only one field will be populated.
// The JSON encoding must produce an array of the populated field.
type EthFilterResult struct {
// List of block hashes. Only populated when the filter has been installed via EthNewBlockFilter
NewBlockHashes []EthHash

// List of transaction hashes. Only populated when the filter has been installed via EthNewPendingTransactionFilter
NewTransactionHashes []EthHash

// List of event logs. Only populated when the filter has been installed via EthNewFilter
NewLogs []EthLog
Results []interface{}
}

func (h EthFilterResult) MarshalJSON() ([]byte, error) {
if h.NewBlockHashes != nil {
return json.Marshal(h.NewBlockHashes)
}
if h.NewTransactionHashes != nil {
return json.Marshal(h.NewTransactionHashes)
}
if h.NewLogs != nil {
return json.Marshal(h.NewLogs)
func (h *EthFilterResult) MarshalJSON() ([]byte, error) {
if h.Results != nil {
return json.Marshal(h.Results)
}
return []byte{'[', ']'}, nil
}

func (h *EthFilterResult) UnmarshalJSON(b []byte) error {
if bytes.Equal(b, []byte{'n', 'u', 'l', 'l'}) {
return nil
}
err := json.Unmarshal(b, &h.Results)
return err
}

// EthLog represents the results of an event filter execution.
type EthLog struct {
// Address is the address of the actor that produced the event log.
Expand Down
3 changes: 3 additions & 0 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
}

m.mu.Lock()
if m.filters == nil {
m.filters = make(map[string]*EventFilter)
}
m.filters[id.String()] = f
m.mu.Unlock()

Expand Down
3 changes: 3 additions & 0 deletions chain/events/filter/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ func (m *MemPoolFilterManager) Install(ctx context.Context) (*MemPoolFilter, err
}

m.mu.Lock()
if m.filters == nil {
m.filters = make(map[string]*MemPoolFilter)
}
m.filters[id.String()] = f
m.mu.Unlock()

Expand Down
3 changes: 3 additions & 0 deletions chain/events/filter/tipset.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func (m *TipSetFilterManager) Install(ctx context.Context) (*TipSetFilter, error
}

m.mu.Lock()
if m.filters == nil {
m.filters = make(map[string]*TipSetFilter)
}
m.filters[id.String()] = f
m.mu.Unlock()

Expand Down
105 changes: 94 additions & 11 deletions itests/actor_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,29 @@ package itests

import (
"context"
"encoding/hex"
"fmt"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
)

func TestActorEventsMpool(t *testing.T) {
func TestEthNewPendingTransactionFilter(t *testing.T) {
ctx := context.Background()

kit.QuietMiningLogs()

client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs())
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)

// create a new address where to send funds.
Expand Down Expand Up @@ -92,15 +97,15 @@ func TestActorEventsMpool(t *testing.T) {
require.NoError(t, err)

// expect to have seen iteration number of mpool messages
require.Equal(t, iterations, len(res.NewTransactionHashes))
require.Equal(t, iterations, len(res.Results))
}

func TestActorEventsTipsets(t *testing.T) {
func TestEthNewBlockFilter(t *testing.T) {
ctx := context.Background()

kit.QuietMiningLogs()

client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs())
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)

// create a new address where to send funds.
Expand All @@ -115,7 +120,7 @@ func TestActorEventsTipsets(t *testing.T) {
filterID, err := client.EthNewBlockFilter(ctx)
require.NoError(t, err)

const iterations = 100
const iterations = 30

// we'll send half our balance (saving the other half for gas),
// in `iterations` increments.
Expand All @@ -133,10 +138,8 @@ func TestActorEventsTipsets(t *testing.T) {
select {
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply {
msgs, err := client.ChainGetMessagesInTipset(ctx, change.Val.Key())
require.NoError(t, err)
count += len(msgs)
if change.Type == store.HCApply || change.Type == store.HCRevert {
count++
if count == iterations {
waitAllCh <- struct{}{}
}
Expand Down Expand Up @@ -172,5 +175,85 @@ func TestActorEventsTipsets(t *testing.T) {
require.NoError(t, err)

// expect to have seen iteration number of tipsets
require.Equal(t, iterations, len(res.NewBlockHashes))
require.Equal(t, iterations, len(res.Results))
}

func TestEthNewFilterCatchAll(t *testing.T) {
require := require.New(t)

kit.QuietMiningLogs()

blockTime := 100 * time.Millisecond
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.RealTimeFilterAPI())
ens.InterconnectAll().BeginMining(blockTime)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

// install contract
contractHex, err := os.ReadFile("contracts/events.bin")
require.NoError(err)

contract, err := hex.DecodeString(string(contractHex))
require.NoError(err)

fromAddr, err := client.WalletDefaultAddress(ctx)
require.NoError(err)

result := client.EVM().DeployContract(ctx, fromAddr, contract)

idAddr, err := address.NewIDAddress(result.ActorID)
require.NoError(err)
t.Logf("actor ID address is %s", idAddr)

// install filter
filterID, err := client.EthNewFilter(ctx, &api.EthFilterSpec{})
require.NoError(err)

const iterations = 10

waitAllCh := make(chan struct{})
go func() {
headChangeCh, err := client.ChainNotify(ctx)
require.NoError(err)
<-headChangeCh // skip hccurrent

count := 0
for {
select {
case headChanges := <-headChangeCh:
for _, change := range headChanges {
if change.Type == store.HCApply || change.Type == store.HCRevert {
count++
if count == iterations {
waitAllCh <- struct{}{}
}
}
}
}
}
}()

for i := 0; i < iterations; i++ {
// log a four topic event with data
ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil)
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
// fmt.Println(ret)
// fmt.Println(client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot))
}

select {
case <-waitAllCh:
case <-time.After(time.Minute):
t.Errorf("timeout to wait for pack messages")
}

// collect filter results
res, err := client.EthGetFilterChanges(ctx, filterID)
require.NoError(err)

// expect to have seen iteration number of mpool messages
require.Equal(iterations, len(res.Results))

fmt.Printf("res.Results=%+v\n", res.Results)
}
7 changes: 7 additions & 0 deletions itests/kit/node_opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,10 @@ func SplitstoreMessges() NodeOpt {
return nil
})
}

func RealTimeFilterAPI() NodeOpt {
return WithCfgOpt(func(cfg *config.FullNode) error {
cfg.ActorEvent.EnableRealTimeFilterAPI = true
return nil
})
}
6 changes: 3 additions & 3 deletions node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent) (*api.EthFilterResu
return nil, err
}

res.NewLogs = append(res.NewLogs, log)
res.Results = append(res.Results, log)
}

return res, nil
Expand All @@ -1312,7 +1312,7 @@ func ethFilterResultFromTipSets(tsks []types.TipSetKey) (*api.EthFilterResult, e
return nil, err
}

res.NewBlockHashes = append(res.NewBlockHashes, hash)
res.Results = append(res.Results, hash)
}

return res, nil
Expand All @@ -1327,7 +1327,7 @@ func ethFilterResultFromMessages(cs []cid.Cid) (*api.EthFilterResult, error) {
return nil, err
}

res.NewTransactionHashes = append(res.NewTransactionHashes, hash)
res.Results = append(res.Results, hash)
}

return res, nil
Expand Down

0 comments on commit 02460a9

Please sign in to comment.