Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix oversized response handling #33

Merged
merged 4 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/localstack/custom_interop.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/go-chi/chi"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -117,8 +118,8 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
timeout := int(server.delegate.GetInvokeTimeout().Seconds())
isErr := false
if err != nil {
switch err {
case rapidcore.ErrInvokeTimeout:
switch {
case errors.Is(err, rapidcore.ErrInvokeTimeout):
dominikschubert marked this conversation as resolved.
Show resolved Hide resolved
log.Debugf("Got invoke timeout")
isErr = true
errorResponse := ErrorResponse{
Expand All @@ -137,6 +138,8 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
if err != nil {
log.Fatalln("unable to write to response")
}
case errors.Is(err, rapidcore.ErrInvokeDoneFailed):
// we can actually just continue here, error message is sent below
default:
log.Fatalln(err)
}
Expand Down
11 changes: 11 additions & 0 deletions cmd/localstack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main
import (
"context"
log "github.com/sirupsen/logrus"
"go.amzn.com/lambda/interop"
"go.amzn.com/lambda/rapidcore"
"os"
"runtime/debug"
Expand All @@ -28,6 +29,7 @@ type LsOpts struct {
EdgePort string
EnableXRayTelemetry string
PostInvokeWaitMS string
MaxPayloadSize string
}

func GetEnvOrDie(env string) string {
Expand All @@ -50,6 +52,7 @@ func InitLsOpts() *LsOpts {
User: GetenvWithDefault("LOCALSTACK_USER", "sbx_user1051"),
InitLogLevel: GetenvWithDefault("LOCALSTACK_INIT_LOG_LEVEL", "warn"),
EdgePort: GetenvWithDefault("EDGE_PORT", "4566"),
MaxPayloadSize: GetenvWithDefault("LOCALSTACK_MAX_PAYLOAD_SIZE", "6291556"),
dominikschubert marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally no unit _BYTES vs. LAMBDA_LIMITS_MAX_FUNCTION_PAYLOAD_SIZE_BYTES in LocalStack
guess to match with interop.MaxPayloadSize

(LOCALSTACK_POST_INVOKE_WAIT_MS also has a unit)

// optional or empty
CodeArchives: os.Getenv("LOCALSTACK_CODE_ARCHIVES"),
HotReloadingPaths: strings.Split(GetenvWithDefault("LOCALSTACK_HOT_RELOADING_PATHS", ""), ","),
Expand Down Expand Up @@ -77,6 +80,7 @@ func UnsetLsEnvs() {
"LOCALSTACK_INIT_LOG_LEVEL",
"LOCALSTACK_POST_INVOKE_WAIT_MS",
"LOCALSTACK_FUNCTION_ACCOUNT_ID",
"LOCALSTACK_MAX_PAYLOAD_SIZE",

// Docker container ID
"HOSTNAME",
Expand Down Expand Up @@ -128,6 +132,13 @@ func main() {
log.Fatal("Invalid value for LOCALSTACK_INIT_LOG_LEVEL")
}

// patch MaxPayloadSize
payloadSize, err := strconv.Atoi(lsOpts.MaxPayloadSize)
if err != nil {
log.Panicln("Please specify a number for LOCALSTACK_MAX_PAYLOAD_SIZE")
}
interop.MaxPayloadSize = payloadSize

// enable dns server
dnsServerContext, stopDnsServer := context.WithCancel(context.Background())
go RunDNSRewriter(lsOpts, dnsServerContext)
Expand Down
2 changes: 1 addition & 1 deletion debugging/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Golang EOL overview: https://endoflife.date/go
DOCKER_GOLANG_IMAGE ?= golang:1.19
DOCKER_GOLANG_IMAGE ?= golang:1.21.0-bullseye
dominikschubert marked this conversation as resolved.
Show resolved Hide resolved

# On ARM hosts, use: make ARCH=arm64 build-init
# Check host architecture: uname -m
Expand Down
5 changes: 3 additions & 2 deletions lambda/core/directinvoke/directinvoke.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
// LOCALSTACK CHANGES 2024-02-13: casting of MaxPayloadSize

package directinvoke

Expand Down Expand Up @@ -51,7 +52,7 @@ var ResetReasonMap = map[string]fatalerror.ErrorType{
"timeout": fatalerror.SandboxTimeout,
}

var MaxDirectResponseSize int64 = interop.MaxPayloadSize // this is intentionally not a constant so we can configure it via CLI
var MaxDirectResponseSize = int64(interop.MaxPayloadSize) // this is intentionally not a constant so we can configure it via CLI
dominikschubert marked this conversation as resolved.
Show resolved Hide resolved
var ResponseBandwidthRate int64 = interop.ResponseBandwidthRate
var ResponseBandwidthBurstSize int64 = interop.ResponseBandwidthBurstSize

Expand Down Expand Up @@ -104,7 +105,7 @@ func ReceiveDirectInvoke(w http.ResponseWriter, r *http.Request, token interop.T

now := metering.Monotime()

MaxDirectResponseSize = interop.MaxPayloadSize
MaxDirectResponseSize = int64(interop.MaxPayloadSize)
if maxPayloadSize := r.Header.Get(MaxPayloadSizeHeader); maxPayloadSize != "" {
if n, err := strconv.ParseInt(maxPayloadSize, 10, 64); err == nil && n >= -1 {
MaxDirectResponseSize = n
Expand Down
7 changes: 4 additions & 3 deletions lambda/interop/model.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
// LOCALSTACK CHANGES 2024-02-13: adjust error message for ErrorResponseTooLarge to be in parity with what AWS returns; make MaxPayloadSize adjustable

package interop

Expand All @@ -18,10 +19,10 @@ import (
log "github.com/sirupsen/logrus"
)

var MaxPayloadSize int = 6*1024*1024 + 100 // 6 MiB + 100 bytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

annoying that we cannot use a single type due to mixed usages 😢 (server.go requires an int; not int64)


// MaxPayloadSize max event body size declared as LAMBDA_EVENT_BODY_SIZE
const (
MaxPayloadSize = 6*1024*1024 + 100 // 6 MiB + 100 bytes

ResponseBandwidthRate = 2 * 1024 * 1024 // default average rate of 2 MiB/s
ResponseBandwidthBurstSize = 6 * 1024 * 1024 // default burst size of 6 MiB

Expand Down Expand Up @@ -355,7 +356,7 @@ type ErrorResponseTooLargeDI struct {

// ErrorResponseTooLarge is returned when response provided by Runtime does not fit into shared memory buffer
func (s *ErrorResponseTooLarge) Error() string {
return fmt.Sprintf("Response payload size (%d bytes) exceeded maximum allowed payload size (%d bytes).", s.ResponseSize, s.MaxResponseSize)
return fmt.Sprintf("Response payload size exceeded maximum allowed payload size (%d bytes).", s.MaxResponseSize)
}

// AsErrorResponse generates ErrorInvokeResponse from ErrorResponseTooLarge
Expand Down
3 changes: 2 additions & 1 deletion lambda/rapi/rendering/rendering.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
// LOCALSTACK CHANGES 2024-02-13: casting of MaxPayloadSize

package rendering

Expand Down Expand Up @@ -174,7 +175,7 @@ func (s *InvokeRenderer) bufferInvokeRequest() error {
defer s.requestMutex.Unlock()
var err error = nil
if s.requestBuffer.Len() == 0 {
reader := io.LimitReader(s.invoke.Payload, interop.MaxPayloadSize)
reader := io.LimitReader(s.invoke.Payload, int64(interop.MaxPayloadSize))
start := time.Now()
_, err = s.requestBuffer.ReadFrom(reader)
s.metrics = InvokeRendererMetrics{
Expand Down