Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support sending metrics before handler initialization #524

Merged
merged 12 commits into from
Mar 26, 2024
7 changes: 7 additions & 0 deletions integration_tests/send-metrics.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
const { datadog, sendDistributionMetric } = require("datadog-lambda-js");

sendDistributionMetric(
"serverless.integration_test.outside_handler",
1,
"tagkey:tagvalue",
`eventsource:outside_handler`,
);

async function handle(event, context) {
const responsePayload = { message: "hello, dog!" };

Expand Down
16 changes: 13 additions & 3 deletions integration_tests/snapshots/logs/async-metrics_node16.log
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ START
],
"v": 1
}
{
"e": XXXX,
"m": "serverless.integration_test.outside_handler",
"t": [
"tagkey:tagvalue",
"eventsource:outside_handler",
"dd_lambda_layer:datadog-nodev16.XX.X"
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
{
"e": XXXX,
"m": "serverless.integration_test.execution",
Expand All @@ -25,7 +36,6 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB
START
{
Expand All @@ -43,6 +53,7 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
{
"e": XXXX,
"m": "serverless.integration_test.records_processed",
Expand All @@ -63,9 +74,9 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
END Duration: XXXX ms Memory Used: XXXX MB
START
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
{
"e": XXXX,
"m": "aws.lambda.enhanced.invocations",
Expand Down Expand Up @@ -111,5 +122,4 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
END Duration: XXXX ms Memory Used: XXXX MB
16 changes: 13 additions & 3 deletions integration_tests/snapshots/logs/async-metrics_node18.log
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ START
],
"v": 1
}
{
"e": XXXX,
"m": "serverless.integration_test.outside_handler",
"t": [
"tagkey:tagvalue",
"eventsource:outside_handler",
"dd_lambda_layer:datadog-nodev18.XX.X"
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
{
"e": XXXX,
"m": "serverless.integration_test.execution",
Expand All @@ -25,7 +36,6 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB
START
{
Expand All @@ -43,6 +53,7 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
{
"e": XXXX,
"m": "serverless.integration_test.records_processed",
Expand All @@ -63,7 +74,6 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
END Duration: XXXX ms Memory Used: XXXX MB
START
{
Expand All @@ -81,6 +91,7 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
{
"e": XXXX,
"m": "serverless.integration_test.records_processed",
Expand Down Expand Up @@ -111,5 +122,4 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
END Duration: XXXX ms Memory Used: XXXX MB
16 changes: 13 additions & 3 deletions integration_tests/snapshots/logs/async-metrics_node20.log
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
{
"e": XXXX,
"m": "serverless.integration_test.outside_handler",
"t": [
"tagkey:tagvalue",
"eventsource:outside_handler",
"dd_lambda_layer:datadog-nodev20.XX.X"
],
"v": 1
}
{
"e": XXXX,
"m": "serverless.integration_test.execution",
Expand All @@ -26,8 +35,10 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGat
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB
START
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
{
"e": XXXX,
"m": "aws.lambda.enhanced.invocations",
Expand All @@ -43,7 +54,6 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
{
"e": XXXX,
"m": "serverless.integration_test.records_processed",
Expand All @@ -66,7 +76,6 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS re
}
END Duration: XXXX ms Memory Used: XXXX MB
START
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
{
"e": XXXX,
"m": "aws.lambda.enhanced.invocations",
Expand All @@ -82,6 +91,7 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS re
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
{
"e": XXXX,
"m": "serverless.integration_test.records_processed",
Expand Down
31 changes: 30 additions & 1 deletion src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import http from "http";
import nock from "nock";

import { Context, Handler } from "aws-lambda";
import { datadog, getTraceHeaders, sendDistributionMetric, sendDistributionMetricWithDate } from "./index";
import {
datadog,
getTraceHeaders,
sendDistributionMetric,
sendDistributionMetricWithDate,
_metricsQueue,
} from "./index";
import { incrementErrorsMetric, incrementInvocationsMetric } from "./metrics/enhanced-metrics";
import { LogLevel, setLogLevel } from "./utils";
import { HANDLER_STREAMING, STREAM_RESPONSE } from "./constants";
Expand Down Expand Up @@ -407,6 +413,7 @@ describe("datadog", () => {
const logger = {
debug: jest.fn(),
error: jest.fn(),
warn: jest.fn(),
};

const wrapped = datadog(handler, { forceWrap: true, logger: logger, debugLogging: true });
Expand Down Expand Up @@ -440,3 +447,25 @@ describe("datadog", () => {
expect(wrapped[HANDLER_STREAMING]).toBe(undefined);
});
});

describe("sendDistributionMetric", () => {
beforeEach(() => {
_metricsQueue.reset();
setLogLevel(LogLevel.NONE);
});
it("enqueues a metric for later processing when metrics listener is not initialized", () => {
sendDistributionMetric("metric", 1, "first-tag", "second-tag");
expect(_metricsQueue.length).toBe(1);
});
});

describe("sendDistributionMetricWithDate", () => {
beforeEach(() => {
_metricsQueue.reset();
setLogLevel(LogLevel.NONE);
});
it("enqueues a metric for later processing when metrics listener is not initialized", () => {
sendDistributionMetricWithDate("metric", 1, new Date(), "first-tag", "second-tag");
expect(_metricsQueue.length).toBe(1);
});
});
39 changes: 25 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,11 @@ import {
KMSService,
MetricsConfig,
MetricsListener,
MetricsQueue,
} from "./metrics";
import { TraceConfig, TraceListener } from "./trace";
import { subscribeToDC } from "./runtime";
import {
logDebug,
logError,
Logger,
LogLevel,
promisifiedHandler,
setSandboxInit,
setLogger,
setLogLevel,
} from "./utils";
import { logDebug, Logger, LogLevel, promisifiedHandler, setSandboxInit, setLogger, setLogLevel } from "./utils";
import { getEnhancedMetricTags } from "./metrics/enhanced-metrics";
import { DatadogTraceHeaders } from "./trace/context/extractor";

Expand Down Expand Up @@ -90,6 +82,8 @@ export const defaultConfig: Config = {
localTesting: false,
} as const;

export const _metricsQueue: MetricsQueue = new MetricsQueue();

let currentMetricsListener: MetricsListener | undefined;
let currentTraceListener: TraceListener | undefined;

Expand Down Expand Up @@ -150,6 +144,8 @@ export function datadog<TEvent, TResult>(
if (finalConfig.enhancedMetrics) {
incrementInvocationsMetric(metricsListener, context);
}

sendQueueMetrics(metricsListener);
} catch (err) {
if (err instanceof Error) {
logDebug("Failed to start listeners", err);
Expand Down Expand Up @@ -259,9 +255,10 @@ export function sendDistributionMetricWithDate(name: string, value: number, metr

if (currentMetricsListener !== undefined) {
currentMetricsListener.sendDistributionMetricWithDate(name, value, metricTime, false, ...tags);
} else {
logError("handler not initialized");
return;
}

_metricsQueue.push({ name, value, metricTime, tags });
}

/**
Expand All @@ -275,8 +272,22 @@ export function sendDistributionMetric(name: string, value: number, ...tags: str

if (currentMetricsListener !== undefined) {
currentMetricsListener.sendDistributionMetric(name, value, false, ...tags);
} else {
logError("handler not initialized");
return;
}

_metricsQueue.push({ name, value, tags });
}

function sendQueueMetrics(listener: MetricsListener) {
while (_metricsQueue.length > 0) {
const metric = _metricsQueue.shift()!; // This will always exist.
const { name, value, metricTime, tags } = metric;
if (metricTime !== undefined) {
listener.sendDistributionMetricWithDate(name, value, metricTime, false, ...tags);
return;
}

listener.sendDistributionMetric(name, value, false, ...tags);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/metrics/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { MetricsConfig, MetricsListener } from "./listener";
export { MetricsQueue } from "./queue";
export { KMSService } from "./kms-service";
export { incrementErrorsMetric, incrementInvocationsMetric } from "./enhanced-metrics";
68 changes: 68 additions & 0 deletions src/metrics/queue.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { LogLevel, logDebug, setLogLevel, setLogger } from "../utils";
import { METRICS_QUEUE_LIMIT, MetricsQueue } from "./queue";

describe("MetricsQueue", () => {
const logger = {
debug: jest.fn(),
error: jest.fn(),
warn: jest.fn(),
};
describe("push", () => {
beforeEach(() => {
setLogLevel(LogLevel.NONE);
setLogger(logger);
});
it("resets metrics queue when its full", () => {
setLogLevel(LogLevel.WARNING);
const queue = new MetricsQueue();
for (let i = 0; i < METRICS_QUEUE_LIMIT + 1; i++) {
queue.push({ name: "metric", tags: [], value: i });
}

// The queue should have been reset and only contain the last metric
expect(queue.length).toBe(1);
expect(logger.warn).toHaveBeenLastCalledWith(
'{"status":"warning","message":"datadog:Metrics queue is full, dropping all metrics."}',
);
});

it("enqueue metric", () => {
setLogLevel(LogLevel.DEBUG);
const queue = new MetricsQueue();
queue.push({ name: "metric", tags: [], value: 1 });
expect(queue.length).toBe(1);
expect(logger.debug).toHaveBeenLastCalledWith(
'{"status":"debug","message":"datadog:Metrics Listener was not initialized. Enqueuing metric for later processing."}',
);
});
});

describe("shift", () => {
it("returns undefined when queue is empty", () => {
const queue = new MetricsQueue();
expect(queue.shift()).toBeUndefined();
});

it("returns the first element in the queue", () => {
const queue = new MetricsQueue();
queue.push({ name: "metric", tags: [], value: 1 });
queue.push({ name: "metric", tags: [], value: 2 });
expect(queue.shift()).toEqual({ name: "metric", tags: [], value: 1 });
});
});

it("resets the queue", () => {
const queue = new MetricsQueue();
queue.push({ name: "metric", tags: [], value: 1 });
queue.push({ name: "metric", tags: [], value: 2 });
queue.reset();
expect(queue.length).toBe(0);
});

it("returns the length of the queue", () => {
const queue = new MetricsQueue();
queue.push({ name: "metric", tags: [], value: 1 });
queue.push({ name: "metric", tags: [], value: 2 });
expect(queue.length).toBe(2);
});
});
Loading
Loading