Skip to content

Commit

Permalink
anura rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindi committed Feb 14, 2025
1 parent eb9d218 commit 0fd6304
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .local.dev
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ WEB3_TOKEN_ADDRESS_=0xa513E6E4b8f2a923D98304ec87F64353C4D5C853
WEB3_USERS_ADDRESS=0x0DCd1Bf9A1b36cE34237eEaFef220932846BCD82
BACALHAU_API_HOST=localhost
BACALHAU_API_PORT=1234
SERVER_RATE_EXEMPTED_IPS=127.0.0.1,::1
ANURA_ADDRESSES=0x1da99b9e884C9e7B15361957577978c1fa66AfBb
1 change: 1 addition & 0 deletions pkg/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type AccessControlOptions struct {
ValidationTokenSecret string
ValidationTokenExpiration int
ValidationTokenKid string
AnuraAddresses []string
}

type ValidationToken struct {
Expand Down
60 changes: 60 additions & 0 deletions pkg/http/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ const X_LILYPAD_SIGNATURE_HEADER = "X-Lilypad-Signature"
// the version run by the client or service
const X_LILYPAD_VERSION_HEADER = "X-Lilypad-Version"

// the signature of the anura server
const X_ANURA_SIGNATURE_HEADER = "X-Anura-Key"

// the address of the anura server
const X_ANURA_SERVER_HEADER = "X-Anura-Server"

// the context name we keep the address
const CONTEXT_ADDRESS = "address"

Expand Down Expand Up @@ -110,6 +116,20 @@ func AddHeaders(
return nil
}

func AddAnuraHeaders(
req *retryablehttp.Request,
privateKey *ecdsa.PrivateKey,
address string,
) error {
serverPayload, serverSignature, err := encodeUserAddress(privateKey, address)
if err != nil {
return err
}
req.Header.Add(X_ANURA_SERVER_HEADER, serverPayload)
req.Header.Add(X_ANURA_SIGNATURE_HEADER, serverSignature)
return nil
}

// Use the client headers to ensure that a message was signed
// by the holder of a private key for a specific address.
// The "X-Lilypad-User" header contains the address.
Expand Down Expand Up @@ -168,6 +188,44 @@ func CheckSignature(req *http.Request) (string, error) {
return signatureAddress, nil
}

func CheckAnuraSignature(req *http.Request, approvedAddresses []string) (string, error) {

serverHeader := req.Header.Get(X_ANURA_SERVER_HEADER)
if serverHeader == "" {
return "", HTTPError{
Message: "missing anura server header",
StatusCode: http.StatusUnauthorized,
}
}

anuraSignature := req.Header.Get(X_ANURA_SIGNATURE_HEADER)
if anuraSignature == "" {
return "", HTTPError{
Message: "missing anura signature header",
StatusCode: http.StatusUnauthorized,
}
}

signatureAddress, err := decodeUserAddress(serverHeader, anuraSignature)
if err != nil {
return "", HTTPError{
Message: fmt.Sprintf("invalid server header or signature %s", err.Error()),
StatusCode: http.StatusUnauthorized,
}
}

for _, addr := range approvedAddresses {
if strings.EqualFold(signatureAddress, addr) {
return signatureAddress, nil
}
}

return "", HTTPError{
Message: "unauthorized anura signature",
StatusCode: http.StatusUnauthorized,
}
}

func GetVersionFromHeaders(req *http.Request) (string, error) {
versionHeader := req.Header.Get(X_LILYPAD_VERSION_HEADER)
if versionHeader == "" {
Expand Down Expand Up @@ -471,3 +529,5 @@ func CanonicalizeIP(ip string) string {
}
return ipv6.Mask(net.CIDRMask(64, 128)).String()
}


6 changes: 3 additions & 3 deletions pkg/options/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ func GetDefaultAccessControlOptions() http.AccessControlOptions {
ValidationTokenSecret: GetDefaultServeOptionString("SERVER_VALIDATION_TOKEN_SECRET", ""),
ValidationTokenExpiration: GetDefaultServeOptionInt("SERVER_VALIDATION_TOKEN_EXPIRATION", 604800), // one week
ValidationTokenKid: GetDefaultServeOptionString("SERVER_VALIDATION_TOKEN_KID", ""),
AnuraAddresses: GetDefaultServeOptionStringArray("ANURA_ADDRESSES", []string{}),
}
}

func GetDefaultRateLimiterOptions() http.RateLimiterOptions {
return http.RateLimiterOptions{
RequestLimit: GetDefaultServeOptionInt("SERVER_RATE_REQUEST_LIMIT", 5),
WindowLength: GetDefaultServeOptionInt("SERVER_RATE_WINDOW_LENGTH", 10),
ExemptedIPs: GetDefaultServeOptionStringArray("SERVER_RATE_EXEMPTED_IPS", []string{}),
}
}

Expand Down Expand Up @@ -77,8 +77,8 @@ func AddServerCliFlags(cmd *cobra.Command, serverOptions *http.ServerOptions) {
`The time window over which to limit in seconds (SERVER_RATE_WINDOW_LENGTH).`,
)
cmd.PersistentFlags().StringArrayVar(
&serverOptions.RateLimiter.ExemptedIPs, "server-rate-exempted-ips", serverOptions.RateLimiter.ExemptedIPs,
`The IPs to exempt from rate limiting (SERVER_RATE_EXEMPTED_IPS).`,
&serverOptions.AccessControl.AnuraAddresses, "server-anura-addresses", serverOptions.AccessControl.AnuraAddresses,
`The Anura wallet addresses that are allowed to access anura endpoints (ANURA_ADDRESSES).`,
)
}

Expand Down
156 changes: 156 additions & 0 deletions pkg/solver/anuraLimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
//go:build integration && solver

package solver_test

import (
"context"
"fmt"
"net/http"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/hashicorp/go-retryablehttp"
httputil "github.com/lilypad-tech/lilypad/pkg/http"
)

type anuraRateResult struct {
path string
okCount int
limitedCount int
unauthorizedCount int
}

type anuraTestCase struct {
name string
privateKey string
expectedOK int
expectedLimit int
expectedUnauth int
}

// This test suite sends 10 requests to Anura endpoints. We test both valid and invalid
// signatures. Invalid signatures should be rate limited after 5 attempts.
func TestAnuraRateLimiter(t *testing.T) {
paths := []string{
"/api/v1/anura/job_offers",
}

validKey := "b3994e7660abe5f65f729bb64163c6cd6b7d0b1a8c67881a7346e3e8c7f026f5"
invalidKey := "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"

t.Run("valid anura signature passes", func(t *testing.T) {
tc := anuraTestCase{
name: "valid signature should pass",
privateKey: validKey,
expectedOK: 10, // All 10 requests should succeed
expectedLimit: 0, // No rate limiting
expectedUnauth: 0, // No unauthorized responses
}
runAnuraTest(t, paths, tc)
})

// Wait a bit between tests to let rate limiter reset
time.Sleep(1 * time.Second)

t.Run("invalid anura signature gets rate limited", func(t *testing.T) {
tc := anuraTestCase{
name: "invalid signature should be rate limited",
privateKey: invalidKey,
expectedOK: 0, // No requests should succeed
expectedLimit: 5, // 5 requests should be rate limited
expectedUnauth: 5, // First 5 should be unauthorized
}
runAnuraTest(t, paths, tc)
})
}

func runAnuraTest(t *testing.T, paths []string, tc anuraTestCase) {
var wg sync.WaitGroup
ch := make(chan anuraRateResult, len(paths))

for _, path := range paths {
wg.Add(1)
go func(path string) {
defer wg.Done()
makeAnuraCalls(t, path, ch, tc)
}(path)
}

wg.Wait()
close(ch)

for result := range ch {
if result.okCount != tc.expectedOK {
t.Errorf("%s: Expected %d successful requests, got %d",
result.path, tc.expectedOK, result.okCount)
}
if result.limitedCount != tc.expectedLimit {
t.Errorf("%s: Expected %d rate limited requests, got %d",
result.path, tc.expectedLimit, result.limitedCount)
}
if result.unauthorizedCount != tc.expectedUnauth {
t.Errorf("%s: Expected %d unauthorized requests, got %d",
result.path, tc.expectedUnauth, result.unauthorizedCount)
}
}
}

func makeAnuraCalls(t *testing.T, path string, ch chan anuraRateResult, tc anuraTestCase) {
var okCount, limitedCount, unauthorizedCount int

// Configure client with no retries and silent logger
client := retryablehttp.NewClient()
client.RetryMax = 0
client.Logger = nil // Disable logging
client.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
return false, nil // Never retry
}

for i := 0; i < 10; i++ {
req, err := retryablehttp.NewRequest("GET", fmt.Sprintf("http://localhost:%d%s", 8081, path), nil)
if err != nil {
t.Errorf("Failed to create request: %s\n", err)
return
}

privateKey, err := crypto.HexToECDSA(tc.privateKey)
if err != nil {
t.Errorf("Failed to parse private key: %s\n", err)
return
}

err = httputil.AddAnuraHeaders(req, privateKey, crypto.PubkeyToAddress(privateKey.PublicKey).String())
if err != nil {
t.Errorf("Failed to add Anura headers: %s\n", err)
return
}

res, err := client.Do(req)
if err != nil {
t.Errorf("Request failed on %s: %s\n", path, err)
return
}

switch res.StatusCode {
case http.StatusOK:
okCount++
case http.StatusTooManyRequests:
limitedCount++
case http.StatusUnauthorized:
unauthorizedCount++
default:
t.Errorf("Expected 200, 401, or 429 status code, but received %d\n", res.StatusCode)
}

time.Sleep(100 * time.Millisecond)
}

ch <- anuraRateResult{
path: path,
okCount: okCount,
limitedCount: limitedCount,
unauthorizedCount: unauthorizedCount,
}
}
26 changes: 1 addition & 25 deletions pkg/solver/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,6 @@ func TestRateLimiter(t *testing.T) {
{"X-Forwarded-For": "1.2.3.4"},
}

// The running solver is configured to exempt localhost.
// When no headers are set, test using the IP address from
// the underlying connection (also localhost)
// TODO: re-enable exempt IP rate limiting
// exemptHeaders := []map[string]string{
// {"True-Client-IP": "127.0.0.1"},
// {"X-Real-IP": "127.0.0.1"},
// {"X-Forwarded-For": "127.0.0.1"},
// {}, // No headers case - uses RemoteAddr
// }

t.Run("non-exempt IP is rate limited", func(t *testing.T) {
// Select a random header on each test run. Over time we test them all.
headers := nonExemptHeaders[rand.Intn(len(nonExemptHeaders))]
Expand All @@ -68,26 +57,13 @@ func TestRateLimiter(t *testing.T) {
}
runRateLimitTest(t, paths, tc)
})

// TODO: re-enable exempt IP rate limiting
// t.Run("exempt IP is not rate limited", func(t *testing.T) {
// // Select a random header on each test run. Over time we test them all.
// headers := exemptHeaders[rand.Intn(len(exemptHeaders))]
// tc := rateTestCase{
// name: fmt.Sprintf("exempt with headers %v", headers),
// headers: headers,
// expectedOK: 100,
// expectedLimit: 0,
// }
// runRateLimitTest(t, paths, tc)
// })

}

func runRateLimitTest(t *testing.T, paths []string, tc rateTestCase) {
var wg sync.WaitGroup
ch := make(chan rateResult, len(paths))

// Run the calls against paths in parallel
for _, path := range paths {
wg.Add(1)
go func(path string) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/solver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,37 @@ func (solverServer *solverServer) ListenAndServe(ctx context.Context, cm *system

subrouter.HandleFunc("/validation_token", http.GetHandler(solverServer.getValidationToken)).Methods("GET")

//anura subrouter

failedAuthLimiter := httprate.Limit(
5,
60*time.Second,
httprate.WithKeyFuncs(httprate.KeyByRealIP),
)

anuraMiddleware := func(next corehttp.Handler) corehttp.Handler {
return corehttp.HandlerFunc(func(w corehttp.ResponseWriter, r *corehttp.Request) {
_, err := http.CheckAnuraSignature(r, solverServer.options.AccessControl.AnuraAddresses)
if err != nil {
failedAuthLimiter(corehttp.HandlerFunc(func(w corehttp.ResponseWriter, r *corehttp.Request) {
corehttp.Error(w, "Unauthorized", corehttp.StatusUnauthorized)
})).ServeHTTP(w, r)
return
}
next.ServeHTTP(w, r)
})
}

anurarouter := router.PathPrefix(http.API_SUB_PATH + "/anura").Subrouter()
anurarouter.Use(http.CorsMiddleware)
anurarouter.Use(otelmux.Middleware("solver", otelmux.WithTracerProvider(tracerProvider)))
anurarouter.Use(anuraMiddleware)

anurarouter.HandleFunc("/job_offers", http.PostHandler(solverServer.addJobOffer)).Methods("POST")
anurarouter.HandleFunc("/job_offers", http.GetHandler(solverServer.getJobOffers)).Methods("GET")
anurarouter.HandleFunc("/job_offers/{id}", http.GetHandler(solverServer.getJobOffer)).Methods("GET")
anurarouter.HandleFunc("/job_offers/{id}/files", solverServer.jobOfferDownloadFiles).Methods("GET")

// this will fan out to all connected web socket connections
// we read all events coming from inside the solver controller
// and write them to anyone who is connected to us
Expand Down

0 comments on commit 0fd6304

Please sign in to comment.