Skip to content

Commit

Permalink
Merge 59d713d into dadad52
Browse files Browse the repository at this point in the history
  • Loading branch information
duncanista authored Mar 25, 2024
2 parents dadad52 + 59d713d commit 5b2885d
Show file tree
Hide file tree
Showing 12 changed files with 269 additions and 26 deletions.
7 changes: 7 additions & 0 deletions integration_tests/send-metrics.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
const { datadog, sendDistributionMetric } = require("datadog-lambda-js");

sendDistributionMetric(
"serverless.integration_test.outside_handler",
1,
"tagkey:tagvalue",
`eventsource:outside_handler`,
);

async function handle(event, context) {
const responsePayload = { message: "hello, dog!" };

Expand Down
16 changes: 13 additions & 3 deletions integration_tests/snapshots/logs/async-metrics_node16.log
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ START
],
"v": 1
}
{
"e": XXXX,
"m": "serverless.integration_test.outside_handler",
"t": [
"tagkey:tagvalue",
"eventsource:outside_handler",
"dd_lambda_layer:datadog-nodev16.XX.X"
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
{
"e": XXXX,
"m": "serverless.integration_test.execution",
Expand All @@ -25,7 +36,6 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB
START
{
Expand All @@ -43,6 +53,7 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
{
"e": XXXX,
"m": "serverless.integration_test.records_processed",
Expand All @@ -63,9 +74,9 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
END Duration: XXXX ms Memory Used: XXXX MB
START
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
{
"e": XXXX,
"m": "aws.lambda.enhanced.invocations",
Expand Down Expand Up @@ -111,5 +122,4 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
END Duration: XXXX ms Memory Used: XXXX MB
16 changes: 13 additions & 3 deletions integration_tests/snapshots/logs/async-metrics_node18.log
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ START
],
"v": 1
}
{
"e": XXXX,
"m": "serverless.integration_test.outside_handler",
"t": [
"tagkey:tagvalue",
"eventsource:outside_handler",
"dd_lambda_layer:datadog-nodev18.XX.X"
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
{
"e": XXXX,
"m": "serverless.integration_test.execution",
Expand All @@ -25,7 +36,6 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB
START
{
Expand All @@ -43,6 +53,7 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
{
"e": XXXX,
"m": "serverless.integration_test.records_processed",
Expand All @@ -63,7 +74,6 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
END Duration: XXXX ms Memory Used: XXXX MB
START
{
Expand All @@ -81,6 +91,7 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
{
"e": XXXX,
"m": "serverless.integration_test.records_processed",
Expand Down Expand Up @@ -111,5 +122,4 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
END Duration: XXXX ms Memory Used: XXXX MB
16 changes: 13 additions & 3 deletions integration_tests/snapshots/logs/async-metrics_node20.log
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
{
"e": XXXX,
"m": "serverless.integration_test.outside_handler",
"t": [
"tagkey:tagvalue",
"eventsource:outside_handler",
"dd_lambda_layer:datadog-nodev20.XX.X"
],
"v": 1
}
{
"e": XXXX,
"m": "serverless.integration_test.execution",
Expand All @@ -26,8 +35,10 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGat
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB
START
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
{
"e": XXXX,
"m": "aws.lambda.enhanced.invocations",
Expand All @@ -43,7 +54,6 @@ START
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
{
"e": XXXX,
"m": "serverless.integration_test.records_processed",
Expand All @@ -66,7 +76,6 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS re
}
END Duration: XXXX ms Memory Used: XXXX MB
START
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
{
"e": XXXX,
"m": "aws.lambda.enhanced.invocations",
Expand All @@ -82,6 +91,7 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS re
],
"v": 1
}
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
{
"e": XXXX,
"m": "serverless.integration_test.records_processed",
Expand Down
31 changes: 30 additions & 1 deletion src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ import http from "http";
import nock from "nock";

import { Context, Handler } from "aws-lambda";
import { datadog, getTraceHeaders, sendDistributionMetric, sendDistributionMetricWithDate } from "./index";
import {
datadog,
getTraceHeaders,
sendDistributionMetric,
sendDistributionMetricWithDate,
_metricsQueue,
} from "./index";
import { incrementErrorsMetric, incrementInvocationsMetric } from "./metrics/enhanced-metrics";
import { LogLevel, setLogLevel } from "./utils";
import { HANDLER_STREAMING, STREAM_RESPONSE } from "./constants";
Expand Down Expand Up @@ -407,6 +413,7 @@ describe("datadog", () => {
const logger = {
debug: jest.fn(),
error: jest.fn(),
warn: jest.fn(),
};

const wrapped = datadog(handler, { forceWrap: true, logger: logger, debugLogging: true });
Expand Down Expand Up @@ -440,3 +447,25 @@ describe("datadog", () => {
expect(wrapped[HANDLER_STREAMING]).toBe(undefined);
});
});

describe("sendDistributionMetric", () => {
beforeEach(() => {
_metricsQueue.reset();
setLogLevel(LogLevel.NONE);
});
it("enqueues a metric for later processing when metrics listener is not initialized", () => {
sendDistributionMetric("metric", 1, "first-tag", "second-tag");
expect(_metricsQueue.length).toBe(1);
});
});

describe("sendDistributionMetricWithDate", () => {
beforeEach(() => {
_metricsQueue.reset();
setLogLevel(LogLevel.NONE);
});
it("enqueues a metric for later processing when metrics listener is not initialized", () => {
sendDistributionMetricWithDate("metric", 1, new Date(), "first-tag", "second-tag");
expect(_metricsQueue.length).toBe(1);
});
});
43 changes: 29 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,11 @@ import {
KMSService,
MetricsConfig,
MetricsListener,
MetricsQueue,
} from "./metrics";
import { TraceConfig, TraceListener } from "./trace";
import { subscribeToDC } from "./runtime";
import {
logDebug,
logError,
Logger,
LogLevel,
promisifiedHandler,
setSandboxInit,
setLogger,
setLogLevel,
} from "./utils";
import { logDebug, Logger, LogLevel, promisifiedHandler, setSandboxInit, setLogger, setLogLevel } from "./utils";
import { getEnhancedMetricTags } from "./metrics/enhanced-metrics";
import { DatadogTraceHeaders } from "./trace/context/extractor";

Expand Down Expand Up @@ -90,6 +82,8 @@ export const defaultConfig: Config = {
localTesting: false,
} as const;

export const _metricsQueue: MetricsQueue = new MetricsQueue();

let currentMetricsListener: MetricsListener | undefined;
let currentTraceListener: TraceListener | undefined;

Expand Down Expand Up @@ -150,6 +144,8 @@ export function datadog<TEvent, TResult>(
if (finalConfig.enhancedMetrics) {
incrementInvocationsMetric(metricsListener, context);
}

sendQueueMetrics(metricsListener);
} catch (err) {
if (err instanceof Error) {
logDebug("Failed to start listeners", err);
Expand Down Expand Up @@ -259,9 +255,10 @@ export function sendDistributionMetricWithDate(name: string, value: number, metr

if (currentMetricsListener !== undefined) {
currentMetricsListener.sendDistributionMetricWithDate(name, value, metricTime, false, ...tags);
} else {
logError("handler not initialized");
return;
}

_metricsQueue.push({ name, value, metricTime, tags });
}

/**
Expand All @@ -275,8 +272,26 @@ export function sendDistributionMetric(name: string, value: number, ...tags: str

if (currentMetricsListener !== undefined) {
currentMetricsListener.sendDistributionMetric(name, value, false, ...tags);
} else {
logError("handler not initialized");
return;
}

_metricsQueue.push({ name, value, tags });
}

function sendQueueMetrics(listener: MetricsListener) {
// Reverse the queue to send metrics in order.
// This is necessary because the "queue" is a stack,
// and we want to send metrics in the order they were added.
_metricsQueue.reverse();
while (_metricsQueue.length > 0) {
const metric = _metricsQueue.pop()!; // This will always exist.
const { name, value, metricTime, tags } = metric;
if (metricTime !== undefined) {
listener.sendDistributionMetricWithDate(name, value, metricTime, false, ...tags);
continue;
}

listener.sendDistributionMetric(name, value, false, ...tags);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/metrics/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { MetricsConfig, MetricsListener } from "./listener";
export { MetricsQueue } from "./queue";
export { KMSService } from "./kms-service";
export { incrementErrorsMetric, incrementInvocationsMetric } from "./enhanced-metrics";
76 changes: 76 additions & 0 deletions src/metrics/queue.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { LogLevel, logDebug, setLogLevel, setLogger } from "../utils";
import { METRICS_QUEUE_LIMIT, MetricsQueue } from "./queue";

describe("MetricsQueue", () => {
const logger = {
debug: jest.fn(),
error: jest.fn(),
warn: jest.fn(),
};
describe("push", () => {
beforeEach(() => {
setLogLevel(LogLevel.NONE);
setLogger(logger);
});
it("resets metrics queue when its full", () => {
setLogLevel(LogLevel.WARNING);
const queue = new MetricsQueue();
for (let i = 0; i < METRICS_QUEUE_LIMIT + 1; i++) {
queue.push({ name: "metric", tags: [], value: i });
}

// The queue should have been reset and only contain the last metric
expect(queue.length).toBe(1);
expect(logger.warn).toHaveBeenLastCalledWith(
'{"status":"warning","message":"datadog:Metrics queue is full, dropping all metrics."}',
);
});

it("enqueue metric", () => {
setLogLevel(LogLevel.DEBUG);
const queue = new MetricsQueue();
queue.push({ name: "metric", tags: [], value: 1 });
expect(queue.length).toBe(1);
expect(logger.debug).toHaveBeenLastCalledWith(
'{"status":"debug","message":"datadog:Metrics Listener was not initialized. Enqueuing metric for later processing."}',
);
});
});

describe("pop", () => {
it("returns undefined when queue is empty", () => {
const queue = new MetricsQueue();
expect(queue.pop()).toBeUndefined();
});

it("returns the first element in the queue", () => {
const queue = new MetricsQueue();
queue.push({ name: "metric", tags: [], value: 1 });
queue.push({ name: "metric", tags: [], value: 2 });
expect(queue.pop()).toEqual({ name: "metric", tags: [], value: 2 });
});
});

it("reverses the queue", () => {
const queue = new MetricsQueue();
queue.push({ name: "metric", tags: [], value: 1 });
queue.push({ name: "metric", tags: [], value: 2 });
queue.reverse();
expect(queue.pop()).toEqual({ name: "metric", tags: [], value: 1 });
});

it("resets the queue", () => {
const queue = new MetricsQueue();
queue.push({ name: "metric", tags: [], value: 1 });
queue.push({ name: "metric", tags: [], value: 2 });
queue.reset();
expect(queue.length).toBe(0);
});

it("returns the length of the queue", () => {
const queue = new MetricsQueue();
queue.push({ name: "metric", tags: [], value: 1 });
queue.push({ name: "metric", tags: [], value: 2 });
expect(queue.length).toBe(2);
});
});
Loading

0 comments on commit 5b2885d

Please sign in to comment.