Skip to content

Commit

Permalink
feat: Anura Rate Limiter (#513)
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindi authored Feb 20, 2025
1 parent 491c875 commit 8e996a7
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 115 deletions.
2 changes: 1 addition & 1 deletion .local.dev
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ WEB3_TOKEN_ADDRESS_=0xa513E6E4b8f2a923D98304ec87F64353C4D5C853
WEB3_USERS_ADDRESS=0x0DCd1Bf9A1b36cE34237eEaFef220932846BCD82
BACALHAU_API_HOST=localhost
BACALHAU_API_PORT=1234
SERVER_RATE_EXEMPTED_IPS=127.0.0.1,::1
ANURA_ADDRESSES=0x1da99b9e884C9e7B15361957577978c1fa66AfBb
2 changes: 1 addition & 1 deletion pkg/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type AccessControlOptions struct {
ValidationTokenSecret string
ValidationTokenExpiration int
ValidationTokenKid string
AnuraAddresses []string
}

type ValidationToken struct {
Expand All @@ -22,7 +23,6 @@ type ValidationToken struct {
type RateLimiterOptions struct {
RequestLimit int
WindowLength int
ExemptedIPs []string
}

type ClientOptions struct {
Expand Down
88 changes: 59 additions & 29 deletions pkg/http/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"io"
stdlog "log"
"net"
"net/http"
"net/url"
"strings"
Expand All @@ -30,6 +29,12 @@ const X_LILYPAD_SIGNATURE_HEADER = "X-Lilypad-Signature"
// the version run by the client or service
const X_LILYPAD_VERSION_HEADER = "X-Lilypad-Version"

// the signature of the anura server
const X_ANURA_SIGNATURE_HEADER = "X-Anura-Key"

// the address of the anura server
const X_ANURA_SERVER_HEADER = "X-Anura-Server"

// the context name we keep the address
const CONTEXT_ADDRESS = "address"

Expand Down Expand Up @@ -110,6 +115,20 @@ func AddHeaders(
return nil
}

func AddAnuraHeaders(
req *retryablehttp.Request,
privateKey *ecdsa.PrivateKey,
address string,
) error {
serverPayload, serverSignature, err := encodeUserAddress(privateKey, address)
if err != nil {
return err
}
req.Header.Add(X_ANURA_SERVER_HEADER, serverPayload)
req.Header.Add(X_ANURA_SIGNATURE_HEADER, serverSignature)
return nil
}

// Use the client headers to ensure that a message was signed
// by the holder of a private key for a specific address.
// The "X-Lilypad-User" header contains the address.
Expand Down Expand Up @@ -168,6 +187,44 @@ func CheckSignature(req *http.Request) (string, error) {
return signatureAddress, nil
}

func CheckAnuraSignature(req *http.Request, approvedAddresses []string) (string, error) {

serverHeader := req.Header.Get(X_ANURA_SERVER_HEADER)
if serverHeader == "" {
return "", HTTPError{
Message: "missing anura server header",
StatusCode: http.StatusUnauthorized,
}
}

anuraSignature := req.Header.Get(X_ANURA_SIGNATURE_HEADER)
if anuraSignature == "" {
return "", HTTPError{
Message: "missing anura signature header",
StatusCode: http.StatusUnauthorized,
}
}

signatureAddress, err := decodeUserAddress(serverHeader, anuraSignature)
if err != nil {
return "", HTTPError{
Message: fmt.Sprintf("invalid server header or signature %s", err.Error()),
StatusCode: http.StatusUnauthorized,
}
}

for _, addr := range approvedAddresses {
if strings.EqualFold(signatureAddress, addr) {
return signatureAddress, nil
}
}

return "", HTTPError{
Message: "unauthorized anura signature",
StatusCode: http.StatusUnauthorized,
}
}

func GetVersionFromHeaders(req *http.Request) (string, error) {
versionHeader := req.Header.Get(X_LILYPAD_VERSION_HEADER)
if versionHeader == "" {
Expand Down Expand Up @@ -443,31 +500,4 @@ func newRetryClient() *retryablehttp.Client {
return nil, fmt.Errorf("%s %s gave up after %d attempt(s): %s", resp.Request.Method, resp.Request.URL, numTries, string(body))
}
return retryClient
}

func CanonicalizeIP(ip string) string {
isIPv6 := false
// This is how net.ParseIP decides if an address is IPv6
// https://cs.opensource.google/go/go/+/refs/tags/go1.17.7:src/net/ip.go;l=704
for i := 0; !isIPv6 && i < len(ip); i++ {
switch ip[i] {
case '.':
// IPv4
return ip
case ':':
// IPv6
isIPv6 = true
break
}
}
if !isIPv6 {
// Not an IP address at all
return ip
}

ipv6 := net.ParseIP(ip)
if ipv6 == nil {
return ip
}
return ipv6.Mask(net.CIDRMask(64, 128)).String()
}
}
14 changes: 3 additions & 11 deletions pkg/options/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package options

import (
"fmt"
"net"
"github.com/lilypad-tech/lilypad/pkg/http"
"github.com/spf13/cobra"
)
Expand All @@ -24,14 +23,14 @@ func GetDefaultAccessControlOptions() http.AccessControlOptions {
ValidationTokenSecret: GetDefaultServeOptionString("SERVER_VALIDATION_TOKEN_SECRET", ""),
ValidationTokenExpiration: GetDefaultServeOptionInt("SERVER_VALIDATION_TOKEN_EXPIRATION", 604800), // one week
ValidationTokenKid: GetDefaultServeOptionString("SERVER_VALIDATION_TOKEN_KID", ""),
AnuraAddresses: GetDefaultServeOptionStringArray("ANURA_ADDRESSES", []string{}),
}
}

func GetDefaultRateLimiterOptions() http.RateLimiterOptions {
return http.RateLimiterOptions{
RequestLimit: GetDefaultServeOptionInt("SERVER_RATE_REQUEST_LIMIT", 5),
WindowLength: GetDefaultServeOptionInt("SERVER_RATE_WINDOW_LENGTH", 10),
ExemptedIPs: GetDefaultServeOptionStringArray("SERVER_RATE_EXEMPTED_IPS", []string{}),
}
}

Expand Down Expand Up @@ -77,8 +76,8 @@ func AddServerCliFlags(cmd *cobra.Command, serverOptions *http.ServerOptions) {
`The time window over which to limit in seconds (SERVER_RATE_WINDOW_LENGTH).`,
)
cmd.PersistentFlags().StringArrayVar(
&serverOptions.RateLimiter.ExemptedIPs, "server-rate-exempted-ips", serverOptions.RateLimiter.ExemptedIPs,
`The IPs to exempt from rate limiting (SERVER_RATE_EXEMPTED_IPS).`,
&serverOptions.AccessControl.AnuraAddresses, "server-anura-addresses", serverOptions.AccessControl.AnuraAddresses,
`The Anura wallet addresses that are allowed to access anura endpoints (ANURA_ADDRESSES).`,
)
}

Expand All @@ -95,12 +94,5 @@ func CheckServerOptions(options http.ServerOptions, storeType string) error {
if options.AccessControl.ValidationTokenKid == "" {
return fmt.Errorf("SERVER_VALIDATION_TOKEN_KID is required")
}
if len(options.RateLimiter.ExemptedIPs) > 0 {
for _, ip := range options.RateLimiter.ExemptedIPs {
if net.ParseIP(ip) == nil {
return fmt.Errorf("invalid IP address: %s", ip)
}
}
}
return nil
}
89 changes: 89 additions & 0 deletions pkg/solver/anuraLimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//go:build integration && solver

package solver_test

import (
"context"
"fmt"
"net/http"
"testing"
"github.com/ethereum/go-ethereum/crypto"
"github.com/hashicorp/go-retryablehttp"
httputil "github.com/lilypad-tech/lilypad/pkg/http"
)

type anuraTestCase struct {
name string
privateKey string
expectedOK bool
}

// This test suite tests Anura authentication with valid and invalid signatures
func TestAnuraAuth(t *testing.T) {
paths := []string{
"/api/v1/anura/job_offers",
}

validKey := "b3994e7660abe5f65f729bb64163c6cd6b7d0b1a8c67881a7346e3e8c7f026f5"
invalidKey := "1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"

t.Run("valid anura signature passes", func(t *testing.T) {
tc := anuraTestCase{
name: "valid signature should pass",
privateKey: validKey,
expectedOK: true,
}
runAnuraTest(t, paths, tc)
})

t.Run("invalid anura signature fails", func(t *testing.T) {
tc := anuraTestCase{
name: "invalid signature should fail",
privateKey: invalidKey,
expectedOK: false,
}
runAnuraTest(t, paths, tc)
})
}

func runAnuraTest(t *testing.T, paths []string, tc anuraTestCase) {
for _, path := range paths {
client := retryablehttp.NewClient()
client.RetryMax = 0
client.Logger = nil
client.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
return false, nil
}

req, err := retryablehttp.NewRequest("GET", fmt.Sprintf("http://localhost:%d%s", 8081, path), nil)
if err != nil {
t.Errorf("Failed to create request: %s\n", err)
return
}

privateKey, err := crypto.HexToECDSA(tc.privateKey)
if err != nil {
t.Errorf("Failed to parse private key: %s\n", err)
return
}

err = httputil.AddAnuraHeaders(req, privateKey, crypto.PubkeyToAddress(privateKey.PublicKey).String())
if err != nil {
t.Errorf("Failed to add Anura headers: %s\n", err)
return
}

res, err := client.Do(req)
if err != nil {
t.Errorf("Request failed on %s: %s\n", path, err)
return
}

if tc.expectedOK && res.StatusCode != http.StatusOK {
t.Errorf("%s: Expected status code 200, got %d", path, res.StatusCode)
}
if !tc.expectedOK && res.StatusCode != http.StatusUnauthorized {
t.Errorf("%s: Expected status code 401, got %d", path, res.StatusCode)
}
}
}
38 changes: 6 additions & 32 deletions pkg/solver/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ type rateTestCase struct {
}

// This test suite sends 200 requests to three different paths. We send the
// requests in rate limited and exempt test groups. The rate limited group
// should allow 5/100 requests through and the exempt group should allow 100/100.
// requests in rate limited groups. The rate limited group should allow 5/100
// requests through.
//
// We assume the solver uses the default rate limiting settings with
// a request limit of 5 and window length of 10 seconds. In addition, the solver
// should be configured to exempt localhost.
// a request limit of 5 and window length of 10 seconds.
func TestRateLimiter(t *testing.T) {
paths := []string{
"/api/v1/resource_offers",
Expand All @@ -40,26 +39,15 @@ func TestRateLimiter(t *testing.T) {

// The solver should rate limit when forwarded
// headers are set to 1.2.3.4.
nonExemptHeaders := []map[string]string{
forwardedHeaders := []map[string]string{
{"True-Client-IP": "1.2.3.4"},
{"X-Real-IP": "1.2.3.4"},
{"X-Forwarded-For": "1.2.3.4"},
}

// The running solver is configured to exempt localhost.
// When no headers are set, test using the IP address from
// the underlying connection (also localhost)
// TODO: re-enable exempt IP rate limiting
// exemptHeaders := []map[string]string{
// {"True-Client-IP": "127.0.0.1"},
// {"X-Real-IP": "127.0.0.1"},
// {"X-Forwarded-For": "127.0.0.1"},
// {}, // No headers case - uses RemoteAddr
// }

t.Run("non-exempt IP is rate limited", func(t *testing.T) {
t.Run("requests are rate limited", func(t *testing.T) {
// Select a random header on each test run. Over time we test them all.
headers := nonExemptHeaders[rand.Intn(len(nonExemptHeaders))]
headers := forwardedHeaders[rand.Intn(len(forwardedHeaders))]
tc := rateTestCase{
name: fmt.Sprintf("rate limited with headers %v", headers),
headers: headers,
Expand All @@ -68,26 +56,12 @@ func TestRateLimiter(t *testing.T) {
}
runRateLimitTest(t, paths, tc)
})

// TODO: re-enable exempt IP rate limiting
// t.Run("exempt IP is not rate limited", func(t *testing.T) {
// // Select a random header on each test run. Over time we test them all.
// headers := exemptHeaders[rand.Intn(len(exemptHeaders))]
// tc := rateTestCase{
// name: fmt.Sprintf("exempt with headers %v", headers),
// headers: headers,
// expectedOK: 100,
// expectedLimit: 0,
// }
// runRateLimitTest(t, paths, tc)
// })
}

func runRateLimitTest(t *testing.T, paths []string, tc rateTestCase) {
var wg sync.WaitGroup
ch := make(chan rateResult, len(paths))

// Run the calls against paths in parallel
for _, path := range paths {
wg.Add(1)
go func(path string) {
Expand Down
Loading

0 comments on commit 8e996a7

Please sign in to comment.