-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathserverless.go
269 lines (230 loc) · 10.5 KB
/
serverless.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
//nolint:revive // TODO(SERV) Fix revive linter
package serverless
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/DataDog/datadog-agent/pkg/serverless/proc"
json "github.com/json-iterator/go"
"github.com/DataDog/datadog-agent/pkg/serverless/daemon"
"github.com/DataDog/datadog-agent/pkg/serverless/flush"
"github.com/DataDog/datadog-agent/pkg/serverless/invocationlifecycle"
"github.com/DataDog/datadog-agent/pkg/serverless/metrics"
"github.com/DataDog/datadog-agent/pkg/serverless/registration"
"github.com/DataDog/datadog-agent/pkg/serverless/tags"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
const (
headerExtID string = "Lambda-Extension-Identifier"
headerExtErrType string = "Lambda-Extension-Function-Error-Type"
requestTimeout time.Duration = 5 * time.Second
safetyBufferTimeout time.Duration = 20 * time.Millisecond
// FatalNoAPIKey is the error reported to the AWS Extension environment when
// no API key has been set. Unused until we can report error
// without stopping the extension.
FatalNoAPIKey ErrorEnum = "Fatal.NoAPIKey"
// FatalDogstatsdInit is the error reported to the AWS Extension environment when
// DogStatsD fails to initialize properly. Unused until we can report error
// without stopping the extension.
FatalDogstatsdInit ErrorEnum = "Fatal.DogstatsdInit"
// FatalBadEndpoint is the error reported to the AWS Extension environment when
// bad endpoints have been configured. Unused until we can report error
// without stopping the extension.
FatalBadEndpoint ErrorEnum = "Fatal.BadEndpoint"
// FatalConnectFailed is the error reported to the AWS Extension environment when
// a connection failed.
FatalConnectFailed ErrorEnum = "Fatal.ConnectFailed"
// Invoke event
Invoke RuntimeEvent = "INVOKE"
// Shutdown event
Shutdown RuntimeEvent = "SHUTDOWN"
// Timeout is one of the possible ShutdownReasons
Timeout ShutdownReason = "timeout"
)
// ShutdownReason is an AWS Shutdown reason
type ShutdownReason string
// RuntimeEvent is an AWS Runtime event
type RuntimeEvent string
// ErrorEnum are errors reported to the AWS Extension environment.
type ErrorEnum string
// String returns the string value for this ErrorEnum.
func (e ErrorEnum) String() string {
return string(e)
}
// String returns the string value for this ShutdownReason.
func (s ShutdownReason) String() string {
return string(s)
}
// InvocationHandler is the invocation handler signature
type InvocationHandler func(doneChannel chan bool, daemon *daemon.Daemon, arn string, requestID string)
// Payload is the payload read in the response while subscribing to
// the AWS Extension env.
type Payload struct {
EventType RuntimeEvent `json:"eventType"`
DeadlineMs int64 `json:"deadlineMs"`
InvokedFunctionArn string `json:"invokedFunctionArn"`
ShutdownReason ShutdownReason `json:"shutdownReason"`
RequestID string `json:"requestId"`
}
// FlushableAgent allows flushing
type FlushableAgent interface {
Flush()
}
// WaitForNextInvocation makes a blocking HTTP call to receive the next event from AWS.
// Note that for now, we only subscribe to INVOKE and SHUTDOWN events.
// Write into stopCh to stop the main thread of the running program.
func WaitForNextInvocation(stopCh chan struct{}, daemon *daemon.Daemon, id registration.ID) error {
var err error
var request *http.Request
var response *http.Response
if request, err = http.NewRequest(http.MethodGet, registration.NextUrl(), nil); err != nil {
return fmt.Errorf("WaitForNextInvocation: can't create the GET request: %v", err)
}
request.Header.Set(headerExtID, id.String())
// make a blocking HTTP call to wait for the next event from AWS
client := &http.Client{Timeout: 0} // this one should never timeout
if response, err = client.Do(request); err != nil {
return fmt.Errorf("WaitForNextInvocation: while GET next route: %v", err)
}
// we received an INVOKE or SHUTDOWN event
daemon.StoreInvocationTime(time.Now())
var body []byte
if body, err = io.ReadAll(response.Body); err != nil {
return fmt.Errorf("WaitForNextInvocation: can't read the body: %v", err)
}
defer response.Body.Close()
var payload Payload
if err := json.Unmarshal(body, &payload); err != nil {
return fmt.Errorf("WaitForNextInvocation: can't unmarshal the payload: %v", err)
}
if payload.EventType == Invoke {
functionArn := removeQualifierFromArn(payload.InvokedFunctionArn)
callInvocationHandler(daemon, functionArn, payload.DeadlineMs, safetyBufferTimeout, payload.RequestID, handleInvocation)
} else if payload.EventType == Shutdown {
// Log collection can be safely called multiple times, so ensure we start log collection during a SHUTDOWN event too in case an INVOKE event is never received
daemon.StartLogCollection()
log.Debug("Received shutdown event. Reason: " + payload.ShutdownReason)
isTimeout := strings.ToLower(payload.ShutdownReason.String()) == Timeout.String()
if isTimeout {
coldStartTags := daemon.ExecutionContext.GetColdStartTagsForRequestID(daemon.ExecutionContext.LastRequestID())
metricTags := tags.AddColdStartTag(daemon.ExtraTags.Tags, coldStartTags.IsColdStart, coldStartTags.IsProactiveInit)
metricTags = tags.AddInitTypeTag(metricTags)
metrics.SendTimeoutEnhancedMetric(metricTags, daemon.MetricAgent.Demux)
metrics.SendErrorsEnhancedMetric(metricTags, time.Now(), daemon.MetricAgent.Demux)
if daemon.IsExecutionSpanIncomplete() {
finishTimeoutExecutionSpan(daemon, coldStartTags.IsColdStart, coldStartTags.IsProactiveInit)
}
}
err := daemon.ExecutionContext.SaveCurrentExecutionContext()
if err != nil {
log.Warnf("Unable to save the current state. Failed with: %s", err)
}
daemon.Stop()
stopCh <- struct{}{}
}
return nil
}
func callInvocationHandler(daemon *daemon.Daemon, arn string, deadlineMs int64, safetyBufferTimeout time.Duration, requestID string, invocationHandler InvocationHandler) {
cpuOffsetData, cpuOffsetErr := proc.GetCPUData()
uptimeOffset, uptimeOffsetErr := proc.GetUptime()
networkOffsetData, networkOffsetErr := proc.GetNetworkData()
sendProcessMetrics := make(chan bool)
go metrics.SendProcessEnhancedMetrics(sendProcessMetrics, daemon.ExtraTags.Tags, daemon.MetricAgent)
sendTmpMetrics := make(chan bool)
go metrics.SendTmpEnhancedMetrics(sendTmpMetrics, daemon.ExtraTags.Tags, daemon.MetricAgent)
timeout := computeTimeout(time.Now(), deadlineMs, safetyBufferTimeout)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
doneChannel := make(chan bool)
daemon.TellDaemonRuntimeStarted()
go invocationHandler(doneChannel, daemon, arn, requestID)
select {
case <-ctx.Done():
log.Debug("Timeout detected, finishing the current invocation now to allow receiving the SHUTDOWN event")
// Tell the Daemon that the runtime is done (even though it isn't, because it's timing out) so that we can receive the SHUTDOWN event
daemon.TellDaemonRuntimeDone()
break
case <-doneChannel:
break
}
sendSystemEnhancedMetrics(daemon, cpuOffsetErr == nil && uptimeOffsetErr == nil, networkOffsetErr == nil, uptimeOffset, cpuOffsetData, networkOffsetData, sendTmpMetrics, sendProcessMetrics)
}
func sendSystemEnhancedMetrics(daemon *daemon.Daemon, emitCPUMetrics, emitNetworkMetrics bool, uptimeOffset float64, cpuOffsetData *proc.CPUData, networkOffsetData *proc.NetworkData, sendTmpMetrics chan bool, sendProcessMetrics chan bool) {
if daemon.MetricAgent == nil {
log.Debug("Could not send system enhanced metrics")
return
}
close(sendTmpMetrics)
close(sendProcessMetrics)
if emitCPUMetrics {
metrics.SendCPUEnhancedMetrics(cpuOffsetData, uptimeOffset, daemon.ExtraTags.Tags, daemon.MetricAgent.Demux)
} else {
log.Debug("Could not send CPU enhanced metrics")
}
if emitNetworkMetrics {
metrics.SendNetworkEnhancedMetrics(networkOffsetData, daemon.ExtraTags.Tags, daemon.MetricAgent.Demux)
} else {
log.Debug("Could not send network enhanced metrics")
}
}
func handleInvocation(doneChannel chan bool, daemon *daemon.Daemon, arn string, requestID string) {
log.Debug("Received invocation event...")
daemon.ExecutionContext.SetFromInvocation(arn, requestID)
daemon.StartLogCollection()
coldStartTags := daemon.ExecutionContext.GetColdStartTagsForRequestID(requestID)
if daemon.MetricAgent != nil {
metricTags := tags.AddColdStartTag(daemon.ExtraTags.Tags, coldStartTags.IsColdStart, coldStartTags.IsProactiveInit)
metricTags = tags.AddInitTypeTag(metricTags)
metrics.SendInvocationEnhancedMetric(metricTags, daemon.MetricAgent.Demux)
} else {
log.Error("Could not send the invocation enhanced metric")
}
if coldStartTags.IsColdStart {
daemon.UpdateStrategy()
}
// immediately check if we should flush data
if daemon.ShouldFlush(flush.Starting) {
log.Debugf("The flush strategy %s has decided to flush at moment: %s", daemon.GetFlushStrategy(), flush.Starting)
daemon.TriggerFlush(false)
} else {
log.Debugf("The flush strategy %s has decided to not flush at moment: %s", daemon.GetFlushStrategy(), flush.Starting)
}
daemon.WaitForDaemon()
doneChannel <- true
}
func computeTimeout(now time.Time, deadlineMs int64, safetyBuffer time.Duration) time.Duration {
currentTimeInMs := now.UnixNano() / int64(time.Millisecond)
return time.Duration((deadlineMs-currentTimeInMs)*int64(time.Millisecond) - int64(safetyBuffer))
}
func removeQualifierFromArn(functionArn string) string {
functionArnTokens := strings.Split(functionArn, ":")
tokenLength := len(functionArnTokens)
if tokenLength > 7 {
functionArnTokens = functionArnTokens[:tokenLength-1]
return strings.Join(functionArnTokens, ":")
}
return functionArn
}
func finishTimeoutExecutionSpan(daemon *daemon.Daemon, isColdStart bool, isProactiveInit bool) {
ecs := daemon.ExecutionContext.GetCurrentState()
timeoutDetails := &invocationlifecycle.InvocationEndDetails{
RequestID: ecs.LastRequestID,
Runtime: ecs.Runtime,
ColdStart: isColdStart,
ProactiveInit: isProactiveInit,
EndTime: time.Now(),
IsError: true,
IsTimeout: true,
ResponseRawPayload: nil,
}
log.Debug("Could not complete the execution span due to a timeout. Attempting to finish the span without details from the tracer.")
daemon.InvocationProcessor.OnInvokeEnd(timeoutDetails)
daemon.SetExecutionSpanIncomplete(false)
}