Skip to content

Commit

Permalink
fix: address issues raised from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 5, 2024
1 parent 2bb1725 commit 07fed9f
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 218 deletions.
18 changes: 12 additions & 6 deletions cmd/lotus-gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ var runCmd = &cli.Command{
&cli.DurationFlag{
Name: "api-max-lookback",
Usage: "maximum duration allowable for tipset lookbacks",
Value: gateway.DefaultLookbackCap,
Value: gateway.DefaultMaxLookbackDuration,
},
&cli.Int64Flag{
Name: "api-wait-lookback-limit",
Usage: "maximum number of blocks to search back through for message inclusion",
Value: int64(gateway.DefaultStateWaitLookbackLimit),
Value: int64(gateway.DefaultMaxMessageLookbackEpochs),
},
&cli.Int64Flag{
Name: "rate-limit",
Expand All @@ -154,7 +154,7 @@ var runCmd = &cli.Command{
},
&cli.Int64Flag{
Name: "conn-per-minute",
Usage: "A hard limit on the number of incomming connections (requests) to accept per remote host per minute. Use 0 to disable",
Usage: "A hard limit on the number of incoming connections (requests) to accept per remote host per minute. Use 0 to disable",
Value: 0,
},
&cli.IntFlag{
Expand Down Expand Up @@ -212,13 +212,19 @@ var runCmd = &cli.Command{
gwapi := gateway.NewNode(
api,
gateway.WithEthSubHandler(subHnd),
gateway.WithLookbackCap(lookbackCap),
gateway.WithStateWaitLookbackLimit(waitLookback),
gateway.WithMaxLookbackDuration(lookbackCap),
gateway.WithMaxMessageLookbackEpochs(waitLookback),
gateway.WithRateLimit(globalRateLimit),
gateway.WithRateLimitTimeout(rateLimitTimeout),
gateway.WithEthMaxFiltersPerConn(maxFiltersPerConn),
)
handler, err := gateway.Handler(gwapi, api, perConnectionRateLimit, perHostConnectionsPerMinute, serverOptions...)
handler, err := gateway.Handler(
gwapi,
api,
gateway.WithPerConnectionAPIRateLimit(perConnectionRateLimit),
gateway.WithPerHostConnectionsPerMinute(perHostConnectionsPerMinute),
gateway.WithJsonrpcServerOptions(serverOptions...),
)
if err != nil {
return xerrors.Errorf("failed to set up gateway HTTP handler")
}
Expand Down
182 changes: 111 additions & 71 deletions gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,62 @@ type ShutdownHandler interface {
Shutdown(ctx context.Context) error
}

var _ ShutdownHandler = &statefulCallHandler{}
var _ ShutdownHandler = &RateLimitHandler{}
var _ ShutdownHandler = (*statefulCallHandler)(nil)
var _ ShutdownHandler = (*RateLimitHandler)(nil)

// Handler returns a gateway http.Handler, to be mounted as-is on the server. The handler is
// returned as a ShutdownHandler which allows for graceful shutdown of the handler via its
// Shutdown method.
// handlerOptions holds the options for the Handler function.
type handlerOptions struct {
perConnectionAPIRateLimit int
perHostConnectionsPerMinute int
jsonrpcServerOptions []jsonrpc.ServerOption
}

// HandlerOption is a functional option for configuring the Handler.
type HandlerOption func(*handlerOptions)

// WithPerConnectionAPIRateLimit sets the per connection API rate limit.
//
// The handler will limit the number of API calls per minute within a single WebSocket connection
// (where API calls are weighted by their relative expense), and the number of connections per
// minute from a single host.
func WithPerConnectionAPIRateLimit(limit int) HandlerOption {
return func(opts *handlerOptions) {
opts.perConnectionAPIRateLimit = limit
}
}

// WithPerHostConnectionsPerMinute sets the per host connections per minute limit.
//
// Connection limiting is a hard limit that will reject requests with a 429 status code if the limit
// is exceeded. API call limiting is a soft limit that will delay requests if the limit is exceeded.
func Handler(
gwapi lapi.Gateway,
api lapi.FullNode,
perConnectionAPIRateLimit int,
perHostConnectionsPerMinute int,
opts ...jsonrpc.ServerOption,
) (ShutdownHandler, error) {
// Connection limiting is a hard limit that will reject requests with a http.StatusTooManyRequests
// status code if the limit is exceeded. API call limiting is a soft limit that will delay requests
// if the limit is exceeded.
func WithPerHostConnectionsPerMinute(limit int) HandlerOption {
return func(opts *handlerOptions) {
opts.perHostConnectionsPerMinute = limit
}
}

m := mux.NewRouter()
// WithJsonrpcServerOptions sets the JSON-RPC server options.
func WithJsonrpcServerOptions(options ...jsonrpc.ServerOption) HandlerOption {
return func(opts *handlerOptions) {
opts.jsonrpcServerOptions = options
}
}

opts = append(opts, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors))
// Handler returns a gateway http.Handler, to be mounted as-is on the server. The handler is
// returned as a ShutdownHandler which allows for graceful shutdown of the handler via its
// Shutdown method.
func Handler(gwapi lapi.Gateway, api lapi.FullNode, options ...HandlerOption) (ShutdownHandler, error) {
opts := &handlerOptions{}
for _, option := range options {
option(opts)
}

m := mux.NewRouter()

rpcopts := append(opts.jsonrpcServerOptions, jsonrpc.WithReverseClient[lapi.EthSubscriberMethods]("Filecoin"), jsonrpc.WithServerErrors(lapi.RPCErrors))
serveRpc := func(path string, hnd interface{}) {
rpcServer := jsonrpc.NewServer(opts...)
rpcServer := jsonrpc.NewServer(rpcopts...)
rpcServer.Register("Filecoin", hnd)
rpcServer.AliasMethod("rpc.discover", "Filecoin.Discover")

Expand Down Expand Up @@ -91,11 +120,11 @@ func Handler(
m.PathPrefix("/").Handler(http.DefaultServeMux)

handler := &statefulCallHandler{m}
if perConnectionAPIRateLimit > 0 && perHostConnectionsPerMinute > 0 {
if opts.perConnectionAPIRateLimit > 0 || opts.perHostConnectionsPerMinute > 0 {
return NewRateLimitHandler(
handler,
perConnectionAPIRateLimit,
perHostConnectionsPerMinute,
opts.perConnectionAPIRateLimit,
opts.perHostConnectionsPerMinute,
connectionLimiterCleanupInterval,
), nil
}
Expand Down Expand Up @@ -125,14 +154,15 @@ type hostLimiter struct {
}

type RateLimitHandler struct {
cancelFunc context.CancelFunc
mu sync.Mutex
limiters map[string]*hostLimiter
perConnectionAPILimit rate.Limit
perHostConnectionsPerMinute int
next http.Handler
cleanupInterval time.Duration
expiryDuration time.Duration
cancelFunc context.CancelFunc
limiters map[string]*hostLimiter
limitersLk sync.Mutex
perConnectionAPILimit rate.Limit
perHostConnectionsLimit rate.Limit
perHostConnectionsLimitBurst int
next http.Handler
cleanupInterval time.Duration
expiryDuration time.Duration
}

// NewRateLimitHandler creates a new RateLimitHandler that wraps the
Expand All @@ -149,85 +179,95 @@ func NewRateLimitHandler(

ctx, cancel := context.WithCancel(context.Background())
h := &RateLimitHandler{
cancelFunc: cancel,
limiters: make(map[string]*hostLimiter),
perConnectionAPILimit: rate.Inf,
perHostConnectionsPerMinute: perHostConnectionsPerMinute,
next: next,
cleanupInterval: cleanupInterval,
expiryDuration: 5 * cleanupInterval,
cancelFunc: cancel,
limiters: make(map[string]*hostLimiter),
perConnectionAPILimit: rate.Inf,
perHostConnectionsLimit: rate.Inf,
next: next,
cleanupInterval: cleanupInterval,
expiryDuration: 5 * cleanupInterval,
}
if perConnectionAPIRateLimit > 0 {
h.perConnectionAPILimit = rate.Every(time.Second / time.Duration(perConnectionAPIRateLimit))
}
if perHostConnectionsPerMinute > 0 {
h.perHostConnectionsLimit = rate.Every(time.Minute / time.Duration(perHostConnectionsPerMinute))
h.perHostConnectionsLimitBurst = perHostConnectionsPerMinute
}
go h.cleanupExpiredLimiters(ctx)
return h
}

func (h *RateLimitHandler) getLimits(host string) *hostLimiter {
h.mu.Lock()
defer h.mu.Unlock()

entry, exists := h.limiters[host]
if !exists {
var limiter *rate.Limiter
if h.perHostConnectionsPerMinute > 0 {
requestLimit := rate.Every(time.Minute / time.Duration(h.perHostConnectionsPerMinute))
limiter = rate.NewLimiter(requestLimit, h.perHostConnectionsPerMinute)
}
entry = &hostLimiter{
limiter: limiter,
lastAccess: time.Now(),
func (h *RateLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if h.perHostConnectionsLimit != rate.Inf {
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
h.limiters[host] = entry
} else {
entry.lastAccess = time.Now()
}

return entry
}

func (h *RateLimitHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
h.limitersLk.Lock()
entry, exists := h.limiters[host]
if !exists {
entry = &hostLimiter{
limiter: rate.NewLimiter(h.perHostConnectionsLimit, h.perHostConnectionsLimitBurst),
lastAccess: time.Now(),
}
h.limiters[host] = entry
} else {
entry.lastAccess = time.Now()
}
h.limitersLk.Unlock()

limits := h.getLimits(host)
if limits.limiter != nil && !limits.limiter.Allow() {
w.WriteHeader(http.StatusTooManyRequests)
return
if !entry.limiter.Allow() {
w.WriteHeader(http.StatusTooManyRequests)
return
}
}

if h.perConnectionAPILimit != rate.Inf {
// new rate limiter for each connection, to throttle a single WebSockets connection;
// allow for a burst of MaxRateLimitTokens
apiLimiter := rate.NewLimiter(h.perConnectionAPILimit, MaxRateLimitTokens)
r = r.WithContext(context.WithValue(r.Context(), perConnectionAPIRateLimiterKey, apiLimiter))
r = r.WithContext(setPerConnectionAPIRateLimiter(r.Context(), apiLimiter))
}

h.next.ServeHTTP(w, r)
}

// setPerConnectionAPIRateLimiter sets the rate limiter in the context.
func setPerConnectionAPIRateLimiter(ctx context.Context, limiter *rate.Limiter) context.Context {
return context.WithValue(ctx, perConnectionAPIRateLimiterKey, limiter)
}

// getPerConnectionAPIRateLimiter retrieves the rate limiter from the context.
func getPerConnectionAPIRateLimiter(ctx context.Context) (*rate.Limiter, bool) {
limiter, ok := ctx.Value(perConnectionAPIRateLimiterKey).(*rate.Limiter)
return limiter, ok
}

// cleanupExpiredLimiters periodically checks for limiters that have expired and removes them.
func (h *RateLimitHandler) cleanupExpiredLimiters(ctx context.Context) {
if h.cleanupInterval == 0 {
return
}

for {
ticker := time.NewTicker(h.cleanupInterval)
defer ticker.Stop()

for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case <-time.After(h.cleanupInterval):
h.mu.Lock()
case <-ticker.C:
h.limitersLk.Lock()
now := time.Now()
for host, entry := range h.limiters {
if now.Sub(entry.lastAccess) > h.expiryDuration {
delete(h.limiters, host)
}
}
h.mu.Unlock()
h.limitersLk.Unlock()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion gateway/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestRequestRateLimiterHandler(t *testing.T) {
var callCount int
h := gateway.NewRateLimitHandler(
http.HandlerFunc(func(_ http.ResponseWriter, _ *http.Request) {
http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
callCount++
}),
0, // api rate
Expand Down
Loading

0 comments on commit 07fed9f

Please sign in to comment.