From 381f812ab8ce5617ac6a0119b877a06aae319bde Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Mon, 11 Dec 2023 14:01:31 -0500 Subject: [PATCH 01/20] create a Lambda span on timeouts --- pkg/serverless/appsec/httpsec/proxy.go | 5 + pkg/serverless/daemon/daemon.go | 11 ++ pkg/serverless/daemon/routes.go | 2 + pkg/serverless/daemon/routes_test.go | 44 +++++++ .../invocation_processor.go | 2 + .../invocationlifecycle/lifecycle.go | 61 ++++++---- .../invocationlifecycle/lifecycle_test.go | 103 ++++++++++++++++ pkg/serverless/invocationlifecycle/trace.go | 111 +++++++++++++----- pkg/serverless/proxy/proxy_test.go | 12 ++ pkg/serverless/serverless.go | 13 ++ 10 files changed, 316 insertions(+), 48 deletions(-) diff --git a/pkg/serverless/appsec/httpsec/proxy.go b/pkg/serverless/appsec/httpsec/proxy.go index 57812c6289778d..3e72a6a696bdb6 100644 --- a/pkg/serverless/appsec/httpsec/proxy.go +++ b/pkg/serverless/appsec/httpsec/proxy.go @@ -46,6 +46,11 @@ func (lp *ProxyLifecycleProcessor) GetExecutionInfo() *invocationlifecycle.Execu return nil // not used in the runtime api proxy case } +// OnTimeoutInvokeEnd completes an unfinished execution span during a timeout +func (lp *ProxyLifecycleProcessor) OnTimeoutInvokeEnd(timeoutCtx *invocationlifecycle.TimeoutExecutionInfo) { + // not used in the runtime api proxy case +} + // OnInvokeStart is the hook triggered when an invocation has started func (lp *ProxyLifecycleProcessor) OnInvokeStart(startDetails *invocationlifecycle.InvocationStartDetails) { log.Debugf("appsec: proxy-lifecycle: invocation started with raw payload `%s`", startDetails.InvokeEventRawPayload) diff --git a/pkg/serverless/daemon/daemon.go b/pkg/serverless/daemon/daemon.go index 575fec745937d7..84327ab11560a4 100644 --- a/pkg/serverless/daemon/daemon.go +++ b/pkg/serverless/daemon/daemon.go @@ -66,6 +66,12 @@ type Daemon struct { // LambdaLibraryDetected represents whether the Datadog Lambda Library was detected in the environment LambdaLibraryDetected bool + // hitOnStart indicates whether the the serverless.StartInvocation route has been hit + hitOnStart bool + + // hitOnEnd indicates whether the the serverless.EndInvocation route has been hit + hitOnEnd bool + // runtimeStateMutex is used to ensure that modifying the state of the runtime is thread-safe runtimeStateMutex sync.Mutex @@ -435,3 +441,8 @@ func (d *Daemon) setTraceTags(tagMap map[string]string) bool { } return false } + +// IsExecutionSpanComplete checks if the execution span was finished during a timeout +func (d *Daemon) IsExecutionSpanComplete() bool { + return !d.LambdaLibraryDetected && d.hitOnStart && d.hitOnEnd +} diff --git a/pkg/serverless/daemon/routes.go b/pkg/serverless/daemon/routes.go index 1b2379d8e18222..f7df550633e25a 100644 --- a/pkg/serverless/daemon/routes.go +++ b/pkg/serverless/daemon/routes.go @@ -53,6 +53,7 @@ type StartInvocation struct { func (s *StartInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.StartInvocation route.") + s.daemon.hitOnStart = true startTime := time.Now() reqBody, err := io.ReadAll(r.Body) if err != nil { @@ -86,6 +87,7 @@ type EndInvocation struct { func (e *EndInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.EndInvocation route.") + e.daemon.hitOnEnd = true endTime := time.Now() ecs := e.daemon.ExecutionContext.GetCurrentState() coldStartTags := e.daemon.ExecutionContext.GetColdStartTagsForRequestID(ecs.LastRequestID) diff --git a/pkg/serverless/daemon/routes_test.go b/pkg/serverless/daemon/routes_test.go index ca7a54ea623576..31a328db73e6e5 100644 --- a/pkg/serverless/daemon/routes_test.go +++ b/pkg/serverless/daemon/routes_test.go @@ -48,6 +48,12 @@ func (m *mockLifecycleProcessor) OnInvokeEnd(endDetails *invocationlifecycle.Inv m.lastEndDetails = endDetails } +func (m *mockLifecycleProcessor) OnTimeoutInvokeEnd(timeoutCtx *invocationlifecycle.TimeoutExecutionInfo) { + m.OnInvokeEndCalled = false + m.isError = true + m.lastEndDetails = nil +} + func TestStartInvocation(t *testing.T) { assert := assert.New(t) port := testutil.FreeTCPPort(t) @@ -127,6 +133,44 @@ func TestEndInvocationWithError(t *testing.T) { assert.True(m.isError) } +func TestTimeoutInvocation(t *testing.T) { + assert := assert.New(t) + port := testutil.FreeTCPPort(t) + d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port)) + time.Sleep(100 * time.Millisecond) + defer d.Stop() + + m := &mockLifecycleProcessor{} + d.InvocationProcessor = m + client := &http.Client{Timeout: 1 * time.Second} + startURL := fmt.Sprintf("http://127.0.0.1:%d/lambda/start-invocation", port) + + body := bytes.NewBuffer([]byte(`{}`)) + startReq, err := http.NewRequest(http.MethodPost, startURL, body) + assert.Nil(err) + res, err := client.Do(startReq) + assert.Nil(err) + if res != nil { + assert.Equal(res.StatusCode, 200) + res.Body.Close() + } + + d.InvocationProcessor.OnTimeoutInvokeEnd( + &invocationlifecycle.TimeoutExecutionInfo{ + RequestId: "123abc", + Runtime: "custom", + IsColdStart: true, + IsProactiveInit: true, + }) + + assert.True(m.OnInvokeStartCalled) + assert.True(d.hitOnStart) + assert.False(m.OnInvokeEndCalled) + assert.False(d.hitOnEnd) + assert.True(m.isError) + assert.Nil(m.lastEndDetails) +} + func TestTraceContext(t *testing.T) { assert := assert.New(t) diff --git a/pkg/serverless/invocationlifecycle/invocation_processor.go b/pkg/serverless/invocationlifecycle/invocation_processor.go index 181344131a8014..a4bc1a223d6ac1 100644 --- a/pkg/serverless/invocationlifecycle/invocation_processor.go +++ b/pkg/serverless/invocationlifecycle/invocation_processor.go @@ -13,6 +13,8 @@ type InvocationProcessor interface { OnInvokeEnd(endDetails *InvocationEndDetails) // GetExecutionInfo returns the current execution start information GetExecutionInfo() *ExecutionStartInfo + // OnTimeoutInvokeEnd completes an unfinished execution span during a timeout + OnTimeoutInvokeEnd(timeoutCtx *TimeoutExecutionInfo) } // InvocationSubProcessor is the interface to implement to receive invocation lifecycle hooks along with the diff --git a/pkg/serverless/invocationlifecycle/lifecycle.go b/pkg/serverless/invocationlifecycle/lifecycle.go index 8a2290cc5b9dcc..d47dacc72e149c 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle.go +++ b/pkg/serverless/invocationlifecycle/lifecycle.go @@ -281,27 +281,7 @@ func (lp *LifecycleProcessor) OnInvokeEnd(endDetails *InvocationEndDetails) { spans = append(spans, span) if lp.InferredSpansEnabled { - log.Debug("[lifecycle] Attempting to complete the inferred span") - log.Debugf("[lifecycle] Inferred span context: %+v", lp.GetInferredSpan().Span) - if lp.GetInferredSpan().Span.Start != 0 { - span0, span1 := lp.requestHandler.inferredSpans[0], lp.requestHandler.inferredSpans[1] - if span1 != nil { - log.Debug("[lifecycle] Completing a secondary inferred span") - lp.setParentIDForMultipleInferredSpans() - span1.AddTagToInferredSpan("http.status_code", statusCode) - span1.AddTagToInferredSpan("peer.service", lp.GetServiceName()) - span := lp.completeInferredSpan(span1, lp.getInferredSpanStart(), endDetails.IsError) - spans = append(spans, span) - log.Debug("[lifecycle] The secondary inferred span attributes are %v", lp.requestHandler.inferredSpans[1]) - } - span0.AddTagToInferredSpan("http.status_code", statusCode) - span0.AddTagToInferredSpan("peer.service", lp.GetServiceName()) - span := lp.completeInferredSpan(span0, endDetails.EndTime, endDetails.IsError) - spans = append(spans, span) - log.Debugf("[lifecycle] The inferred span attributes are: %v", lp.GetInferredSpan()) - } else { - log.Debug("[lifecyle] Failed to complete inferred span due to a missing start time. Please check that the event payload was received with the appropriate data") - } + spans = lp.endInferredSpan(spans, statusCode, endDetails.EndTime, endDetails.IsError) } lp.processTrace(spans) } @@ -313,6 +293,19 @@ func (lp *LifecycleProcessor) OnInvokeEnd(endDetails *InvocationEndDetails) { } } +// OnTimeoutInvokeEnd completes an unfinished execution span during a timeout +func (lp *LifecycleProcessor) OnTimeoutInvokeEnd(timeoutContext *TimeoutExecutionInfo) { + spans := make([]*pb.Span, 0, 3) + span := lp.endExecutionSpanOnTimeout(timeoutContext) + spans = append(spans, span) + + if lp.InferredSpansEnabled { + // No response status code can be retrieved in a timeout so we pass an empty string + spans = lp.endInferredSpan(spans, "", time.Now(), true) + } + lp.processTrace(spans) +} + // GetTags returns the tagset of the currently executing lambda function func (lp *LifecycleProcessor) GetTags() map[string]string { return lp.requestHandler.triggerTags @@ -385,3 +378,29 @@ func (lp *LifecycleProcessor) setParentIDForMultipleInferredSpans() { lp.requestHandler.inferredSpans[1].Span.ParentID = lp.requestHandler.inferredSpans[0].Span.ParentID lp.requestHandler.inferredSpans[0].Span.ParentID = lp.requestHandler.inferredSpans[1].Span.SpanID } + +// endInferredSpan attempts to complete any inferred spans and send them to intake +func (lp *LifecycleProcessor) endInferredSpan(spans []*pb.Span, statusCode string, endTime time.Time, isError bool) []*pb.Span { + log.Debug("[lifecycle] Attempting to complete the inferred span") + log.Debugf("[lifecycle] Inferred span context: %+v", lp.GetInferredSpan().Span) + if lp.GetInferredSpan().Span.Start != 0 { + span0, span1 := lp.requestHandler.inferredSpans[0], lp.requestHandler.inferredSpans[1] + if span1 != nil { + log.Debug("[lifecycle] Completing a secondary inferred span") + lp.setParentIDForMultipleInferredSpans() + span1.AddTagToInferredSpan("http.status_code", statusCode) + span1.AddTagToInferredSpan("peer.service", lp.GetServiceName()) + span := lp.completeInferredSpan(span1, lp.getInferredSpanStart(), isError) + spans = append(spans, span) + log.Debug("[lifecycle] The secondary inferred span attributes are %v", lp.requestHandler.inferredSpans[1]) + } + span0.AddTagToInferredSpan("http.status_code", statusCode) + span0.AddTagToInferredSpan("peer.service", lp.GetServiceName()) + span := lp.completeInferredSpan(span0, endTime, isError) + spans = append(spans, span) + log.Debugf("[lifecycle] The inferred span attributes are: %v", lp.GetInferredSpan()) + } else { + log.Debug("[lifecyle] Failed to complete inferred span due to a missing start time. Please check that the event payload was received with the appropriate data") + } + return spans +} diff --git a/pkg/serverless/invocationlifecycle/lifecycle_test.go b/pkg/serverless/invocationlifecycle/lifecycle_test.go index 2bdb8cd48ca475..876d5c7dc04e1c 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle_test.go +++ b/pkg/serverless/invocationlifecycle/lifecycle_test.go @@ -385,6 +385,109 @@ func TestCompleteInferredSpanWithOutStartTime(t *testing.T) { completedInferredSpan := tracePayload.TracerPayload.Chunks[0].Spans[0] assert.Equal(t, startInvocationTime.UnixNano(), completedInferredSpan.Start) } + +func TestTimeoutExecutionSpan(t *testing.T) { + t.Setenv(functionNameEnvVar, "my-function") + t.Setenv("DD_SERVICE", "mock-lambda-service") + + extraTags := &logs.Tags{ + Tags: []string{"functionname:test-function"}, + } + log := fxutil.Test[log.Component](t, logimpl.MockModule) + demux := aggregator.InitTestAgentDemultiplexerWithFlushInterval(log, time.Hour) + mockDetectLambdaLibrary := func() bool { return false } + + var tracePayload *api.Payload + mockProcessTrace := func(payload *api.Payload) { + tracePayload = payload + } + + testProcessor := LifecycleProcessor{ + ExtraTags: extraTags, + ProcessTrace: mockProcessTrace, + DetectLambdaLibrary: mockDetectLambdaLibrary, + Demux: demux, + InferredSpansEnabled: true, + } + startDetails := InvocationStartDetails{ + StartTime: time.Now(), + InvokeEventRawPayload: []byte(`{}`), + InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function", + } + testProcessor.OnInvokeStart(&startDetails) + + timeoutCtx := &TimeoutExecutionInfo{ + RequestId: "test-request-id", + Runtime: "java11", + IsColdStart: false, + IsProactiveInit: false, + } + testProcessor.OnTimeoutInvokeEnd(timeoutCtx) + + spans := tracePayload.TracerPayload.Chunks[0].Spans + assert.Equal(t, 1, len(spans)) + // No trace context passed + assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().TraceID) + assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().SpanID) + assert.Equal(t, tracePayload.TracerPayload.Chunks[0].Priority, int32(-128)) + // New trace ID and span ID has been created + assert.NotEqual(t, uint64(0), spans[0].TraceID) + assert.NotEqual(t, uint64(0), spans[0].SpanID) + assert.Equal(t, spans[0].Error, int32(1)) + assert.Equal(t, spans[0].GetMeta()["request_id"], "test-request-id") + assert.Equal(t, spans[0].GetMeta()["language"], "java") +} + +func TestTimeoutExecutionSpanWithTraceContext(t *testing.T) { + t.Setenv(functionNameEnvVar, "my-function") + t.Setenv("DD_SERVICE", "mock-lambda-service") + + extraTags := &logs.Tags{ + Tags: []string{"functionname:test-function"}, + } + log := fxutil.Test[log.Component](t, logimpl.MockModule) + demux := aggregator.InitTestAgentDemultiplexerWithFlushInterval(log, time.Hour) + mockDetectLambdaLibrary := func() bool { return false } + + var tracePayload *api.Payload + mockProcessTrace := func(payload *api.Payload) { + tracePayload = payload + } + + testProcessor := LifecycleProcessor{ + ExtraTags: extraTags, + ProcessTrace: mockProcessTrace, + DetectLambdaLibrary: mockDetectLambdaLibrary, + Demux: demux, + InferredSpansEnabled: true, + } + eventPayload := `a5a{"resource":"/users/create","path":"/users/create","httpMethod":"GET","headers":{"Accept":"*/*","Accept-Encoding":"gzip","x-datadog-parent-id":"1480558859903409531","x-datadog-sampling-priority":"1","x-datadog-trace-id":"5736943178450432258"}}0` + startDetails := InvocationStartDetails{ + StartTime: time.Now(), + InvokeEventRawPayload: []byte(eventPayload), + InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function", + } + testProcessor.OnInvokeStart(&startDetails) + + timeoutCtx := &TimeoutExecutionInfo{ + RequestId: "test-request-id", + Runtime: "java11", + IsColdStart: false, + IsProactiveInit: false, + } + testProcessor.OnTimeoutInvokeEnd(timeoutCtx) + + spans := tracePayload.TracerPayload.Chunks[0].Spans + assert.Equal(t, 1, len(spans)) + // Trace context received + assert.Equal(t, spans[0].GetTraceID(), testProcessor.GetExecutionInfo().TraceID) + assert.Equal(t, spans[0].GetParentID(), testProcessor.GetExecutionInfo().parentID) + assert.Equal(t, tracePayload.TracerPayload.Chunks[0].Priority, int32(testProcessor.GetExecutionInfo().SamplingPriority)) + assert.Equal(t, spans[0].Error, int32(1)) + assert.Equal(t, spans[0].GetMeta()["request_id"], "test-request-id") + assert.Equal(t, spans[0].GetMeta()["language"], "java") +} + func TestTriggerTypesLifecycleEventForAPIGatewayRest(t *testing.T) { startDetails := &InvocationStartDetails{ InvokeEventRawPayload: getEventFromFile("api-gateway.json"), diff --git a/pkg/serverless/invocationlifecycle/trace.go b/pkg/serverless/invocationlifecycle/trace.go index efd35e511e1840..c501ecec78af2d 100644 --- a/pkg/serverless/invocationlifecycle/trace.go +++ b/pkg/serverless/invocationlifecycle/trace.go @@ -17,6 +17,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/config" pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace" + "github.com/DataDog/datadog-agent/pkg/serverless/random" "github.com/DataDog/datadog-agent/pkg/serverless/trace/inferredspan" "github.com/DataDog/datadog-agent/pkg/trace/api" "github.com/DataDog/datadog-agent/pkg/trace/info" @@ -40,6 +41,14 @@ type ExecutionStartInfo struct { SamplingPriority sampler.SamplingPriority } +// TimeoutExecutionInfo is the information needed to complete an execution span during a timeout +type TimeoutExecutionInfo struct { + RequestId string + Runtime string + IsColdStart bool + IsProactiveInit bool +} + // startExecutionSpan records information from the start of the invocation. // It should be called at the start of the invocation. func (lp *LifecycleProcessor) startExecutionSpan(event interface{}, rawPayload []byte, startDetails *InvocationStartDetails) { @@ -90,33 +99,8 @@ func (lp *LifecycleProcessor) endExecutionSpan(endDetails *InvocationEndDetails) Meta: lp.requestHandler.triggerTags, Metrics: lp.requestHandler.triggerMetrics, } - executionSpan.Meta["request_id"] = endDetails.RequestID - executionSpan.Meta["cold_start"] = fmt.Sprintf("%t", endDetails.ColdStart) - if endDetails.ProactiveInit { - executionSpan.Meta["proactive_initialization"] = fmt.Sprintf("%t", endDetails.ProactiveInit) - } - langMatches := runtimeRegex.FindStringSubmatch(endDetails.Runtime) - if len(langMatches) >= 2 { - executionSpan.Meta["language"] = langMatches[1] - } - captureLambdaPayloadEnabled := config.Datadog.GetBool("capture_lambda_payload") - if captureLambdaPayloadEnabled { - capturePayloadMaxDepth := config.Datadog.GetInt("capture_lambda_payload_max_depth") - requestPayloadJSON := make(map[string]interface{}) - if err := json.Unmarshal(executionContext.requestPayload, &requestPayloadJSON); err != nil { - log.Debugf("[lifecycle] Failed to parse request payload: %v", err) - executionSpan.Meta["function.request"] = string(executionContext.requestPayload) - } else { - capturePayloadAsTags(requestPayloadJSON, executionSpan, "function.request", 0, capturePayloadMaxDepth) - } - responsePayloadJSON := make(map[string]interface{}) - if err := json.Unmarshal(endDetails.ResponseRawPayload, &responsePayloadJSON); err != nil { - log.Debugf("[lifecycle] Failed to parse response payload: %v", err) - executionSpan.Meta["function.response"] = string(endDetails.ResponseRawPayload) - } else { - capturePayloadAsTags(responsePayloadJSON, executionSpan, "function.response", 0, capturePayloadMaxDepth) - } - } + setExecutionSpanTags(executionSpan, endDetails.RequestID, endDetails.ColdStart, endDetails.ProactiveInit, endDetails.Runtime) + captureLambdaPayload(executionContext, executionSpan, endDetails.ResponseRawPayload) if endDetails.IsError { executionSpan.Error = 1 @@ -134,6 +118,46 @@ func (lp *LifecycleProcessor) endExecutionSpan(endDetails *InvocationEndDetails) return executionSpan } +// endExecutionSpanOnTimeout attempts to finish the execution span during a timeout. +// It should only be called when the execution span has been started but not finished during a timeout +func (lp *LifecycleProcessor) endExecutionSpanOnTimeout(timeoutCtx *TimeoutExecutionInfo) *pb.Span { + executionContext := lp.GetExecutionInfo() + start := executionContext.startTime.UnixNano() + duration := time.Now().UnixNano() - start + + // In a timeout we do not receive the trace and span IDs from the tracer so we must do it here + traceId := executionContext.TraceID + if traceId == 0 { + traceId = random.Random.Uint64() + } + spanId := executionContext.SpanID + if spanId == 0 { + spanId = random.Random.Uint64() + } + + executionSpan := &pb.Span{ + Service: "aws.lambda", // will be replaced by the span processor + Name: "aws.lambda", + Resource: os.Getenv(functionNameEnvVar), + Type: "serverless", + TraceID: traceId, + SpanID: spanId, + ParentID: executionContext.parentID, + Start: start, + Duration: duration, + Meta: lp.requestHandler.triggerTags, + Metrics: lp.requestHandler.triggerMetrics, + } + setExecutionSpanTags(executionSpan, timeoutCtx.RequestId, timeoutCtx.IsColdStart, timeoutCtx.IsProactiveInit, timeoutCtx.Runtime) + // In a timeout the tracer is unable to send the response payload so it must be excluded + captureLambdaPayload(executionContext, executionSpan, []byte{}) + + // Always mark it as an error since this is a timeout span + executionSpan.Error = 1 + + return executionSpan +} + // completeInferredSpan finishes the inferred span and passes it // as an API payload to be processed by the trace agent func (lp *LifecycleProcessor) completeInferredSpan(inferredSpan *inferredspan.InferredSpan, endTime time.Time, isError bool) *pb.Span { @@ -216,6 +240,27 @@ func InjectSpanID(executionContext *ExecutionStartInfo, headers http.Header) { } } +func captureLambdaPayload(executionContext *ExecutionStartInfo, executionSpan *pb.Span, responsePayload []byte) { + if captureLambdaPayloadEnabled := config.Datadog.GetBool("capture_lambda_payload"); !captureLambdaPayloadEnabled { + return + } + capturePayloadMaxDepth := config.Datadog.GetInt("capture_lambda_payload_max_depth") + requestPayloadJSON := make(map[string]interface{}) + if err := json.Unmarshal(executionContext.requestPayload, &requestPayloadJSON); err != nil { + log.Debugf("[lifecycle] Failed to parse request payload: %v", err) + executionSpan.Meta["function.request"] = string(executionContext.requestPayload) + } else { + capturePayloadAsTags(requestPayloadJSON, executionSpan, "function.request", 0, capturePayloadMaxDepth) + } + responsePayloadJSON := make(map[string]interface{}) + if err := json.Unmarshal(responsePayload, &responsePayloadJSON); err != nil { + log.Debugf("[lifecycle] Failed to parse response payload: %v", err) + executionSpan.Meta["function.response"] = string(responsePayload) + } else { + capturePayloadAsTags(responsePayloadJSON, executionSpan, "function.response", 0, capturePayloadMaxDepth) + } +} + func capturePayloadAsTags(value interface{}, targetSpan *pb.Span, key string, depth int, maxDepth int) { if key == "" { return @@ -278,3 +323,15 @@ func convertJSONToString(payloadJSON interface{}) string { } return string(jsonData) } + +func setExecutionSpanTags(executionSpan *pb.Span, requestId string, isColdStart bool, isProactiveInit bool, runtime string) { + executionSpan.Meta["request_id"] = requestId + executionSpan.Meta["cold_start"] = fmt.Sprintf("%t", isColdStart) + if isProactiveInit { + executionSpan.Meta["proactive_initialization"] = fmt.Sprintf("%t", isProactiveInit) + } + langMatches := runtimeRegex.FindStringSubmatch(runtime) + if len(langMatches) >= 2 { + executionSpan.Meta["language"] = langMatches[1] + } +} diff --git a/pkg/serverless/proxy/proxy_test.go b/pkg/serverless/proxy/proxy_test.go index c8169a67d5b6c0..9b689fd9b768c9 100644 --- a/pkg/serverless/proxy/proxy_test.go +++ b/pkg/serverless/proxy/proxy_test.go @@ -44,6 +44,12 @@ func (tp *testProcessorResponseValid) OnInvokeEnd(endDetails *invocationlifecycl } } +func (tp *testProcessorResponseValid) OnTimeoutInvokeEnd(timeoutContext *invocationlifecycle.TimeoutExecutionInfo) { + if len(timeoutContext.RequestId) == 0 { + panic("requestId") + } +} + type testProcessorResponseError struct{} func (tp *testProcessorResponseError) OnInvokeStart(startDetails *invocationlifecycle.InvocationStartDetails) { @@ -65,6 +71,12 @@ func (tp *testProcessorResponseError) GetExecutionInfo() *invocationlifecycle.Ex return nil } +func (tp *testProcessorResponseError) OnTimeoutInvokeEnd(timeoutContext *invocationlifecycle.TimeoutExecutionInfo) { + if len(timeoutContext.RequestId) == 0 { + panic("requestId") + } +} + func TestProxyResponseValid(t *testing.T) { // fake the runtime API running on 5001 l, err := net.Listen("tcp", "127.0.0.1:5001") diff --git a/pkg/serverless/serverless.go b/pkg/serverless/serverless.go index f98c34522dd8e8..5a944c8b238d60 100644 --- a/pkg/serverless/serverless.go +++ b/pkg/serverless/serverless.go @@ -17,6 +17,7 @@ import ( "github.com/DataDog/datadog-agent/pkg/serverless/daemon" "github.com/DataDog/datadog-agent/pkg/serverless/flush" + "github.com/DataDog/datadog-agent/pkg/serverless/invocationlifecycle" "github.com/DataDog/datadog-agent/pkg/serverless/metrics" "github.com/DataDog/datadog-agent/pkg/serverless/registration" "github.com/DataDog/datadog-agent/pkg/serverless/tags" @@ -138,6 +139,18 @@ func WaitForNextInvocation(stopCh chan struct{}, daemon *daemon.Daemon, id regis metricTags = tags.AddInitTypeTag(metricTags) metrics.SendTimeoutEnhancedMetric(metricTags, daemon.MetricAgent.Demux) metrics.SendErrorsEnhancedMetric(metricTags, time.Now(), daemon.MetricAgent.Demux) + + if ok := daemon.IsExecutionSpanComplete(); !ok { + ecs := daemon.ExecutionContext.GetCurrentState() + log.Debug("No hit on serverless.EndInvocation route. Attempting to finish Lambda execution span.") + timeoutContext := &invocationlifecycle.TimeoutExecutionInfo{ + RequestId: ecs.LastRequestID, + Runtime: ecs.Runtime, + IsColdStart: coldStartTags.IsColdStart, + IsProactiveInit: coldStartTags.IsProactiveInit, + } + daemon.InvocationProcessor.OnTimeoutInvokeEnd(timeoutContext) + } } err := daemon.ExecutionContext.SaveCurrentExecutionContext() if err != nil { From 831dab7d87984329c078d8c916af65eddfb8fd67 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Mon, 11 Dec 2023 14:03:14 -0500 Subject: [PATCH 02/20] don't create a cold start span when the runtime restarts during timeouts --- cmd/serverless/main.go | 1 + pkg/serverless/executioncontext/executioncontext.go | 1 + pkg/serverless/trace/cold_start_span_creator.go | 5 +++-- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/cmd/serverless/main.go b/cmd/serverless/main.go index a280a0b6832298..90ad5d0a20b104 100644 --- a/cmd/serverless/main.go +++ b/cmd/serverless/main.go @@ -271,6 +271,7 @@ func runAgent() { TraceAgent: serverlessDaemon.TraceAgent, StopChan: make(chan struct{}), ColdStartSpanId: coldStartSpanId, + WasColdStart: serverlessDaemon.ExecutionContext.GetCurrentState().WasColdStart, } log.Debug("Starting ColdStartSpanCreator") diff --git a/pkg/serverless/executioncontext/executioncontext.go b/pkg/serverless/executioncontext/executioncontext.go index 1647ae1b749980..d35549905c45a6 100644 --- a/pkg/serverless/executioncontext/executioncontext.go +++ b/pkg/serverless/executioncontext/executioncontext.go @@ -218,6 +218,7 @@ func (ec *ExecutionContext) RestoreCurrentStateFromFile() error { ec.lastLogRequestID = restoredExecutionContextState.LastLogRequestID ec.lastOOMRequestID = restoredExecutionContextState.LastOOMRequestID ec.coldstartRequestID = restoredExecutionContextState.ColdstartRequestID + ec.wasColdStart = restoredExecutionContextState.WasColdStart ec.startTime = restoredExecutionContextState.StartTime ec.endTime = restoredExecutionContextState.EndTime return nil diff --git a/pkg/serverless/trace/cold_start_span_creator.go b/pkg/serverless/trace/cold_start_span_creator.go index ef46416d497d2a..c43d26dcfa40a1 100644 --- a/pkg/serverless/trace/cold_start_span_creator.go +++ b/pkg/serverless/trace/cold_start_span_creator.go @@ -46,6 +46,7 @@ type ColdStartSpanCreator struct { initDuration float64 StopChan chan struct{} initStartTime time.Time + WasColdStart bool } //nolint:revive // TODO(SERV) Fix revive linter @@ -114,8 +115,8 @@ func (c *ColdStartSpanCreator) createIfReady() { } func (c *ColdStartSpanCreator) create() { - // Prevent infinite loop from SpanModifier call - if c.lambdaSpan.Name == spanName { + // Prevent infinite loop from SpanModifier call and duplicates from timeout restarts + if c.lambdaSpan.Name == spanName || c.WasColdStart { return } From 2200b766e8591f771d132802084fda09ffbb140f Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Mon, 11 Dec 2023 14:53:05 -0500 Subject: [PATCH 03/20] fix linting --- pkg/serverless/appsec/httpsec/proxy.go | 2 +- pkg/serverless/daemon/routes_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/serverless/appsec/httpsec/proxy.go b/pkg/serverless/appsec/httpsec/proxy.go index 3e72a6a696bdb6..b72fbab811f23c 100644 --- a/pkg/serverless/appsec/httpsec/proxy.go +++ b/pkg/serverless/appsec/httpsec/proxy.go @@ -47,7 +47,7 @@ func (lp *ProxyLifecycleProcessor) GetExecutionInfo() *invocationlifecycle.Execu } // OnTimeoutInvokeEnd completes an unfinished execution span during a timeout -func (lp *ProxyLifecycleProcessor) OnTimeoutInvokeEnd(timeoutCtx *invocationlifecycle.TimeoutExecutionInfo) { +func (lp *ProxyLifecycleProcessor) OnTimeoutInvokeEnd(*invocationlifecycle.TimeoutExecutionInfo) { // not used in the runtime api proxy case } diff --git a/pkg/serverless/daemon/routes_test.go b/pkg/serverless/daemon/routes_test.go index 31a328db73e6e5..938ba3b32a824c 100644 --- a/pkg/serverless/daemon/routes_test.go +++ b/pkg/serverless/daemon/routes_test.go @@ -48,7 +48,7 @@ func (m *mockLifecycleProcessor) OnInvokeEnd(endDetails *invocationlifecycle.Inv m.lastEndDetails = endDetails } -func (m *mockLifecycleProcessor) OnTimeoutInvokeEnd(timeoutCtx *invocationlifecycle.TimeoutExecutionInfo) { +func (m *mockLifecycleProcessor) OnTimeoutInvokeEnd(*invocationlifecycle.TimeoutExecutionInfo) { m.OnInvokeEndCalled = false m.isError = true m.lastEndDetails = nil From a9ec6a7cdda21f637ec900857d1eca5d5486f13e Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Mon, 11 Dec 2023 15:29:10 -0500 Subject: [PATCH 04/20] fix test --- pkg/serverless/invocationlifecycle/lifecycle_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/serverless/invocationlifecycle/lifecycle_test.go b/pkg/serverless/invocationlifecycle/lifecycle_test.go index 10906cecf28ba9..bec6ce552eb779 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle_test.go +++ b/pkg/serverless/invocationlifecycle/lifecycle_test.go @@ -393,7 +393,7 @@ func TestTimeoutExecutionSpan(t *testing.T) { extraTags := &logs.Tags{ Tags: []string{"functionname:test-function"}, } - log := fxutil.Test[log.Component](t, logimpl.MockModule) + log := fxutil.Test[log.Component](t, logimpl.MockModule()) demux := aggregator.InitTestAgentDemultiplexerWithFlushInterval(log, time.Hour) mockDetectLambdaLibrary := func() bool { return false } @@ -445,7 +445,7 @@ func TestTimeoutExecutionSpanWithTraceContext(t *testing.T) { extraTags := &logs.Tags{ Tags: []string{"functionname:test-function"}, } - log := fxutil.Test[log.Component](t, logimpl.MockModule) + log := fxutil.Test[log.Component](t, logimpl.MockModule()) demux := aggregator.InitTestAgentDemultiplexerWithFlushInterval(log, time.Hour) mockDetectLambdaLibrary := func() bool { return false } From 71ee55f6482cf7f9f74cbe92ff7c6b8a7afd1eae Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Mon, 11 Dec 2023 15:54:51 -0500 Subject: [PATCH 05/20] lint: rename name variables --- pkg/serverless/daemon/routes_test.go | 2 +- .../invocationlifecycle/lifecycle_test.go | 4 ++-- pkg/serverless/invocationlifecycle/trace.go | 20 +++++++++---------- pkg/serverless/proxy/proxy_test.go | 4 ++-- pkg/serverless/serverless.go | 2 +- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/serverless/daemon/routes_test.go b/pkg/serverless/daemon/routes_test.go index 938ba3b32a824c..0d9fa6ee1c0641 100644 --- a/pkg/serverless/daemon/routes_test.go +++ b/pkg/serverless/daemon/routes_test.go @@ -157,7 +157,7 @@ func TestTimeoutInvocation(t *testing.T) { d.InvocationProcessor.OnTimeoutInvokeEnd( &invocationlifecycle.TimeoutExecutionInfo{ - RequestId: "123abc", + RequestID: "123abc", Runtime: "custom", IsColdStart: true, IsProactiveInit: true, diff --git a/pkg/serverless/invocationlifecycle/lifecycle_test.go b/pkg/serverless/invocationlifecycle/lifecycle_test.go index bec6ce552eb779..84fba17548c75b 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle_test.go +++ b/pkg/serverless/invocationlifecycle/lifecycle_test.go @@ -417,7 +417,7 @@ func TestTimeoutExecutionSpan(t *testing.T) { testProcessor.OnInvokeStart(&startDetails) timeoutCtx := &TimeoutExecutionInfo{ - RequestId: "test-request-id", + RequestID: "test-request-id", Runtime: "java11", IsColdStart: false, IsProactiveInit: false, @@ -470,7 +470,7 @@ func TestTimeoutExecutionSpanWithTraceContext(t *testing.T) { testProcessor.OnInvokeStart(&startDetails) timeoutCtx := &TimeoutExecutionInfo{ - RequestId: "test-request-id", + RequestID: "test-request-id", Runtime: "java11", IsColdStart: false, IsProactiveInit: false, diff --git a/pkg/serverless/invocationlifecycle/trace.go b/pkg/serverless/invocationlifecycle/trace.go index c501ecec78af2d..c1a58df80744e6 100644 --- a/pkg/serverless/invocationlifecycle/trace.go +++ b/pkg/serverless/invocationlifecycle/trace.go @@ -43,7 +43,7 @@ type ExecutionStartInfo struct { // TimeoutExecutionInfo is the information needed to complete an execution span during a timeout type TimeoutExecutionInfo struct { - RequestId string + RequestID string Runtime string IsColdStart bool IsProactiveInit bool @@ -126,13 +126,13 @@ func (lp *LifecycleProcessor) endExecutionSpanOnTimeout(timeoutCtx *TimeoutExecu duration := time.Now().UnixNano() - start // In a timeout we do not receive the trace and span IDs from the tracer so we must do it here - traceId := executionContext.TraceID - if traceId == 0 { - traceId = random.Random.Uint64() + traceID := executionContext.TraceID + if traceID == 0 { + traceID = random.Random.Uint64() } - spanId := executionContext.SpanID - if spanId == 0 { - spanId = random.Random.Uint64() + spanID := executionContext.SpanID + if spanID == 0 { + spanID = random.Random.Uint64() } executionSpan := &pb.Span{ @@ -140,15 +140,15 @@ func (lp *LifecycleProcessor) endExecutionSpanOnTimeout(timeoutCtx *TimeoutExecu Name: "aws.lambda", Resource: os.Getenv(functionNameEnvVar), Type: "serverless", - TraceID: traceId, - SpanID: spanId, + TraceID: traceID, + SpanID: spanID, ParentID: executionContext.parentID, Start: start, Duration: duration, Meta: lp.requestHandler.triggerTags, Metrics: lp.requestHandler.triggerMetrics, } - setExecutionSpanTags(executionSpan, timeoutCtx.RequestId, timeoutCtx.IsColdStart, timeoutCtx.IsProactiveInit, timeoutCtx.Runtime) + setExecutionSpanTags(executionSpan, timeoutCtx.RequestID, timeoutCtx.IsColdStart, timeoutCtx.IsProactiveInit, timeoutCtx.Runtime) // In a timeout the tracer is unable to send the response payload so it must be excluded captureLambdaPayload(executionContext, executionSpan, []byte{}) diff --git a/pkg/serverless/proxy/proxy_test.go b/pkg/serverless/proxy/proxy_test.go index 9b689fd9b768c9..24e974f98faaa3 100644 --- a/pkg/serverless/proxy/proxy_test.go +++ b/pkg/serverless/proxy/proxy_test.go @@ -45,7 +45,7 @@ func (tp *testProcessorResponseValid) OnInvokeEnd(endDetails *invocationlifecycl } func (tp *testProcessorResponseValid) OnTimeoutInvokeEnd(timeoutContext *invocationlifecycle.TimeoutExecutionInfo) { - if len(timeoutContext.RequestId) == 0 { + if len(timeoutContext.RequestID) == 0 { panic("requestId") } } @@ -72,7 +72,7 @@ func (tp *testProcessorResponseError) GetExecutionInfo() *invocationlifecycle.Ex } func (tp *testProcessorResponseError) OnTimeoutInvokeEnd(timeoutContext *invocationlifecycle.TimeoutExecutionInfo) { - if len(timeoutContext.RequestId) == 0 { + if len(timeoutContext.RequestID) == 0 { panic("requestId") } } diff --git a/pkg/serverless/serverless.go b/pkg/serverless/serverless.go index 5a944c8b238d60..8c34768ff6e7de 100644 --- a/pkg/serverless/serverless.go +++ b/pkg/serverless/serverless.go @@ -144,7 +144,7 @@ func WaitForNextInvocation(stopCh chan struct{}, daemon *daemon.Daemon, id regis ecs := daemon.ExecutionContext.GetCurrentState() log.Debug("No hit on serverless.EndInvocation route. Attempting to finish Lambda execution span.") timeoutContext := &invocationlifecycle.TimeoutExecutionInfo{ - RequestId: ecs.LastRequestID, + RequestID: ecs.LastRequestID, Runtime: ecs.Runtime, IsColdStart: coldStartTags.IsColdStart, IsProactiveInit: coldStartTags.IsProactiveInit, From eb04b12bfa0cfc2d08560ea38651500644ae83ac Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Mon, 11 Dec 2023 16:44:19 -0500 Subject: [PATCH 06/20] lint again --- pkg/serverless/invocationlifecycle/trace.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/serverless/invocationlifecycle/trace.go b/pkg/serverless/invocationlifecycle/trace.go index c1a58df80744e6..96a6417e093bfb 100644 --- a/pkg/serverless/invocationlifecycle/trace.go +++ b/pkg/serverless/invocationlifecycle/trace.go @@ -324,8 +324,8 @@ func convertJSONToString(payloadJSON interface{}) string { return string(jsonData) } -func setExecutionSpanTags(executionSpan *pb.Span, requestId string, isColdStart bool, isProactiveInit bool, runtime string) { - executionSpan.Meta["request_id"] = requestId +func setExecutionSpanTags(executionSpan *pb.Span, requestID string, isColdStart bool, isProactiveInit bool, runtime string) { + executionSpan.Meta["request_id"] = requestID executionSpan.Meta["cold_start"] = fmt.Sprintf("%t", isColdStart) if isProactiveInit { executionSpan.Meta["proactive_initialization"] = fmt.Sprintf("%t", isProactiveInit) From 68632e36da6fe28b1fd5c53297d5f999bf5fdaca Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Wed, 13 Dec 2023 15:08:33 -0500 Subject: [PATCH 07/20] small fixes --- pkg/serverless/invocationlifecycle/lifecycle.go | 9 ++++++--- pkg/serverless/invocationlifecycle/trace.go | 16 +++++++++------- pkg/serverless/serverless.go | 2 +- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/serverless/invocationlifecycle/lifecycle.go b/pkg/serverless/invocationlifecycle/lifecycle.go index d47dacc72e149c..ff40348e59ca02 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle.go +++ b/pkg/serverless/invocationlifecycle/lifecycle.go @@ -281,7 +281,8 @@ func (lp *LifecycleProcessor) OnInvokeEnd(endDetails *InvocationEndDetails) { spans = append(spans, span) if lp.InferredSpansEnabled { - spans = lp.endInferredSpan(spans, statusCode, endDetails.EndTime, endDetails.IsError) + inferredSpans := lp.endInferredSpan(statusCode, endDetails.EndTime, endDetails.IsError) + spans = append(spans, inferredSpans...) } lp.processTrace(spans) } @@ -301,7 +302,8 @@ func (lp *LifecycleProcessor) OnTimeoutInvokeEnd(timeoutContext *TimeoutExecutio if lp.InferredSpansEnabled { // No response status code can be retrieved in a timeout so we pass an empty string - spans = lp.endInferredSpan(spans, "", time.Now(), true) + inferredSpans := lp.endInferredSpan("", time.Now(), true) + spans = append(spans, inferredSpans...) } lp.processTrace(spans) } @@ -380,7 +382,8 @@ func (lp *LifecycleProcessor) setParentIDForMultipleInferredSpans() { } // endInferredSpan attempts to complete any inferred spans and send them to intake -func (lp *LifecycleProcessor) endInferredSpan(spans []*pb.Span, statusCode string, endTime time.Time, isError bool) []*pb.Span { +func (lp *LifecycleProcessor) endInferredSpan(statusCode string, endTime time.Time, isError bool) []*pb.Span { + spans := make([]*pb.Span, 0, 2) log.Debug("[lifecycle] Attempting to complete the inferred span") log.Debugf("[lifecycle] Inferred span context: %+v", lp.GetInferredSpan().Span) if lp.GetInferredSpan().Span.Start != 0 { diff --git a/pkg/serverless/invocationlifecycle/trace.go b/pkg/serverless/invocationlifecycle/trace.go index 96a6417e093bfb..b09dfbe1551565 100644 --- a/pkg/serverless/invocationlifecycle/trace.go +++ b/pkg/serverless/invocationlifecycle/trace.go @@ -150,7 +150,7 @@ func (lp *LifecycleProcessor) endExecutionSpanOnTimeout(timeoutCtx *TimeoutExecu } setExecutionSpanTags(executionSpan, timeoutCtx.RequestID, timeoutCtx.IsColdStart, timeoutCtx.IsProactiveInit, timeoutCtx.Runtime) // In a timeout the tracer is unable to send the response payload so it must be excluded - captureLambdaPayload(executionContext, executionSpan, []byte{}) + captureLambdaPayload(executionContext, executionSpan, nil) // Always mark it as an error since this is a timeout span executionSpan.Error = 1 @@ -252,12 +252,14 @@ func captureLambdaPayload(executionContext *ExecutionStartInfo, executionSpan *p } else { capturePayloadAsTags(requestPayloadJSON, executionSpan, "function.request", 0, capturePayloadMaxDepth) } - responsePayloadJSON := make(map[string]interface{}) - if err := json.Unmarshal(responsePayload, &responsePayloadJSON); err != nil { - log.Debugf("[lifecycle] Failed to parse response payload: %v", err) - executionSpan.Meta["function.response"] = string(responsePayload) - } else { - capturePayloadAsTags(responsePayloadJSON, executionSpan, "function.response", 0, capturePayloadMaxDepth) + if responsePayload != nil { + responsePayloadJSON := make(map[string]interface{}) + if err := json.Unmarshal(responsePayload, &responsePayloadJSON); err != nil { + log.Debugf("[lifecycle] Failed to parse response payload: %v", err) + executionSpan.Meta["function.response"] = string(responsePayload) + } else { + capturePayloadAsTags(responsePayloadJSON, executionSpan, "function.response", 0, capturePayloadMaxDepth) + } } } diff --git a/pkg/serverless/serverless.go b/pkg/serverless/serverless.go index 8c34768ff6e7de..5d7dfd0191ef98 100644 --- a/pkg/serverless/serverless.go +++ b/pkg/serverless/serverless.go @@ -140,7 +140,7 @@ func WaitForNextInvocation(stopCh chan struct{}, daemon *daemon.Daemon, id regis metrics.SendTimeoutEnhancedMetric(metricTags, daemon.MetricAgent.Demux) metrics.SendErrorsEnhancedMetric(metricTags, time.Now(), daemon.MetricAgent.Demux) - if ok := daemon.IsExecutionSpanComplete(); !ok { + if !daemon.IsExecutionSpanComplete() { ecs := daemon.ExecutionContext.GetCurrentState() log.Debug("No hit on serverless.EndInvocation route. Attempting to finish Lambda execution span.") timeoutContext := &invocationlifecycle.TimeoutExecutionInfo{ From 6c6ce1c0fd21ddccf847f1015da6adbf6e79fa39 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Thu, 21 Dec 2023 10:26:30 -0500 Subject: [PATCH 08/20] refactor timeout span logic --- pkg/serverless/appsec/httpsec/proxy.go | 5 -- pkg/serverless/daemon/routes_test.go | 44 ----------- .../invocationlifecycle/invocation_details.go | 1 + .../invocation_processor.go | 2 - .../invocationlifecycle/lifecycle.go | 16 +--- .../invocationlifecycle/lifecycle_test.go | 46 ++++++++---- pkg/serverless/invocationlifecycle/trace.go | 75 ++++++------------- pkg/serverless/proxy/proxy_test.go | 12 --- pkg/serverless/serverless.go | 17 +++-- 9 files changed, 64 insertions(+), 154 deletions(-) diff --git a/pkg/serverless/appsec/httpsec/proxy.go b/pkg/serverless/appsec/httpsec/proxy.go index b72fbab811f23c..57812c6289778d 100644 --- a/pkg/serverless/appsec/httpsec/proxy.go +++ b/pkg/serverless/appsec/httpsec/proxy.go @@ -46,11 +46,6 @@ func (lp *ProxyLifecycleProcessor) GetExecutionInfo() *invocationlifecycle.Execu return nil // not used in the runtime api proxy case } -// OnTimeoutInvokeEnd completes an unfinished execution span during a timeout -func (lp *ProxyLifecycleProcessor) OnTimeoutInvokeEnd(*invocationlifecycle.TimeoutExecutionInfo) { - // not used in the runtime api proxy case -} - // OnInvokeStart is the hook triggered when an invocation has started func (lp *ProxyLifecycleProcessor) OnInvokeStart(startDetails *invocationlifecycle.InvocationStartDetails) { log.Debugf("appsec: proxy-lifecycle: invocation started with raw payload `%s`", startDetails.InvokeEventRawPayload) diff --git a/pkg/serverless/daemon/routes_test.go b/pkg/serverless/daemon/routes_test.go index 0d9fa6ee1c0641..ca7a54ea623576 100644 --- a/pkg/serverless/daemon/routes_test.go +++ b/pkg/serverless/daemon/routes_test.go @@ -48,12 +48,6 @@ func (m *mockLifecycleProcessor) OnInvokeEnd(endDetails *invocationlifecycle.Inv m.lastEndDetails = endDetails } -func (m *mockLifecycleProcessor) OnTimeoutInvokeEnd(*invocationlifecycle.TimeoutExecutionInfo) { - m.OnInvokeEndCalled = false - m.isError = true - m.lastEndDetails = nil -} - func TestStartInvocation(t *testing.T) { assert := assert.New(t) port := testutil.FreeTCPPort(t) @@ -133,44 +127,6 @@ func TestEndInvocationWithError(t *testing.T) { assert.True(m.isError) } -func TestTimeoutInvocation(t *testing.T) { - assert := assert.New(t) - port := testutil.FreeTCPPort(t) - d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port)) - time.Sleep(100 * time.Millisecond) - defer d.Stop() - - m := &mockLifecycleProcessor{} - d.InvocationProcessor = m - client := &http.Client{Timeout: 1 * time.Second} - startURL := fmt.Sprintf("http://127.0.0.1:%d/lambda/start-invocation", port) - - body := bytes.NewBuffer([]byte(`{}`)) - startReq, err := http.NewRequest(http.MethodPost, startURL, body) - assert.Nil(err) - res, err := client.Do(startReq) - assert.Nil(err) - if res != nil { - assert.Equal(res.StatusCode, 200) - res.Body.Close() - } - - d.InvocationProcessor.OnTimeoutInvokeEnd( - &invocationlifecycle.TimeoutExecutionInfo{ - RequestID: "123abc", - Runtime: "custom", - IsColdStart: true, - IsProactiveInit: true, - }) - - assert.True(m.OnInvokeStartCalled) - assert.True(d.hitOnStart) - assert.False(m.OnInvokeEndCalled) - assert.False(d.hitOnEnd) - assert.True(m.isError) - assert.Nil(m.lastEndDetails) -} - func TestTraceContext(t *testing.T) { assert := assert.New(t) diff --git a/pkg/serverless/invocationlifecycle/invocation_details.go b/pkg/serverless/invocationlifecycle/invocation_details.go index bd0e285f8d377b..0ad7d0a98b8ea8 100644 --- a/pkg/serverless/invocationlifecycle/invocation_details.go +++ b/pkg/serverless/invocationlifecycle/invocation_details.go @@ -27,6 +27,7 @@ type InvocationStartDetails struct { type InvocationEndDetails struct { EndTime time.Time IsError bool + IsTimeout bool RequestID string ResponseRawPayload []byte ColdStart bool diff --git a/pkg/serverless/invocationlifecycle/invocation_processor.go b/pkg/serverless/invocationlifecycle/invocation_processor.go index a4bc1a223d6ac1..181344131a8014 100644 --- a/pkg/serverless/invocationlifecycle/invocation_processor.go +++ b/pkg/serverless/invocationlifecycle/invocation_processor.go @@ -13,8 +13,6 @@ type InvocationProcessor interface { OnInvokeEnd(endDetails *InvocationEndDetails) // GetExecutionInfo returns the current execution start information GetExecutionInfo() *ExecutionStartInfo - // OnTimeoutInvokeEnd completes an unfinished execution span during a timeout - OnTimeoutInvokeEnd(timeoutCtx *TimeoutExecutionInfo) } // InvocationSubProcessor is the interface to implement to receive invocation lifecycle hooks along with the diff --git a/pkg/serverless/invocationlifecycle/lifecycle.go b/pkg/serverless/invocationlifecycle/lifecycle.go index ff40348e59ca02..f321f7cb7b4719 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle.go +++ b/pkg/serverless/invocationlifecycle/lifecycle.go @@ -287,27 +287,13 @@ func (lp *LifecycleProcessor) OnInvokeEnd(endDetails *InvocationEndDetails) { lp.processTrace(spans) } - if endDetails.IsError { + if endDetails.IsError && !endDetails.IsTimeout { serverlessMetrics.SendErrorsEnhancedMetric( lp.ExtraTags.Tags, endDetails.EndTime, lp.Demux, ) } } -// OnTimeoutInvokeEnd completes an unfinished execution span during a timeout -func (lp *LifecycleProcessor) OnTimeoutInvokeEnd(timeoutContext *TimeoutExecutionInfo) { - spans := make([]*pb.Span, 0, 3) - span := lp.endExecutionSpanOnTimeout(timeoutContext) - spans = append(spans, span) - - if lp.InferredSpansEnabled { - // No response status code can be retrieved in a timeout so we pass an empty string - inferredSpans := lp.endInferredSpan("", time.Now(), true) - spans = append(spans, inferredSpans...) - } - lp.processTrace(spans) -} - // GetTags returns the tagset of the currently executing lambda function func (lp *LifecycleProcessor) GetTags() map[string]string { return lp.requestHandler.triggerTags diff --git a/pkg/serverless/invocationlifecycle/lifecycle_test.go b/pkg/serverless/invocationlifecycle/lifecycle_test.go index 84fba17548c75b..f281ad9cb3af82 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle_test.go +++ b/pkg/serverless/invocationlifecycle/lifecycle_test.go @@ -409,6 +409,9 @@ func TestTimeoutExecutionSpan(t *testing.T) { Demux: demux, InferredSpansEnabled: true, } + startTime := time.Now() + duration := 1 * time.Second + endTime := startTime.Add(duration) startDetails := InvocationStartDetails{ StartTime: time.Now(), InvokeEventRawPayload: []byte(`{}`), @@ -416,23 +419,28 @@ func TestTimeoutExecutionSpan(t *testing.T) { } testProcessor.OnInvokeStart(&startDetails) - timeoutCtx := &TimeoutExecutionInfo{ - RequestID: "test-request-id", - Runtime: "java11", - IsColdStart: false, - IsProactiveInit: false, + timeoutCtx := &InvocationEndDetails{ + RequestID: "test-request-id", + Runtime: "java11", + ColdStart: false, + ProactiveInit: false, + EndTime: endTime, + IsError: true, + IsTimeout: true, + ResponseRawPayload: nil, } - testProcessor.OnTimeoutInvokeEnd(timeoutCtx) + testProcessor.OnInvokeEnd(timeoutCtx) spans := tracePayload.TracerPayload.Chunks[0].Spans assert.Equal(t, 1, len(spans)) // No trace context passed - assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().TraceID) + assert.NotZero(t, testProcessor.GetExecutionInfo().TraceID) assert.Equal(t, uint64(0), testProcessor.GetExecutionInfo().SpanID) - assert.Equal(t, tracePayload.TracerPayload.Chunks[0].Priority, int32(-128)) + assert.Equal(t, int32(-128), tracePayload.TracerPayload.Chunks[0].Priority) // New trace ID and span ID has been created assert.NotEqual(t, uint64(0), spans[0].TraceID) assert.NotEqual(t, uint64(0), spans[0].SpanID) + assert.Equal(t, spans[0].TraceID, testProcessor.GetExecutionInfo().TraceID) assert.Equal(t, spans[0].Error, int32(1)) assert.Equal(t, spans[0].GetMeta()["request_id"], "test-request-id") assert.Equal(t, spans[0].GetMeta()["language"], "java") @@ -462,20 +470,26 @@ func TestTimeoutExecutionSpanWithTraceContext(t *testing.T) { InferredSpansEnabled: true, } eventPayload := `a5a{"resource":"/users/create","path":"/users/create","httpMethod":"GET","headers":{"Accept":"*/*","Accept-Encoding":"gzip","x-datadog-parent-id":"1480558859903409531","x-datadog-sampling-priority":"1","x-datadog-trace-id":"5736943178450432258"}}0` + startTime := time.Now() + duration := 1 * time.Second + endTime := startTime.Add(duration) startDetails := InvocationStartDetails{ - StartTime: time.Now(), + StartTime: startTime, InvokeEventRawPayload: []byte(eventPayload), InvokedFunctionARN: "arn:aws:lambda:us-east-1:123456789012:function:my-function", } testProcessor.OnInvokeStart(&startDetails) - - timeoutCtx := &TimeoutExecutionInfo{ - RequestID: "test-request-id", - Runtime: "java11", - IsColdStart: false, - IsProactiveInit: false, + timeoutCtx := &InvocationEndDetails{ + RequestID: "test-request-id", + Runtime: "java11", + ColdStart: false, + ProactiveInit: false, + EndTime: endTime, + IsError: true, + IsTimeout: true, + ResponseRawPayload: nil, } - testProcessor.OnTimeoutInvokeEnd(timeoutCtx) + testProcessor.OnInvokeEnd(timeoutCtx) spans := tracePayload.TracerPayload.Chunks[0].Spans assert.Equal(t, 1, len(spans)) diff --git a/pkg/serverless/invocationlifecycle/trace.go b/pkg/serverless/invocationlifecycle/trace.go index b09dfbe1551565..80d56ffd5ec666 100644 --- a/pkg/serverless/invocationlifecycle/trace.go +++ b/pkg/serverless/invocationlifecycle/trace.go @@ -41,14 +41,6 @@ type ExecutionStartInfo struct { SamplingPriority sampler.SamplingPriority } -// TimeoutExecutionInfo is the information needed to complete an execution span during a timeout -type TimeoutExecutionInfo struct { - RequestID string - Runtime string - IsColdStart bool - IsProactiveInit bool -} - // startExecutionSpan records information from the start of the invocation. // It should be called at the start of the invocation. func (lp *LifecycleProcessor) startExecutionSpan(event interface{}, rawPayload []byte, startDetails *InvocationStartDetails) { @@ -84,26 +76,38 @@ func (lp *LifecycleProcessor) startExecutionSpan(event interface{}, rawPayload [ // It should be called at the end of the invocation. func (lp *LifecycleProcessor) endExecutionSpan(endDetails *InvocationEndDetails) *pb.Span { executionContext := lp.GetExecutionInfo() - duration := endDetails.EndTime.UnixNano() - executionContext.startTime.UnixNano() + start := executionContext.startTime.UnixNano() + + traceID := executionContext.TraceID + spanID := executionContext.SpanID + // If we fail to receive the trace and span IDs from the tracer during a timeout we create it ourselves + if endDetails.IsTimeout && traceID == 0 { + traceID = random.Random.Uint64() + lp.requestHandler.executionInfo.TraceID = traceID + } + if endDetails.IsTimeout && spanID == 0 { + spanID = random.Random.Uint64() + } executionSpan := &pb.Span{ Service: "aws.lambda", // will be replaced by the span processor Name: "aws.lambda", Resource: os.Getenv(functionNameEnvVar), Type: "serverless", - TraceID: executionContext.TraceID, - SpanID: executionContext.SpanID, + TraceID: traceID, + SpanID: spanID, ParentID: executionContext.parentID, - Start: executionContext.startTime.UnixNano(), - Duration: duration, + Start: start, + Duration: endDetails.EndTime.UnixNano() - start, Meta: lp.requestHandler.triggerTags, Metrics: lp.requestHandler.triggerMetrics, } + setExecutionSpanTags(executionSpan, endDetails.RequestID, endDetails.ColdStart, endDetails.ProactiveInit, endDetails.Runtime) captureLambdaPayload(executionContext, executionSpan, endDetails.ResponseRawPayload) - if endDetails.IsError { executionSpan.Error = 1 + if len(endDetails.ErrorMsg) > 0 { executionSpan.Meta["error.msg"] = endDetails.ErrorMsg } @@ -113,47 +117,12 @@ func (lp *LifecycleProcessor) endExecutionSpan(endDetails *InvocationEndDetails) if len(endDetails.ErrorStack) > 0 { executionSpan.Meta["error.stack"] = endDetails.ErrorStack } - } - return executionSpan -} - -// endExecutionSpanOnTimeout attempts to finish the execution span during a timeout. -// It should only be called when the execution span has been started but not finished during a timeout -func (lp *LifecycleProcessor) endExecutionSpanOnTimeout(timeoutCtx *TimeoutExecutionInfo) *pb.Span { - executionContext := lp.GetExecutionInfo() - start := executionContext.startTime.UnixNano() - duration := time.Now().UnixNano() - start - - // In a timeout we do not receive the trace and span IDs from the tracer so we must do it here - traceID := executionContext.TraceID - if traceID == 0 { - traceID = random.Random.Uint64() - } - spanID := executionContext.SpanID - if spanID == 0 { - spanID = random.Random.Uint64() - } - - executionSpan := &pb.Span{ - Service: "aws.lambda", // will be replaced by the span processor - Name: "aws.lambda", - Resource: os.Getenv(functionNameEnvVar), - Type: "serverless", - TraceID: traceID, - SpanID: spanID, - ParentID: executionContext.parentID, - Start: start, - Duration: duration, - Meta: lp.requestHandler.triggerTags, - Metrics: lp.requestHandler.triggerMetrics, + if endDetails.IsTimeout { + executionSpan.Meta["error.type"] = "Impending Timeout" + executionSpan.Meta["error.msg"] = "Datadog detected an Impending Timeout" + } } - setExecutionSpanTags(executionSpan, timeoutCtx.RequestID, timeoutCtx.IsColdStart, timeoutCtx.IsProactiveInit, timeoutCtx.Runtime) - // In a timeout the tracer is unable to send the response payload so it must be excluded - captureLambdaPayload(executionContext, executionSpan, nil) - - // Always mark it as an error since this is a timeout span - executionSpan.Error = 1 return executionSpan } diff --git a/pkg/serverless/proxy/proxy_test.go b/pkg/serverless/proxy/proxy_test.go index 24e974f98faaa3..c8169a67d5b6c0 100644 --- a/pkg/serverless/proxy/proxy_test.go +++ b/pkg/serverless/proxy/proxy_test.go @@ -44,12 +44,6 @@ func (tp *testProcessorResponseValid) OnInvokeEnd(endDetails *invocationlifecycl } } -func (tp *testProcessorResponseValid) OnTimeoutInvokeEnd(timeoutContext *invocationlifecycle.TimeoutExecutionInfo) { - if len(timeoutContext.RequestID) == 0 { - panic("requestId") - } -} - type testProcessorResponseError struct{} func (tp *testProcessorResponseError) OnInvokeStart(startDetails *invocationlifecycle.InvocationStartDetails) { @@ -71,12 +65,6 @@ func (tp *testProcessorResponseError) GetExecutionInfo() *invocationlifecycle.Ex return nil } -func (tp *testProcessorResponseError) OnTimeoutInvokeEnd(timeoutContext *invocationlifecycle.TimeoutExecutionInfo) { - if len(timeoutContext.RequestID) == 0 { - panic("requestId") - } -} - func TestProxyResponseValid(t *testing.T) { // fake the runtime API running on 5001 l, err := net.Listen("tcp", "127.0.0.1:5001") diff --git a/pkg/serverless/serverless.go b/pkg/serverless/serverless.go index 5d7dfd0191ef98..dc331177cb11be 100644 --- a/pkg/serverless/serverless.go +++ b/pkg/serverless/serverless.go @@ -142,14 +142,17 @@ func WaitForNextInvocation(stopCh chan struct{}, daemon *daemon.Daemon, id regis if !daemon.IsExecutionSpanComplete() { ecs := daemon.ExecutionContext.GetCurrentState() - log.Debug("No hit on serverless.EndInvocation route. Attempting to finish Lambda execution span.") - timeoutContext := &invocationlifecycle.TimeoutExecutionInfo{ - RequestID: ecs.LastRequestID, - Runtime: ecs.Runtime, - IsColdStart: coldStartTags.IsColdStart, - IsProactiveInit: coldStartTags.IsProactiveInit, + timeoutDetails := &invocationlifecycle.InvocationEndDetails{ + RequestID: ecs.LastRequestID, + Runtime: ecs.Runtime, + ColdStart: coldStartTags.IsColdStart, + ProactiveInit: coldStartTags.IsProactiveInit, + EndTime: time.Now(), + IsError: true, + IsTimeout: true, + ResponseRawPayload: nil, } - daemon.InvocationProcessor.OnTimeoutInvokeEnd(timeoutContext) + daemon.InvocationProcessor.OnInvokeEnd(timeoutDetails) } } err := daemon.ExecutionContext.SaveCurrentExecutionContext() From bf2379017eddefc45085430df45bbf3fb6d1fee5 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Tue, 2 Jan 2024 17:17:02 -0500 Subject: [PATCH 09/20] add mutexes --- cmd/serverless/main.go | 2 +- pkg/serverless/daemon/daemon.go | 19 +++++++++++++++++-- pkg/serverless/daemon/routes.go | 6 ++++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/cmd/serverless/main.go b/cmd/serverless/main.go index 90ad5d0a20b104..c39427f7213d7a 100644 --- a/cmd/serverless/main.go +++ b/cmd/serverless/main.go @@ -290,7 +290,7 @@ func runAgent() { ExtraTags: serverlessDaemon.ExtraTags, Demux: serverlessDaemon.MetricAgent.Demux, ProcessTrace: ta.Process, - DetectLambdaLibrary: func() bool { return serverlessDaemon.LambdaLibraryDetected }, + DetectLambdaLibrary: serverlessDaemon.IsLambdaLibraryDetected, InferredSpansEnabled: inferredspan.IsInferredSpansEnabled(), } diff --git a/pkg/serverless/daemon/daemon.go b/pkg/serverless/daemon/daemon.go index 84327ab11560a4..37f43ea36a3d3d 100644 --- a/pkg/serverless/daemon/daemon.go +++ b/pkg/serverless/daemon/daemon.go @@ -66,12 +66,18 @@ type Daemon struct { // LambdaLibraryDetected represents whether the Datadog Lambda Library was detected in the environment LambdaLibraryDetected bool + // LambdaLibraryStateLock keeps track of whether the Datadog Lambda Library was detected in the environment + LambdaLibraryStateLock sync.RWMutex + // hitOnStart indicates whether the the serverless.StartInvocation route has been hit hitOnStart bool // hitOnEnd indicates whether the the serverless.EndInvocation route has been hit hitOnEnd bool + // ExecutionSpanStateLock keeps track of whether the serverless Invocation routes have been hit to complete the execution span + ExecutionSpanStateLock sync.Mutex + // runtimeStateMutex is used to ensure that modifying the state of the runtime is thread-safe runtimeStateMutex sync.Mutex @@ -442,7 +448,16 @@ func (d *Daemon) setTraceTags(tagMap map[string]string) bool { return false } -// IsExecutionSpanComplete checks if the execution span was finished during a timeout +// IsLambdaLibraryDetected returns if the Lambda Library is in use +func (d *Daemon) IsLambdaLibraryDetected() bool { + d.LambdaLibraryStateLock.RLock() + defer d.LambdaLibraryStateLock.RUnlock() + return d.LambdaLibraryDetected +} + +// IsExecutionSpanComplete checks if the execution span was finished func (d *Daemon) IsExecutionSpanComplete() bool { - return !d.LambdaLibraryDetected && d.hitOnStart && d.hitOnEnd + d.ExecutionSpanStateLock.Lock() + defer d.ExecutionSpanStateLock.Unlock() + return d.hitOnStart && d.hitOnEnd } diff --git a/pkg/serverless/daemon/routes.go b/pkg/serverless/daemon/routes.go index f7df550633e25a..861f0bb71a12a5 100644 --- a/pkg/serverless/daemon/routes.go +++ b/pkg/serverless/daemon/routes.go @@ -26,6 +26,8 @@ type Hello struct { //nolint:revive // TODO(SERV) Fix revive linter func (h *Hello) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.Hello route.") + h.daemon.LambdaLibraryStateLock.Lock() + defer h.daemon.LambdaLibraryStateLock.Unlock() h.daemon.LambdaLibraryDetected = true } @@ -53,6 +55,8 @@ type StartInvocation struct { func (s *StartInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.StartInvocation route.") + s.daemon.ExecutionSpanStateLock.Lock() + defer s.daemon.ExecutionSpanStateLock.Unlock() s.daemon.hitOnStart = true startTime := time.Now() reqBody, err := io.ReadAll(r.Body) @@ -87,6 +91,8 @@ type EndInvocation struct { func (e *EndInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.EndInvocation route.") + e.daemon.ExecutionSpanStateLock.Lock() + defer e.daemon.ExecutionSpanStateLock.Unlock() e.daemon.hitOnEnd = true endTime := time.Now() ecs := e.daemon.ExecutionContext.GetCurrentState() From 65e7c208cbe67800035d78306c880fd3ffdc7611 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Mon, 8 Jan 2024 15:15:07 -0500 Subject: [PATCH 10/20] fix span completed check --- pkg/serverless/daemon/daemon.go | 18 +++++++++++------- pkg/serverless/daemon/routes.go | 8 ++------ pkg/serverless/serverless.go | 1 + 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/pkg/serverless/daemon/daemon.go b/pkg/serverless/daemon/daemon.go index 37f43ea36a3d3d..5e60f500ce51e1 100644 --- a/pkg/serverless/daemon/daemon.go +++ b/pkg/serverless/daemon/daemon.go @@ -69,11 +69,8 @@ type Daemon struct { // LambdaLibraryStateLock keeps track of whether the Datadog Lambda Library was detected in the environment LambdaLibraryStateLock sync.RWMutex - // hitOnStart indicates whether the the serverless.StartInvocation route has been hit - hitOnStart bool - - // hitOnEnd indicates whether the the serverless.EndInvocation route has been hit - hitOnEnd bool + // executionSpanComplete indicates whether the Lambda span has been completed by the Extension + executionSpanComplete bool // ExecutionSpanStateLock keeps track of whether the serverless Invocation routes have been hit to complete the execution span ExecutionSpanStateLock sync.Mutex @@ -455,9 +452,16 @@ func (d *Daemon) IsLambdaLibraryDetected() bool { return d.LambdaLibraryDetected } -// IsExecutionSpanComplete checks if the execution span was finished +// IsExecutionSpanComplete checks if the Lambda execution span was finished func (d *Daemon) IsExecutionSpanComplete() bool { d.ExecutionSpanStateLock.Lock() defer d.ExecutionSpanStateLock.Unlock() - return d.hitOnStart && d.hitOnEnd + return d.executionSpanComplete +} + +// SetExecutionSpanComplete keeps track of whether the Extension completed the Lambda execution span +func (d *Daemon) SetExecutionSpanComplete(spanComplete bool) { + d.ExecutionSpanStateLock.Lock() + defer d.ExecutionSpanStateLock.Unlock() + d.executionSpanComplete = spanComplete } diff --git a/pkg/serverless/daemon/routes.go b/pkg/serverless/daemon/routes.go index 861f0bb71a12a5..ea3f35493f7bc1 100644 --- a/pkg/serverless/daemon/routes.go +++ b/pkg/serverless/daemon/routes.go @@ -55,9 +55,7 @@ type StartInvocation struct { func (s *StartInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.StartInvocation route.") - s.daemon.ExecutionSpanStateLock.Lock() - defer s.daemon.ExecutionSpanStateLock.Unlock() - s.daemon.hitOnStart = true + s.daemon.SetExecutionSpanComplete(false) startTime := time.Now() reqBody, err := io.ReadAll(r.Body) if err != nil { @@ -91,9 +89,7 @@ type EndInvocation struct { func (e *EndInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.EndInvocation route.") - e.daemon.ExecutionSpanStateLock.Lock() - defer e.daemon.ExecutionSpanStateLock.Unlock() - e.daemon.hitOnEnd = true + e.daemon.SetExecutionSpanComplete(true) endTime := time.Now() ecs := e.daemon.ExecutionContext.GetCurrentState() coldStartTags := e.daemon.ExecutionContext.GetColdStartTagsForRequestID(ecs.LastRequestID) diff --git a/pkg/serverless/serverless.go b/pkg/serverless/serverless.go index b89166121eff04..8131f72484e469 100644 --- a/pkg/serverless/serverless.go +++ b/pkg/serverless/serverless.go @@ -154,6 +154,7 @@ func WaitForNextInvocation(stopCh chan struct{}, daemon *daemon.Daemon, id regis ResponseRawPayload: nil, } daemon.InvocationProcessor.OnInvokeEnd(timeoutDetails) + daemon.SetExecutionSpanComplete(true) } } err := daemon.ExecutionContext.SaveCurrentExecutionContext() From f1e28572d5f9d22b7ac111c3611939f4dd8d216f Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Mon, 8 Jan 2024 16:06:32 -0500 Subject: [PATCH 11/20] revert refactor --- pkg/serverless/invocationlifecycle/trace.go | 67 +++++++++------------ 1 file changed, 29 insertions(+), 38 deletions(-) diff --git a/pkg/serverless/invocationlifecycle/trace.go b/pkg/serverless/invocationlifecycle/trace.go index aca43f8035a026..cfd545ed144f44 100644 --- a/pkg/serverless/invocationlifecycle/trace.go +++ b/pkg/serverless/invocationlifecycle/trace.go @@ -103,9 +103,35 @@ func (lp *LifecycleProcessor) endExecutionSpan(endDetails *InvocationEndDetails) Meta: lp.requestHandler.triggerTags, Metrics: lp.requestHandler.triggerMetrics, } - - setExecutionSpanTags(executionSpan, endDetails.RequestID, endDetails.ColdStart, endDetails.ProactiveInit, endDetails.Runtime) - captureLambdaPayload(executionContext, executionSpan, endDetails.ResponseRawPayload) + executionSpan.Meta["request_id"] = endDetails.RequestID + executionSpan.Meta["cold_start"] = fmt.Sprintf("%t", endDetails.ColdStart) + if endDetails.ProactiveInit { + executionSpan.Meta["proactive_initialization"] = fmt.Sprintf("%t", endDetails.ProactiveInit) + } + langMatches := runtimeRegex.FindStringSubmatch(endDetails.Runtime) + if len(langMatches) >= 2 { + executionSpan.Meta["language"] = langMatches[1] + } + captureLambdaPayloadEnabled := config.Datadog.GetBool("capture_lambda_payload") + if captureLambdaPayloadEnabled { + capturePayloadMaxDepth := config.Datadog.GetInt("capture_lambda_payload_max_depth") + requestPayloadJSON := make(map[string]interface{}) + if err := json.Unmarshal(executionContext.requestPayload, &requestPayloadJSON); err != nil { + log.Debugf("[lifecycle] Failed to parse request payload: %v", err) + executionSpan.Meta["function.request"] = string(executionContext.requestPayload) + } else { + capturePayloadAsTags(requestPayloadJSON, executionSpan, "function.request", 0, capturePayloadMaxDepth) + } + if endDetails.ResponseRawPayload != nil { + responsePayloadJSON := make(map[string]interface{}) + if err := json.Unmarshal(endDetails.ResponseRawPayload, &responsePayloadJSON); err != nil { + log.Debugf("[lifecycle] Failed to parse response payload: %v", err) + executionSpan.Meta["function.response"] = string(endDetails.ResponseRawPayload) + } else { + capturePayloadAsTags(responsePayloadJSON, executionSpan, "function.response", 0, capturePayloadMaxDepth) + } + } + } if endDetails.IsError { executionSpan.Error = 1 @@ -210,29 +236,6 @@ func InjectSpanID(executionContext *ExecutionStartInfo, headers http.Header) { } } -func captureLambdaPayload(executionContext *ExecutionStartInfo, executionSpan *pb.Span, responsePayload []byte) { - if captureLambdaPayloadEnabled := config.Datadog.GetBool("capture_lambda_payload"); !captureLambdaPayloadEnabled { - return - } - capturePayloadMaxDepth := config.Datadog.GetInt("capture_lambda_payload_max_depth") - requestPayloadJSON := make(map[string]interface{}) - if err := json.Unmarshal(executionContext.requestPayload, &requestPayloadJSON); err != nil { - log.Debugf("[lifecycle] Failed to parse request payload: %v", err) - executionSpan.Meta["function.request"] = string(executionContext.requestPayload) - } else { - capturePayloadAsTags(requestPayloadJSON, executionSpan, "function.request", 0, capturePayloadMaxDepth) - } - if responsePayload != nil { - responsePayloadJSON := make(map[string]interface{}) - if err := json.Unmarshal(responsePayload, &responsePayloadJSON); err != nil { - log.Debugf("[lifecycle] Failed to parse response payload: %v", err) - executionSpan.Meta["function.response"] = string(responsePayload) - } else { - capturePayloadAsTags(responsePayloadJSON, executionSpan, "function.response", 0, capturePayloadMaxDepth) - } - } -} - func capturePayloadAsTags(value interface{}, targetSpan *pb.Span, key string, depth int, maxDepth int) { if key == "" { return @@ -295,15 +298,3 @@ func convertJSONToString(payloadJSON interface{}) string { } return string(jsonData) } - -func setExecutionSpanTags(executionSpan *pb.Span, requestID string, isColdStart bool, isProactiveInit bool, runtime string) { - executionSpan.Meta["request_id"] = requestID - executionSpan.Meta["cold_start"] = fmt.Sprintf("%t", isColdStart) - if isProactiveInit { - executionSpan.Meta["proactive_initialization"] = fmt.Sprintf("%t", isProactiveInit) - } - langMatches := runtimeRegex.FindStringSubmatch(runtime) - if len(langMatches) >= 2 { - executionSpan.Meta["language"] = langMatches[1] - } -} From 12eb913783a200a11ee7fd074fc2973d86b63cc9 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Tue, 9 Jan 2024 11:00:42 -0500 Subject: [PATCH 12/20] remove cold start span changes --- cmd/serverless/main.go | 1 - pkg/serverless/executioncontext/executioncontext.go | 1 - pkg/serverless/trace/cold_start_span_creator.go | 5 ++--- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/cmd/serverless/main.go b/cmd/serverless/main.go index c39427f7213d7a..8cd58fbecde75e 100644 --- a/cmd/serverless/main.go +++ b/cmd/serverless/main.go @@ -271,7 +271,6 @@ func runAgent() { TraceAgent: serverlessDaemon.TraceAgent, StopChan: make(chan struct{}), ColdStartSpanId: coldStartSpanId, - WasColdStart: serverlessDaemon.ExecutionContext.GetCurrentState().WasColdStart, } log.Debug("Starting ColdStartSpanCreator") diff --git a/pkg/serverless/executioncontext/executioncontext.go b/pkg/serverless/executioncontext/executioncontext.go index 775e534fe51e34..d27929c63a3fc1 100644 --- a/pkg/serverless/executioncontext/executioncontext.go +++ b/pkg/serverless/executioncontext/executioncontext.go @@ -219,7 +219,6 @@ func (ec *ExecutionContext) RestoreCurrentStateFromFile() error { ec.lastLogRequestID = restoredExecutionContextState.LastLogRequestID ec.lastOOMRequestID = restoredExecutionContextState.LastOOMRequestID ec.coldstartRequestID = restoredExecutionContextState.ColdstartRequestID - ec.wasColdStart = restoredExecutionContextState.WasColdStart ec.startTime = restoredExecutionContextState.StartTime ec.endTime = restoredExecutionContextState.EndTime return nil diff --git a/pkg/serverless/trace/cold_start_span_creator.go b/pkg/serverless/trace/cold_start_span_creator.go index c43d26dcfa40a1..ef46416d497d2a 100644 --- a/pkg/serverless/trace/cold_start_span_creator.go +++ b/pkg/serverless/trace/cold_start_span_creator.go @@ -46,7 +46,6 @@ type ColdStartSpanCreator struct { initDuration float64 StopChan chan struct{} initStartTime time.Time - WasColdStart bool } //nolint:revive // TODO(SERV) Fix revive linter @@ -115,8 +114,8 @@ func (c *ColdStartSpanCreator) createIfReady() { } func (c *ColdStartSpanCreator) create() { - // Prevent infinite loop from SpanModifier call and duplicates from timeout restarts - if c.lambdaSpan.Name == spanName || c.WasColdStart { + // Prevent infinite loop from SpanModifier call + if c.lambdaSpan.Name == spanName { return } From 4f0543f5b33490002b1e56f6af4fad56f59f2e84 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Tue, 9 Jan 2024 15:55:48 -0500 Subject: [PATCH 13/20] use mutex over rwmutex --- pkg/serverless/daemon/daemon.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/serverless/daemon/daemon.go b/pkg/serverless/daemon/daemon.go index 5e60f500ce51e1..e34d7e24b590a0 100644 --- a/pkg/serverless/daemon/daemon.go +++ b/pkg/serverless/daemon/daemon.go @@ -67,7 +67,7 @@ type Daemon struct { LambdaLibraryDetected bool // LambdaLibraryStateLock keeps track of whether the Datadog Lambda Library was detected in the environment - LambdaLibraryStateLock sync.RWMutex + LambdaLibraryStateLock sync.Mutex // executionSpanComplete indicates whether the Lambda span has been completed by the Extension executionSpanComplete bool @@ -447,8 +447,8 @@ func (d *Daemon) setTraceTags(tagMap map[string]string) bool { // IsLambdaLibraryDetected returns if the Lambda Library is in use func (d *Daemon) IsLambdaLibraryDetected() bool { - d.LambdaLibraryStateLock.RLock() - defer d.LambdaLibraryStateLock.RUnlock() + d.LambdaLibraryStateLock.Lock() + defer d.LambdaLibraryStateLock.Unlock() return d.LambdaLibraryDetected } From 24712f07aabf17baf9ccb6f8dc5780cad9a02a7b Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Tue, 9 Jan 2024 17:48:38 -0500 Subject: [PATCH 14/20] test routes --- pkg/serverless/daemon/routes_test.go | 30 ++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/pkg/serverless/daemon/routes_test.go b/pkg/serverless/daemon/routes_test.go index ef617475cc1280..ca2dba60cdea31 100644 --- a/pkg/serverless/daemon/routes_test.go +++ b/pkg/serverless/daemon/routes_test.go @@ -330,6 +330,36 @@ func TestStartEndInvocationSpanParenting(t *testing.T) { } } +func TestStartEndInvocationIsExecutionSpanComplete(t *testing.T) { + assert := assert.New(t) + port := testutil.FreeTCPPort(t) + d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port)) + time.Sleep(100 * time.Millisecond) + defer d.Stop() + + m := &mockLifecycleProcessor{} + d.InvocationProcessor = m + + client := &http.Client{} + body := bytes.NewBuffer([]byte(`{"key": "value"}`)) + startReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/lambda/start-invocation", port), body) + assert.Nil(err) + startResp, err := client.Do(startReq) + assert.Nil(err) + startResp.Body.Close() + assert.True(m.OnInvokeStartCalled) + assert.False(d.IsExecutionSpanComplete()) + + body = bytes.NewBuffer([]byte(`{}`)) + endReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/lambda/end-invocation", port), body) + assert.Nil(err) + endResp, err := client.Do(endReq) + assert.Nil(err) + endResp.Body.Close() + assert.True(m.OnInvokeEndCalled) + assert.True(d.IsExecutionSpanComplete()) +} + // Helper function for reading test file func getEventFromFile(filename string) string { event, err := os.ReadFile("../trace/testdata/event_samples/" + filename) From 05a63920b7cc26707452b029c8736623b8134b29 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Wed, 10 Jan 2024 13:40:55 -0500 Subject: [PATCH 15/20] add comment + update tests --- pkg/serverless/invocationlifecycle/lifecycle.go | 1 + pkg/serverless/invocationlifecycle/lifecycle_test.go | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/serverless/invocationlifecycle/lifecycle.go b/pkg/serverless/invocationlifecycle/lifecycle.go index 5d1878abcb2b35..7464f9c35cc4dd 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle.go +++ b/pkg/serverless/invocationlifecycle/lifecycle.go @@ -287,6 +287,7 @@ func (lp *LifecycleProcessor) OnInvokeEnd(endDetails *InvocationEndDetails) { lp.processTrace(spans) } + // We don't submit an error metric on timeouts since it should have already been submitted when the Extension receives a SHUTDOWN event if endDetails.IsError && !endDetails.IsTimeout { serverlessMetrics.SendErrorsEnhancedMetric( lp.ExtraTags.Tags, endDetails.EndTime, lp.Demux, diff --git a/pkg/serverless/invocationlifecycle/lifecycle_test.go b/pkg/serverless/invocationlifecycle/lifecycle_test.go index 14742fb4d3bbe8..0a687053314bd1 100644 --- a/pkg/serverless/invocationlifecycle/lifecycle_test.go +++ b/pkg/serverless/invocationlifecycle/lifecycle_test.go @@ -385,8 +385,8 @@ func TestTimeoutExecutionSpan(t *testing.T) { extraTags := &logs.Tags{ Tags: []string{"functionname:test-function"}, } - log := fxutil.Test[log.Component](t, logimpl.MockModule()) - demux := aggregator.InitTestAgentDemultiplexerWithFlushInterval(log, time.Hour) + demux := createDemultiplexer(t) + defer demux.Stop(false) mockDetectLambdaLibrary := func() bool { return false } var tracePayload *api.Payload @@ -445,8 +445,8 @@ func TestTimeoutExecutionSpanWithTraceContext(t *testing.T) { extraTags := &logs.Tags{ Tags: []string{"functionname:test-function"}, } - log := fxutil.Test[log.Component](t, logimpl.MockModule()) - demux := aggregator.InitTestAgentDemultiplexerWithFlushInterval(log, time.Hour) + demux := createDemultiplexer(t) + defer demux.Stop(false) mockDetectLambdaLibrary := func() bool { return false } var tracePayload *api.Payload From 2b48fe17b14360e4295534ad6e5847c5735aadfc Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Wed, 10 Jan 2024 15:08:30 -0500 Subject: [PATCH 16/20] test endExecutionSpan --- .../invocationlifecycle/trace_test.go | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/pkg/serverless/invocationlifecycle/trace_test.go b/pkg/serverless/invocationlifecycle/trace_test.go index 0b925f9a25be6b..6b45d32755165e 100644 --- a/pkg/serverless/invocationlifecycle/trace_test.go +++ b/pkg/serverless/invocationlifecycle/trace_test.go @@ -649,6 +649,54 @@ func TestEndExecutionSpanWithError(t *testing.T) { assert.Equal(t, executionSpan.Error, int32(1)) } +func TestEndExecutionSpanWithTimeout(t *testing.T) { + t.Setenv(functionNameEnvVar, "TestFunction") + currentExecutionInfo := &ExecutionStartInfo{} + lp := &LifecycleProcessor{ + requestHandler: &RequestHandler{ + executionInfo: currentExecutionInfo, + triggerTags: make(map[string]string), + }, + } + + startTime := time.Now() + startDetails := &InvocationStartDetails{ + StartTime: startTime, + InvokeEventHeaders: http.Header{}, + } + lp.startExecutionSpan(nil, []byte("[]"), startDetails) + + assert.Zero(t, currentExecutionInfo.TraceID) + assert.Zero(t, currentExecutionInfo.SpanID) + + duration := 1 * time.Second + endTime := startTime.Add(duration) + + endDetails := &InvocationEndDetails{ + EndTime: endTime, + IsError: true, + IsTimeout: true, + RequestID: "test-request-id", + ResponseRawPayload: nil, + ColdStart: true, + ProactiveInit: false, + Runtime: "dotnet6", + } + executionSpan := lp.endExecutionSpan(endDetails) + assert.Equal(t, "aws.lambda", executionSpan.Name) + assert.Equal(t, "aws.lambda", executionSpan.Service) + assert.Equal(t, "TestFunction", executionSpan.Resource) + assert.Equal(t, "serverless", executionSpan.Type) + assert.Equal(t, "dotnet", executionSpan.Meta["language"]) + assert.Equal(t, lp.requestHandler.executionInfo.TraceID, executionSpan.TraceID) + assert.NotZero(t, executionSpan.TraceID) + assert.NotZero(t, executionSpan.SpanID) + assert.Equal(t, startTime.UnixNano(), executionSpan.Start) + assert.Equal(t, duration.Nanoseconds(), executionSpan.Duration) + assert.Equal(t, "Impending Timeout", executionSpan.Meta["error.type"]) + assert.Equal(t, "Datadog detected an Impending Timeout", executionSpan.Meta["error.msg"]) +} + func TestParseLambdaPayload(t *testing.T) { assert.Equal(t, []byte(""), ParseLambdaPayload([]byte(""))) assert.Equal(t, []byte("{}"), ParseLambdaPayload([]byte("{}"))) From 5d27996bade18e9b410a27c10be27e60e8b3ae8a Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Wed, 10 Jan 2024 16:52:24 -0500 Subject: [PATCH 17/20] add serverless.go test --- pkg/serverless/serverless.go | 31 ++++++++++++++----------- pkg/serverless/serverless_test.go | 38 +++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/pkg/serverless/serverless.go b/pkg/serverless/serverless.go index 8131f72484e469..8d62338e81341d 100644 --- a/pkg/serverless/serverless.go +++ b/pkg/serverless/serverless.go @@ -142,19 +142,7 @@ func WaitForNextInvocation(stopCh chan struct{}, daemon *daemon.Daemon, id regis metrics.SendErrorsEnhancedMetric(metricTags, time.Now(), daemon.MetricAgent.Demux) if !daemon.IsExecutionSpanComplete() { - ecs := daemon.ExecutionContext.GetCurrentState() - timeoutDetails := &invocationlifecycle.InvocationEndDetails{ - RequestID: ecs.LastRequestID, - Runtime: ecs.Runtime, - ColdStart: coldStartTags.IsColdStart, - ProactiveInit: coldStartTags.IsProactiveInit, - EndTime: time.Now(), - IsError: true, - IsTimeout: true, - ResponseRawPayload: nil, - } - daemon.InvocationProcessor.OnInvokeEnd(timeoutDetails) - daemon.SetExecutionSpanComplete(true) + finishTimeoutExecutionSpan(daemon, coldStartTags.IsColdStart, coldStartTags.IsProactiveInit) } } err := daemon.ExecutionContext.SaveCurrentExecutionContext() @@ -231,3 +219,20 @@ func removeQualifierFromArn(functionArn string) string { } return functionArn } + +func finishTimeoutExecutionSpan(daemon *daemon.Daemon, isColdStart bool, isProactiveInit bool) { + ecs := daemon.ExecutionContext.GetCurrentState() + timeoutDetails := &invocationlifecycle.InvocationEndDetails{ + RequestID: ecs.LastRequestID, + Runtime: ecs.Runtime, + ColdStart: isColdStart, + ProactiveInit: isProactiveInit, + EndTime: time.Now(), + IsError: true, + IsTimeout: true, + ResponseRawPayload: nil, + } + log.Debug("Could not complete the execution span due to a time out. Attempting to finish the span without details from the tracer.") + daemon.InvocationProcessor.OnInvokeEnd(timeoutDetails) + daemon.SetExecutionSpanComplete(true) +} diff --git a/pkg/serverless/serverless_test.go b/pkg/serverless/serverless_test.go index ccd144ea939bdb..58ccad5a1e02c7 100644 --- a/pkg/serverless/serverless_test.go +++ b/pkg/serverless/serverless_test.go @@ -15,6 +15,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/DataDog/datadog-agent/pkg/serverless/daemon" + "github.com/DataDog/datadog-agent/pkg/serverless/invocationlifecycle" + "github.com/DataDog/datadog-agent/pkg/serverless/trace" + "github.com/DataDog/datadog-agent/pkg/trace/testutil" ) func TestMain(m *testing.M) { @@ -69,3 +72,38 @@ func TestRemoveQualifierFromArnWithoutAlias(t *testing.T) { functionArn := removeQualifierFromArn(invokedFunctionArn) assert.Equal(t, functionArn, invokedFunctionArn) } + +type mockLifecycleProcessor struct { + isError bool + isTimeout bool + isColdStart bool + isProactiveInit bool +} + +func (m *mockLifecycleProcessor) GetExecutionInfo() *invocationlifecycle.ExecutionStartInfo { + return &invocationlifecycle.ExecutionStartInfo{} +} +func (m *mockLifecycleProcessor) OnInvokeStart(*invocationlifecycle.InvocationStartDetails) {} +func (m *mockLifecycleProcessor) OnInvokeEnd(endDetails *invocationlifecycle.InvocationEndDetails) { + m.isError = endDetails.IsError + m.isTimeout = endDetails.IsTimeout + m.isColdStart = endDetails.ColdStart + m.isProactiveInit = endDetails.ProactiveInit +} + +func TestFinishTimeoutExecutionSpan(t *testing.T) { + port := testutil.FreeTCPPort(t) + d := daemon.StartDaemon(fmt.Sprintf("127.0.0.1:%d", port)) + d.TraceAgent = &trace.ServerlessTraceAgent{} + mock := &mockLifecycleProcessor{} + d.InvocationProcessor = mock + defer d.Stop() + + assert.False(t, d.IsExecutionSpanComplete()) + finishTimeoutExecutionSpan(d, true, true) + assert.True(t, d.IsExecutionSpanComplete()) + assert.True(t, mock.isError) + assert.True(t, mock.isTimeout) + assert.True(t, mock.isColdStart) + assert.True(t, mock.isProactiveInit) +} From 70cbf8333c76f7af907db34544bf4427703ea7c2 Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Wed, 10 Jan 2024 17:17:54 -0500 Subject: [PATCH 18/20] add test /hello for route --- pkg/serverless/daemon/routes_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pkg/serverless/daemon/routes_test.go b/pkg/serverless/daemon/routes_test.go index ca2dba60cdea31..32cfdf1987694e 100644 --- a/pkg/serverless/daemon/routes_test.go +++ b/pkg/serverless/daemon/routes_test.go @@ -159,6 +159,30 @@ func TestTraceContext(t *testing.T) { } } +func TestHello(t *testing.T) { + assert := assert.New(t) + + port := testutil.FreeTCPPort(t) + d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port)) + time.Sleep(100 * time.Millisecond) + defer d.Stop() + d.InvocationProcessor = &invocationlifecycle.LifecycleProcessor{ + ExtraTags: d.ExtraTags, + Demux: nil, + ProcessTrace: nil, + DetectLambdaLibrary: d.IsLambdaLibraryDetected, + } + client := &http.Client{} + body := bytes.NewBuffer([]byte(`{}`)) + request, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/lambda/hello", port), body) + assert.Nil(err) + assert.False(d.IsLambdaLibraryDetected()) + response, err := client.Do(request) + assert.Nil(err) + response.Body.Close() + assert.True(d.IsLambdaLibraryDetected()) +} + func TestStartEndInvocationSpanParenting(t *testing.T) { port := testutil.FreeTCPPort(t) d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port)) From 5292f399df9f513cc74cc293d3da0c612af9aadd Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Mon, 8 Apr 2024 17:22:32 -0400 Subject: [PATCH 19/20] only set span incomplete when /startInvocation has been hit --- pkg/serverless/daemon/daemon.go | 16 ++++++++-------- pkg/serverless/daemon/routes.go | 4 ++-- pkg/serverless/daemon/routes_test.go | 6 +++--- pkg/serverless/serverless.go | 4 ++-- pkg/serverless/serverless_test.go | 6 ++++-- 5 files changed, 19 insertions(+), 17 deletions(-) diff --git a/pkg/serverless/daemon/daemon.go b/pkg/serverless/daemon/daemon.go index a5d4e57747a9a5..58bc1ac85190e3 100644 --- a/pkg/serverless/daemon/daemon.go +++ b/pkg/serverless/daemon/daemon.go @@ -69,8 +69,8 @@ type Daemon struct { // LambdaLibraryStateLock keeps track of whether the Datadog Lambda Library was detected in the environment LambdaLibraryStateLock sync.Mutex - // executionSpanComplete indicates whether the Lambda span has been completed by the Extension - executionSpanComplete bool + // executionSpanIncomplete indicates whether the Lambda span has been completed by the Extension + executionSpanIncomplete bool // ExecutionSpanStateLock keeps track of whether the serverless Invocation routes have been hit to complete the execution span ExecutionSpanStateLock sync.Mutex @@ -452,16 +452,16 @@ func (d *Daemon) IsLambdaLibraryDetected() bool { return d.LambdaLibraryDetected } -// IsExecutionSpanComplete checks if the Lambda execution span was finished -func (d *Daemon) IsExecutionSpanComplete() bool { +// IsExecutionSpanIncomplete checks if the Lambda execution span was finished +func (d *Daemon) IsExecutionSpanIncomplete() bool { d.ExecutionSpanStateLock.Lock() defer d.ExecutionSpanStateLock.Unlock() - return d.executionSpanComplete + return d.executionSpanIncomplete } -// SetExecutionSpanComplete keeps track of whether the Extension completed the Lambda execution span -func (d *Daemon) SetExecutionSpanComplete(spanComplete bool) { +// SetExecutionSpanIncomplete keeps track of whether the Extension completed the Lambda execution span +func (d *Daemon) SetExecutionSpanIncomplete(spanIncomplete bool) { d.ExecutionSpanStateLock.Lock() defer d.ExecutionSpanStateLock.Unlock() - d.executionSpanComplete = spanComplete + d.executionSpanIncomplete = spanIncomplete } diff --git a/pkg/serverless/daemon/routes.go b/pkg/serverless/daemon/routes.go index ea3f35493f7bc1..93e113782dbb88 100644 --- a/pkg/serverless/daemon/routes.go +++ b/pkg/serverless/daemon/routes.go @@ -55,7 +55,7 @@ type StartInvocation struct { func (s *StartInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.StartInvocation route.") - s.daemon.SetExecutionSpanComplete(false) + s.daemon.SetExecutionSpanIncomplete(true) startTime := time.Now() reqBody, err := io.ReadAll(r.Body) if err != nil { @@ -89,7 +89,7 @@ type EndInvocation struct { func (e *EndInvocation) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Debug("Hit on the serverless.EndInvocation route.") - e.daemon.SetExecutionSpanComplete(true) + e.daemon.SetExecutionSpanIncomplete(false) endTime := time.Now() ecs := e.daemon.ExecutionContext.GetCurrentState() coldStartTags := e.daemon.ExecutionContext.GetColdStartTagsForRequestID(ecs.LastRequestID) diff --git a/pkg/serverless/daemon/routes_test.go b/pkg/serverless/daemon/routes_test.go index 831687fd4bab0b..0cdae0c594057d 100644 --- a/pkg/serverless/daemon/routes_test.go +++ b/pkg/serverless/daemon/routes_test.go @@ -356,7 +356,7 @@ func TestStartEndInvocationSpanParenting(t *testing.T) { } } -func TestStartEndInvocationIsExecutionSpanComplete(t *testing.T) { +func TestStartEndInvocationIsExecutionSpanIncomplete(t *testing.T) { assert := assert.New(t) port := testutil.FreeTCPPort(t) d := StartDaemon(fmt.Sprintf("127.0.0.1:%d", port)) @@ -374,7 +374,7 @@ func TestStartEndInvocationIsExecutionSpanComplete(t *testing.T) { assert.Nil(err) startResp.Body.Close() assert.True(m.OnInvokeStartCalled) - assert.False(d.IsExecutionSpanComplete()) + assert.True(d.IsExecutionSpanIncomplete()) body = bytes.NewBuffer([]byte(`{}`)) endReq, err := http.NewRequest(http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/lambda/end-invocation", port), body) @@ -383,7 +383,7 @@ func TestStartEndInvocationIsExecutionSpanComplete(t *testing.T) { assert.Nil(err) endResp.Body.Close() assert.True(m.OnInvokeEndCalled) - assert.True(d.IsExecutionSpanComplete()) + assert.False(d.IsExecutionSpanIncomplete()) } // Helper function for reading test file diff --git a/pkg/serverless/serverless.go b/pkg/serverless/serverless.go index 8d62338e81341d..f8408c2fd35d69 100644 --- a/pkg/serverless/serverless.go +++ b/pkg/serverless/serverless.go @@ -141,7 +141,7 @@ func WaitForNextInvocation(stopCh chan struct{}, daemon *daemon.Daemon, id regis metrics.SendTimeoutEnhancedMetric(metricTags, daemon.MetricAgent.Demux) metrics.SendErrorsEnhancedMetric(metricTags, time.Now(), daemon.MetricAgent.Demux) - if !daemon.IsExecutionSpanComplete() { + if daemon.IsExecutionSpanIncomplete() { finishTimeoutExecutionSpan(daemon, coldStartTags.IsColdStart, coldStartTags.IsProactiveInit) } } @@ -234,5 +234,5 @@ func finishTimeoutExecutionSpan(daemon *daemon.Daemon, isColdStart bool, isProac } log.Debug("Could not complete the execution span due to a time out. Attempting to finish the span without details from the tracer.") daemon.InvocationProcessor.OnInvokeEnd(timeoutDetails) - daemon.SetExecutionSpanComplete(true) + daemon.SetExecutionSpanIncomplete(false) } diff --git a/pkg/serverless/serverless_test.go b/pkg/serverless/serverless_test.go index 58ccad5a1e02c7..14bd868ab65482 100644 --- a/pkg/serverless/serverless_test.go +++ b/pkg/serverless/serverless_test.go @@ -99,9 +99,11 @@ func TestFinishTimeoutExecutionSpan(t *testing.T) { d.InvocationProcessor = mock defer d.Stop() - assert.False(t, d.IsExecutionSpanComplete()) + assert.False(t, d.IsExecutionSpanIncomplete()) + d.SetExecutionSpanIncomplete(true) + assert.True(t, d.IsExecutionSpanIncomplete()) finishTimeoutExecutionSpan(d, true, true) - assert.True(t, d.IsExecutionSpanComplete()) + assert.False(t, d.IsExecutionSpanIncomplete()) assert.True(t, mock.isError) assert.True(t, mock.isTimeout) assert.True(t, mock.isColdStart) From 7148b7d6cb52ee15401bdc66697565e4358c1a7c Mon Sep 17 00:00:00 2001 From: Dylan Yang Date: Mon, 8 Apr 2024 17:26:20 -0400 Subject: [PATCH 20/20] time out -> timeout Co-authored-by: Duncan Harvey <35278470+duncanpharvey@users.noreply.github.com> --- pkg/serverless/serverless.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/serverless/serverless.go b/pkg/serverless/serverless.go index f8408c2fd35d69..24c04e22a08ad6 100644 --- a/pkg/serverless/serverless.go +++ b/pkg/serverless/serverless.go @@ -232,7 +232,7 @@ func finishTimeoutExecutionSpan(daemon *daemon.Daemon, isColdStart bool, isProac IsTimeout: true, ResponseRawPayload: nil, } - log.Debug("Could not complete the execution span due to a time out. Attempting to finish the span without details from the tracer.") + log.Debug("Could not complete the execution span due to a timeout. Attempting to finish the span without details from the tracer.") daemon.InvocationProcessor.OnInvokeEnd(timeoutDetails) daemon.SetExecutionSpanIncomplete(false) }