Skip to content

Commit

Permalink
[SVLS-4999] Add Lambda tags to metrics sent via the API (#559)
Browse files Browse the repository at this point in the history
* tags for API metrics

* warn on timestamp

* make pretty

* modify test

* lint

* simplify + fix test

* improve log message + add test

* add test for tags

* fix test

* handle arn tag alias

* convert number into constant

---------

Co-authored-by: jordan gonzález <[email protected]>
  • Loading branch information
DylanLovesCoffee and duncanista authored Jul 16, 2024
1 parent 1273883 commit 211d047
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 13 deletions.
12 changes: 12 additions & 0 deletions src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,18 @@ describe("sendDistributionMetricWithDate", () => {
sendDistributionMetricWithDate("metric", 1, new Date(), "first-tag", "second-tag");
expect(_metricsQueue.length).toBe(1);
});
it("attaches tags from Datadog environment variables to the metric", () => {
process.env.DD_TAGS = "foo:bar,hello:world";
sendDistributionMetricWithDate("metric", 1, new Date(Date.now() - 1 * 60 * 60 * 1000), "first-tag", "second-tag");
expect(_metricsQueue.length).toBe(1);
const metricTags = _metricsQueue.pop()?.tags;
expect(metricTags).toBeDefined();
["first-tag", "second-tag", `dd_lambda_layer:datadog-node${process.version}`, "foo:bar", "hello:world"].forEach(
(tag) => {
expect(metricTags?.indexOf(tag)).toBeGreaterThanOrEqual(0);
},
);
});
});

describe("emitTelemetryOnErrorOutsideHandler", () => {
Expand Down
17 changes: 15 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export function datadog<TEvent, TResult>(

try {
await traceListener.onStartInvocation(event, context);
await metricsListener.onStartInvocation(event);
await metricsListener.onStartInvocation(event, context);
if (finalConfig.enhancedMetrics) {
incrementInvocationsMetric(metricsListener, context);
}
Expand Down Expand Up @@ -267,7 +267,7 @@ export function extractArgs<TEvent>(isResponseStreamFunction: boolean, ...args:
* @param tags The tags associated with the metric. Should be of the format "tag:value".
*/
export function sendDistributionMetricWithDate(name: string, value: number, metricTime: Date, ...tags: string[]) {
tags = [...tags, getRuntimeTag()];
tags = [...tags, getRuntimeTag(), ...getDDTags()];

if (currentMetricsListener !== undefined) {
currentMetricsListener.sendDistributionMetricWithDate(name, value, metricTime, false, ...tags);
Expand Down Expand Up @@ -419,6 +419,19 @@ function getRuntimeTag(): string {
return `dd_lambda_layer:datadog-node${version}`;
}

function getDDTags(): string[] {
const ddTags = getEnvValue("DD_TAGS", "").split(",");
const ddService = getEnvValue("DD_SERVICE", "");
if (ddService.length > 0) {
ddTags.push(`service:${ddService}`);
}
const ddEnv = getEnvValue("DD_ENV", "");
if (ddEnv.length > 0) {
ddTags.push(`env:${ddEnv}`);
}
return ddTags;
}

export async function emitTelemetryOnErrorOutsideHandler(
error: Error,
functionName: string,
Expand Down
4 changes: 1 addition & 3 deletions src/metrics/batcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ export class Batcher {
* Convert batched metrics to a list of api compatible metrics
*/
public toAPIMetrics(): APIMetric[] {
return [...this.metrics.values()]
.map((metric) => metric.toAPIMetrics()) // No flatMap support yet in node 10
.reduce((prev, curr) => prev.concat(curr), []);
return [...this.metrics.values()].flatMap((metric) => metric.toAPIMetrics());
}

private getBatchKey(metric: Metric): string {
Expand Down
39 changes: 34 additions & 5 deletions src/metrics/listener.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { EXTENSION_URL } from "./extension";

import { MetricsListener } from "./listener";
import StatsDClient from "hot-shots";
import { Context } from "aws-lambda";
jest.mock("hot-shots");

const siteURL = "example.com";
Expand Down Expand Up @@ -143,7 +144,7 @@ describe("MetricsListener", () => {
mock({
"/opt/extensions/datadog-agent": Buffer.from([0]),
});
nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {});
const apiScope = nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {});

const distributionMock = jest.fn();
(StatsDClient as any).mockImplementation(() => {
Expand All @@ -153,8 +154,6 @@ describe("MetricsListener", () => {
};
});

jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);

const metricTimeOneMinuteAgo = new Date(Date.now() - 60000);
const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
Expand All @@ -166,8 +165,12 @@ describe("MetricsListener", () => {
localTesting: true,
siteURL,
});
const mockARN = "arn:aws:lambda:us-east-1:123497598159:function:my-test-lambda";
const mockContext = {
invokedFunctionArn: mockARN,
} as any as Context;

await listener.onStartInvocation({});
await listener.onStartInvocation({}, mockContext);
listener.sendDistributionMetricWithDate(
"my-metric-with-a-timestamp",
10,
Expand All @@ -178,11 +181,37 @@ describe("MetricsListener", () => {
);
listener.sendDistributionMetric("my-metric-without-a-timestamp", 10, false, "tag:a", "tag:b");
await listener.onCompleteInvocation();

expect(flushScope.isDone()).toBeTruthy();
expect(nock.isDone()).toBeTruthy();
expect(apiScope.isDone()).toBeTruthy();
expect(distributionMock).toHaveBeenCalledWith("my-metric-without-a-timestamp", 10, undefined, ["tag:a", "tag:b"]);
});

it("does not send historical metrics from over 4 hours ago to the API", async () => {
mock({
"/opt/extensions/datadog-agent": Buffer.from([0]),
});
const apiScope = nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {});

const metricTimeFiveHoursAgo = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 hours ago
const kms = new MockKMS("kms-api-key-decrypted");
const listener = new MetricsListener(kms as any, {
apiKey: "api-key",
apiKeyKMS: "",
enhancedMetrics: false,
logForwarding: false,
shouldRetryMetrics: false,
localTesting: true,
siteURL,
});

await listener.onStartInvocation({});
listener.sendDistributionMetricWithDate("my-metric-with-a-timestamp", 10, metricTimeFiveHoursAgo, false);
await listener.onCompleteInvocation();

expect(apiScope.isDone()).toBeFalsy();
});

it("logs metrics when logForwarding is enabled with custom timestamp", async () => {
const spy = jest.spyOn(process.stdout, "write");
// jest.spyOn(Date, "now").mockImplementation(() => 1487076708000);
Expand Down
34 changes: 31 additions & 3 deletions src/metrics/listener.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { StatsD } from "hot-shots";
import { promisify } from "util";
import { logDebug, logError } from "../utils";
import { logDebug, logError, logWarning } from "../utils";
import { flushExtension, isExtensionRunning } from "./extension";
import { KMSService } from "./kms-service";
import { writeMetricToStdout } from "./metric-log";
import { Distribution } from "./model";
import { Context } from "aws-lambda";
import { getEnhancedMetricTags } from "./enhanced-metrics";

const METRICS_BATCH_SEND_INTERVAL = 10000; // 10 seconds
const HISTORICAL_METRICS_THRESHOLD_HOURS = 4 * 60 * 60 * 1000; // 4 hours

export interface MetricsConfig {
/**
Expand Down Expand Up @@ -58,13 +61,14 @@ export class MetricsListener {
private apiKey: Promise<string>;
private statsDClient?: StatsD;
private isExtensionRunning?: boolean = undefined;
private globalTags?: string[] = [];

constructor(private kmsClient: KMSService, private config: MetricsConfig) {
this.apiKey = this.getAPIKey(config);
this.config = config;
}

public async onStartInvocation(_: any) {
public async onStartInvocation(_: any, context?: Context) {
if (this.isExtensionRunning === undefined) {
this.isExtensionRunning = await isExtensionRunning();
logDebug(`Extension present: ${this.isExtensionRunning}`);
Expand All @@ -73,6 +77,7 @@ export class MetricsListener {
if (this.isExtensionRunning) {
logDebug(`Using StatsD client`);

this.globalTags = this.getGlobalTags(context);
this.statsDClient = new StatsD({ host: "127.0.0.1", closingFlushInterval: 1 });
return;
}
Expand Down Expand Up @@ -138,9 +143,18 @@ export class MetricsListener {
if (this.isExtensionRunning) {
const isMetricTimeValid = Date.parse(metricTime.toString()) > 0;
if (isMetricTimeValid) {
const dateCeiling = new Date(Date.now() - HISTORICAL_METRICS_THRESHOLD_HOURS); // 4 hours ago
if (dateCeiling > metricTime) {
logWarning(`Timestamp ${metricTime.toISOString()} is older than 4 hours, not submitting metric ${name}`);
return;
}
// Only create the processor to submit metrics to the API when a user provides a valid timestamp as
// Dogstatsd does not support timestamps for distributions.
this.currentProcessor = this.createProcessor(this.config, this.apiKey);
// Add global tags to metrics sent to the API
if (this.globalTags !== undefined && this.globalTags.length > 0) {
tags = [...tags, ...this.globalTags];
}
} else {
this.statsDClient?.distribution(name, value, undefined, tags);
return;
Expand Down Expand Up @@ -183,7 +197,7 @@ export class MetricsListener {
const url = `https://api.${config.siteURL}`;
const apiClient = new APIClient(key, url);
const processor = new Processor(apiClient, METRICS_BATCH_SEND_INTERVAL, config.shouldRetryMetrics);
processor.startProcessing();
processor.startProcessing(this.globalTags);
return processor;
}
}
Expand All @@ -202,4 +216,18 @@ export class MetricsListener {
}
return "";
}

private getGlobalTags(context?: Context) {
const tags = getEnhancedMetricTags(context);
if (context?.invokedFunctionArn) {
const splitArn = context.invokedFunctionArn.split(":");
if (splitArn.length > 7) {
// Get rid of the alias
splitArn.pop();
}
const arn = splitArn.join(":");
tags.push(`function_arn:${arn}`);
}
return tags;
}
}

0 comments on commit 211d047

Please sign in to comment.