diff --git a/src/index.spec.ts b/src/index.spec.ts index a11868d6..eba64684 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -555,6 +555,18 @@ describe("sendDistributionMetricWithDate", () => { sendDistributionMetricWithDate("metric", 1, new Date(), "first-tag", "second-tag"); expect(_metricsQueue.length).toBe(1); }); + it("attaches tags from Datadog environment variables to the metric", () => { + process.env.DD_TAGS = "foo:bar,hello:world"; + sendDistributionMetricWithDate("metric", 1, new Date(Date.now() - 1 * 60 * 60 * 1000), "first-tag", "second-tag"); + expect(_metricsQueue.length).toBe(1); + const metricTags = _metricsQueue.pop()?.tags; + expect(metricTags).toBeDefined(); + ["first-tag", "second-tag", `dd_lambda_layer:datadog-node${process.version}`, "foo:bar", "hello:world"].forEach( + (tag) => { + expect(metricTags?.indexOf(tag)).toBeGreaterThanOrEqual(0); + }, + ); + }); }); describe("emitTelemetryOnErrorOutsideHandler", () => { diff --git a/src/index.ts b/src/index.ts index c3715aa0..5e107006 100644 --- a/src/index.ts +++ b/src/index.ts @@ -153,7 +153,7 @@ export function datadog( try { await traceListener.onStartInvocation(event, context); - await metricsListener.onStartInvocation(event); + await metricsListener.onStartInvocation(event, context); if (finalConfig.enhancedMetrics) { incrementInvocationsMetric(metricsListener, context); } @@ -267,7 +267,7 @@ export function extractArgs(isResponseStreamFunction: boolean, ...args: * @param tags The tags associated with the metric. Should be of the format "tag:value". */ export function sendDistributionMetricWithDate(name: string, value: number, metricTime: Date, ...tags: string[]) { - tags = [...tags, getRuntimeTag()]; + tags = [...tags, getRuntimeTag(), ...getDDTags()]; if (currentMetricsListener !== undefined) { currentMetricsListener.sendDistributionMetricWithDate(name, value, metricTime, false, ...tags); @@ -419,6 +419,19 @@ function getRuntimeTag(): string { return `dd_lambda_layer:datadog-node${version}`; } +function getDDTags(): string[] { + const ddTags = getEnvValue("DD_TAGS", "").split(","); + const ddService = getEnvValue("DD_SERVICE", ""); + if (ddService.length > 0) { + ddTags.push(`service:${ddService}`); + } + const ddEnv = getEnvValue("DD_ENV", ""); + if (ddEnv.length > 0) { + ddTags.push(`env:${ddEnv}`); + } + return ddTags; +} + export async function emitTelemetryOnErrorOutsideHandler( error: Error, functionName: string, diff --git a/src/metrics/batcher.ts b/src/metrics/batcher.ts index 2c70c131..4da14e82 100644 --- a/src/metrics/batcher.ts +++ b/src/metrics/batcher.ts @@ -23,9 +23,7 @@ export class Batcher { * Convert batched metrics to a list of api compatible metrics */ public toAPIMetrics(): APIMetric[] { - return [...this.metrics.values()] - .map((metric) => metric.toAPIMetrics()) // No flatMap support yet in node 10 - .reduce((prev, curr) => prev.concat(curr), []); + return [...this.metrics.values()].flatMap((metric) => metric.toAPIMetrics()); } private getBatchKey(metric: Metric): string { diff --git a/src/metrics/listener.spec.ts b/src/metrics/listener.spec.ts index 244b0197..abf6771f 100644 --- a/src/metrics/listener.spec.ts +++ b/src/metrics/listener.spec.ts @@ -6,6 +6,7 @@ import { EXTENSION_URL } from "./extension"; import { MetricsListener } from "./listener"; import StatsDClient from "hot-shots"; +import { Context } from "aws-lambda"; jest.mock("hot-shots"); const siteURL = "example.com"; @@ -143,7 +144,7 @@ describe("MetricsListener", () => { mock({ "/opt/extensions/datadog-agent": Buffer.from([0]), }); - nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {}); + const apiScope = nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {}); const distributionMock = jest.fn(); (StatsDClient as any).mockImplementation(() => { @@ -153,8 +154,6 @@ describe("MetricsListener", () => { }; }); - jest.spyOn(Date, "now").mockImplementation(() => 1487076708000); - const metricTimeOneMinuteAgo = new Date(Date.now() - 60000); const kms = new MockKMS("kms-api-key-decrypted"); const listener = new MetricsListener(kms as any, { @@ -166,8 +165,12 @@ describe("MetricsListener", () => { localTesting: true, siteURL, }); + const mockARN = "arn:aws:lambda:us-east-1:123497598159:function:my-test-lambda"; + const mockContext = { + invokedFunctionArn: mockARN, + } as any as Context; - await listener.onStartInvocation({}); + await listener.onStartInvocation({}, mockContext); listener.sendDistributionMetricWithDate( "my-metric-with-a-timestamp", 10, @@ -178,11 +181,37 @@ describe("MetricsListener", () => { ); listener.sendDistributionMetric("my-metric-without-a-timestamp", 10, false, "tag:a", "tag:b"); await listener.onCompleteInvocation(); + expect(flushScope.isDone()).toBeTruthy(); - expect(nock.isDone()).toBeTruthy(); + expect(apiScope.isDone()).toBeTruthy(); expect(distributionMock).toHaveBeenCalledWith("my-metric-without-a-timestamp", 10, undefined, ["tag:a", "tag:b"]); }); + it("does not send historical metrics from over 4 hours ago to the API", async () => { + mock({ + "/opt/extensions/datadog-agent": Buffer.from([0]), + }); + const apiScope = nock("https://api.example.com").post("/api/v1/distribution_points?api_key=api-key").reply(200, {}); + + const metricTimeFiveHoursAgo = new Date(Date.now() - 5 * 60 * 60 * 1000); // 5 hours ago + const kms = new MockKMS("kms-api-key-decrypted"); + const listener = new MetricsListener(kms as any, { + apiKey: "api-key", + apiKeyKMS: "", + enhancedMetrics: false, + logForwarding: false, + shouldRetryMetrics: false, + localTesting: true, + siteURL, + }); + + await listener.onStartInvocation({}); + listener.sendDistributionMetricWithDate("my-metric-with-a-timestamp", 10, metricTimeFiveHoursAgo, false); + await listener.onCompleteInvocation(); + + expect(apiScope.isDone()).toBeFalsy(); + }); + it("logs metrics when logForwarding is enabled with custom timestamp", async () => { const spy = jest.spyOn(process.stdout, "write"); // jest.spyOn(Date, "now").mockImplementation(() => 1487076708000); diff --git a/src/metrics/listener.ts b/src/metrics/listener.ts index c8e4100c..4a50aecd 100644 --- a/src/metrics/listener.ts +++ b/src/metrics/listener.ts @@ -1,12 +1,15 @@ import { StatsD } from "hot-shots"; import { promisify } from "util"; -import { logDebug, logError } from "../utils"; +import { logDebug, logError, logWarning } from "../utils"; import { flushExtension, isExtensionRunning } from "./extension"; import { KMSService } from "./kms-service"; import { writeMetricToStdout } from "./metric-log"; import { Distribution } from "./model"; +import { Context } from "aws-lambda"; +import { getEnhancedMetricTags } from "./enhanced-metrics"; const METRICS_BATCH_SEND_INTERVAL = 10000; // 10 seconds +const HISTORICAL_METRICS_THRESHOLD_HOURS = 4 * 60 * 60 * 1000; // 4 hours export interface MetricsConfig { /** @@ -58,13 +61,14 @@ export class MetricsListener { private apiKey: Promise; private statsDClient?: StatsD; private isExtensionRunning?: boolean = undefined; + private globalTags?: string[] = []; constructor(private kmsClient: KMSService, private config: MetricsConfig) { this.apiKey = this.getAPIKey(config); this.config = config; } - public async onStartInvocation(_: any) { + public async onStartInvocation(_: any, context?: Context) { if (this.isExtensionRunning === undefined) { this.isExtensionRunning = await isExtensionRunning(); logDebug(`Extension present: ${this.isExtensionRunning}`); @@ -73,6 +77,7 @@ export class MetricsListener { if (this.isExtensionRunning) { logDebug(`Using StatsD client`); + this.globalTags = this.getGlobalTags(context); this.statsDClient = new StatsD({ host: "127.0.0.1", closingFlushInterval: 1 }); return; } @@ -138,9 +143,18 @@ export class MetricsListener { if (this.isExtensionRunning) { const isMetricTimeValid = Date.parse(metricTime.toString()) > 0; if (isMetricTimeValid) { + const dateCeiling = new Date(Date.now() - HISTORICAL_METRICS_THRESHOLD_HOURS); // 4 hours ago + if (dateCeiling > metricTime) { + logWarning(`Timestamp ${metricTime.toISOString()} is older than 4 hours, not submitting metric ${name}`); + return; + } // Only create the processor to submit metrics to the API when a user provides a valid timestamp as // Dogstatsd does not support timestamps for distributions. this.currentProcessor = this.createProcessor(this.config, this.apiKey); + // Add global tags to metrics sent to the API + if (this.globalTags !== undefined && this.globalTags.length > 0) { + tags = [...tags, ...this.globalTags]; + } } else { this.statsDClient?.distribution(name, value, undefined, tags); return; @@ -183,7 +197,7 @@ export class MetricsListener { const url = `https://api.${config.siteURL}`; const apiClient = new APIClient(key, url); const processor = new Processor(apiClient, METRICS_BATCH_SEND_INTERVAL, config.shouldRetryMetrics); - processor.startProcessing(); + processor.startProcessing(this.globalTags); return processor; } } @@ -202,4 +216,18 @@ export class MetricsListener { } return ""; } + + private getGlobalTags(context?: Context) { + const tags = getEnhancedMetricTags(context); + if (context?.invokedFunctionArn) { + const splitArn = context.invokedFunctionArn.split(":"); + if (splitArn.length > 7) { + // Get rid of the alias + splitArn.pop(); + } + const arn = splitArn.join(":"); + tags.push(`function_arn:${arn}`); + } + return tags; + } }