From 259a2ccbb0e596101e6ca975a5f9f7d3fce41740 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Fri, 26 Jul 2024 23:27:22 +1000 Subject: [PATCH 1/7] fix!: gateway: fix rate limiting, general cleanup Minor API changes: * gateway.NewRateLimiterHandler and gateway.NewConnectionRateLimiterHandler have been replaced with gateway.NewRateLimitHandler. * The handlers returned by both gateway.NewRateLimitHandler and the primary gateway.Handler return an http.Handler augmented with a Shutdown(ctx) method to be used for graceful cleanup of resources. Fix: * --per-conn-rate-limit was previously applied as a global rate limiter, effectively making it have the same impact as --rate-limit. This change fixes the behaviour such that --per-conn-rate-limit is applied as a API call limiter within a single connection (i.e. a WebSocket connection). The rate is specified as tokens-per-second, where tokens are relative to the expense of the API call being made. --- CHANGELOG.md | 7 ++ cmd/lotus-gateway/main.go | 47 ++++---- gateway/handler.go | 226 +++++++++++++++++++++++++------------- gateway/handler_test.go | 42 +++++++ gateway/node.go | 23 ++-- gateway/node_test.go | 34 ++++-- itests/gateway_test.go | 188 ++++++++++++++++++++++++------- 7 files changed, 417 insertions(+), 150 deletions(-) create mode 100644 gateway/handler_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e9dad67f3d..60def96bbc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,13 @@ - fix: add datacap balance to circ supply internal accounting as unCirc #12348 +## Improvements + +- fix!: gateway: fix rate limiting, general cleanup ([filecoin-project/lotus#12315](https://github.com/filecoin-project/lotus/pull/12315)). + - CLI usage documentation has been improved for `lotus-gateway` + - `--per-conn-rate-limit` now works as advertised. + - Some APIs have changed which may impact users consuming Lotus Gateway code as a library. + # v1.28.1 / 2024-07-24 This is the MANDATORY Lotus v1.28.1 release, which will deliver the Filecoin network version 23, codenamed Waffle 🧇. v1.28.1 is also the minimal version that supports nv23. diff --git a/cmd/lotus-gateway/main.go b/cmd/lotus-gateway/main.go index 2c5279ed448..a2c46c6b26c 100644 --- a/cmd/lotus-gateway/main.go +++ b/cmd/lotus-gateway/main.go @@ -132,23 +132,29 @@ var runCmd = &cli.Command{ Value: int64(gateway.DefaultStateWaitLookbackLimit), }, &cli.Int64Flag{ - Name: "rate-limit", - Usage: "rate-limit API calls. Use 0 to disable", + Name: "rate-limit", + Usage: fmt.Sprintf( + "Global API call throttling rate limit (per second), weighted by relative expense of the call, with the most expensive calls counting for %d. Use 0 to disable", + gateway.MaxRateLimitTokens, + ), Value: 0, }, &cli.Int64Flag{ - Name: "per-conn-rate-limit", - Usage: "rate-limit API calls per each connection. Use 0 to disable", + Name: "per-conn-rate-limit", + Usage: fmt.Sprintf( + "API call throttling rate limit (per second) per WebSocket connection, weighted by relative expense of the call, with the most expensive calls counting for %d. Use 0 to disable", + gateway.MaxRateLimitTokens, + ), Value: 0, }, &cli.DurationFlag{ Name: "rate-limit-timeout", - Usage: "the maximum time to wait for the rate limiter before returning an error to clients", + Usage: "The maximum time to wait for the API call throttling rate limiter before returning an error to clients", Value: gateway.DefaultRateLimitTimeout, }, &cli.Int64Flag{ Name: "conn-per-minute", - Usage: "The number of incomming connections to accept from a single IP per minute. Use 0 to disable", + Usage: "A hard limit on the number of incomming connections (requests) to accept per remote host per minute. Use 0 to disable", Value: 0, }, }, @@ -171,13 +177,13 @@ var runCmd = &cli.Command{ defer closer() var ( - lookbackCap = cctx.Duration("api-max-lookback") - address = cctx.String("listen") - waitLookback = abi.ChainEpoch(cctx.Int64("api-wait-lookback-limit")) - rateLimit = cctx.Int64("rate-limit") - perConnRateLimit = cctx.Int64("per-conn-rate-limit") - rateLimitTimeout = cctx.Duration("rate-limit-timeout") - connPerMinute = cctx.Int64("conn-per-minute") + lookbackCap = cctx.Duration("api-max-lookback") + address = cctx.String("listen") + waitLookback = abi.ChainEpoch(cctx.Int64("api-wait-lookback-limit")) + globalRateLimit = cctx.Int("rate-limit") + perConnectionRateLimit = cctx.Int("per-conn-rate-limit") + rateLimitTimeout = cctx.Duration("rate-limit-timeout") + perHostConnectionsPerMinute = cctx.Int("conn-per-minute") ) serverOptions := make([]jsonrpc.ServerOption, 0) @@ -197,21 +203,22 @@ var runCmd = &cli.Command{ return xerrors.Errorf("failed to convert endpoint address to multiaddr: %w", err) } - gwapi := gateway.NewNode(api, subHnd, lookbackCap, waitLookback, rateLimit, rateLimitTimeout) - h, err := gateway.Handler(gwapi, api, perConnRateLimit, connPerMinute, serverOptions...) + gwapi := gateway.NewNode(api, subHnd, lookbackCap, waitLookback, int64(globalRateLimit), rateLimitTimeout) + handler, err := gateway.Handler(gwapi, api, perConnectionRateLimit, perHostConnectionsPerMinute, serverOptions...) if err != nil { return xerrors.Errorf("failed to set up gateway HTTP handler") } - stopFunc, err := node.ServeRPC(h, "lotus-gateway", maddr) + stopFunc, err := node.ServeRPC(handler, "lotus-gateway", maddr) if err != nil { return xerrors.Errorf("failed to serve rpc endpoint: %w", err) } - <-node.MonitorShutdown(nil, node.ShutdownHandler{ - Component: "rpc", - StopFunc: stopFunc, - }) + <-node.MonitorShutdown( + nil, + node.ShutdownHandler{Component: "rpc", StopFunc: stopFunc}, + node.ShutdownHandler{Component: "rpc-handler", StopFunc: handler.Shutdown}, + ) return nil }, } diff --git a/gateway/handler.go b/gateway/handler.go index 2a9ee20807f..d98b9594cf2 100644 --- a/gateway/handler.go +++ b/gateway/handler.go @@ -21,20 +21,48 @@ import ( "github.com/filecoin-project/lotus/node" ) -type perConnLimiterKeyType string +type perConnectionAPIRateLimiterKeyType string +type filterTrackerKeyType string -const perConnLimiterKey perConnLimiterKeyType = "limiter" +const ( + perConnectionAPIRateLimiterKey perConnectionAPIRateLimiterKeyType = "limiter" + statefulCallTrackerKey filterTrackerKeyType = "statefulCallTracker" + connectionLimiterCleanupInterval = 30 * time.Second +) -type filterTrackerKeyType string +// ShutdownHandler is an http.Handler that can be gracefully shutdown. +type ShutdownHandler interface { + http.Handler + + Shutdown(ctx context.Context) error +} -const statefulCallTrackerKey filterTrackerKeyType = "statefulCallTracker" +var _ ShutdownHandler = &statefulCallHandler{} +var _ ShutdownHandler = &RateLimitHandler{} + +// Handler returns a gateway http.Handler, to be mounted as-is on the server. The handler is +// returned as a ShutdownHandler which allows for graceful shutdown of the handler via its +// Shutdown method. +// +// The handler will limit the number of API calls per minute within a single WebSocket connection +// (where API calls are weighted by their relative expense), and the number of connections per +// minute from a single host. +// +// Connection limiting is a hard limit that will reject requests with a 429 status code if the limit +// is exceeded. API call limiting is a soft limit that will delay requests if the limit is exceeded. +func Handler( + gwapi lapi.Gateway, + api lapi.FullNode, + perConnectionAPIRateLimit int, + perHostConnectionsPerMinute int, + opts ...jsonrpc.ServerOption, +) (ShutdownHandler, error) { -// Handler returns a gateway http.Handler, to be mounted as-is on the server. -func Handler(gwapi lapi.Gateway, api lapi.FullNode, rateLimit int64, connPerMinute int64, opts ...jsonrpc.ServerOption) (http.Handler, error) { m := mux.NewRouter() + opts = append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors)) serveRpc := func(path string, hnd interface{}) { - rpcServer := jsonrpc.NewServer(append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors))...) + rpcServer := jsonrpc.NewServer(opts...) rpcServer.Register("Filecoin", hnd) rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover") @@ -61,104 +89,152 @@ func Handler(gwapi lapi.Gateway, api lapi.FullNode, rateLimit int64, connPerMinu m.Handle("/health/readyz", node.NewReadyHandler(api)) m.PathPrefix("/").Handler(http.DefaultServeMux) - /*ah := &auth.Handler{ - Verify: nodeApi.AuthVerify, - Next: mux.ServeHTTP, - }*/ - - rlh := NewRateLimiterHandler(m, rateLimit) - clh := NewConnectionRateLimiterHandler(rlh, connPerMinute) - return clh, nil + handler := &statefulCallHandler{m} + if perConnectionAPIRateLimit > 0 && perHostConnectionsPerMinute > 0 { + return NewRateLimitHandler( + handler, + perConnectionAPIRateLimit, + perHostConnectionsPerMinute, + connectionLimiterCleanupInterval, + ), nil + } + return handler, nil } -func NewRateLimiterHandler(handler http.Handler, rateLimit int64) *RateLimiterHandler { - limiter := limiterFromRateLimit(rateLimit) - - return &RateLimiterHandler{ - handler: handler, - limiter: limiter, - } +type statefulCallHandler struct { + next http.Handler } -// RateLimiterHandler adds a rate limiter to the request context for per-connection rate limiting -type RateLimiterHandler struct { - handler http.Handler - limiter *rate.Limiter +func (h statefulCallHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + r = r.WithContext(context.WithValue(r.Context(), statefulCallTrackerKey, newStatefulCallTracker())) + h.next.ServeHTTP(w, r) } -func (h RateLimiterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - r = r.WithContext(context.WithValue(r.Context(), perConnLimiterKey, h.limiter)) +func (h statefulCallHandler) Shutdown(ctx context.Context) error { + return shutdown(ctx, h.next) +} - // also add a filter tracker to the context - r = r.WithContext(context.WithValue(r.Context(), statefulCallTrackerKey, newStatefulCallTracker())) +type hostLimiter struct { + limiter *rate.Limiter + lastAccess time.Time +} - h.handler.ServeHTTP(w, r) +type RateLimitHandler struct { + cancelFunc context.CancelFunc + mu sync.Mutex + limiters map[string]*hostLimiter + perConnectionAPILimit rate.Limit + perHostConnectionsPerMinute int + next http.Handler + cleanupInterval time.Duration + expiryDuration time.Duration } -// NewConnectionRateLimiterHandler blocks new connections if there have already been too many. -func NewConnectionRateLimiterHandler(handler http.Handler, connPerMinute int64) *ConnectionRateLimiterHandler { - ipmap := make(map[string]int64) - return &ConnectionRateLimiterHandler{ - ipmap: ipmap, - connPerMinute: connPerMinute, - handler: handler, +// NewRateLimitHandler creates a new RateLimitHandler that wraps the +// provided handler and limits the number of API calls per minute within a single WebSocket +// connection (where API calls are weighted by their relative expense), and the number of +// connections per minute from a single host. +// The cleanupInterval determines how often the handler will check for unused limiters to clean up. +func NewRateLimitHandler( + next http.Handler, + perConnectionAPIRateLimit int, + perHostConnectionsPerMinute int, + cleanupInterval time.Duration, +) *RateLimitHandler { + + ctx, cancel := context.WithCancel(context.Background()) + h := &RateLimitHandler{ + cancelFunc: cancel, + limiters: make(map[string]*hostLimiter), + perConnectionAPILimit: rate.Inf, + perHostConnectionsPerMinute: perHostConnectionsPerMinute, + next: next, + cleanupInterval: cleanupInterval, + expiryDuration: 5 * cleanupInterval, + } + if perConnectionAPIRateLimit > 0 { + h.perConnectionAPILimit = rate.Every(time.Second / time.Duration(perConnectionAPIRateLimit)) } + go h.cleanupExpiredLimiters(ctx) + return h } -type ConnectionRateLimiterHandler struct { - mu sync.Mutex - ipmap map[string]int64 - connPerMinute int64 - handler http.Handler +func (h *RateLimitHandler) getLimits(host string) *hostLimiter { + h.mu.Lock() + defer h.mu.Unlock() + + entry, exists := h.limiters[host] + if !exists { + var limiter *rate.Limiter + if h.perHostConnectionsPerMinute > 0 { + requestLimit := rate.Every(time.Minute / time.Duration(h.perHostConnectionsPerMinute)) + limiter = rate.NewLimiter(requestLimit, h.perHostConnectionsPerMinute) + } + entry = &hostLimiter{ + limiter: limiter, + lastAccess: time.Now(), + } + h.limiters[host] = entry + } else { + entry.lastAccess = time.Now() + } + + return entry } -func (h *ConnectionRateLimiterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if h.connPerMinute == 0 { - h.handler.ServeHTTP(w, r) - return - } +func (h *RateLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { host, _, err := net.SplitHostPort(r.RemoteAddr) if err != nil { w.WriteHeader(http.StatusInternalServerError) return } - h.mu.Lock() - seen, ok := h.ipmap[host] - if !ok { - h.ipmap[host] = 1 - h.mu.Unlock() - h.handler.ServeHTTP(w, r) + limits := h.getLimits(host) + if limits.limiter != nil && !limits.limiter.Allow() { + w.WriteHeader(http.StatusTooManyRequests) return } - // rate limited - if seen > h.connPerMinute { - h.mu.Unlock() - w.WriteHeader(http.StatusTooManyRequests) + + if h.perConnectionAPILimit != rate.Inf { + // new rate limiter for each connection, to throttle a single WebSockets connection; + // allow for a burst of MaxRateLimitTokens + apiLimiter := rate.NewLimiter(h.perConnectionAPILimit, MaxRateLimitTokens) + r = r.WithContext(context.WithValue(r.Context(), perConnectionAPIRateLimiterKey, apiLimiter)) + } + + h.next.ServeHTTP(w, r) +} + +func (h *RateLimitHandler) cleanupExpiredLimiters(ctx context.Context) { + if h.cleanupInterval == 0 { return } - h.ipmap[host] = seen + 1 - h.mu.Unlock() - go func() { + + for { select { - case <-time.After(time.Minute): + case <-ctx.Done(): + return + case <-time.After(h.cleanupInterval): h.mu.Lock() - defer h.mu.Unlock() - h.ipmap[host] = h.ipmap[host] - 1 - if h.ipmap[host] <= 0 { - delete(h.ipmap, host) + now := time.Now() + for host, entry := range h.limiters { + if now.Sub(entry.lastAccess) > h.expiryDuration { + delete(h.limiters, host) + } } + h.mu.Unlock() } - }() - h.handler.ServeHTTP(w, r) + } } -func limiterFromRateLimit(rateLimit int64) *rate.Limiter { - var limit rate.Limit - if rateLimit == 0 { - limit = rate.Inf - } else { - limit = rate.Every(time.Second / time.Duration(rateLimit)) +func (h *RateLimitHandler) Shutdown(ctx context.Context) error { + h.cancelFunc() + return shutdown(ctx, h.next) +} + +func shutdown(ctx context.Context, handler http.Handler) error { + if sh, ok := handler.(ShutdownHandler); ok { + return sh.Shutdown(ctx) } - return rate.NewLimiter(limit, stateRateLimitTokens) + return nil } diff --git a/gateway/handler_test.go b/gateway/handler_test.go new file mode 100644 index 00000000000..65e56836c0a --- /dev/null +++ b/gateway/handler_test.go @@ -0,0 +1,42 @@ +package gateway_test + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/gateway" +) + +func TestRequestRateLimiterHandler(t *testing.T) { + var callCount int + h := gateway.NewRateLimitHandler( + http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { + callCount++ + }), + 0, // api rate + 2, // request rate (per minute) + 0, // cleanup interval + ) + + runRequest := func(host string, expectedStatus, expectedCallCount int) { + req := httptest.NewRequest("GET", "/", nil) + req.RemoteAddr = host + ":1234" + w := httptest.NewRecorder() + h.ServeHTTP(w, req) + + require.Equal(t, expectedStatus, w.Code, "expected status %v, got %v", expectedStatus, w.Code) + require.Equal(t, expectedCallCount, callCount, "expected callCount to be %v, got %v", expectedCallCount, callCount) + } + + // Test that the handler allows up to 2 requests per minute per host. + runRequest("boop", http.StatusOK, 1) + runRequest("boop", http.StatusOK, 2) + runRequest("beep", http.StatusOK, 3) + runRequest("boop", http.StatusTooManyRequests, 3) + runRequest("beep", http.StatusOK, 4) + runRequest("boop", http.StatusTooManyRequests, 4) + runRequest("beep", http.StatusTooManyRequests, 4) +} diff --git a/gateway/node.go b/gateway/node.go index facf7f3f7b0..0b15918a96b 100644 --- a/gateway/node.go +++ b/gateway/node.go @@ -39,6 +39,9 @@ const ( walletRateLimitTokens = 1 chainRateLimitTokens = 2 stateRateLimitTokens = 3 + + // MaxRateLimitTokens is the number of tokens consumed for the most expensive types of operations + MaxRateLimitTokens = stateRateLimitTokens ) // TargetAPI defines the API methods that the Node depends on @@ -175,11 +178,17 @@ var ( ) // NewNode creates a new gateway node. -func NewNode(api TargetAPI, sHnd *EthSubHandler, lookbackCap time.Duration, stateWaitLookbackLimit abi.ChainEpoch, rateLimit int64, rateLimitTimeout time.Duration) *Node { - var limit rate.Limit - if rateLimit == 0 { - limit = rate.Inf - } else { +func NewNode( + api TargetAPI, + sHnd *EthSubHandler, + lookbackCap time.Duration, + stateWaitLookbackLimit abi.ChainEpoch, + rateLimit int64, + rateLimitTimeout time.Duration, +) *Node { + + limit := rate.Inf + if rateLimit > 0 { limit = rate.Every(time.Second / time.Duration(rateLimit)) } return &Node{ @@ -187,7 +196,7 @@ func NewNode(api TargetAPI, sHnd *EthSubHandler, lookbackCap time.Duration, stat subHnd: sHnd, lookbackCap: lookbackCap, stateWaitLookbackLimit: stateWaitLookbackLimit, - rateLimiter: rate.NewLimiter(limit, stateRateLimitTokens), + rateLimiter: rate.NewLimiter(limit, MaxRateLimitTokens), // allow for a burst of MaxRateLimitTokens rateLimitTimeout: rateLimitTimeout, errLookback: fmt.Errorf("lookbacks of more than %s are disallowed", lookbackCap), } @@ -238,7 +247,7 @@ func (gw *Node) checkTimestamp(at time.Time) error { func (gw *Node) limit(ctx context.Context, tokens int) error { ctx2, cancel := context.WithTimeout(ctx, gw.rateLimitTimeout) defer cancel() - if perConnLimiter, ok := ctx2.Value(perConnLimiterKey).(*rate.Limiter); ok { + if perConnLimiter, ok := ctx2.Value(perConnectionAPIRateLimiterKey).(*rate.Limiter); ok { err := perConnLimiter.WaitN(ctx2, tokens) if err != nil { return fmt.Errorf("connection limited. %w", err) diff --git a/gateway/node_test.go b/gateway/node_test.go index 3b801e19d84..0c60679d012 100644 --- a/gateway/node_test.go +++ b/gateway/node_test.go @@ -260,18 +260,32 @@ func TestGatewayLimitTokensAvailable(t *testing.T) { require.NoError(t, a.limit(ctx, tokens), "requests should not be limited when there are enough tokens available") } -func TestGatewayLimitTokensNotAvailable(t *testing.T) { +func TestGatewayLimitTokensRate(t *testing.T) { ctx := context.Background() mock := &mockGatewayDepsAPI{} tokens := 3 - a := NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, int64(1), time.Millisecond) - var err error - // try to be rate limited - for i := 0; i <= 1000; i++ { - err = a.limit(ctx, tokens) - if err != nil { - break - } + var rateLimit int64 = 200 + rateLimitTimeout := time.Second / time.Duration(rateLimit/3) // large enough to not be hit + a := NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, rateLimit, rateLimitTimeout) + + start := time.Now() + calls := 10 + for i := 0; i < calls; i++ { + require.NoError(t, a.limit(ctx, tokens)) } - require.Error(t, err, "requiests should be rate limited when they hit limits") + // We should be slowed down by the rate limit, but not hard limited because the timeout is + // large; the duration should be roughly the rate limit (per second) times the number of calls, + // with one extra free call because the first one can use up the burst tokens. We'll also add a + // couple more to account for slow test runs. + delayPerToken := time.Second / time.Duration(rateLimit) + expectedDuration := delayPerToken * time.Duration((calls-1)*tokens) + expectedEnd := start.Add(expectedDuration) + require.WithinDuration(t, expectedEnd, time.Now(), delayPerToken*time.Duration(2*tokens), "API calls should be rate limited when they hit limits") + + // In this case our timeout is too short to allow for the rate limit, so we should hit the + // hard rate limit. + rateLimitTimeout = time.Second / time.Duration(rateLimit) + a = NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, rateLimit, rateLimitTimeout) + require.NoError(t, a.limit(ctx, tokens)) + require.ErrorContains(t, a.limit(ctx, tokens), "server busy", "API calls should be hard rate limited when they hit limits") } diff --git a/itests/gateway_test.go b/itests/gateway_test.go index b994d6de3c8..b223fbdc7ab 100644 --- a/itests/gateway_test.go +++ b/itests/gateway_test.go @@ -4,9 +4,12 @@ package itests import ( "bytes" "context" + "encoding/json" "fmt" + "io" "math" "net" + "net/http" "testing" "time" @@ -46,9 +49,8 @@ func TestGatewayWalletMsig(t *testing.T) { //stm: @CHAIN_INCOMING_HANDLE_INCOMING_BLOCKS_001, @CHAIN_INCOMING_VALIDATE_BLOCK_PUBSUB_001, @CHAIN_INCOMING_VALIDATE_MESSAGE_PUBSUB_001 kit.QuietMiningLogs() - blocktime := 5 * time.Millisecond ctx := context.Background() - nodes := startNodes(ctx, t, blocktime, maxLookbackCap, maxStateWaitLookbackLimit) + nodes := startNodes(ctx, t) lite := nodes.lite full := nodes.full @@ -185,51 +187,72 @@ func TestGatewayMsigCLI(t *testing.T) { //stm: @CHAIN_SYNCER_NEW_PEER_HEAD_001, @CHAIN_SYNCER_VALIDATE_MESSAGE_META_001, @CHAIN_SYNCER_STOP_001 kit.QuietMiningLogs() - blocktime := 5 * time.Millisecond ctx := context.Background() - nodes := startNodesWithFunds(ctx, t, blocktime, maxLookbackCap, maxStateWaitLookbackLimit) + nodes := startNodes(ctx, t, withFunds()) lite := nodes.lite multisig.RunMultisigTests(t, lite) } type testNodes struct { - lite *kit.TestFullNode - full *kit.TestFullNode - miner *kit.TestMiner + lite *kit.TestFullNode + full *kit.TestFullNode + miner *kit.TestMiner + gatewayAddr string } -func startNodesWithFunds( - ctx context.Context, - t *testing.T, - blocktime time.Duration, - lookbackCap time.Duration, - stateWaitLookbackLimit abi.ChainEpoch, -) *testNodes { - nodes := startNodes(ctx, t, blocktime, lookbackCap, stateWaitLookbackLimit) +type startOptions struct { + blocktime time.Duration + lookbackCap time.Duration + stateWaitLookbackLimit abi.ChainEpoch + fund bool + perConnectionAPIRateLimit int + perHostRequestsPerMinute int + nodeOpts []kit.NodeOpt +} - // The full node starts with a wallet - fullWalletAddr, err := nodes.full.WalletDefaultAddress(ctx) - require.NoError(t, err) +type startOption func(*startOptions) - // Get the lite node default wallet address. - liteWalletAddr, err := nodes.lite.WalletDefaultAddress(ctx) - require.NoError(t, err) +func applyStartOptions(opts ...startOption) startOptions { + o := startOptions{ + blocktime: 5 * time.Millisecond, + lookbackCap: maxLookbackCap, + stateWaitLookbackLimit: maxStateWaitLookbackLimit, + fund: false, + } + for _, opt := range opts { + opt(&o) + } + return o +} - // Send some funds from the full node to the lite node - err = sendFunds(ctx, nodes.full, fullWalletAddr, liteWalletAddr, types.NewInt(1e18)) - require.NoError(t, err) +func withFunds() startOption { + return func(opts *startOptions) { + opts.fund = true + } +} - return nodes +func withPerConnectionAPIRateLimit(rateLimit int) startOption { + return func(opts *startOptions) { + opts.perConnectionAPIRateLimit = rateLimit + } +} + +func withPerHostRequestsPerMinute(rateLimit int) startOption { + return func(opts *startOptions) { + opts.perHostRequestsPerMinute = rateLimit + } } -func startNodes( - ctx context.Context, - t *testing.T, - blocktime time.Duration, - lookbackCap time.Duration, - stateWaitLookbackLimit abi.ChainEpoch, -) *testNodes { +func withNodeOpts(nodeOpts ...kit.NodeOpt) startOption { + return func(opts *startOptions) { + opts.nodeOpts = nodeOpts + } +} + +func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNodes { + options := applyStartOptions(opts...) + var closer jsonrpc.ClientCloser var ( @@ -246,11 +269,13 @@ func startNodes( // create the full node and the miner. var ens *kit.Ensemble full, miner, ens = kit.EnsembleMinimal(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(blocktime) + ens.InterconnectAll().BeginMining(options.blocktime) + api.RunningNodeType = api.NodeFull // Create a gateway server in front of the full node - gwapi := gateway.NewNode(full, nil, lookbackCap, stateWaitLookbackLimit, 0, time.Minute) - handler, err := gateway.Handler(gwapi, full, 0, 0) + gwapi := gateway.NewNode(full, nil, options.lookbackCap, options.stateWaitLookbackLimit, 0, time.Minute) + handler, err := gateway.Handler(gwapi, full, options.perConnectionAPIRateLimit, options.perHostRequestsPerMinute) + t.Cleanup(func() { _ = handler.Shutdown(ctx) }) require.NoError(t, err) l, err := net.Listen("tcp", "127.0.0.1:0") @@ -264,15 +289,37 @@ func startNodes( require.NoError(t, err) t.Cleanup(closer) - ens.FullNode(&lite, + nodeOpts := append([]kit.NodeOpt{ kit.LiteNode(), kit.ThroughRPC(), kit.ConstructorOpts( node.Override(new(api.Gateway), gapi), ), - ).Start().InterconnectAll() + }, options.nodeOpts...) + ens.FullNode(&lite, nodeOpts...).Start().InterconnectAll() + + nodes := &testNodes{ + lite: &lite, + full: full, + miner: miner, + gatewayAddr: srv.Listener.Addr().String(), + } + + if options.fund { + // The full node starts with a wallet + fullWalletAddr, err := nodes.full.WalletDefaultAddress(ctx) + require.NoError(t, err) + + // Get the lite node default wallet address. + liteWalletAddr, err := nodes.lite.WalletDefaultAddress(ctx) + require.NoError(t, err) - return &testNodes{lite: &lite, full: full, miner: miner} + // Send some funds from the full node to the lite node + err = sendFunds(ctx, nodes.full, fullWalletAddr, liteWalletAddr, types.NewInt(1e18)) + require.NoError(t, err) + } + + return nodes } func sendFunds(ctx context.Context, fromNode *kit.TestFullNode, fromAddr address.Address, toAddr address.Address, amt types.BigInt) error { @@ -297,3 +344,68 @@ func sendFunds(ctx context.Context, fromNode *kit.TestFullNode, fromAddr address return nil } + +func TestGatewayRateLimits(t *testing.T) { + req := require.New(t) + + kit.QuietMiningLogs() + ctx := context.Background() + tokensPerSecond := 10 + requestsPerMinute := 30 // http requests + nodes := startNodes(ctx, t, + withNodeOpts(kit.DisableEthRPC()), + withPerConnectionAPIRateLimit(tokensPerSecond), + withPerHostRequestsPerMinute(requestsPerMinute), + ) + + time.Sleep(time.Second) + + // ChainHead uses chainRateLimitTokens=2. + // But we're also competing with the paymentChannelSettler which listens to the chain uses + // ChainGetBlockMessages on each change, which also uses chainRateLimitTokens=2. + // So each loop should be 4 tokens. + loops := 10 + tokensPerLoop := 4 + start := time.Now() + for i := 0; i < loops; i++ { + _, err := nodes.lite.ChainHead(ctx) + req.NoError(err) + } + tokensUsed := loops * tokensPerLoop + expectedEnd := start.Add(time.Duration(float64(tokensUsed) / float64(tokensPerSecond) * float64(time.Second))) + allowPad := time.Duration(float64(tokensPerLoop) / float64(tokensPerSecond) * float64(time.Second)) // add padding to account for slow test runs + t.Logf("expected end: %s, now: %s, allowPad: %s, actual delta: %s", expectedEnd, time.Now(), allowPad, time.Since(expectedEnd)) + req.WithinDuration(expectedEnd, time.Now(), allowPad) + + client := &http.Client{} + url := fmt.Sprintf("http://%s/rpc/v1", nodes.gatewayAddr) + jsonPayload := []byte(`{"method":"Filecoin.ChainHead","params":[],"id":1,"jsonrpc":"2.0"}`) + var failed bool + for i := 0; i < requestsPerMinute*2 && !failed; i++ { + func() { + request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonPayload)) + req.NoError(err) + request.Header.Set("Content-Type", "application/json") + response, err := client.Do(request) + req.NoError(err) + defer func() { _ = response.Body.Close() }() + req.NoError(err) + if http.StatusOK == response.StatusCode { + body, err := io.ReadAll(response.Body) + req.NoError(err) + result := map[string]interface{}{} + req.NoError(json.Unmarshal(body, &result)) + req.NoError(err) + req.NotNil(result["result"]) + height, ok := result["result"].(map[string]interface{})["Height"].(float64) + req.True(ok) + req.Greater(int(height), 0) + } else { + req.Equal(http.StatusTooManyRequests, response.StatusCode) + req.LessOrEqual(i, requestsPerMinute+1) + failed = true + } + }() + } + req.True(failed, "expected requests to fail due to rate limiting") +} From d65e2b0dcd66544226ca97e34b47b834dcbe159a Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Wed, 31 Jul 2024 17:05:11 +1000 Subject: [PATCH 2/7] fix!: gateway: limit stateful calls to websockets Ref: https://github.com/filecoin-project/go-jsonrpc/pull/118 Ref: https://github.com/filecoin-project/lotus/issues/11589 Ref: https://github.com/filecoin-project/lotus/issues/11153 --- gateway/proxy_eth.go | 64 +++++++++++++++++++++++++++++------------- go.mod | 2 +- go.sum | 4 +-- itests/gateway_test.go | 39 ++++++++++++++++++++++++- 4 files changed, 85 insertions(+), 24 deletions(-) diff --git a/gateway/proxy_eth.go b/gateway/proxy_eth.go index fb33a0ecae9..3d70d19a694 100644 --- a/gateway/proxy_eth.go +++ b/gateway/proxy_eth.go @@ -104,7 +104,7 @@ func (gw *Node) checkEthBlockParam(ctx context.Context, blkParam ethtypes.EthBlo var num ethtypes.EthUint64 if blkParam.PredefinedBlock != nil { if *blkParam.PredefinedBlock == "earliest" { - return fmt.Errorf("block param \"earliest\" is not supported") + return xerrors.New("block param \"earliest\" is not supported") } else if *blkParam.PredefinedBlock == "pending" || *blkParam.PredefinedBlock == "latest" { // Head is always ok. if lookback == 0 { @@ -127,13 +127,13 @@ func (gw *Node) checkEthBlockParam(ctx context.Context, blkParam ethtypes.EthBlo return gw.checkBlkHash(ctx, *blkParam.BlockHash) } - return fmt.Errorf("invalid block param") + return xerrors.New("invalid block param") } func (gw *Node) checkBlkParam(ctx context.Context, blkParam string, lookback ethtypes.EthUint64) error { if blkParam == "earliest" { // also not supported in node impl - return fmt.Errorf("block param \"earliest\" is not supported") + return xerrors.New("block param \"earliest\" is not supported") } head, err := gw.target.ChainHead(ctx) @@ -363,7 +363,7 @@ func (gw *Node) EthFeeHistory(ctx context.Context, p jsonrpc.RawParams) (ethtype } if params.BlkCount > ethtypes.EthUint64(EthFeeHistoryMaxBlockCount) { - return ethtypes.EthFeeHistory{}, fmt.Errorf("block count too high") + return ethtypes.EthFeeHistory{}, xerrors.New("block count too high") } return gw.target.EthFeeHistory(ctx, p) @@ -437,14 +437,15 @@ func (gw *Node) EthGetLogs(ctx context.Context, filter *ethtypes.EthFilterSpec) return gw.target.EthGetLogs(ctx, filter) } -/* FILTERS: Those are stateful.. figure out how to properly either bind them to users, or time out? */ - func (gw *Node) EthGetFilterChanges(ctx context.Context, id ethtypes.EthFilterID) (*ethtypes.EthFilterResult, error) { if err := gw.limit(ctx, stateRateLimitTokens); err != nil { return nil, err } - ft := statefulCallFromContext(ctx) + ft, err := getStatefulTracker(ctx) + if err != nil { + return nil, xerrors.Errorf("EthGetFilterChanges not supported: %w", err) + } ft.lk.Lock() _, ok := ft.userFilters[id] ft.lk.Unlock() @@ -461,7 +462,10 @@ func (gw *Node) EthGetFilterLogs(ctx context.Context, id ethtypes.EthFilterID) ( return nil, err } - ft := statefulCallFromContext(ctx) + ft, err := getStatefulTracker(ctx) + if err != nil { + return nil, xerrors.Errorf("EthGetFilterLogs not supported: %w", err) + } ft.lk.Lock() _, ok := ft.userFilters[id] ft.lk.Unlock() @@ -478,7 +482,7 @@ func (gw *Node) EthNewFilter(ctx context.Context, filter *ethtypes.EthFilterSpec return ethtypes.EthFilterID{}, err } - return addUserFilterLimited(ctx, func() (ethtypes.EthFilterID, error) { + return addUserFilterLimited(ctx, "EthNewFilter", func() (ethtypes.EthFilterID, error) { return gw.target.EthNewFilter(ctx, filter) }) } @@ -488,7 +492,7 @@ func (gw *Node) EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, er return ethtypes.EthFilterID{}, err } - return addUserFilterLimited(ctx, func() (ethtypes.EthFilterID, error) { + return addUserFilterLimited(ctx, "EthNewBlockFilter", func() (ethtypes.EthFilterID, error) { return gw.target.EthNewBlockFilter(ctx) }) } @@ -498,7 +502,7 @@ func (gw *Node) EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.Et return ethtypes.EthFilterID{}, err } - return addUserFilterLimited(ctx, func() (ethtypes.EthFilterID, error) { + return addUserFilterLimited(ctx, "EthNewPendingTransactionFilter", func() (ethtypes.EthFilterID, error) { return gw.target.EthNewPendingTransactionFilter(ctx) }) } @@ -509,7 +513,10 @@ func (gw *Node) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) } // check if the filter belongs to this connection - ft := statefulCallFromContext(ctx) + ft, err := getStatefulTracker(ctx) + if err != nil { + return false, xerrors.Errorf("EthUninstallFilter not supported: %w", err) + } ft.lk.Lock() defer ft.lk.Unlock() @@ -546,12 +553,15 @@ func (gw *Node) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks") } - ft := statefulCallFromContext(ctx) + ft, err := getStatefulTracker(ctx) + if err != nil { + return ethtypes.EthSubscriptionID{}, xerrors.Errorf("EthSubscribe not supported: %w", err) + } ft.lk.Lock() defer ft.lk.Unlock() if len(ft.userSubscriptions) >= EthMaxFiltersPerConn { - return ethtypes.EthSubscriptionID{}, fmt.Errorf("too many subscriptions") + return ethtypes.EthSubscriptionID{}, xerrors.New("too many subscriptions") } sub, err := gw.target.EthSubscribe(ctx, p) @@ -582,7 +592,10 @@ func (gw *Node) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionI } // check if the filter belongs to this connection - ft := statefulCallFromContext(ctx) + ft, err := getStatefulTracker(ctx) + if err != nil { + return false, xerrors.Errorf("EthUnsubscribe not supported: %w", err) + } ft.lk.Lock() defer ft.lk.Unlock() @@ -666,13 +679,16 @@ func (gw *Node) EthTraceFilter(ctx context.Context, filter ethtypes.EthTraceFilt var EthMaxFiltersPerConn = 16 // todo make this configurable -func addUserFilterLimited(ctx context.Context, cb func() (ethtypes.EthFilterID, error)) (ethtypes.EthFilterID, error) { - ft := statefulCallFromContext(ctx) +func addUserFilterLimited(ctx context.Context, callName string, cb func() (ethtypes.EthFilterID, error)) (ethtypes.EthFilterID, error) { + ft, err := getStatefulTracker(ctx) + if err != nil { + return ethtypes.EthFilterID{}, xerrors.Errorf("%s not supported: %w", callName, err) + } ft.lk.Lock() defer ft.lk.Unlock() if len(ft.userFilters) >= EthMaxFiltersPerConn { - return ethtypes.EthFilterID{}, fmt.Errorf("too many filters") + return ethtypes.EthFilterID{}, xerrors.New("too many filters") } id, err := cb() @@ -685,8 +701,16 @@ func addUserFilterLimited(ctx context.Context, cb func() (ethtypes.EthFilterID, return id, nil } -func statefulCallFromContext(ctx context.Context) *statefulCallTracker { - return ctx.Value(statefulCallTrackerKey).(*statefulCallTracker) +func getStatefulTracker(ctx context.Context) (*statefulCallTracker, error) { + if jsonrpc.GetConnectionType(ctx) != jsonrpc.ConnectionTypeWS { + return nil, xerrors.New("stateful tracking is only available for websockets connections") + } + + if ct, ok := ctx.Value(statefulCallTrackerKey).(*statefulCallTracker); !ok { + return nil, xerrors.New("stateful tracking is not available for this call") + } else { + return ct, nil + } } type statefulCallTracker struct { diff --git a/go.mod b/go.mod index 98eafb3526a..8aabc3163dc 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,7 @@ require ( github.com/filecoin-project/go-f3 v0.0.7 github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0 - github.com/filecoin-project/go-jsonrpc v0.3.2 + github.com/filecoin-project/go-jsonrpc v0.6.0 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.4 github.com/filecoin-project/go-state-types v0.14.0 diff --git a/go.sum b/go.sum index f8a2b698194..52d825635fd 100644 --- a/go.sum +++ b/go.sum @@ -281,8 +281,8 @@ github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGy github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g= github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0 h1:nYs6OPUF8KbZ3E8o9p9HJnQaE8iugjHR5WYVMcicDJc= github.com/filecoin-project/go-hamt-ipld/v3 v3.4.0/go.mod h1:s0qiHRhFyrgW0SvdQMSJFQxNa4xEIG5XvqCBZUEgcbc= -github.com/filecoin-project/go-jsonrpc v0.3.2 h1:uuAWTZe6B3AUUta+O26HlycGoej/yiaI1fXp3Du+D3I= -github.com/filecoin-project/go-jsonrpc v0.3.2/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM= +github.com/filecoin-project/go-jsonrpc v0.6.0 h1:/fFJIAN/k6EgY90m7qbyfY28woMwyseZmh2gVs5sYjY= +github.com/filecoin-project/go-jsonrpc v0.6.0/go.mod h1:/n/niXcS4ZQua6i37LcVbY1TmlJR0UIK9mDFQq2ICek= github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20/go.mod h1:mPn+LRRd5gEKNAtc+r3ScpW2JRU/pj4NBKdADYWHiak= github.com/filecoin-project/go-padreader v0.0.1 h1:8h2tVy5HpoNbr2gBRr+WD6zV6VD6XHig+ynSGJg8ZOs= github.com/filecoin-project/go-padreader v0.0.1/go.mod h1:VYVPJqwpsfmtoHnAmPx6MUwmrK6HIcDqZJiuZhtmfLQ= diff --git a/itests/gateway_test.go b/itests/gateway_test.go index b223fbdc7ab..7c57ae62125 100644 --- a/itests/gateway_test.go +++ b/itests/gateway_test.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/chain/types/ethtypes" "github.com/filecoin-project/lotus/gateway" "github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/itests/multisig" @@ -389,7 +390,6 @@ func TestGatewayRateLimits(t *testing.T) { response, err := client.Do(request) req.NoError(err) defer func() { _ = response.Body.Close() }() - req.NoError(err) if http.StatusOK == response.StatusCode { body, err := io.ReadAll(response.Body) req.NoError(err) @@ -409,3 +409,40 @@ func TestGatewayRateLimits(t *testing.T) { } req.True(failed, "expected requests to fail due to rate limiting") } + +func TestStatefulCallHandling(t *testing.T) { + req := require.New(t) + + kit.QuietMiningLogs() + ctx := context.Background() + nodes := startNodes(ctx, t) + + // not available over plain http + client := &http.Client{} + url := fmt.Sprintf("http://%s/rpc/v1", nodes.gatewayAddr) + jsonPayload := []byte(`{"method":"Filecoin.EthNewBlockFilter","params":[],"id":1,"jsonrpc":"2.0"}`) + request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonPayload)) + req.NoError(err) + request.Header.Set("Content-Type", "application/json") + response, err := client.Do(request) + req.NoError(err) + body, err := io.ReadAll(response.Body) + req.NoError(err) + defer func() { _ = response.Body.Close() }() + req.Equal(http.StatusOK, response.StatusCode) + req.Contains( + string(body), + `{"error":{"code":1,"message":"EthNewBlockFilter not supported: stateful tracking is only available for websockets connections"},"id":1,"jsonrpc":"2.0"}`, + ) + + // available over websocket + for i := 0; i < gateway.EthMaxFiltersPerConn; i++ { + _, err := nodes.lite.EthNewBlockFilter(ctx) + req.NoError(err) + } + // but only up to max + _, err = nodes.lite.EthNewBlockFilter(ctx) + require.ErrorContains(t, err, "too many filters") + _, err = nodes.lite.EthNewFilter(ctx, ðtypes.EthFilterSpec{}) + require.ErrorContains(t, err, "too many filters") +} From 0cef0448288318c8393c139e0602c1ba7b22cb8c Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 1 Aug 2024 11:40:35 +1000 Subject: [PATCH 3/7] feat: gateway: auto-cleanup of installed filters when ws connection ends --- gateway/handler.go | 7 +- gateway/node.go | 3 + gateway/proxy_eth.go | 77 +++++++++++++------ itests/gateway_test.go | 163 ++++++++++++++++++++++++++++++++++------- 4 files changed, 200 insertions(+), 50 deletions(-) diff --git a/gateway/handler.go b/gateway/handler.go index d98b9594cf2..eef8473288f 100644 --- a/gateway/handler.go +++ b/gateway/handler.go @@ -61,6 +61,7 @@ func Handler( m := mux.NewRouter() opts = append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors)) + serveRpc := func(path string, hnd interface{}) { rpcServer := jsonrpc.NewServer(opts...) rpcServer.Register("Filecoin", hnd) @@ -106,7 +107,11 @@ type statefulCallHandler struct { } func (h statefulCallHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - r = r.WithContext(context.WithValue(r.Context(), statefulCallTrackerKey, newStatefulCallTracker())) + tracker := newStatefulCallTracker() + defer func() { + go tracker.cleanup() + }() + r = r.WithContext(context.WithValue(r.Context(), statefulCallTrackerKey, tracker)) h.next.ServeHTTP(w, r) } diff --git a/gateway/node.go b/gateway/node.go index 0b15918a96b..2ee4b7555ab 100644 --- a/gateway/node.go +++ b/gateway/node.go @@ -7,6 +7,7 @@ import ( blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + logger "github.com/ipfs/go-log/v2" "go.opencensus.io/stats" "golang.org/x/time/rate" @@ -31,6 +32,8 @@ import ( "github.com/filecoin-project/lotus/node/modules/dtypes" ) +var log = logger.Logger("gateway") + const ( DefaultLookbackCap = time.Hour * 24 DefaultStateWaitLookbackLimit = abi.ChainEpoch(20) diff --git a/gateway/proxy_eth.go b/gateway/proxy_eth.go index 3d70d19a694..24f9ba08665 100644 --- a/gateway/proxy_eth.go +++ b/gateway/proxy_eth.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "sync" - "time" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -446,11 +445,8 @@ func (gw *Node) EthGetFilterChanges(ctx context.Context, id ethtypes.EthFilterID if err != nil { return nil, xerrors.Errorf("EthGetFilterChanges not supported: %w", err) } - ft.lk.Lock() - _, ok := ft.userFilters[id] - ft.lk.Unlock() - if !ok { + if !ft.hasFilter(id) { return nil, filter.ErrFilterNotFound } @@ -466,11 +462,8 @@ func (gw *Node) EthGetFilterLogs(ctx context.Context, id ethtypes.EthFilterID) ( if err != nil { return nil, xerrors.Errorf("EthGetFilterLogs not supported: %w", err) } - ft.lk.Lock() - _, ok := ft.userFilters[id] - ft.lk.Unlock() - if !ok { + if !ft.hasFilter(id) { return nil, nil } @@ -482,7 +475,7 @@ func (gw *Node) EthNewFilter(ctx context.Context, filter *ethtypes.EthFilterSpec return ethtypes.EthFilterID{}, err } - return addUserFilterLimited(ctx, "EthNewFilter", func() (ethtypes.EthFilterID, error) { + return gw.addUserFilterLimited(ctx, "EthNewFilter", func() (ethtypes.EthFilterID, error) { return gw.target.EthNewFilter(ctx, filter) }) } @@ -492,7 +485,7 @@ func (gw *Node) EthNewBlockFilter(ctx context.Context) (ethtypes.EthFilterID, er return ethtypes.EthFilterID{}, err } - return addUserFilterLimited(ctx, "EthNewBlockFilter", func() (ethtypes.EthFilterID, error) { + return gw.addUserFilterLimited(ctx, "EthNewBlockFilter", func() (ethtypes.EthFilterID, error) { return gw.target.EthNewBlockFilter(ctx) }) } @@ -502,7 +495,7 @@ func (gw *Node) EthNewPendingTransactionFilter(ctx context.Context) (ethtypes.Et return ethtypes.EthFilterID{}, err } - return addUserFilterLimited(ctx, "EthNewPendingTransactionFilter", func() (ethtypes.EthFilterID, error) { + return gw.addUserFilterLimited(ctx, "EthNewPendingTransactionFilter", func() (ethtypes.EthFilterID, error) { return gw.target.EthNewPendingTransactionFilter(ctx) }) } @@ -517,6 +510,7 @@ func (gw *Node) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) if err != nil { return false, xerrors.Errorf("EthUninstallFilter not supported: %w", err) } + ft.lk.Lock() defer ft.lk.Unlock() @@ -526,6 +520,7 @@ func (gw *Node) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) ok, err := gw.target.EthUninstallFilter(ctx, id) if err != nil { + // don't delete the filter, it's "stuck" so should still count towards the limit return false, err } @@ -545,12 +540,12 @@ func (gw *Node) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes } if gw.subHnd == nil { - return ethtypes.EthSubscriptionID{}, xerrors.New("subscription support not enabled") + return ethtypes.EthSubscriptionID{}, xerrors.New("EthSubscribe not supported: subscription support not enabled") } ethCb, ok := jsonrpc.ExtractReverseClient[api.EthSubscriberMethods](ctx) if !ok { - return ethtypes.EthSubscriptionID{}, xerrors.Errorf("connection doesn't support callbacks") + return ethtypes.EthSubscriptionID{}, xerrors.Errorf("EthSubscribe not supported: connection doesn't support callbacks") } ft, err := getStatefulTracker(ctx) @@ -581,7 +576,11 @@ func (gw *Node) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes return ethtypes.EthSubscriptionID{}, err } - ft.userSubscriptions[sub] = time.Now() + ft.userSubscriptions[sub] = func() { + if _, err := gw.target.EthUnsubscribe(ctx, sub); err != nil { + log.Warnf("error unsubscribing after connection end: %v", err) + } + } return sub, err } @@ -605,6 +604,7 @@ func (gw *Node) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionI ok, err := gw.target.EthUnsubscribe(ctx, id) if err != nil { + // don't delete the subscription, it's "stuck" so should still count towards the limit return false, err } @@ -679,11 +679,16 @@ func (gw *Node) EthTraceFilter(ctx context.Context, filter ethtypes.EthTraceFilt var EthMaxFiltersPerConn = 16 // todo make this configurable -func addUserFilterLimited(ctx context.Context, callName string, cb func() (ethtypes.EthFilterID, error)) (ethtypes.EthFilterID, error) { +func (gw *Node) addUserFilterLimited( + ctx context.Context, + callName string, + install func() (ethtypes.EthFilterID, error), +) (ethtypes.EthFilterID, error) { ft, err := getStatefulTracker(ctx) if err != nil { return ethtypes.EthFilterID{}, xerrors.Errorf("%s not supported: %w", callName, err) } + ft.lk.Lock() defer ft.lk.Unlock() @@ -691,19 +696,23 @@ func addUserFilterLimited(ctx context.Context, callName string, cb func() (ethty return ethtypes.EthFilterID{}, xerrors.New("too many filters") } - id, err := cb() + id, err := install() if err != nil { return id, err } - ft.userFilters[id] = time.Now() + ft.userFilters[id] = func() { + if _, err := gw.target.EthUninstallFilter(ctx, id); err != nil { + log.Warnf("error uninstalling filter after connection end: %v", err) + } + } return id, nil } func getStatefulTracker(ctx context.Context) (*statefulCallTracker, error) { if jsonrpc.GetConnectionType(ctx) != jsonrpc.ConnectionTypeWS { - return nil, xerrors.New("stateful tracking is only available for websockets connections") + return nil, xerrors.New("stateful methods are only available on websockets connections") } if ct, ok := ctx.Value(statefulCallTrackerKey).(*statefulCallTracker); !ok { @@ -713,17 +722,39 @@ func getStatefulTracker(ctx context.Context) (*statefulCallTracker, error) { } } +type cleanup func() + type statefulCallTracker struct { lk sync.Mutex - userFilters map[ethtypes.EthFilterID]time.Time - userSubscriptions map[ethtypes.EthSubscriptionID]time.Time + userFilters map[ethtypes.EthFilterID]cleanup + userSubscriptions map[ethtypes.EthSubscriptionID]cleanup +} + +func (ft *statefulCallTracker) cleanup() { + ft.lk.Lock() + defer ft.lk.Unlock() + + for _, cleanup := range ft.userFilters { + cleanup() + } + for _, cleanup := range ft.userSubscriptions { + cleanup() + } +} + +func (ft *statefulCallTracker) hasFilter(id ethtypes.EthFilterID) bool { + ft.lk.Lock() + defer ft.lk.Unlock() + + _, ok := ft.userFilters[id] + return ok } // called per request (ws connection) func newStatefulCallTracker() *statefulCallTracker { return &statefulCallTracker{ - userFilters: make(map[ethtypes.EthFilterID]time.Time), - userSubscriptions: make(map[ethtypes.EthSubscriptionID]time.Time), + userFilters: make(map[ethtypes.EthFilterID]cleanup), + userSubscriptions: make(map[ethtypes.EthSubscriptionID]cleanup), } } diff --git a/itests/gateway_test.go b/itests/gateway_test.go index 7c57ae62125..88b8c53fc0b 100644 --- a/itests/gateway_test.go +++ b/itests/gateway_test.go @@ -10,6 +10,7 @@ import ( "math" "net" "net/http" + "sync" "testing" "time" @@ -31,6 +32,7 @@ import ( "github.com/filecoin-project/lotus/gateway" "github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/itests/multisig" + res "github.com/filecoin-project/lotus/lib/result" "github.com/filecoin-project/lotus/node" ) @@ -200,6 +202,7 @@ type testNodes struct { full *kit.TestFullNode miner *kit.TestMiner gatewayAddr string + rpcCloser jsonrpc.ClientCloser } type startOptions struct { @@ -254,8 +257,6 @@ func withNodeOpts(nodeOpts ...kit.NodeOpt) startOption { func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNodes { options := applyStartOptions(opts...) - var closer jsonrpc.ClientCloser - var ( full *kit.TestFullNode miner *kit.TestMiner @@ -274,7 +275,8 @@ func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNod api.RunningNodeType = api.NodeFull // Create a gateway server in front of the full node - gwapi := gateway.NewNode(full, nil, options.lookbackCap, options.stateWaitLookbackLimit, 0, time.Minute) + ethSubHandler := gateway.NewEthSubHandler() + gwapi := gateway.NewNode(full, ethSubHandler, options.lookbackCap, options.stateWaitLookbackLimit, 0, time.Minute) handler, err := gateway.Handler(gwapi, full, options.perConnectionAPIRateLimit, options.perHostRequestsPerMinute) t.Cleanup(func() { _ = handler.Shutdown(ctx) }) require.NoError(t, err) @@ -286,8 +288,14 @@ func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNod // Create a gateway client API that connects to the gateway server var gapi api.Gateway - gapi, closer, err = client.NewGatewayRPCV1(ctx, "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil) + var _closer jsonrpc.ClientCloser + gapi, _closer, err = client.NewGatewayRPCV1(ctx, "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil, + jsonrpc.WithClientHandler("Filecoin", ethSubHandler), + jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"), + ) require.NoError(t, err) + var closeOnce sync.Once + closer := func() { closeOnce.Do(_closer) } t.Cleanup(closer) nodeOpts := append([]kit.NodeOpt{ @@ -304,6 +312,7 @@ func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNod full: full, miner: miner, gatewayAddr: srv.Listener.Addr().String(), + rpcCloser: closer, } if options.fund { @@ -417,32 +426,134 @@ func TestStatefulCallHandling(t *testing.T) { ctx := context.Background() nodes := startNodes(ctx, t) - // not available over plain http - client := &http.Client{} - url := fmt.Sprintf("http://%s/rpc/v1", nodes.gatewayAddr) - jsonPayload := []byte(`{"method":"Filecoin.EthNewBlockFilter","params":[],"id":1,"jsonrpc":"2.0"}`) - request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonPayload)) - req.NoError(err) - request.Header.Set("Content-Type", "application/json") - response, err := client.Do(request) - req.NoError(err) - body, err := io.ReadAll(response.Body) - req.NoError(err) - defer func() { _ = response.Body.Close() }() - req.Equal(http.StatusOK, response.StatusCode) - req.Contains( - string(body), - `{"error":{"code":1,"message":"EthNewBlockFilter not supported: stateful tracking is only available for websockets connections"},"id":1,"jsonrpc":"2.0"}`, - ) + httpReq := func(payload string) (int, string) { + // not available over plain http + client := &http.Client{} + url := fmt.Sprintf("http://%s/rpc/v1", nodes.gatewayAddr) + request, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(payload))) + req.NoError(err) + request.Header.Set("Content-Type", "application/json") + response, err := client.Do(request) + req.NoError(err) + defer func() { _ = response.Body.Close() }() + body, err := io.ReadAll(response.Body) + req.NoError(err) + return response.StatusCode, string(body) + } + + t.Logf("Testing stateful call handling rejection via plain http") + for _, typ := range []string{ + "EthNewBlockFilter", + "EthNewPendingTransactionFilter", + "EthNewFilter", + "EthGetFilterChanges", + "EthGetFilterLogs", + "EthUninstallFilter", + "EthSubscribe", + "EthUnsubscribe", + } { + params := "" + expErr := typ + " not supported: stateful methods are only available on websockets connections" + + switch typ { + case "EthNewFilter": + params = "{}" + case "EthGetFilterChanges", "EthGetFilterLogs", "EthUninstallFilter", "EthUnsubscribe": + params = `"0x0000000000000000000000000000000000000000000000000000000000000000"` + case "EthSubscribe": + params = `"newHeads"` + expErr = "EthSubscribe not supported: connection doesn't support callbacks" + } + + status, body := httpReq(`{"method":"Filecoin.` + typ + `","params":[` + params + `],"id":1,"jsonrpc":"2.0"}`) + + req.Equal(http.StatusOK, status, "not ok for "+typ) + req.Contains(body, `{"error":{"code":1,"message":"`+expErr+`"},"id":1,"jsonrpc":"2.0"}`, "unexpected response for "+typ) + } - // available over websocket - for i := 0; i < gateway.EthMaxFiltersPerConn; i++ { - _, err := nodes.lite.EthNewBlockFilter(ctx) + t.Logf("Installing a stateful filters via ws") + // install the variety of stateful filters we have, but only up to the max total + var ( + blockFilterIds = make([]ethtypes.EthFilterID, gateway.EthMaxFiltersPerConn/3) + pendingFilterIds = make([]ethtypes.EthFilterID, gateway.EthMaxFiltersPerConn/3) + matchFilterIds = make([]ethtypes.EthFilterID, gateway.EthMaxFiltersPerConn-len(blockFilterIds)-len(pendingFilterIds)) + ) + for i := 0; i < len(blockFilterIds); i++ { + fid, err := nodes.lite.EthNewBlockFilter(ctx) req.NoError(err) + blockFilterIds[i] = fid } - // but only up to max - _, err = nodes.lite.EthNewBlockFilter(ctx) + for i := 0; i < len(pendingFilterIds); i++ { + fid, err := nodes.lite.EthNewPendingTransactionFilter(ctx) + req.NoError(err) + pendingFilterIds[i] = fid + } + for i := 0; i < len(matchFilterIds); i++ { + fid, err := nodes.lite.EthNewFilter(ctx, ðtypes.EthFilterSpec{}) + req.NoError(err) + matchFilterIds[i] = fid + } + + // sanity check we're actually doing something + req.Greater(len(blockFilterIds), 0) + req.Greater(len(pendingFilterIds), 0) + req.Greater(len(matchFilterIds), 0) + + t.Logf("Testing 'too many filters' rejection") + _, err := nodes.lite.EthNewBlockFilter(ctx) + require.ErrorContains(t, err, "too many filters") + _, err = nodes.lite.EthNewPendingTransactionFilter(ctx) require.ErrorContains(t, err, "too many filters") _, err = nodes.lite.EthNewFilter(ctx, ðtypes.EthFilterSpec{}) require.ErrorContains(t, err, "too many filters") + + t.Logf("Testing subscriptions") + // subscribe twice, so we can unsub one over ws to check unsub works, then unsub after ws close to + // check that auto-cleanup happned + subId1, err := nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError)) + req.NoError(err) + err = nodes.lite.EthSubRouter.AddSub(ctx, subId1, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error { + t.Logf("Received subscription response (sub1): %v", resp) + return nil + }) + req.NoError(err) + subId2, err := nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError)) + req.NoError(err) + err = nodes.lite.EthSubRouter.AddSub(ctx, subId2, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error { + t.Logf("Received subscription response (sub2): %v", resp) + return nil + }) + req.NoError(err) + + ok, err := nodes.lite.EthUnsubscribe(ctx, subId1) // unsub on lite node, should work + req.NoError(err) + req.True(ok) + ok, err = nodes.full.EthUnsubscribe(ctx, subId1) // unsub on full node, already done + req.NoError(err) + req.False(ok) + + t.Logf("Shutting down the lite node") + req.NoError(nodes.lite.Stop(ctx)) + nodes.rpcCloser() // once the websocket connection is closed, the server should clean up the filters for us + + time.Sleep(time.Second) // unfortunately we have no other way to check for completeness of shutdown and cleanup + + t.Logf("Checking that all filters and subs were cleared up by directly talking to full node") + + ok, err = nodes.full.EthUnsubscribe(ctx, subId2) // unsub on full node, already done + req.NoError(err) + req.False(ok) // already unsubbed because of auto-cleanup + + for _, fid := range blockFilterIds { + _, err = nodes.full.EthGetFilterChanges(ctx, fid) + req.ErrorContains(err, "filter not found") + } + for _, fid := range pendingFilterIds { + _, err = nodes.full.EthGetFilterChanges(ctx, fid) + req.ErrorContains(err, "filter not found") + } + for _, fid := range matchFilterIds { + _, err = nodes.full.EthGetFilterChanges(ctx, fid) + req.ErrorContains(err, "filter not found") + } } From 612c2529a62a3362c02afecffe02ce11ed295f02 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 1 Aug 2024 17:28:53 +1000 Subject: [PATCH 4/7] feat!: make max-filters configurable, count combined subs+filters * CLI takes --eth-max-filters-per-conn * gateway.NewNode() now takes the API plus an optional list of Options --- cmd/lotus-gateway/main.go | 16 ++++++- gateway/node.go | 92 ++++++++++++++++++++++++++++++++------- gateway/node_test.go | 12 ++--- gateway/proxy_eth.go | 15 ++++--- itests/gateway_test.go | 78 ++++++++++++++++++--------------- 5 files changed, 149 insertions(+), 64 deletions(-) diff --git a/cmd/lotus-gateway/main.go b/cmd/lotus-gateway/main.go index a2c46c6b26c..92194d16a55 100644 --- a/cmd/lotus-gateway/main.go +++ b/cmd/lotus-gateway/main.go @@ -157,6 +157,11 @@ var runCmd = &cli.Command{ Usage: "A hard limit on the number of incomming connections (requests) to accept per remote host per minute. Use 0 to disable", Value: 0, }, + &cli.IntFlag{ + Name: "eth-max-filters-per-conn", + Usage: "The maximum number of filters plus subscriptions that a single websocket connection can maintain", + Value: gateway.DefaultEthMaxFiltersPerConn, + }, }, Action: func(cctx *cli.Context) error { log.Info("Starting lotus gateway") @@ -184,6 +189,7 @@ var runCmd = &cli.Command{ perConnectionRateLimit = cctx.Int("per-conn-rate-limit") rateLimitTimeout = cctx.Duration("rate-limit-timeout") perHostConnectionsPerMinute = cctx.Int("conn-per-minute") + maxFiltersPerConn = cctx.Int("eth-max-filters-per-conn") ) serverOptions := make([]jsonrpc.ServerOption, 0) @@ -203,7 +209,15 @@ var runCmd = &cli.Command{ return xerrors.Errorf("failed to convert endpoint address to multiaddr: %w", err) } - gwapi := gateway.NewNode(api, subHnd, lookbackCap, waitLookback, int64(globalRateLimit), rateLimitTimeout) + gwapi := gateway.NewNode( + api, + gateway.WithEthSubHandler(subHnd), + gateway.WithLookbackCap(lookbackCap), + gateway.WithStateWaitLookbackLimit(waitLookback), + gateway.WithRateLimit(globalRateLimit), + gateway.WithRateLimitTimeout(rateLimitTimeout), + gateway.WithEthMaxFiltersPerConn(maxFiltersPerConn), + ) handler, err := gateway.Handler(gwapi, api, perConnectionRateLimit, perHostConnectionsPerMinute, serverOptions...) if err != nil { return xerrors.Errorf("failed to set up gateway HTTP handler") diff --git a/gateway/node.go b/gateway/node.go index 2ee4b7555ab..0fd0fa57ea7 100644 --- a/gateway/node.go +++ b/gateway/node.go @@ -38,6 +38,7 @@ const ( DefaultLookbackCap = time.Hour * 24 DefaultStateWaitLookbackLimit = abi.ChainEpoch(20) DefaultRateLimitTimeout = time.Second * 5 + DefaultEthMaxFiltersPerConn = 16 basicRateLimitTokens = 1 walletRateLimitTokens = 1 chainRateLimitTokens = 2 @@ -169,6 +170,7 @@ type Node struct { stateWaitLookbackLimit abi.ChainEpoch rateLimiter *rate.Limiter rateLimitTimeout time.Duration + ethMaxFiltersPerConn int errLookback error } @@ -180,28 +182,88 @@ var ( _ full.StateModuleAPI = (*Node)(nil) ) +type Options struct { + subHandler *EthSubHandler + lookbackCap time.Duration + stateWaitLookbackLimit abi.ChainEpoch + rateLimit int + rateLimitTimeout time.Duration + ethMaxFiltersPerConn int +} + +type Option func(*Options) + +// WithEthSubHandler sets the Ethereum subscription handler for the gateway node. This is used for +// the RPC reverse handler for EthSubscribe calls. +func WithEthSubHandler(subHandler *EthSubHandler) Option { + return func(opts *Options) { + opts.subHandler = subHandler + } +} + +// WithLookbackCap sets the maximum lookback duration (time) for state queries. +func WithLookbackCap(lookbackCap time.Duration) Option { + return func(opts *Options) { + opts.lookbackCap = lookbackCap + } +} + +// WithStateWaitLookbackLimit sets the maximum lookback (epochs) for state queries. +func WithStateWaitLookbackLimit(stateWaitLookbackLimit abi.ChainEpoch) Option { + return func(opts *Options) { + opts.stateWaitLookbackLimit = stateWaitLookbackLimit + } +} + +// WithRateLimit sets the maximum number of requests per second globally that will be allowed +// before the gateway starts to rate limit requests. +func WithRateLimit(rateLimit int) Option { + return func(opts *Options) { + opts.rateLimit = rateLimit + } +} + +// WithRateLimitTimeout sets the timeout for rate limiting requests such that when rate limiting is +// being applied, if the timeout is reached the request will be allowed. +func WithRateLimitTimeout(rateLimitTimeout time.Duration) Option { + return func(opts *Options) { + opts.rateLimitTimeout = rateLimitTimeout + } +} + +// WithEthMaxFiltersPerConn sets the maximum number of Ethereum filters and subscriptions that can +// be maintained per websocket connection. +func WithEthMaxFiltersPerConn(ethMaxFiltersPerConn int) Option { + return func(opts *Options) { + opts.ethMaxFiltersPerConn = ethMaxFiltersPerConn + } +} + // NewNode creates a new gateway node. -func NewNode( - api TargetAPI, - sHnd *EthSubHandler, - lookbackCap time.Duration, - stateWaitLookbackLimit abi.ChainEpoch, - rateLimit int64, - rateLimitTimeout time.Duration, -) *Node { +func NewNode(api TargetAPI, opts ...Option) *Node { + options := &Options{ + lookbackCap: DefaultLookbackCap, + stateWaitLookbackLimit: DefaultStateWaitLookbackLimit, + rateLimitTimeout: DefaultRateLimitTimeout, + ethMaxFiltersPerConn: DefaultEthMaxFiltersPerConn, + } + for _, opt := range opts { + opt(options) + } limit := rate.Inf - if rateLimit > 0 { - limit = rate.Every(time.Second / time.Duration(rateLimit)) + if options.rateLimit > 0 { + limit = rate.Every(time.Second / time.Duration(options.rateLimit)) } return &Node{ target: api, - subHnd: sHnd, - lookbackCap: lookbackCap, - stateWaitLookbackLimit: stateWaitLookbackLimit, + subHnd: options.subHandler, + lookbackCap: options.lookbackCap, + stateWaitLookbackLimit: options.stateWaitLookbackLimit, rateLimiter: rate.NewLimiter(limit, MaxRateLimitTokens), // allow for a burst of MaxRateLimitTokens - rateLimitTimeout: rateLimitTimeout, - errLookback: fmt.Errorf("lookbacks of more than %s are disallowed", lookbackCap), + rateLimitTimeout: options.rateLimitTimeout, + errLookback: fmt.Errorf("lookbacks of more than %s are disallowed", options.lookbackCap), + ethMaxFiltersPerConn: options.ethMaxFiltersPerConn, } } diff --git a/gateway/node_test.go b/gateway/node_test.go index 0c60679d012..99c46d8aa8d 100644 --- a/gateway/node_test.go +++ b/gateway/node_test.go @@ -89,7 +89,7 @@ func TestGatewayAPIChainGetTipSetByHeight(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { mock := &mockGatewayDepsAPI{} - a := NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, 0, time.Minute) + a := NewNode(mock) // Create tipsets from genesis up to tskh and return the highest ts := mock.createTipSets(tt.args.tskh, tt.args.genesisTS) @@ -245,7 +245,7 @@ func TestGatewayVersion(t *testing.T) { //stm: @GATEWAY_NODE_GET_VERSION_001 ctx := context.Background() mock := &mockGatewayDepsAPI{} - a := NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, 0, time.Minute) + a := NewNode(mock) v, err := a.Version(ctx) require.NoError(t, err) @@ -256,7 +256,7 @@ func TestGatewayLimitTokensAvailable(t *testing.T) { ctx := context.Background() mock := &mockGatewayDepsAPI{} tokens := 3 - a := NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, int64(tokens), time.Minute) + a := NewNode(mock, WithRateLimit(tokens)) require.NoError(t, a.limit(ctx, tokens), "requests should not be limited when there are enough tokens available") } @@ -264,9 +264,9 @@ func TestGatewayLimitTokensRate(t *testing.T) { ctx := context.Background() mock := &mockGatewayDepsAPI{} tokens := 3 - var rateLimit int64 = 200 + var rateLimit = 200 rateLimitTimeout := time.Second / time.Duration(rateLimit/3) // large enough to not be hit - a := NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, rateLimit, rateLimitTimeout) + a := NewNode(mock, WithRateLimit(rateLimit), WithRateLimitTimeout(rateLimitTimeout)) start := time.Now() calls := 10 @@ -285,7 +285,7 @@ func TestGatewayLimitTokensRate(t *testing.T) { // In this case our timeout is too short to allow for the rate limit, so we should hit the // hard rate limit. rateLimitTimeout = time.Second / time.Duration(rateLimit) - a = NewNode(mock, nil, DefaultLookbackCap, DefaultStateWaitLookbackLimit, rateLimit, rateLimitTimeout) + a = NewNode(mock, WithRateLimit(rateLimit), WithRateLimitTimeout(rateLimitTimeout)) require.NoError(t, a.limit(ctx, tokens)) require.ErrorContains(t, a.limit(ctx, tokens), "server busy", "API calls should be hard rate limited when they hit limits") } diff --git a/gateway/proxy_eth.go b/gateway/proxy_eth.go index 24f9ba08665..323499deae0 100644 --- a/gateway/proxy_eth.go +++ b/gateway/proxy_eth.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "sync" @@ -22,6 +23,8 @@ import ( "github.com/filecoin-project/lotus/chain/types/ethtypes" ) +var ErrTooManyFilters = errors.New("too many subscriptions and filters for this connection") + func (gw *Node) EthAccounts(ctx context.Context) ([]ethtypes.EthAddress, error) { // gateway provides public API, so it can't hold user accounts return []ethtypes.EthAddress{}, nil @@ -555,8 +558,8 @@ func (gw *Node) EthSubscribe(ctx context.Context, p jsonrpc.RawParams) (ethtypes ft.lk.Lock() defer ft.lk.Unlock() - if len(ft.userSubscriptions) >= EthMaxFiltersPerConn { - return ethtypes.EthSubscriptionID{}, xerrors.New("too many subscriptions") + if len(ft.userSubscriptions)+len(ft.userFilters) >= gw.ethMaxFiltersPerConn { + return ethtypes.EthSubscriptionID{}, ErrTooManyFilters } sub, err := gw.target.EthSubscribe(ctx, p) @@ -677,8 +680,6 @@ func (gw *Node) EthTraceFilter(ctx context.Context, filter ethtypes.EthTraceFilt return gw.target.EthTraceFilter(ctx, filter) } -var EthMaxFiltersPerConn = 16 // todo make this configurable - func (gw *Node) addUserFilterLimited( ctx context.Context, callName string, @@ -692,8 +693,8 @@ func (gw *Node) addUserFilterLimited( ft.lk.Lock() defer ft.lk.Unlock() - if len(ft.userFilters) >= EthMaxFiltersPerConn { - return ethtypes.EthFilterID{}, xerrors.New("too many filters") + if len(ft.userSubscriptions)+len(ft.userFilters) >= gw.ethMaxFiltersPerConn { + return ethtypes.EthFilterID{}, ErrTooManyFilters } id, err := install() @@ -712,7 +713,7 @@ func (gw *Node) addUserFilterLimited( func getStatefulTracker(ctx context.Context) (*statefulCallTracker, error) { if jsonrpc.GetConnectionType(ctx) != jsonrpc.ConnectionTypeWS { - return nil, xerrors.New("stateful methods are only available on websockets connections") + return nil, xerrors.New("stateful methods are only available on websocket connections") } if ct, ok := ctx.Value(statefulCallTrackerKey).(*statefulCallTracker); !ok { diff --git a/itests/gateway_test.go b/itests/gateway_test.go index 88b8c53fc0b..340546ca902 100644 --- a/itests/gateway_test.go +++ b/itests/gateway_test.go @@ -276,7 +276,12 @@ func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNod // Create a gateway server in front of the full node ethSubHandler := gateway.NewEthSubHandler() - gwapi := gateway.NewNode(full, ethSubHandler, options.lookbackCap, options.stateWaitLookbackLimit, 0, time.Minute) + gwapi := gateway.NewNode( + full, + gateway.WithEthSubHandler(ethSubHandler), + gateway.WithLookbackCap(options.lookbackCap), + gateway.WithStateWaitLookbackLimit(options.stateWaitLookbackLimit), + ) handler, err := gateway.Handler(gwapi, full, options.perConnectionAPIRateLimit, options.perHostRequestsPerMinute) t.Cleanup(func() { _ = handler.Shutdown(ctx) }) require.NoError(t, err) @@ -453,7 +458,7 @@ func TestStatefulCallHandling(t *testing.T) { "EthUnsubscribe", } { params := "" - expErr := typ + " not supported: stateful methods are only available on websockets connections" + expErr := typ + " not supported: stateful methods are only available on websocket connections" switch typ { case "EthNewFilter": @@ -471,12 +476,38 @@ func TestStatefulCallHandling(t *testing.T) { req.Contains(body, `{"error":{"code":1,"message":"`+expErr+`"},"id":1,"jsonrpc":"2.0"}`, "unexpected response for "+typ) } + t.Logf("Testing subscriptions") + // subscribe twice, so we can unsub one over ws to check unsub works, then unsub after ws close to + // check that auto-cleanup happned + subId1, err := nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError)) + req.NoError(err) + err = nodes.lite.EthSubRouter.AddSub(ctx, subId1, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error { + t.Logf("Received subscription response (sub1): %v", resp) + return nil + }) + req.NoError(err) + subId2, err := nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError)) + req.NoError(err) + err = nodes.lite.EthSubRouter.AddSub(ctx, subId2, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error { + t.Logf("Received subscription response (sub2): %v", resp) + return nil + }) + req.NoError(err) + + ok, err := nodes.lite.EthUnsubscribe(ctx, subId1) // unsub on lite node, should work + req.NoError(err) + req.True(ok) + ok, err = nodes.full.EthUnsubscribe(ctx, subId1) // unsub on full node, already done + req.NoError(err) + req.False(ok) + t.Logf("Installing a stateful filters via ws") // install the variety of stateful filters we have, but only up to the max total var ( - blockFilterIds = make([]ethtypes.EthFilterID, gateway.EthMaxFiltersPerConn/3) - pendingFilterIds = make([]ethtypes.EthFilterID, gateway.EthMaxFiltersPerConn/3) - matchFilterIds = make([]ethtypes.EthFilterID, gateway.EthMaxFiltersPerConn-len(blockFilterIds)-len(pendingFilterIds)) + blockFilterIds = make([]ethtypes.EthFilterID, gateway.DefaultEthMaxFiltersPerConn/3) + pendingFilterIds = make([]ethtypes.EthFilterID, gateway.DefaultEthMaxFiltersPerConn/3) + // matchFilterIds takes up the remainder, minus 1 because we still have 1 live subscription that counts + matchFilterIds = make([]ethtypes.EthFilterID, gateway.DefaultEthMaxFiltersPerConn-len(blockFilterIds)-len(pendingFilterIds)-1) ) for i := 0; i < len(blockFilterIds); i++ { fid, err := nodes.lite.EthNewBlockFilter(ctx) @@ -500,41 +531,18 @@ func TestStatefulCallHandling(t *testing.T) { req.Greater(len(matchFilterIds), 0) t.Logf("Testing 'too many filters' rejection") - _, err := nodes.lite.EthNewBlockFilter(ctx) - require.ErrorContains(t, err, "too many filters") + _, err = nodes.lite.EthNewBlockFilter(ctx) + require.ErrorContains(t, err, "too many subscriptions and filters for this connection") _, err = nodes.lite.EthNewPendingTransactionFilter(ctx) - require.ErrorContains(t, err, "too many filters") + require.ErrorContains(t, err, "too many subscriptions and filters for this connection") _, err = nodes.lite.EthNewFilter(ctx, ðtypes.EthFilterSpec{}) - require.ErrorContains(t, err, "too many filters") - - t.Logf("Testing subscriptions") - // subscribe twice, so we can unsub one over ws to check unsub works, then unsub after ws close to - // check that auto-cleanup happned - subId1, err := nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError)) - req.NoError(err) - err = nodes.lite.EthSubRouter.AddSub(ctx, subId1, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error { - t.Logf("Received subscription response (sub1): %v", resp) - return nil - }) - req.NoError(err) - subId2, err := nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError)) - req.NoError(err) - err = nodes.lite.EthSubRouter.AddSub(ctx, subId2, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error { - t.Logf("Received subscription response (sub2): %v", resp) - return nil - }) - req.NoError(err) - - ok, err := nodes.lite.EthUnsubscribe(ctx, subId1) // unsub on lite node, should work - req.NoError(err) - req.True(ok) - ok, err = nodes.full.EthUnsubscribe(ctx, subId1) // unsub on full node, already done - req.NoError(err) - req.False(ok) + require.ErrorContains(t, err, "too many subscriptions and filters for this connection") + _, err = nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError)) + require.ErrorContains(t, err, "too many subscriptions and filters for this connection") t.Logf("Shutting down the lite node") req.NoError(nodes.lite.Stop(ctx)) - nodes.rpcCloser() // once the websocket connection is closed, the server should clean up the filters for us + nodes.rpcCloser() // once the websocke connection is closed, the server should clean up the filters for us time.Sleep(time.Second) // unfortunately we have no other way to check for completeness of shutdown and cleanup From fa068eec7728cc78bb750a8a0dca534df6e1cdc0 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 1 Aug 2024 17:43:31 +1000 Subject: [PATCH 5/7] chore: config: reduce FilterTTL default to 1 hour --- documentation/en/default-lotus-config.toml | 8 ++++++-- node/config/def.go | 2 +- node/config/doc_gen.go | 8 ++++++-- node/config/types.go | 6 +++++- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index 2113c9e1ab7..40715b341a3 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -268,13 +268,17 @@ #EnableActorEventsAPI = false # FilterTTL specifies the time to live for actor event filters. Filters that haven't been accessed longer than - # this time become eligible for automatic deletion. + # this time become eligible for automatic deletion. Filters consume resources, so if they are unused they + # should not be retained. # # type: Duration # env var: LOTUS_EVENTS_FILTERTTL - #FilterTTL = "24h0m0s" + #FilterTTL = "1h0m0s" # MaxFilters specifies the maximum number of filters that may exist at any one time. + # Multi-tenant environments may want to increase this value to serve a larger number of clients. If using + # lotus-gateway, this global limit can be coupled with --eth-max-filters-per-conn which limits the number + # of filters per connection. # # type: int # env var: LOTUS_EVENTS_MAXFILTERS diff --git a/node/config/def.go b/node/config/def.go index dfa205623ca..b212f72d589 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -91,7 +91,7 @@ func DefaultFullNode() *FullNode { DisableRealTimeFilterAPI: false, DisableHistoricFilterAPI: false, EnableActorEventsAPI: false, - FilterTTL: Duration(time.Hour * 24), + FilterTTL: Duration(time.Hour * 1), MaxFilters: 100, MaxFilterResults: 10000, MaxFilterHeightRange: 2880, // conservative limit of one day diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 780d5d073fb..a683152a04d 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -143,13 +143,17 @@ disabled by setting their respective Disable* options.`, Type: "Duration", Comment: `FilterTTL specifies the time to live for actor event filters. Filters that haven't been accessed longer than -this time become eligible for automatic deletion.`, +this time become eligible for automatic deletion. Filters consume resources, so if they are unused they +should not be retained.`, }, { Name: "MaxFilters", Type: "int", - Comment: `MaxFilters specifies the maximum number of filters that may exist at any one time.`, + Comment: `MaxFilters specifies the maximum number of filters that may exist at any one time. +Multi-tenant environments may want to increase this value to serve a larger number of clients. If using +lotus-gateway, this global limit can be coupled with --eth-max-filters-per-conn which limits the number +of filters per connection.`, }, { Name: "MaxFilterResults", diff --git a/node/config/types.go b/node/config/types.go index aa977285795..ee40c98b5f2 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -592,10 +592,14 @@ type EventsConfig struct { EnableActorEventsAPI bool // FilterTTL specifies the time to live for actor event filters. Filters that haven't been accessed longer than - // this time become eligible for automatic deletion. + // this time become eligible for automatic deletion. Filters consume resources, so if they are unused they + // should not be retained. FilterTTL Duration // MaxFilters specifies the maximum number of filters that may exist at any one time. + // Multi-tenant environments may want to increase this value to serve a larger number of clients. If using + // lotus-gateway, this global limit can be coupled with --eth-max-filters-per-conn which limits the number + // of filters per connection. MaxFilters int // MaxFilterResults specifies the maximum number of results that can be accumulated by an actor event filter. From a816dd039d0201884bca3c188016523d42d9ae40 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 1 Aug 2024 17:43:59 +1000 Subject: [PATCH 6/7] docs: gateway: changelog entry for new changes --- CHANGELOG.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 60def96bbc8..ff3cbbd26e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ - https://github.com/filecoin-project/lotus/pull/12332: fix: ETH RPC: receipts: use correct txtype in receipts - https://github.com/filecoin-project/lotus/pull/12335: fix: lotus-shed: store processed tipset after backfilling events +## ☢️ Upgrade Warnings ☢️ + +- lotus-gateway behaviour, CLI arguments and APIs have received minor changes. See the improvements section below. + ## New features - feat: Add trace filter API supporting RPC method `trace_filter` ([filecoin-project/lotus#12123](https://github.com/filecoin-project/lotus/pull/12123)). Configuring `EthTraceFilterMaxResults` sets a limit on how many results are returned in any individual `trace_filter` RPC API call. @@ -32,10 +36,15 @@ ## Improvements -- fix!: gateway: fix rate limiting, general cleanup ([filecoin-project/lotus#12315](https://github.com/filecoin-project/lotus/pull/12315)). +- feat!: gateway: fix rate limiting, better stateful handling ([filecoin-project/lotus#12315](https://github.com/filecoin-project/lotus/pull/12315)). - CLI usage documentation has been improved for `lotus-gateway` - `--per-conn-rate-limit` now works as advertised. + - `--eth-max-filters-per-conn` is new and allows you to set the maximum number of filters and subscription per connection, it defaults to 16. + - Previously, this limit was set to `16` and applied separately to filters and subscriptions. This limit is now unified and applies to both filters and subscriptions. + - Stateful Ethereum APIs (those involving filters and subscriptions) are now disabled for plain HTTP connections. A client must be using websockets to access these APIs. + - These APIs are also now automatically removed from the node by the gateway when a client disconnects. - Some APIs have changed which may impact users consuming Lotus Gateway code as a library. + - The default value for the `Events.FilterTTL` config option has been reduced from 24h to 1h. This means that filters will expire on a Lotus node after 1 hour of not being accessed by the client. # v1.28.1 / 2024-07-24 From fcd600029185d1c3d0af7e6f4aeb186c43b75c62 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Mon, 5 Aug 2024 18:10:40 +1000 Subject: [PATCH 7/7] fix: address issues raised from code review --- cmd/lotus-gateway/main.go | 18 ++-- gateway/handler.go | 182 +++++++++++++++++++++++--------------- gateway/handler_test.go | 2 +- gateway/node.go | 109 ++++++++++++----------- gateway/node_test.go | 4 +- gateway/proxy_eth.go | 16 ++-- gateway/proxy_fil.go | 12 +-- itests/gateway_test.go | 157 ++++++++++++++++---------------- 8 files changed, 278 insertions(+), 222 deletions(-) diff --git a/cmd/lotus-gateway/main.go b/cmd/lotus-gateway/main.go index 92194d16a55..2e7a1efb431 100644 --- a/cmd/lotus-gateway/main.go +++ b/cmd/lotus-gateway/main.go @@ -124,12 +124,12 @@ var runCmd = &cli.Command{ &cli.DurationFlag{ Name: "api-max-lookback", Usage: "maximum duration allowable for tipset lookbacks", - Value: gateway.DefaultLookbackCap, + Value: gateway.DefaultMaxLookbackDuration, }, &cli.Int64Flag{ Name: "api-wait-lookback-limit", Usage: "maximum number of blocks to search back through for message inclusion", - Value: int64(gateway.DefaultStateWaitLookbackLimit), + Value: int64(gateway.DefaultMaxMessageLookbackEpochs), }, &cli.Int64Flag{ Name: "rate-limit", @@ -154,7 +154,7 @@ var runCmd = &cli.Command{ }, &cli.Int64Flag{ Name: "conn-per-minute", - Usage: "A hard limit on the number of incomming connections (requests) to accept per remote host per minute. Use 0 to disable", + Usage: "A hard limit on the number of incoming connections (requests) to accept per remote host per minute. Use 0 to disable", Value: 0, }, &cli.IntFlag{ @@ -212,13 +212,19 @@ var runCmd = &cli.Command{ gwapi := gateway.NewNode( api, gateway.WithEthSubHandler(subHnd), - gateway.WithLookbackCap(lookbackCap), - gateway.WithStateWaitLookbackLimit(waitLookback), + gateway.WithMaxLookbackDuration(lookbackCap), + gateway.WithMaxMessageLookbackEpochs(waitLookback), gateway.WithRateLimit(globalRateLimit), gateway.WithRateLimitTimeout(rateLimitTimeout), gateway.WithEthMaxFiltersPerConn(maxFiltersPerConn), ) - handler, err := gateway.Handler(gwapi, api, perConnectionRateLimit, perHostConnectionsPerMinute, serverOptions...) + handler, err := gateway.Handler( + gwapi, + api, + gateway.WithPerConnectionAPIRateLimit(perConnectionRateLimit), + gateway.WithPerHostConnectionsPerMinute(perHostConnectionsPerMinute), + gateway.WithJsonrpcServerOptions(serverOptions...), + ) if err != nil { return xerrors.Errorf("failed to set up gateway HTTP handler") } diff --git a/gateway/handler.go b/gateway/handler.go index eef8473288f..78310edb4fd 100644 --- a/gateway/handler.go +++ b/gateway/handler.go @@ -37,33 +37,62 @@ type ShutdownHandler interface { Shutdown(ctx context.Context) error } -var _ ShutdownHandler = &statefulCallHandler{} -var _ ShutdownHandler = &RateLimitHandler{} +var _ ShutdownHandler = (*statefulCallHandler)(nil) +var _ ShutdownHandler = (*RateLimitHandler)(nil) -// Handler returns a gateway http.Handler, to be mounted as-is on the server. The handler is -// returned as a ShutdownHandler which allows for graceful shutdown of the handler via its -// Shutdown method. +// handlerOptions holds the options for the Handler function. +type handlerOptions struct { + perConnectionAPIRateLimit int + perHostConnectionsPerMinute int + jsonrpcServerOptions []jsonrpc.ServerOption +} + +// HandlerOption is a functional option for configuring the Handler. +type HandlerOption func(*handlerOptions) + +// WithPerConnectionAPIRateLimit sets the per connection API rate limit. // // The handler will limit the number of API calls per minute within a single WebSocket connection // (where API calls are weighted by their relative expense), and the number of connections per // minute from a single host. +func WithPerConnectionAPIRateLimit(limit int) HandlerOption { + return func(opts *handlerOptions) { + opts.perConnectionAPIRateLimit = limit + } +} + +// WithPerHostConnectionsPerMinute sets the per host connections per minute limit. // -// Connection limiting is a hard limit that will reject requests with a 429 status code if the limit -// is exceeded. API call limiting is a soft limit that will delay requests if the limit is exceeded. -func Handler( - gwapi lapi.Gateway, - api lapi.FullNode, - perConnectionAPIRateLimit int, - perHostConnectionsPerMinute int, - opts ...jsonrpc.ServerOption, -) (ShutdownHandler, error) { +// Connection limiting is a hard limit that will reject requests with a http.StatusTooManyRequests +// status code if the limit is exceeded. API call limiting is a soft limit that will delay requests +// if the limit is exceeded. +func WithPerHostConnectionsPerMinute(limit int) HandlerOption { + return func(opts *handlerOptions) { + opts.perHostConnectionsPerMinute = limit + } +} - m := mux.NewRouter() +// WithJsonrpcServerOptions sets the JSON-RPC server options. +func WithJsonrpcServerOptions(options ...jsonrpc.ServerOption) HandlerOption { + return func(opts *handlerOptions) { + opts.jsonrpcServerOptions = options + } +} - opts = append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors)) +// Handler returns a gateway http.Handler, to be mounted as-is on the server. The handler is +// returned as a ShutdownHandler which allows for graceful shutdown of the handler via its +// Shutdown method. +func Handler(gwapi lapi.Gateway, api lapi.FullNode, options ...HandlerOption) (ShutdownHandler, error) { + opts := &handlerOptions{} + for _, option := range options { + option(opts) + } + + m := mux.NewRouter() + rpcopts := append(opts.jsonrpcServerOptions, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors)) serveRpc := func(path string, hnd interface{}) { - rpcServer := jsonrpc.NewServer(opts...) + rpcServer := jsonrpc.NewServer(rpcopts...) rpcServer.Register("Filecoin", hnd) rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover") @@ -91,11 +120,11 @@ func Handler( m.PathPrefix("/").Handler(http.DefaultServeMux) handler := &statefulCallHandler{m} - if perConnectionAPIRateLimit > 0 && perHostConnectionsPerMinute > 0 { + if opts.perConnectionAPIRateLimit > 0 || opts.perHostConnectionsPerMinute > 0 { return NewRateLimitHandler( handler, - perConnectionAPIRateLimit, - perHostConnectionsPerMinute, + opts.perConnectionAPIRateLimit, + opts.perHostConnectionsPerMinute, connectionLimiterCleanupInterval, ), nil } @@ -125,14 +154,15 @@ type hostLimiter struct { } type RateLimitHandler struct { - cancelFunc context.CancelFunc - mu sync.Mutex - limiters map[string]*hostLimiter - perConnectionAPILimit rate.Limit - perHostConnectionsPerMinute int - next http.Handler - cleanupInterval time.Duration - expiryDuration time.Duration + cancelFunc context.CancelFunc + limiters map[string]*hostLimiter + limitersLk sync.Mutex + perConnectionAPILimit rate.Limit + perHostConnectionsLimit rate.Limit + perHostConnectionsLimitBurst int + next http.Handler + cleanupInterval time.Duration + expiryDuration time.Duration } // NewRateLimitHandler creates a new RateLimitHandler that wraps the @@ -149,85 +179,95 @@ func NewRateLimitHandler( ctx, cancel := context.WithCancel(context.Background()) h := &RateLimitHandler{ - cancelFunc: cancel, - limiters: make(map[string]*hostLimiter), - perConnectionAPILimit: rate.Inf, - perHostConnectionsPerMinute: perHostConnectionsPerMinute, - next: next, - cleanupInterval: cleanupInterval, - expiryDuration: 5 * cleanupInterval, + cancelFunc: cancel, + limiters: make(map[string]*hostLimiter), + perConnectionAPILimit: rate.Inf, + perHostConnectionsLimit: rate.Inf, + next: next, + cleanupInterval: cleanupInterval, + expiryDuration: 5 * cleanupInterval, } if perConnectionAPIRateLimit > 0 { h.perConnectionAPILimit = rate.Every(time.Second / time.Duration(perConnectionAPIRateLimit)) } + if perHostConnectionsPerMinute > 0 { + h.perHostConnectionsLimit = rate.Every(time.Minute / time.Duration(perHostConnectionsPerMinute)) + h.perHostConnectionsLimitBurst = perHostConnectionsPerMinute + } go h.cleanupExpiredLimiters(ctx) return h } -func (h *RateLimitHandler) getLimits(host string) *hostLimiter { - h.mu.Lock() - defer h.mu.Unlock() - - entry, exists := h.limiters[host] - if !exists { - var limiter *rate.Limiter - if h.perHostConnectionsPerMinute > 0 { - requestLimit := rate.Every(time.Minute / time.Duration(h.perHostConnectionsPerMinute)) - limiter = rate.NewLimiter(requestLimit, h.perHostConnectionsPerMinute) - } - entry = &hostLimiter{ - limiter: limiter, - lastAccess: time.Now(), +func (h *RateLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if h.perHostConnectionsLimit != rate.Inf { + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return } - h.limiters[host] = entry - } else { - entry.lastAccess = time.Now() - } - - return entry -} -func (h *RateLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - host, _, err := net.SplitHostPort(r.RemoteAddr) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } + h.limitersLk.Lock() + entry, exists := h.limiters[host] + if !exists { + entry = &hostLimiter{ + limiter: rate.NewLimiter(h.perHostConnectionsLimit, h.perHostConnectionsLimitBurst), + lastAccess: time.Now(), + } + h.limiters[host] = entry + } else { + entry.lastAccess = time.Now() + } + h.limitersLk.Unlock() - limits := h.getLimits(host) - if limits.limiter != nil && !limits.limiter.Allow() { - w.WriteHeader(http.StatusTooManyRequests) - return + if !entry.limiter.Allow() { + w.WriteHeader(http.StatusTooManyRequests) + return + } } if h.perConnectionAPILimit != rate.Inf { // new rate limiter for each connection, to throttle a single WebSockets connection; // allow for a burst of MaxRateLimitTokens apiLimiter := rate.NewLimiter(h.perConnectionAPILimit, MaxRateLimitTokens) - r = r.WithContext(context.WithValue(r.Context(), perConnectionAPIRateLimiterKey, apiLimiter)) + r = r.WithContext(setPerConnectionAPIRateLimiter(r.Context(), apiLimiter)) } h.next.ServeHTTP(w, r) } +// setPerConnectionAPIRateLimiter sets the rate limiter in the context. +func setPerConnectionAPIRateLimiter(ctx context.Context, limiter *rate.Limiter) context.Context { + return context.WithValue(ctx, perConnectionAPIRateLimiterKey, limiter) +} + +// getPerConnectionAPIRateLimiter retrieves the rate limiter from the context. +func getPerConnectionAPIRateLimiter(ctx context.Context) (*rate.Limiter, bool) { + limiter, ok := ctx.Value(perConnectionAPIRateLimiterKey).(*rate.Limiter) + return limiter, ok +} + +// cleanupExpiredLimiters periodically checks for limiters that have expired and removes them. func (h *RateLimitHandler) cleanupExpiredLimiters(ctx context.Context) { if h.cleanupInterval == 0 { return } - for { + ticker := time.NewTicker(h.cleanupInterval) + defer ticker.Stop() + + for ctx.Err() == nil { select { case <-ctx.Done(): return - case <-time.After(h.cleanupInterval): - h.mu.Lock() + case <-ticker.C: + h.limitersLk.Lock() now := time.Now() for host, entry := range h.limiters { if now.Sub(entry.lastAccess) > h.expiryDuration { delete(h.limiters, host) } } - h.mu.Unlock() + h.limitersLk.Unlock() } } } diff --git a/gateway/handler_test.go b/gateway/handler_test.go index 65e56836c0a..4599df96f8f 100644 --- a/gateway/handler_test.go +++ b/gateway/handler_test.go @@ -13,7 +13,7 @@ import ( func TestRequestRateLimiterHandler(t *testing.T) { var callCount int h := gateway.NewRateLimitHandler( - http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) { + http.HandlerFunc(func(http.ResponseWriter, *http.Request) { callCount++ }), 0, // api rate diff --git a/gateway/node.go b/gateway/node.go index 0fd0fa57ea7..e3e2c6a4715 100644 --- a/gateway/node.go +++ b/gateway/node.go @@ -35,17 +35,17 @@ import ( var log = logger.Logger("gateway") const ( - DefaultLookbackCap = time.Hour * 24 - DefaultStateWaitLookbackLimit = abi.ChainEpoch(20) - DefaultRateLimitTimeout = time.Second * 5 - DefaultEthMaxFiltersPerConn = 16 - basicRateLimitTokens = 1 - walletRateLimitTokens = 1 - chainRateLimitTokens = 2 - stateRateLimitTokens = 3 - - // MaxRateLimitTokens is the number of tokens consumed for the most expensive types of operations - MaxRateLimitTokens = stateRateLimitTokens + DefaultMaxLookbackDuration = time.Hour * 24 // Default duration that a gateway request can look back in chain history + DefaultMaxMessageLookbackEpochs = abi.ChainEpoch(20) // Default number of epochs that a gateway message lookup can look back in chain history + DefaultRateLimitTimeout = time.Second * 5 // Default timeout for rate limiting requests; where a request would take longer to wait than this value, it will be retjected + DefaultEthMaxFiltersPerConn = 16 // Default maximum number of ETH filters and subscriptions per websocket connection + + basicRateLimitTokens = 1 + walletRateLimitTokens = 1 + chainRateLimitTokens = 2 + stateRateLimitTokens = 3 + + MaxRateLimitTokens = stateRateLimitTokens // Number of tokens consumed for the most expensive types of operations ) // TargetAPI defines the API methods that the Node depends on @@ -164,14 +164,14 @@ type TargetAPI interface { var _ TargetAPI = *new(api.FullNode) // gateway depends on latest type Node struct { - target TargetAPI - subHnd *EthSubHandler - lookbackCap time.Duration - stateWaitLookbackLimit abi.ChainEpoch - rateLimiter *rate.Limiter - rateLimitTimeout time.Duration - ethMaxFiltersPerConn int - errLookback error + target TargetAPI + subHnd *EthSubHandler + maxLookbackDuration time.Duration + maxMessageLookbackEpochs abi.ChainEpoch + rateLimiter *rate.Limiter + rateLimitTimeout time.Duration + ethMaxFiltersPerConn int + errLookback error } var ( @@ -182,43 +182,43 @@ var ( _ full.StateModuleAPI = (*Node)(nil) ) -type Options struct { - subHandler *EthSubHandler - lookbackCap time.Duration - stateWaitLookbackLimit abi.ChainEpoch - rateLimit int - rateLimitTimeout time.Duration - ethMaxFiltersPerConn int +type options struct { + subHandler *EthSubHandler + maxLookbackDuration time.Duration + maxMessageLookbackEpochs abi.ChainEpoch + rateLimit int + rateLimitTimeout time.Duration + ethMaxFiltersPerConn int } -type Option func(*Options) +type Option func(*options) // WithEthSubHandler sets the Ethereum subscription handler for the gateway node. This is used for // the RPC reverse handler for EthSubscribe calls. func WithEthSubHandler(subHandler *EthSubHandler) Option { - return func(opts *Options) { + return func(opts *options) { opts.subHandler = subHandler } } -// WithLookbackCap sets the maximum lookback duration (time) for state queries. -func WithLookbackCap(lookbackCap time.Duration) Option { - return func(opts *Options) { - opts.lookbackCap = lookbackCap +// WithMaxLookbackDuration sets the maximum lookback duration (time) for state queries. +func WithMaxLookbackDuration(maxLookbackDuration time.Duration) Option { + return func(opts *options) { + opts.maxLookbackDuration = maxLookbackDuration } } -// WithStateWaitLookbackLimit sets the maximum lookback (epochs) for state queries. -func WithStateWaitLookbackLimit(stateWaitLookbackLimit abi.ChainEpoch) Option { - return func(opts *Options) { - opts.stateWaitLookbackLimit = stateWaitLookbackLimit +// WithMaxMessageLookbackEpochs sets the maximum lookback (epochs) for state queries. +func WithMaxMessageLookbackEpochs(maxMessageLookbackEpochs abi.ChainEpoch) Option { + return func(opts *options) { + opts.maxMessageLookbackEpochs = maxMessageLookbackEpochs } } // WithRateLimit sets the maximum number of requests per second globally that will be allowed // before the gateway starts to rate limit requests. func WithRateLimit(rateLimit int) Option { - return func(opts *Options) { + return func(opts *options) { opts.rateLimit = rateLimit } } @@ -226,7 +226,7 @@ func WithRateLimit(rateLimit int) Option { // WithRateLimitTimeout sets the timeout for rate limiting requests such that when rate limiting is // being applied, if the timeout is reached the request will be allowed. func WithRateLimitTimeout(rateLimitTimeout time.Duration) Option { - return func(opts *Options) { + return func(opts *options) { opts.rateLimitTimeout = rateLimitTimeout } } @@ -234,18 +234,18 @@ func WithRateLimitTimeout(rateLimitTimeout time.Duration) Option { // WithEthMaxFiltersPerConn sets the maximum number of Ethereum filters and subscriptions that can // be maintained per websocket connection. func WithEthMaxFiltersPerConn(ethMaxFiltersPerConn int) Option { - return func(opts *Options) { + return func(opts *options) { opts.ethMaxFiltersPerConn = ethMaxFiltersPerConn } } // NewNode creates a new gateway node. func NewNode(api TargetAPI, opts ...Option) *Node { - options := &Options{ - lookbackCap: DefaultLookbackCap, - stateWaitLookbackLimit: DefaultStateWaitLookbackLimit, - rateLimitTimeout: DefaultRateLimitTimeout, - ethMaxFiltersPerConn: DefaultEthMaxFiltersPerConn, + options := &options{ + maxLookbackDuration: DefaultMaxLookbackDuration, + maxMessageLookbackEpochs: DefaultMaxMessageLookbackEpochs, + rateLimitTimeout: DefaultRateLimitTimeout, + ethMaxFiltersPerConn: DefaultEthMaxFiltersPerConn, } for _, opt := range opts { opt(options) @@ -256,14 +256,14 @@ func NewNode(api TargetAPI, opts ...Option) *Node { limit = rate.Every(time.Second / time.Duration(options.rateLimit)) } return &Node{ - target: api, - subHnd: options.subHandler, - lookbackCap: options.lookbackCap, - stateWaitLookbackLimit: options.stateWaitLookbackLimit, - rateLimiter: rate.NewLimiter(limit, MaxRateLimitTokens), // allow for a burst of MaxRateLimitTokens - rateLimitTimeout: options.rateLimitTimeout, - errLookback: fmt.Errorf("lookbacks of more than %s are disallowed", options.lookbackCap), - ethMaxFiltersPerConn: options.ethMaxFiltersPerConn, + target: api, + subHnd: options.subHandler, + maxLookbackDuration: options.maxLookbackDuration, + maxMessageLookbackEpochs: options.maxMessageLookbackEpochs, + rateLimiter: rate.NewLimiter(limit, MaxRateLimitTokens), // allow for a burst of MaxRateLimitTokens + rateLimitTimeout: options.rateLimitTimeout, + errLookback: fmt.Errorf("lookbacks of more than %s are disallowed", options.maxLookbackDuration), + ethMaxFiltersPerConn: options.ethMaxFiltersPerConn, } } @@ -303,7 +303,7 @@ func (gw *Node) checkTipsetHeight(ts *types.TipSet, h abi.ChainEpoch) error { } func (gw *Node) checkTimestamp(at time.Time) error { - if time.Since(at) > gw.lookbackCap { + if time.Since(at) > gw.maxLookbackDuration { return gw.errLookback } return nil @@ -312,7 +312,8 @@ func (gw *Node) checkTimestamp(at time.Time) error { func (gw *Node) limit(ctx context.Context, tokens int) error { ctx2, cancel := context.WithTimeout(ctx, gw.rateLimitTimeout) defer cancel() - if perConnLimiter, ok := ctx2.Value(perConnectionAPIRateLimiterKey).(*rate.Limiter); ok { + + if perConnLimiter, ok := getPerConnectionAPIRateLimiter(ctx); ok { err := perConnLimiter.WaitN(ctx2, tokens) if err != nil { return fmt.Errorf("connection limited. %w", err) diff --git a/gateway/node_test.go b/gateway/node_test.go index 99c46d8aa8d..fd70f8f214b 100644 --- a/gateway/node_test.go +++ b/gateway/node_test.go @@ -23,7 +23,7 @@ import ( func TestGatewayAPIChainGetTipSetByHeight(t *testing.T) { ctx := context.Background() - lookbackTimestamp := uint64(time.Now().Unix()) - uint64(DefaultLookbackCap.Seconds()) + lookbackTimestamp := uint64(time.Now().Unix()) - uint64(DefaultMaxLookbackDuration.Seconds()) type args struct { h abi.ChainEpoch tskh abi.ChainEpoch @@ -264,7 +264,7 @@ func TestGatewayLimitTokensRate(t *testing.T) { ctx := context.Background() mock := &mockGatewayDepsAPI{} tokens := 3 - var rateLimit = 200 + rateLimit := 200 rateLimitTimeout := time.Second / time.Duration(rateLimit/3) // large enough to not be hit a := NewNode(mock, WithRateLimit(rateLimit), WithRateLimitTimeout(rateLimitTimeout)) diff --git a/gateway/proxy_eth.go b/gateway/proxy_eth.go index 323499deae0..5d823b2c010 100644 --- a/gateway/proxy_eth.go +++ b/gateway/proxy_eth.go @@ -23,7 +23,7 @@ import ( "github.com/filecoin-project/lotus/chain/types/ethtypes" ) -var ErrTooManyFilters = errors.New("too many subscriptions and filters for this connection") +var ErrTooManyFilters = errors.New("too many subscriptions and filters per connection") func (gw *Node) EthAccounts(ctx context.Context) ([]ethtypes.EthAddress, error) { // gateway provides public API, so it can't hold user accounts @@ -209,10 +209,10 @@ func (gw *Node) EthGetTransactionByHashLimited(ctx context.Context, txHash *etht return nil, err } if limit == api.LookbackNoLimit { - limit = gw.stateWaitLookbackLimit + limit = gw.maxMessageLookbackEpochs } - if gw.stateWaitLookbackLimit != api.LookbackNoLimit && limit > gw.stateWaitLookbackLimit { - limit = gw.stateWaitLookbackLimit + if gw.maxMessageLookbackEpochs != api.LookbackNoLimit && limit > gw.maxMessageLookbackEpochs { + limit = gw.maxMessageLookbackEpochs } return gw.target.EthGetTransactionByHashLimited(ctx, txHash, limit) @@ -255,10 +255,10 @@ func (gw *Node) EthGetTransactionReceiptLimited(ctx context.Context, txHash etht return nil, err } if limit == api.LookbackNoLimit { - limit = gw.stateWaitLookbackLimit + limit = gw.maxMessageLookbackEpochs } - if gw.stateWaitLookbackLimit != api.LookbackNoLimit && limit > gw.stateWaitLookbackLimit { - limit = gw.stateWaitLookbackLimit + if gw.maxMessageLookbackEpochs != api.LookbackNoLimit && limit > gw.maxMessageLookbackEpochs { + limit = gw.maxMessageLookbackEpochs } return gw.target.EthGetTransactionReceiptLimited(ctx, txHash, limit) @@ -524,6 +524,7 @@ func (gw *Node) EthUninstallFilter(ctx context.Context, id ethtypes.EthFilterID) ok, err := gw.target.EthUninstallFilter(ctx, id) if err != nil { // don't delete the filter, it's "stuck" so should still count towards the limit + log.Warnf("error uninstalling filter: %v", err) return false, err } @@ -608,6 +609,7 @@ func (gw *Node) EthUnsubscribe(ctx context.Context, id ethtypes.EthSubscriptionI ok, err := gw.target.EthUnsubscribe(ctx, id) if err != nil { // don't delete the subscription, it's "stuck" so should still count towards the limit + log.Warnf("error unsubscribing: %v", err) return false, err } diff --git a/gateway/proxy_fil.go b/gateway/proxy_fil.go index 9daa0796d3a..59fa511b506 100644 --- a/gateway/proxy_fil.go +++ b/gateway/proxy_fil.go @@ -413,10 +413,10 @@ func (gw *Node) StateSearchMsg(ctx context.Context, from types.TipSetKey, msg ci return nil, err } if limit == api.LookbackNoLimit { - limit = gw.stateWaitLookbackLimit + limit = gw.maxMessageLookbackEpochs } - if gw.stateWaitLookbackLimit != api.LookbackNoLimit && limit > gw.stateWaitLookbackLimit { - limit = gw.stateWaitLookbackLimit + if gw.maxMessageLookbackEpochs != api.LookbackNoLimit && limit > gw.maxMessageLookbackEpochs { + limit = gw.maxMessageLookbackEpochs } if err := gw.checkTipsetKey(ctx, from); err != nil { return nil, err @@ -429,10 +429,10 @@ func (gw *Node) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64 return nil, err } if limit == api.LookbackNoLimit { - limit = gw.stateWaitLookbackLimit + limit = gw.maxMessageLookbackEpochs } - if gw.stateWaitLookbackLimit != api.LookbackNoLimit && limit > gw.stateWaitLookbackLimit { - limit = gw.stateWaitLookbackLimit + if gw.maxMessageLookbackEpochs != api.LookbackNoLimit && limit > gw.maxMessageLookbackEpochs { + limit = gw.maxMessageLookbackEpochs } return gw.target.StateWaitMsg(ctx, msg, confidence, limit, allowReplaced) } diff --git a/itests/gateway_test.go b/itests/gateway_test.go index 340546ca902..3972f91157d 100644 --- a/itests/gateway_test.go +++ b/itests/gateway_test.go @@ -37,8 +37,8 @@ import ( ) const ( - maxLookbackCap = time.Duration(math.MaxInt64) - maxStateWaitLookbackLimit = stmgr.LookbackNoLimit + maxLookbackCap = time.Duration(math.MaxInt64) + maxMessageLookbackEpochs = stmgr.LookbackNoLimit ) // TestGatewayWalletMsig tests that API calls to wallet and msig can be made on a lite @@ -206,23 +206,23 @@ type testNodes struct { } type startOptions struct { - blocktime time.Duration - lookbackCap time.Duration - stateWaitLookbackLimit abi.ChainEpoch - fund bool - perConnectionAPIRateLimit int - perHostRequestsPerMinute int - nodeOpts []kit.NodeOpt + blockTime time.Duration + lookbackCap time.Duration + maxMessageLookbackEpochs abi.ChainEpoch + fund bool + perConnectionAPIRateLimit int + perHostConnectionsPerMinute int + nodeOpts []kit.NodeOpt } type startOption func(*startOptions) -func applyStartOptions(opts ...startOption) startOptions { +func newStartOptions(opts ...startOption) startOptions { o := startOptions{ - blocktime: 5 * time.Millisecond, - lookbackCap: maxLookbackCap, - stateWaitLookbackLimit: maxStateWaitLookbackLimit, - fund: false, + blockTime: 5 * time.Millisecond, + lookbackCap: maxLookbackCap, + maxMessageLookbackEpochs: maxMessageLookbackEpochs, + fund: false, } for _, opt := range opts { opt(&o) @@ -244,7 +244,7 @@ func withPerConnectionAPIRateLimit(rateLimit int) startOption { func withPerHostRequestsPerMinute(rateLimit int) startOption { return func(opts *startOptions) { - opts.perHostRequestsPerMinute = rateLimit + opts.perHostConnectionsPerMinute = rateLimit } } @@ -255,7 +255,7 @@ func withNodeOpts(nodeOpts ...kit.NodeOpt) startOption { } func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNodes { - options := applyStartOptions(opts...) + options := newStartOptions(opts...) var ( full *kit.TestFullNode @@ -271,7 +271,7 @@ func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNod // create the full node and the miner. var ens *kit.Ensemble full, miner, ens = kit.EnsembleMinimal(t, kit.MockProofs()) - ens.InterconnectAll().BeginMining(options.blocktime) + ens.InterconnectAll().BeginMining(options.blockTime) api.RunningNodeType = api.NodeFull // Create a gateway server in front of the full node @@ -279,10 +279,15 @@ func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNod gwapi := gateway.NewNode( full, gateway.WithEthSubHandler(ethSubHandler), - gateway.WithLookbackCap(options.lookbackCap), - gateway.WithStateWaitLookbackLimit(options.stateWaitLookbackLimit), + gateway.WithMaxLookbackDuration(options.lookbackCap), + gateway.WithMaxMessageLookbackEpochs(options.maxMessageLookbackEpochs), + ) + handler, err := gateway.Handler( + gwapi, + full, + gateway.WithPerConnectionAPIRateLimit(options.perConnectionAPIRateLimit), + gateway.WithPerHostConnectionsPerMinute(options.perHostConnectionsPerMinute), ) - handler, err := gateway.Handler(gwapi, full, options.perConnectionAPIRateLimit, options.perHostRequestsPerMinute) t.Cleanup(func() { _ = handler.Shutdown(ctx) }) require.NoError(t, err) @@ -293,14 +298,14 @@ func startNodes(ctx context.Context, t *testing.T, opts ...startOption) *testNod // Create a gateway client API that connects to the gateway server var gapi api.Gateway - var _closer jsonrpc.ClientCloser - gapi, _closer, err = client.NewGatewayRPCV1(ctx, "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil, + var rpcCloser jsonrpc.ClientCloser + gapi, rpcCloser, err = client.NewGatewayRPCV1(ctx, "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil, jsonrpc.WithClientHandler("Filecoin", ethSubHandler), jsonrpc.WithClientHandlerAlias("eth_subscription", "Filecoin.EthSubscription"), ) require.NoError(t, err) var closeOnce sync.Once - closer := func() { closeOnce.Do(_closer) } + closer := func() { closeOnce.Do(rpcCloser) } t.Cleanup(closer) nodeOpts := append([]kit.NodeOpt{ @@ -373,13 +378,14 @@ func TestGatewayRateLimits(t *testing.T) { withPerHostRequestsPerMinute(requestsPerMinute), ) - time.Sleep(time.Second) + nodes.full.WaitTillChain(ctx, kit.HeightAtLeast(10)) // let's get going first // ChainHead uses chainRateLimitTokens=2. // But we're also competing with the paymentChannelSettler which listens to the chain uses - // ChainGetBlockMessages on each change, which also uses chainRateLimitTokens=2. - // So each loop should be 4 tokens. - loops := 10 + // ChainGetBlockMessages on each change, which also uses chainRateLimitTokens=2. But because we're + // rate limiting, it only gets a ChainGetBlockMessages in between our ChainHead calls, not on each + // 5ms block (which it wants). So each loop should be 4 tokens. + loops := 25 tokensPerLoop := 4 start := time.Now() for i := 0; i < loops; i++ { @@ -388,42 +394,45 @@ func TestGatewayRateLimits(t *testing.T) { } tokensUsed := loops * tokensPerLoop expectedEnd := start.Add(time.Duration(float64(tokensUsed) / float64(tokensPerSecond) * float64(time.Second))) - allowPad := time.Duration(float64(tokensPerLoop) / float64(tokensPerSecond) * float64(time.Second)) // add padding to account for slow test runs + allowPad := time.Duration(float64(tokensPerLoop*2) / float64(tokensPerSecond) * float64(time.Second)) // add padding to account for slow test runs t.Logf("expected end: %s, now: %s, allowPad: %s, actual delta: %s", expectedEnd, time.Now(), allowPad, time.Since(expectedEnd)) req.WithinDuration(expectedEnd, time.Now(), allowPad) client := &http.Client{} - url := fmt.Sprintf("http://%s/rpc/v1", nodes.gatewayAddr) jsonPayload := []byte(`{"method":"Filecoin.ChainHead","params":[],"id":1,"jsonrpc":"2.0"}`) var failed bool for i := 0; i < requestsPerMinute*2 && !failed; i++ { - func() { - request, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonPayload)) - req.NoError(err) - request.Header.Set("Content-Type", "application/json") - response, err := client.Do(request) - req.NoError(err) - defer func() { _ = response.Body.Close() }() - if http.StatusOK == response.StatusCode { - body, err := io.ReadAll(response.Body) - req.NoError(err) - result := map[string]interface{}{} - req.NoError(json.Unmarshal(body, &result)) - req.NoError(err) - req.NotNil(result["result"]) - height, ok := result["result"].(map[string]interface{})["Height"].(float64) - req.True(ok) - req.Greater(int(height), 0) - } else { - req.Equal(http.StatusTooManyRequests, response.StatusCode) - req.LessOrEqual(i, requestsPerMinute+1) - failed = true - } - }() + status, body := makeManualRpcCall(t, client, nodes.gatewayAddr, string(jsonPayload)) + if http.StatusOK == status { + result := map[string]interface{}{} + req.NoError(json.Unmarshal([]byte(body), &result)) + req.NotNil(result["result"]) + height, ok := result["result"].(map[string]interface{})["Height"].(float64) + req.True(ok) + req.Greater(int(height), 0) + } else { + req.Equal(http.StatusTooManyRequests, status) + req.LessOrEqual(i, requestsPerMinute+1) + failed = true + } } req.True(failed, "expected requests to fail due to rate limiting") } +func makeManualRpcCall(t *testing.T, client *http.Client, gatewayAddr, payload string) (int, string) { + // not available over plain http + url := fmt.Sprintf("http://%s/rpc/v1", gatewayAddr) + request, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(payload))) + require.NoError(t, err) + request.Header.Set("Content-Type", "application/json") + response, err := client.Do(request) + require.NoError(t, err) + defer func() { _ = response.Body.Close() }() + body, err := io.ReadAll(response.Body) + require.NoError(t, err) + return response.StatusCode, string(body) +} + func TestStatefulCallHandling(t *testing.T) { req := require.New(t) @@ -431,21 +440,6 @@ func TestStatefulCallHandling(t *testing.T) { ctx := context.Background() nodes := startNodes(ctx, t) - httpReq := func(payload string) (int, string) { - // not available over plain http - client := &http.Client{} - url := fmt.Sprintf("http://%s/rpc/v1", nodes.gatewayAddr) - request, err := http.NewRequest("POST", url, bytes.NewBuffer([]byte(payload))) - req.NoError(err) - request.Header.Set("Content-Type", "application/json") - response, err := client.Do(request) - req.NoError(err) - defer func() { _ = response.Body.Close() }() - body, err := io.ReadAll(response.Body) - req.NoError(err) - return response.StatusCode, string(body) - } - t.Logf("Testing stateful call handling rejection via plain http") for _, typ := range []string{ "EthNewBlockFilter", @@ -470,7 +464,12 @@ func TestStatefulCallHandling(t *testing.T) { expErr = "EthSubscribe not supported: connection doesn't support callbacks" } - status, body := httpReq(`{"method":"Filecoin.` + typ + `","params":[` + params + `],"id":1,"jsonrpc":"2.0"}`) + status, body := makeManualRpcCall( + t, + &http.Client{}, + nodes.gatewayAddr, + `{"method":"Filecoin.`+typ+`","params":[`+params+`],"id":1,"jsonrpc":"2.0"}`, + ) req.Equal(http.StatusOK, status, "not ok for "+typ) req.Contains(body, `{"error":{"code":1,"message":"`+expErr+`"},"id":1,"jsonrpc":"2.0"}`, "unexpected response for "+typ) @@ -532,25 +531,33 @@ func TestStatefulCallHandling(t *testing.T) { t.Logf("Testing 'too many filters' rejection") _, err = nodes.lite.EthNewBlockFilter(ctx) - require.ErrorContains(t, err, "too many subscriptions and filters for this connection") + require.ErrorContains(t, err, "too many subscriptions and filters per connection") _, err = nodes.lite.EthNewPendingTransactionFilter(ctx) - require.ErrorContains(t, err, "too many subscriptions and filters for this connection") + require.ErrorContains(t, err, "too many subscriptions and filters per connection") _, err = nodes.lite.EthNewFilter(ctx, ðtypes.EthFilterSpec{}) - require.ErrorContains(t, err, "too many subscriptions and filters for this connection") + require.ErrorContains(t, err, "too many subscriptions and filters per connection") _, err = nodes.lite.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(req.NoError)) - require.ErrorContains(t, err, "too many subscriptions and filters for this connection") + require.ErrorContains(t, err, "too many subscriptions and filters per connection") t.Logf("Shutting down the lite node") req.NoError(nodes.lite.Stop(ctx)) - nodes.rpcCloser() // once the websocke connection is closed, the server should clean up the filters for us - time.Sleep(time.Second) // unfortunately we have no other way to check for completeness of shutdown and cleanup + nodes.rpcCloser() + // Once the websocket connection is closed, the server should clean up the filters for us. + // Unfortunately we have no other way to check for completeness of shutdown and cleanup. + // * Asynchronously the rpcCloser() call will end the client websockets connection to the gateway. + // * When the gateway recognises the end of the HTTP connection, it will asynchronously make calls + // to the fullnode to clean up the filters. + // * The fullnode will then uninstall the filters and we can finally move on to check it directly + // that this has happened. + // This should happen quickly, but we have no way to synchronously check for it. So we just wait a bit. + time.Sleep(time.Second) t.Logf("Checking that all filters and subs were cleared up by directly talking to full node") ok, err = nodes.full.EthUnsubscribe(ctx, subId2) // unsub on full node, already done req.NoError(err) - req.False(ok) // already unsubbed because of auto-cleanup + req.False(ok) // already unsubscribed because of auto-cleanup for _, fid := range blockFilterIds { _, err = nodes.full.EthGetFilterChanges(ctx, fid)