Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SVLS-4999] Add Lambda tags to metrics sent via the API #559

Merged
merged 12 commits into from
Jul 16, 2024
12 changes: 12 additions & 0 deletions src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,18 @@ describe("sendDistributionMetricWithDate", () => {
sendDistributionMetricWithDate("metric", 1, new Date(), "first-tag", "second-tag");
expect(_metricsQueue.length).toBe(1);
});
it("attaches tags from Datadog environment variables to the metric", () => {
process.env.DD_TAGS = "foo:bar,hello:world";
sendDistributionMetricWithDate("metric", 1, new Date(Date.now() - 1 * 60 * 60 * 1000), "first-tag", "second-tag");
expect(_metricsQueue.length).toBe(1);
const metricTags = _metricsQueue.pop()?.tags;
expect(metricTags).toBeDefined();
["first-tag", "second-tag", `dd_lambda_layer:datadog-node${process.version}`, "foo:bar", "hello:world"].forEach(
(tag) => {
expect(metricTags?.indexOf(tag)).toBeGreaterThanOrEqual(0);
},
);
});
});

describe("emitTelemetryOnErrorOutsideHandler", () => {
Expand Down
17 changes: 15 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export function datadog<TEvent, TResult>(

try {
await traceListener.onStartInvocation(event, context);
await metricsListener.onStartInvocation(event);
await metricsListener.onStartInvocation(event, context);
if (finalConfig.enhancedMetrics) {
incrementInvocationsMetric(metricsListener, context);
}
Expand Down Expand Up @@ -267,7 +267,7 @@ export function extractArgs<TEvent>(isResponseStreamFunction: boolean, ...args:
* @param tags The tags associated with the metric. Should be of the format "tag:value".
*/
export function sendDistributionMetricWithDate(name: string, value: number, metricTime: Date, ...tags: string[]) {
tags = [...tags, getRuntimeTag()];
tags = [...tags, getRuntimeTag(), ...getDDTags()];

if (currentMetricsListener !== undefined) {
currentMetricsListener.sendDistributionMetricWithDate(name, value, metricTime, false, ...tags);
Expand Down Expand Up @@ -419,6 +419,19 @@ function getRuntimeTag(): string {
return `dd_lambda_layer:datadog-node${version}`;
}

function getDDTags(): string[] {
const ddTags = getEnvValue("DD_TAGS", "").split(",");
const ddService = getEnvValue("DD_SERVICE", "");
if (ddService.length > 0) {
ddTags.push(`service:${ddService}`);
}
const ddEnv = getEnvValue("DD_ENV", "");
if (ddEnv.length > 0) {
ddTags.push(`env:${ddEnv}`);
}
return ddTags;
}

export async function emitTelemetryOnErrorOutsideHandler(
error: Error,
functionName: string,
Expand Down
4 changes: 1 addition & 3 deletions src/metrics/batcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ export class Batcher {
* Convert batched metrics to a list of api compatible metrics
*/
public toAPIMetrics(): APIMetric[] {
return [...this.metrics.values()]
.map((metric) => metric.toAPIMetrics()) // No flatMap support yet in node 10
.reduce((prev, curr) => prev.concat(curr), []);
return [...this.metrics.values()].flatMap((metric) => metric.toAPIMetrics());
}

private getBatchKey(metric: Metric): string {
Expand Down
39 changes: 34 additions & 5 deletions src/metrics/listener.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { EXTENSION_URL } from "./extension";

import { MetricsListener } from "./listener";
import StatsDClient from "hot-shots";
import { Context } from "aws-lambda";
jest.mock("hot-shots");

const siteURL = "example.com";
Expand Down Expand Up @@ -143,7 +144,7 @@ describe("MetricsListener", () => {
mock({
"/opt/extensions/datadog-agent": Buffer.from([0]),
});
nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {});
const apiScope = nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {});

const distributionMock = jest.fn();
(StatsDClient as any).mockImplementation(() => {
Expand All @@ -153,8 +154,6 @@ describe("MetricsListener", () => {
};
});

jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);

const metricTimeOneMinuteAgo = new Date(Date.now() - 60000);
const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
Expand All @@ -166,8 +165,12 @@ describe("MetricsListener", () => {
localTesting: true,
siteURL,
});
const mockARN = "arn:aws:lambda:us-east-1:123497598159:function:my-test-lambda";
const mockContext = {
invokedFunctionArn: mockARN,
} as any as Context;

await listener.onStartInvocation({});
await listener.onStartInvocation({}, mockContext);
listener.sendDistributionMetricWithDate(
"my-metric-with-a-timestamp",
10,
Expand All @@ -178,11 +181,37 @@ describe("MetricsListener", () => {
);
listener.sendDistributionMetric("my-metric-without-a-timestamp", 10, false, "tag:a", "tag:b");
await listener.onCompleteInvocation();

expect(flushScope.isDone()).toBeTruthy();
expect(nock.isDone()).toBeTruthy();
expect(apiScope.isDone()).toBeTruthy();
expect(distributionMock).toHaveBeenCalledWith("my-metric-without-a-timestamp", 10, undefined, ["tag:a", "tag:b"]);
});

it("does not send historical metrics from over 4 hours ago to the API", async () => {
mock({
"/opt/extensions/datadog-agent": Buffer.from([0]),
});
const apiScope = nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {});

const metricTimeFiveHoursAgo = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 hours ago
const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
apiKey: "api-key",
apiKeyKMS: "",
enhancedMetrics: false,
logForwarding: false,
shouldRetryMetrics: false,
localTesting: true,
siteURL,
});

await listener.onStartInvocation({});
listener.sendDistributionMetricWithDate("my-metric-with-a-timestamp", 10, metricTimeFiveHoursAgo, false);
await listener.onCompleteInvocation();

expect(apiScope.isDone()).toBeFalsy();
});

it("logs metrics when logForwarding is enabled with custom timestamp", async () => {
const spy = jest.spyOn(process.stdout, "write");
// jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);
Expand Down
33 changes: 30 additions & 3 deletions src/metrics/listener.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { StatsD } from "hot-shots";
import { promisify } from "util";
import { logDebug, logError } from "../utils";
import { logDebug, logError, logWarning } from "../utils";
import { flushExtension, isExtensionRunning } from "./extension";
import { KMSService } from "./kms-service";
import { writeMetricToStdout } from "./metric-log";
import { Distribution } from "./model";
import { Context } from "aws-lambda";
import { getEnhancedMetricTags } from "./enhanced-metrics";

const METRICS_BATCH_SEND_INTERVAL = 10000; // 10 seconds

Expand Down Expand Up @@ -58,13 +60,14 @@ export class MetricsListener {
private apiKey: Promise<string>;
private statsDClient?: StatsD;
private isExtensionRunning?: boolean = undefined;
private globalTags?: string[] = [];

constructor(private kmsClient: KMSService, private config: MetricsConfig) {
this.apiKey = this.getAPIKey(config);
this.config = config;
}

public async onStartInvocation(_: any) {
public async onStartInvocation(_: any, context?: Context) {
if (this.isExtensionRunning === undefined) {
this.isExtensionRunning = await isExtensionRunning();
logDebug(`Extension present: ${this.isExtensionRunning}`);
Expand All @@ -73,6 +76,7 @@ export class MetricsListener {
if (this.isExtensionRunning) {
logDebug(`Using StatsD client`);

this.globalTags = this.getGlobalTags(context);
this.statsDClient = new StatsD({ host: "127.0.0.1", closingFlushInterval: 1 });
return;
}
Expand Down Expand Up @@ -138,9 +142,18 @@ export class MetricsListener {
if (this.isExtensionRunning) {
const isMetricTimeValid = Date.parse(metricTime.toString()) > 0;
if (isMetricTimeValid) {
const dateCeiling = new Date(Date.now() - 4 * 60 * 60 * 1000); // 4 hours ago
if (dateCeiling > metricTime) {
logWarning(`Timestamp ${metricTime.toISOString()} is older than 4 hours, not submitting metric ${name}`);
return;
}
// Only create the processor to submit metrics to the API when a user provides a valid timestamp as
// Dogstatsd does not support timestamps for distributions.
this.currentProcessor = this.createProcessor(this.config, this.apiKey);
// Add global tags to metrics sent to the API
if (this.globalTags !== undefined && this.globalTags.length > 0) {
tags = [...tags, ...this.globalTags];
}
} else {
this.statsDClient?.distribution(name, value, undefined, tags);
return;
Expand Down Expand Up @@ -183,7 +196,7 @@ export class MetricsListener {
const url = `https://api.${config.siteURL}`;
const apiClient = new APIClient(key, url);
const processor = new Processor(apiClient, METRICS_BATCH_SEND_INTERVAL, config.shouldRetryMetrics);
processor.startProcessing();
processor.startProcessing(this.globalTags);
return processor;
}
}
Expand All @@ -202,4 +215,18 @@ export class MetricsListener {
}
return "";
}

private getGlobalTags(context?: Context) {
const tags = getEnhancedMetricTags(context);
if (context?.invokedFunctionArn) {
const splitArn = context.invokedFunctionArn.split(":");
if (splitArn.length > 7) {
// Get rid of the alias
splitArn.pop();
}
const arn = splitArn.join(":");
tags.push(`function_arn:${arn}`);
}
return tags;
}
}
Loading