Skip to content

Commit

Permalink
feat: gateway: auto-cleanup of installed filters when ws connection ends
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 9, 2024
1 parent f8981cb commit 1fd00bc
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 50 deletions.
7 changes: 6 additions & 1 deletion gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func Handler(
m := mux.NewRouter()

opts = append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors))

serveRpc := func(path string, hnd interface{}) {
rpcServer := jsonrpc.NewServer(opts...)
rpcServer.Register("Filecoin", hnd)
Expand Down Expand Up @@ -106,7 +107,11 @@ type statefulCallHandler struct {
}

func (h statefulCallHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(context.WithValue(r.Context(), statefulCallTrackerKey, newStatefulCallTracker()))
tracker := newStatefulCallTracker()
defer func() {
go tracker.cleanup()
}()
r = r.WithContext(context.WithValue(r.Context(), statefulCallTrackerKey, tracker))
h.next.ServeHTTP(w, r)
}

Expand Down
3 changes: 3 additions & 0 deletions gateway/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
logger "github.com/ipfs/go-log/v2"
"go.opencensus.io/stats"
"golang.org/x/time/rate"

Expand All @@ -31,6 +32,8 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes"
)

var log = logger.Logger("gateway")

const (
DefaultLookbackCap = time.Hour * 24
DefaultStateWaitLookbackLimit = abi.ChainEpoch(20)
Expand Down
77 changes: 54 additions & 23 deletions gateway/proxy_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"sync"
"time"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -446,11 +445,8 @@ func (gw *Node) EthGetFilterChanges(ctx context.Context, id ethtypes.EthFilterID
if err != nil {
return nil, xerrors.Errorf("EthGetFilterChanges not supported: %w", err)
}
ft.lk.Lock()
_, ok := ft.userFilters[id]
ft.lk.Unlock()

if !ok {
if !ft.hasFilter(id) {
return nil, filter.ErrFilterNotFound
}

Expand All @@ -466,11 +462,8 @@ func (gw *Node) EthGetFilterLogs(ctx context.Context, id ethtypes.EthFilterID) (
if err != nil {
return nil, xerrors.Errorf("EthGetFilterLogs not supported: %w", err)
}
ft.lk.Lock()
_, ok := ft.userFilters[id]
ft.lk.Unlock()

if !ok {
if !ft.hasFilter(id) {
return nil, nil
}

Expand All @@ -482,7 +475,7 @@ func (gw *Node) EthNewFilter(ctx context.Context, filter *ethtypes.EthFilterSpec
return ethtypes.EthFilterID{}, err
}

return addUserFilterLimited(ctx, "EthNewFilter", func() (ethtypes.EthFilterID, error) {
return gw.addUserFilterLimited(ctx, "EthNewFilter", func() (ethtypes.EthFilterID, error) {
return gw.target.EthNewFilter(ctx, filter)
})
}
Expand All @@ -492,7 +485,7 @@ func (gw *Node) EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, er
return ethtypes.EthFilterID{}, err
}

return addUserFilterLimited(ctx, "EthNewBlockFilter", func() (ethtypes.EthFilterID, error) {
return gw.addUserFilterLimited(ctx, "EthNewBlockFilter", func() (ethtypes.EthFilterID, error) {
return gw.target.EthNewBlockFilter(ctx)
})
}
Expand All @@ -502,7 +495,7 @@ func (gw *Node) EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.Et
return ethtypes.EthFilterID{}, err
}

return addUserFilterLimited(ctx, "EthNewPendingTransactionFilter", func() (ethtypes.EthFilterID, error) {
return gw.addUserFilterLimited(ctx, "EthNewPendingTransactionFilter", func() (ethtypes.EthFilterID, error) {
return gw.target.EthNewPendingTransactionFilter(ctx)
})
}
Expand All @@ -517,6 +510,7 @@ func (gw *Node) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID)
if err != nil {
return false, xerrors.Errorf("EthUninstallFilter not supported: %w", err)
}

ft.lk.Lock()
defer ft.lk.Unlock()

Expand All @@ -526,6 +520,7 @@ func (gw *Node) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID)

ok, err := gw.target.EthUninstallFilter(ctx, id)
if err != nil {
// don't delete the filter, it's "stuck" so should still count towards the limit
return false, err
}

Expand All @@ -545,12 +540,12 @@ func (gw *Node) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes
}

if gw.subHnd == nil {
return ethtypes.EthSubscriptionID{}, xerrors.New("subscription support not enabled")
return ethtypes.EthSubscriptionID{}, xerrors.New("EthSubscribe not supported: subscription support not enabled")
}

ethCb, ok := jsonrpc.ExtractReverseClient[api.EthSubscriberMethods](ctx)
if !ok {
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks")
return ethtypes.EthSubscriptionID{}, xerrors.Errorf("EthSubscribe not supported: connection doesn't support callbacks")
}

ft, err := getStatefulTracker(ctx)
Expand Down Expand Up @@ -581,7 +576,11 @@ func (gw *Node) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes
return ethtypes.EthSubscriptionID{}, err
}

ft.userSubscriptions[sub] = time.Now()
ft.userSubscriptions[sub] = func() {
if _, err := gw.target.EthUnsubscribe(ctx, sub); err != nil {
log.Warnf("error unsubscribing after connection end: %v", err)
}
}

return sub, err
}
Expand All @@ -605,6 +604,7 @@ func (gw *Node) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionI

ok, err := gw.target.EthUnsubscribe(ctx, id)
if err != nil {
// don't delete the subscription, it's "stuck" so should still count towards the limit
return false, err
}

Expand Down Expand Up @@ -679,31 +679,40 @@ func (gw *Node) EthTraceFilter(ctx context.Context, filter ethtypes.EthTraceFilt

var EthMaxFiltersPerConn = 16 // todo make this configurable

func addUserFilterLimited(ctx context.Context, callName string, cb func() (ethtypes.EthFilterID, error)) (ethtypes.EthFilterID, error) {
func (gw *Node) addUserFilterLimited(
ctx context.Context,
callName string,
install func() (ethtypes.EthFilterID, error),
) (ethtypes.EthFilterID, error) {
ft, err := getStatefulTracker(ctx)
if err != nil {
return ethtypes.EthFilterID{}, xerrors.Errorf("%s not supported: %w", callName, err)
}

ft.lk.Lock()
defer ft.lk.Unlock()

if len(ft.userFilters) >= EthMaxFiltersPerConn {
return ethtypes.EthFilterID{}, xerrors.New("too many filters")
}

id, err := cb()
id, err := install()
if err != nil {
return id, err
}

ft.userFilters[id] = time.Now()
ft.userFilters[id] = func() {
if _, err := gw.target.EthUninstallFilter(ctx, id); err != nil {
log.Warnf("error uninstalling filter after connection end: %v", err)
}
}

return id, nil
}

func getStatefulTracker(ctx context.Context) (*statefulCallTracker, error) {
if jsonrpc.GetConnectionType(ctx) != jsonrpc.ConnectionTypeWS {
return nil, xerrors.New("stateful tracking is only available for websockets connections")
return nil, xerrors.New("stateful methods are only available on websockets connections")
}

if ct, ok := ctx.Value(statefulCallTrackerKey).(*statefulCallTracker); !ok {
Expand All @@ -713,17 +722,39 @@ func getStatefulTracker(ctx context.Context) (*statefulCallTracker, error) {
}
}

type cleanup func()

type statefulCallTracker struct {
lk sync.Mutex

userFilters map[ethtypes.EthFilterID]time.Time
userSubscriptions map[ethtypes.EthSubscriptionID]time.Time
userFilters map[ethtypes.EthFilterID]cleanup
userSubscriptions map[ethtypes.EthSubscriptionID]cleanup
}

func (ft *statefulCallTracker) cleanup() {
ft.lk.Lock()
defer ft.lk.Unlock()

for _, cleanup := range ft.userFilters {
cleanup()
}
for _, cleanup := range ft.userSubscriptions {
cleanup()
}
}

func (ft *statefulCallTracker) hasFilter(id ethtypes.EthFilterID) bool {
ft.lk.Lock()
defer ft.lk.Unlock()

_, ok := ft.userFilters[id]
return ok
}

// called per request (ws connection)
func newStatefulCallTracker() *statefulCallTracker {
return &statefulCallTracker{
userFilters: make(map[ethtypes.EthFilterID]time.Time),
userSubscriptions: make(map[ethtypes.EthSubscriptionID]time.Time),
userFilters: make(map[ethtypes.EthFilterID]cleanup),
userSubscriptions: make(map[ethtypes.EthSubscriptionID]cleanup),
}
}
Loading

0 comments on commit 1fd00bc

Please sign in to comment.