From 6810d0c99d61890452a027781159383ae3b84d44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jordan=20gonz=C3=A1lez?= <30836115+duncanista@users.noreply.github.com> Date: Tue, 26 Mar 2024 11:03:07 -0400 Subject: [PATCH] feat: support sending metrics before handler initialization (#524) * add `warning` level to logger * export `logWarning` from `utils` * add `queue.ts` for `MetricsQueue` * export `MetricsQueue` from `metrics` * use `MetricsQueue` for global metrics queue for later processing when using on handler not initialized * remove unused import * format and lint * remove unused import * format and lint * add entry to integration tets * fix `return` to `continue` * update `shift` to `pop` reducing time complexity to `O(2n)` --- integration_tests/send-metrics.js | 7 ++ .../snapshots/logs/async-metrics_node16.log | 16 +++- .../snapshots/logs/async-metrics_node18.log | 16 +++- .../snapshots/logs/async-metrics_node20.log | 16 +++- src/index.spec.ts | 31 +++++++- src/index.ts | 43 +++++++---- src/metrics/index.ts | 1 + src/metrics/queue.spec.ts | 76 +++++++++++++++++++ src/metrics/queue.ts | 58 ++++++++++++++ src/utils/index.ts | 2 +- src/utils/log.spec.ts | 20 ++++- src/utils/log.ts | 9 +++ 12 files changed, 269 insertions(+), 26 deletions(-) create mode 100644 src/metrics/queue.spec.ts create mode 100644 src/metrics/queue.ts diff --git a/integration_tests/send-metrics.js b/integration_tests/send-metrics.js index 7bee8506..a91d5945 100644 --- a/integration_tests/send-metrics.js +++ b/integration_tests/send-metrics.js @@ -1,5 +1,12 @@ const { datadog, sendDistributionMetric } = require("datadog-lambda-js"); +sendDistributionMetric( + "serverless.integration_test.outside_handler", + 1, + "tagkey:tagvalue", + `eventsource:outside_handler`, +); + async function handle(event, context) { const responsePayload = { message: "hello, dog!" }; diff --git a/integration_tests/snapshots/logs/async-metrics_node16.log b/integration_tests/snapshots/logs/async-metrics_node16.log index ee3c4f6b..061590da 100644 --- a/integration_tests/snapshots/logs/async-metrics_node16.log +++ b/integration_tests/snapshots/logs/async-metrics_node16.log @@ -15,6 +15,17 @@ START ], "v": 1 } +{ + "e": XXXX, + "m": "serverless.integration_test.outside_handler", + "t": [ + "tagkey:tagvalue", + "eventsource:outside_handler", + "dd_lambda_layer:datadog-nodev16.XX.X" + ], + "v": 1 +} +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request { "e": XXXX, "m": "serverless.integration_test.execution", @@ -25,7 +36,6 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB START { @@ -43,6 +53,7 @@ START ], "v": 1 } +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request { "e": XXXX, "m": "serverless.integration_test.records_processed", @@ -63,9 +74,9 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request END Duration: XXXX ms Memory Used: XXXX MB START +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request { "e": XXXX, "m": "aws.lambda.enhanced.invocations", @@ -111,5 +122,4 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request END Duration: XXXX ms Memory Used: XXXX MB diff --git a/integration_tests/snapshots/logs/async-metrics_node18.log b/integration_tests/snapshots/logs/async-metrics_node18.log index 2a36fed1..3c476b5c 100644 --- a/integration_tests/snapshots/logs/async-metrics_node18.log +++ b/integration_tests/snapshots/logs/async-metrics_node18.log @@ -15,6 +15,17 @@ START ], "v": 1 } +{ + "e": XXXX, + "m": "serverless.integration_test.outside_handler", + "t": [ + "tagkey:tagvalue", + "eventsource:outside_handler", + "dd_lambda_layer:datadog-nodev18.XX.X" + ], + "v": 1 +} +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request { "e": XXXX, "m": "serverless.integration_test.execution", @@ -25,7 +36,6 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB START { @@ -43,6 +53,7 @@ START ], "v": 1 } +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request { "e": XXXX, "m": "serverless.integration_test.records_processed", @@ -63,7 +74,6 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request END Duration: XXXX ms Memory Used: XXXX MB START { @@ -81,6 +91,7 @@ START ], "v": 1 } +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request { "e": XXXX, "m": "serverless.integration_test.records_processed", @@ -111,5 +122,4 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request END Duration: XXXX ms Memory Used: XXXX MB diff --git a/integration_tests/snapshots/logs/async-metrics_node20.log b/integration_tests/snapshots/logs/async-metrics_node20.log index dfb99e2a..9da13a4d 100644 --- a/integration_tests/snapshots/logs/async-metrics_node20.log +++ b/integration_tests/snapshots/logs/async-metrics_node20.log @@ -15,7 +15,16 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request +{ + "e": XXXX, + "m": "serverless.integration_test.outside_handler", + "t": [ + "tagkey:tagvalue", + "eventsource:outside_handler", + "dd_lambda_layer:datadog-nodev20.XX.X" + ], + "v": 1 +} { "e": XXXX, "m": "serverless.integration_test.execution", @@ -26,8 +35,10 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGat ], "v": 1 } +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB START +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request { "e": XXXX, "m": "aws.lambda.enhanced.invocations", @@ -43,7 +54,6 @@ START ], "v": 1 } -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request { "e": XXXX, "m": "serverless.integration_test.records_processed", @@ -66,7 +76,6 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS re } END Duration: XXXX ms Memory Used: XXXX MB START -XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request { "e": XXXX, "m": "aws.lambda.enhanced.invocations", @@ -82,6 +91,7 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS re ], "v": 1 } +XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request { "e": XXXX, "m": "serverless.integration_test.records_processed", diff --git a/src/index.spec.ts b/src/index.spec.ts index 87ba4280..da393684 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -2,7 +2,13 @@ import http from "http"; import nock from "nock"; import { Context, Handler } from "aws-lambda"; -import { datadog, getTraceHeaders, sendDistributionMetric, sendDistributionMetricWithDate } from "./index"; +import { + datadog, + getTraceHeaders, + sendDistributionMetric, + sendDistributionMetricWithDate, + _metricsQueue, +} from "./index"; import { incrementErrorsMetric, incrementInvocationsMetric } from "./metrics/enhanced-metrics"; import { LogLevel, setLogLevel } from "./utils"; import { HANDLER_STREAMING, STREAM_RESPONSE } from "./constants"; @@ -407,6 +413,7 @@ describe("datadog", () => { const logger = { debug: jest.fn(), error: jest.fn(), + warn: jest.fn(), }; const wrapped = datadog(handler, { forceWrap: true, logger: logger, debugLogging: true }); @@ -440,3 +447,25 @@ describe("datadog", () => { expect(wrapped[HANDLER_STREAMING]).toBe(undefined); }); }); + +describe("sendDistributionMetric", () => { + beforeEach(() => { + _metricsQueue.reset(); + setLogLevel(LogLevel.NONE); + }); + it("enqueues a metric for later processing when metrics listener is not initialized", () => { + sendDistributionMetric("metric", 1, "first-tag", "second-tag"); + expect(_metricsQueue.length).toBe(1); + }); +}); + +describe("sendDistributionMetricWithDate", () => { + beforeEach(() => { + _metricsQueue.reset(); + setLogLevel(LogLevel.NONE); + }); + it("enqueues a metric for later processing when metrics listener is not initialized", () => { + sendDistributionMetricWithDate("metric", 1, new Date(), "first-tag", "second-tag"); + expect(_metricsQueue.length).toBe(1); + }); +}); diff --git a/src/index.ts b/src/index.ts index 4b5e98df..cdfce7d3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,19 +6,11 @@ import { KMSService, MetricsConfig, MetricsListener, + MetricsQueue, } from "./metrics"; import { TraceConfig, TraceListener } from "./trace"; import { subscribeToDC } from "./runtime"; -import { - logDebug, - logError, - Logger, - LogLevel, - promisifiedHandler, - setSandboxInit, - setLogger, - setLogLevel, -} from "./utils"; +import { logDebug, Logger, LogLevel, promisifiedHandler, setSandboxInit, setLogger, setLogLevel } from "./utils"; import { getEnhancedMetricTags } from "./metrics/enhanced-metrics"; import { DatadogTraceHeaders } from "./trace/context/extractor"; @@ -90,6 +82,8 @@ export const defaultConfig: Config = { localTesting: false, } as const; +export const _metricsQueue: MetricsQueue = new MetricsQueue(); + let currentMetricsListener: MetricsListener | undefined; let currentTraceListener: TraceListener | undefined; @@ -150,6 +144,8 @@ export function datadog( if (finalConfig.enhancedMetrics) { incrementInvocationsMetric(metricsListener, context); } + + sendQueueMetrics(metricsListener); } catch (err) { if (err instanceof Error) { logDebug("Failed to start listeners", err); @@ -259,9 +255,10 @@ export function sendDistributionMetricWithDate(name: string, value: number, metr if (currentMetricsListener !== undefined) { currentMetricsListener.sendDistributionMetricWithDate(name, value, metricTime, false, ...tags); - } else { - logError("handler not initialized"); + return; } + + _metricsQueue.push({ name, value, metricTime, tags }); } /** @@ -275,8 +272,26 @@ export function sendDistributionMetric(name: string, value: number, ...tags: str if (currentMetricsListener !== undefined) { currentMetricsListener.sendDistributionMetric(name, value, false, ...tags); - } else { - logError("handler not initialized"); + return; + } + + _metricsQueue.push({ name, value, tags }); +} + +function sendQueueMetrics(listener: MetricsListener) { + // Reverse the queue to send metrics in order. + // This is necessary because the "queue" is a stack, + // and we want to send metrics in the order they were added. + _metricsQueue.reverse(); + while (_metricsQueue.length > 0) { + const metric = _metricsQueue.pop()!; // This will always exist. + const { name, value, metricTime, tags } = metric; + if (metricTime !== undefined) { + listener.sendDistributionMetricWithDate(name, value, metricTime, false, ...tags); + continue; + } + + listener.sendDistributionMetric(name, value, false, ...tags); } } diff --git a/src/metrics/index.ts b/src/metrics/index.ts index 14d3419d..21ea830a 100644 --- a/src/metrics/index.ts +++ b/src/metrics/index.ts @@ -1,3 +1,4 @@ export { MetricsConfig, MetricsListener } from "./listener"; +export { MetricsQueue } from "./queue"; export { KMSService } from "./kms-service"; export { incrementErrorsMetric, incrementInvocationsMetric } from "./enhanced-metrics"; diff --git a/src/metrics/queue.spec.ts b/src/metrics/queue.spec.ts new file mode 100644 index 00000000..75fb5bd7 --- /dev/null +++ b/src/metrics/queue.spec.ts @@ -0,0 +1,76 @@ +import { LogLevel, logDebug, setLogLevel, setLogger } from "../utils"; +import { METRICS_QUEUE_LIMIT, MetricsQueue } from "./queue"; + +describe("MetricsQueue", () => { + const logger = { + debug: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + }; + describe("push", () => { + beforeEach(() => { + setLogLevel(LogLevel.NONE); + setLogger(logger); + }); + it("resets metrics queue when its full", () => { + setLogLevel(LogLevel.WARNING); + const queue = new MetricsQueue(); + for (let i = 0; i < METRICS_QUEUE_LIMIT + 1; i++) { + queue.push({ name: "metric", tags: [], value: i }); + } + + // The queue should have been reset and only contain the last metric + expect(queue.length).toBe(1); + expect(logger.warn).toHaveBeenLastCalledWith( + '{"status":"warning","message":"datadog:Metrics queue is full, dropping all metrics."}', + ); + }); + + it("enqueue metric", () => { + setLogLevel(LogLevel.DEBUG); + const queue = new MetricsQueue(); + queue.push({ name: "metric", tags: [], value: 1 }); + expect(queue.length).toBe(1); + expect(logger.debug).toHaveBeenLastCalledWith( + '{"status":"debug","message":"datadog:Metrics Listener was not initialized. Enqueuing metric for later processing."}', + ); + }); + }); + + describe("pop", () => { + it("returns undefined when queue is empty", () => { + const queue = new MetricsQueue(); + expect(queue.pop()).toBeUndefined(); + }); + + it("returns the first element in the queue", () => { + const queue = new MetricsQueue(); + queue.push({ name: "metric", tags: [], value: 1 }); + queue.push({ name: "metric", tags: [], value: 2 }); + expect(queue.pop()).toEqual({ name: "metric", tags: [], value: 2 }); + }); + }); + + it("reverses the queue", () => { + const queue = new MetricsQueue(); + queue.push({ name: "metric", tags: [], value: 1 }); + queue.push({ name: "metric", tags: [], value: 2 }); + queue.reverse(); + expect(queue.pop()).toEqual({ name: "metric", tags: [], value: 1 }); + }); + + it("resets the queue", () => { + const queue = new MetricsQueue(); + queue.push({ name: "metric", tags: [], value: 1 }); + queue.push({ name: "metric", tags: [], value: 2 }); + queue.reset(); + expect(queue.length).toBe(0); + }); + + it("returns the length of the queue", () => { + const queue = new MetricsQueue(); + queue.push({ name: "metric", tags: [], value: 1 }); + queue.push({ name: "metric", tags: [], value: 2 }); + expect(queue.length).toBe(2); + }); +}); diff --git a/src/metrics/queue.ts b/src/metrics/queue.ts new file mode 100644 index 00000000..26f3a16d --- /dev/null +++ b/src/metrics/queue.ts @@ -0,0 +1,58 @@ +import { logDebug, logWarning } from "../utils"; + +export type MetricParameters = { + value: number; + name: string; + tags: string[]; + metricTime?: Date; +}; + +export const METRICS_QUEUE_LIMIT = 1024; + +/** + * MetricsQueue is a queue for metrics that are enqueued when the MetricsListener is not initialized. + * + * When the MetricsListener is initialized, the metrics are sent to the listener for processing. + * If the queue is full, all metrics are dropped, and new ones are enqueued. This might happen in two + * scenarios: + * 1. The MetricsListener is not initialized for a long time. + * 2. The MetricsListener is initialized, but the amount of enqueued metrics is higher than the limit. + */ +export class MetricsQueue { + private queue: MetricParameters[] = []; + + /** + * Enqueues a metric for later processing. + * If the queue is full, all metrics are dropped. But the new metric is still enqueued. + * + * @param metric The metric to be enqueued. + */ + public push(metric: MetricParameters) { + logDebug("Metrics Listener was not initialized. Enqueuing metric for later processing."); + if (this.queue.length >= METRICS_QUEUE_LIMIT) { + logWarning("Metrics queue is full, dropping all metrics."); + this.reset(); + } + this.queue.push(metric); + } + + public pop() { + if (this.queue.length > 0) { + return this.queue.pop(); + } + + return undefined; + } + + public reverse() { + this.queue.reverse(); + } + + public reset() { + this.queue = []; + } + + public get length() { + return this.queue.length; + } +} diff --git a/src/utils/index.ts b/src/utils/index.ts index 9bb311e7..7d4110e7 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -1,5 +1,5 @@ export { didFunctionColdStart, getSandboxInitTags, setSandboxInit, isProactiveInitialization } from "./cold-start"; export { wrap, promisifiedHandler } from "./handler"; export { Timer } from "./timer"; -export { logError, logDebug, Logger, setLogLevel, setLogger, LogLevel } from "./log"; +export { logWarning, logError, logDebug, Logger, setLogLevel, setLogger, LogLevel } from "./log"; export { tagObject } from "./tag-object"; diff --git a/src/utils/log.spec.ts b/src/utils/log.spec.ts index 74498ecb..6d6a6986 100644 --- a/src/utils/log.spec.ts +++ b/src/utils/log.spec.ts @@ -1,10 +1,11 @@ -import { setLogger, logError } from "./log"; +import { setLogger, logError, logDebug, logWarning } from "./log"; describe("logger", () => { it("log using custom logger", async () => { const logger = { debug: jest.fn(), error: jest.fn(), + warn: jest.fn(), }; setLogger(logger); @@ -12,10 +13,12 @@ describe("logger", () => { expect(logger.error).toHaveBeenLastCalledWith('{"status":"error","message":"datadog:My Error"}'); }); + it("logs errors correctly", async () => { const logger = { debug: jest.fn(), error: jest.fn(), + warn: jest.fn(), }; setLogger(logger); @@ -23,10 +26,12 @@ describe("logger", () => { const lastErrorCall = logger.error.mock.calls[0][0]; expect(lastErrorCall).toContain('{"status":"error","message":"Oh no!","name":"Error","stack":"Error: Oh no!'); }); + it("logs errors and metadata correctly", async () => { const logger = { debug: jest.fn(), error: jest.fn(), + warn: jest.fn(), }; setLogger(logger); @@ -37,4 +42,17 @@ describe("logger", () => { '{"status":"error","message":"Oh no 2!","foo":"bar","baz":"2","name":"Error","stack":"Error: Oh no 2!', ); }); + + it("logs warnings correctly", async () => { + const logger = { + debug: jest.fn(), + error: jest.fn(), + warn: jest.fn(), + }; + + setLogger(logger); + logWarning("This is a warning"); + const lastWarning = logger.warn.mock.calls[0][0]; + expect(lastWarning).toContain('{"status":"warning","message":"datadog:This is a warning"}'); + }); }); diff --git a/src/utils/log.ts b/src/utils/log.ts index 712fb600..71598be5 100644 --- a/src/utils/log.ts +++ b/src/utils/log.ts @@ -3,12 +3,14 @@ import { serializeError } from "serialize-error"; export enum LogLevel { DEBUG = 0, ERROR, + WARNING, NONE, } export interface Logger { debug(message: string): void; error(message: string): void; + warn(message: string): void; } let logger: Logger = console; @@ -40,6 +42,13 @@ export function logError(message: string, metadata?: Error | object, error?: Err emitLog(logger.error, "error", message, metadata, error); } +export function logWarning(message: string, metadata?: Error | object, error?: Error) { + if (logLevel > LogLevel.WARNING) { + return; + } + emitLog(logger.warn, "warning", message, metadata, error); +} + function emitLog( outputter: (a: string) => any, status: string,