Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add rate limiting to the lotus gateway #8517

Merged
merged 17 commits into from
Jun 10, 2022
36 changes: 30 additions & 6 deletions cmd/lotus-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,33 @@ var runCmd = &cli.Command{
Usage: "maximum number of blocks to search back through for message inclusion",
Value: int64(gateway.DefaultStateWaitLookbackLimit),
},
&cli.Int64Flag{
Name: "rate-limit",
Usage: "rate-limit API calls. Use 0 to disable",
Value: 0,
},
&cli.Int64Flag{
Name: "per-conn-rate-limit",
Usage: "rate-limit API calls per each connection. Use 0 to disable",
Value: 0,
},
&cli.DurationFlag{
Name: "rate-limit-timeout",
Usage: "the maximum time to wait for the rate limter before returning an error to clients",
Value: gateway.DefaultRateLimitTimeout,
},
&cli.Int64Flag{
Name: "conn-per-minute",
Usage: "The number of incomming connections to accept from a single IP per minute. Use 0 to disable",
Value: 0,
},
},
Action: func(cctx *cli.Context) error {
log.Info("Starting lotus gateway")

// Register all metric views
if err := view.Register(
metrics.ChainNodeViews...,
metrics.GatewayNodeViews...,
); err != nil {
log.Fatalf("Cannot register the view: %v", err)
}
Expand All @@ -151,9 +171,13 @@ var runCmd = &cli.Command{
defer closer()

var (
lookbackCap = cctx.Duration("api-max-lookback")
address = cctx.String("listen")
waitLookback = abi.ChainEpoch(cctx.Int64("api-wait-lookback-limit"))
lookbackCap = cctx.Duration("api-max-lookback")
address = cctx.String("listen")
waitLookback = abi.ChainEpoch(cctx.Int64("api-wait-lookback-limit"))
rateLimit = cctx.Int64("rate-limit")
perConnRateLimit = cctx.Int64("per-conn-rate-limit")
rateLimitTimeout = cctx.Duration("rate-limit-timeout")
connPerMinute = cctx.Int64("conn-per-minute")
)

serverOptions := make([]jsonrpc.ServerOption, 0)
Expand All @@ -173,8 +197,8 @@ var runCmd = &cli.Command{
return xerrors.Errorf("failed to convert endpoint address to multiaddr: %w", err)
}

gwapi := gateway.NewNode(api, lookbackCap, waitLookback)
h, err := gateway.Handler(gwapi, api, serverOptions...)
gwapi := gateway.NewNode(api, lookbackCap, waitLookback, rateLimit, rateLimitTimeout)
h, err := gateway.Handler(gwapi, api, perConnRateLimit, connPerMinute, serverOptions...)
if err != nil {
return xerrors.Errorf("failed to set up gateway HTTP handler")
}
Expand Down
103 changes: 101 additions & 2 deletions gateway/handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package gateway

import (
"context"
"net"
"net/http"
"sync"
"time"

"contrib.go.opencensus.io/exporter/prometheus"
"github.com/filecoin-project/go-jsonrpc"
Expand All @@ -12,10 +16,15 @@ import (
"github.com/filecoin-project/lotus/node"
"github.com/gorilla/mux"
promclient "github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"
)

type perConnLimiterKeyType string

const perConnLimiterKey perConnLimiterKeyType = "limiter"

// Handler returns a gateway http.Handler, to be mounted as-is on the server.
func Handler(gwapi lapi.Gateway, api lapi.FullNode, opts ...jsonrpc.ServerOption) (http.Handler, error) {
func Handler(gwapi lapi.Gateway, api lapi.FullNode, rateLimit int64, connPerMinute int64, opts ...jsonrpc.ServerOption) (http.Handler, error) {
m := mux.NewRouter()

serveRpc := func(path string, hnd interface{}) {
Expand Down Expand Up @@ -49,5 +58,95 @@ func Handler(gwapi lapi.Gateway, api lapi.FullNode, opts ...jsonrpc.ServerOption
Next: mux.ServeHTTP,
}*/

return m, nil
rlh := NewRateLimiterHandler(m, rateLimit)
clh := NewConnectionRateLimiterHandler(rlh, connPerMinute)
return clh, nil
}

func NewRateLimiterHandler(handler http.Handler, rateLimit int64) *RateLimiterHandler {
limiter := limiterFromRateLimit(rateLimit)

return &RateLimiterHandler{
handler: handler,
limiter: limiter,
}
}

// Adds a rate limiter to the request context for per-connection rate limiting
type RateLimiterHandler struct {
handler http.Handler
limiter *rate.Limiter
}

func (h RateLimiterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r2 := r.WithContext(context.WithValue(r.Context(), perConnLimiterKey, h.limiter))
h.handler.ServeHTTP(w, r2)
}

// this blocks new connections if there have already been too many.
func NewConnectionRateLimiterHandler(handler http.Handler, connPerMinute int64) *ConnectionRateLimiterHandler {
ipmap := make(map[string]int64)
return &ConnectionRateLimiterHandler{
ipmap: ipmap,
connPerMinute: connPerMinute,
handler: handler,
}
}

type ConnectionRateLimiterHandler struct {
mu sync.Mutex
ipmap map[string]int64
connPerMinute int64
handler http.Handler
}

func (h *ConnectionRateLimiterHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h.connPerMinute == 0 {
h.handler.ServeHTTP(w, r)
return
}
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

h.mu.Lock()
seen, ok := h.ipmap[host]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this not abuseable with ipv6

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does work the same way on ipv4 or ipv6, this is keyed by just simply a string, which will be something like "1.2.3.4" when ipv4 is used or "1111:2222:3333:4444:5555:6666:7777:8888" when using ipv6.

This is the third kind of rate limiting on this PR, and I could be convinced that this should be done differently.

The global rate limiter and per-connection limiter protect the backend. The former is persistent for the lifetime of the process and the latter persists for the lifetime of a single connection.

The connection rate limiter, however, is intended to prevent abuse from scripts opening several connections in quick succession. the ipmap grows when there new connections, and shrinks again some time later.

Alternatively, we could do this as a max simultanious connections rather than "connections per minute"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main worry is that with ipv6 you get a /64 subnet, which is a lot of IPs which are tracked separately; but yes, the other limits should help here

if !ok {
h.ipmap[host] = 1
h.mu.Unlock()
h.handler.ServeHTTP(w, r)
return
}
// rate limited
if seen > h.connPerMinute {
h.mu.Unlock()
w.WriteHeader(http.StatusTooManyRequests)
return
}
h.ipmap[host] = seen + 1
h.mu.Unlock()
go func() {
select {
case <-time.After(time.Minute):
h.mu.Lock()
defer h.mu.Unlock()
h.ipmap[host] = h.ipmap[host] - 1
if h.ipmap[host] <= 0 {
delete(h.ipmap, host)
}
}
}()
h.handler.ServeHTTP(w, r)
}

func limiterFromRateLimit(rateLimit int64) *rate.Limiter {
var limit rate.Limit
if rateLimit == 0 {
limit = rate.Inf
} else {
limit = rate.Every(time.Second / time.Duration(rateLimit))
}
return rate.NewLimiter(limit, stateRateLimitTokens)
}
Loading