Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add S3 Downstream Span Pointers #587

Merged
merged 15 commits into from
Dec 6, 2024
18 changes: 16 additions & 2 deletions src/trace/listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Context } from "aws-lambda";

import { patchHttp, unpatchHttp } from "./patch-http";

import { extractTriggerTags, extractHTTPStatusCodeTag } from "./trigger";
import { extractTriggerTags, extractHTTPStatusCodeTag, parseEventSource } from "./trigger";
import { ColdStartTracerConfig, ColdStartTracer } from "./cold-start-tracer";
import { logDebug, tagObject } from "../utils";
import { didFunctionColdStart, isProactiveInitialization } from "../utils/cold-start";
Expand All @@ -17,6 +17,7 @@ import { TraceContext, TraceContextService, TraceSource } from "./trace-context-
import { StepFunctionContext, StepFunctionContextService } from "./step-function-service";
import { XrayService } from "./xray-service";
import { AUTHORIZING_REQUEST_ID_HEADER } from "./context/extractors/http";
import { getSpanPointerAttributes } from "../utils/span-pointers";
export type TraceExtractor = (event: any, context: Context) => Promise<TraceContext> | TraceContext;

export interface TraceConfig {
Expand Down Expand Up @@ -80,6 +81,7 @@ export class TraceListener {
private wrappedCurrentSpan?: SpanWrapper;
private triggerTags?: { [key: string]: string };
private lambdaSpanParentContext?: SpanContext;
private spanPointerAttributesList: object[] = [];

public get currentTraceHeaders() {
return this.contextService.currentTraceHeaders;
Expand Down Expand Up @@ -131,8 +133,14 @@ export class TraceListener {

this.lambdaSpanParentContext = this.inferredSpan?.span || parentSpanContext;
this.context = context;
this.triggerTags = extractTriggerTags(event, context);
const eventSource = parseEventSource(event);
this.triggerTags = extractTriggerTags(event, context, eventSource);
this.stepFunctionContext = StepFunctionContextService.instance().context;

const result = getSpanPointerAttributes(eventSource, event);
if (result) {
this.spanPointerAttributesList.push(...result);
}
}

/**
Expand Down Expand Up @@ -195,6 +203,12 @@ export class TraceListener {
}
}
}

if (this.wrappedCurrentSpan) {
for (const attributes of this.spanPointerAttributesList) {
this.wrappedCurrentSpan.span.addSpanPointer(attributes);
}
}
return false;
}

Expand Down
9 changes: 6 additions & 3 deletions src/trace/trigger.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,17 @@ describe("parseEventSource", () => {
it("extracts all trigger tags", () => {
for (let event of events) {
const eventData = JSON.parse(readFileSync(`./event_samples/${event.file}`, "utf8"));
const triggerTags = extractTriggerTags(eventData, mockContext);
const eventSource = parseEventSource(event);
const triggerTags = extractTriggerTags(eventData, mockContext, eventSource);
expect(triggerTags).toEqual(event.result);
}
});

it("extracts the status code if API Gateway, ALB, or Function URL, otherwise do nothing, for buffered functions", () => {
for (const event of events) {
const eventData = JSON.parse(readFileSync(`./event_samples/${event.file}`, "utf8"));
const triggerTags = extractTriggerTags(eventData, mockContext);
const eventSource = parseEventSource(event);
const triggerTags = extractTriggerTags(eventData, mockContext, eventSource);
const isResponseStreamingFunction = false;
for (const response of bufferedResponses) {
const statusCode = extractHTTPStatusCodeTag(triggerTags, response.responseBody, isResponseStreamingFunction);
Expand All @@ -198,7 +200,8 @@ describe("parseEventSource", () => {
it("extracts the status code if API Gateway, ALB, or Function URL, otherwise do nothing, for streaming functions", () => {
for (let event of events) {
const eventData = JSON.parse(readFileSync(`./event_samples/${event.file}`, "utf8"));
const triggerTags = extractTriggerTags(eventData, mockContext);
const eventSource = parseEventSource(event);
const triggerTags = extractTriggerTags(eventData, mockContext, eventSource);
const isResponseStreamingFunction = true;
for (const response of streamingResponses) {
const statusCode = extractHTTPStatusCodeTag(triggerTags, response.responseBody, isResponseStreamingFunction);
Expand Down
3 changes: 1 addition & 2 deletions src/trace/trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,8 @@ function extractHTTPTags(event: APIGatewayEvent | APIGatewayProxyEventV2 | ALBEv
/**
* extractTriggerTags parses the trigger event object for tags to be added to the span metadata
*/
export function extractTriggerTags(event: any, context: Context) {
export function extractTriggerTags(event: any, context: Context, eventSource: eventTypes | undefined) {
let triggerTags: { [key: string]: string } = {};
const eventSource = parseEventSource(event);
if (eventSource) {
triggerTags["function_trigger.event_source"] = eventSource;

Expand Down
151 changes: 151 additions & 0 deletions src/utils/span-pointers.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import { getSpanPointerAttributes } from "./span-pointers";
import { eventTypes } from "../trace/trigger";
import { SPAN_LINK_KIND, S3_PTR_KIND, SPAN_POINTER_DIRECTION } from "dd-trace/packages/dd-trace/src/span_pointers";
import * as spanPointers from "dd-trace/packages/dd-trace/src/span_pointers";

// Mock the external dependencies
jest.mock("./log", () => ({
logDebug: jest.fn(),
}));

describe("span-pointers utils", () => {
const mockS3PointerHash = "mock-hash-123";

beforeEach(() => {
// Mock the generateS3PointerHash function
jest.spyOn(spanPointers, "generateS3PointerHash").mockReturnValue(mockS3PointerHash);
});

afterEach(() => {
jest.clearAllMocks();
});

describe("getSpanPointerAttributes", () => {
it("returns undefined when eventSource is undefined", () => {
const result = getSpanPointerAttributes(undefined, {});
expect(result).toBeUndefined();
});

it("returns undefined for unsupported event types", () => {
const result = getSpanPointerAttributes("unsupported" as eventTypes, {});
expect(result).toBeUndefined();
});

describe("S3 event processing", () => {
it("processes single S3 record correctly", () => {
const event = {
Records: [
{
s3: {
bucket: { name: "test-bucket" },
object: {
key: "test-key",
eTag: "test-etag",
},
},
},
],
};

const expected = [
{
"ptr.kind": S3_PTR_KIND,
"ptr.dir": SPAN_POINTER_DIRECTION.UPSTREAM,
"ptr.hash": mockS3PointerHash,
"link.kind": SPAN_LINK_KIND,
},
];

const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual(expected);
expect(spanPointers.generateS3PointerHash).toHaveBeenCalledWith("test-bucket", "test-key", "test-etag");
});

it("processes multiple S3 records correctly", () => {
const event = {
Records: [
{
s3: {
bucket: { name: "bucket1" },
object: {
key: "key1",
eTag: "etag1",
},
},
},
{
s3: {
bucket: { name: "bucket2" },
object: {
key: "key2",
eTag: "etag2",
},
},
},
],
};

const expected = [
{
"ptr.kind": S3_PTR_KIND,
"ptr.dir": SPAN_POINTER_DIRECTION.UPSTREAM,
"ptr.hash": mockS3PointerHash,
"link.kind": SPAN_LINK_KIND,
},
{
"ptr.kind": S3_PTR_KIND,
"ptr.dir": SPAN_POINTER_DIRECTION.UPSTREAM,
"ptr.hash": mockS3PointerHash,
"link.kind": SPAN_LINK_KIND,
},
];

const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual(expected);
});

it("handles empty Records array", () => {
const event = { Records: [] };
const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual([]);
});

it("handles missing Records property", () => {
const event = {};
const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual([]);
});

it("skips invalid records but processes valid ones", () => {
const event = {
Records: [
{
// Invalid record missing s3 property
},
{
s3: {
bucket: { name: "valid-bucket" },
object: {
key: "valid-key",
eTag: "valid-etag",
},
},
},
],
};

const expected = [
{
"ptr.kind": S3_PTR_KIND,
"ptr.dir": SPAN_POINTER_DIRECTION.UPSTREAM,
"ptr.hash": mockS3PointerHash,
"link.kind": SPAN_LINK_KIND,
},
];

const result = getSpanPointerAttributes(eventTypes.s3, event);
expect(result).toEqual(expected);
});
});
});
});
74 changes: 74 additions & 0 deletions src/utils/span-pointers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { eventTypes } from "../trace/trigger";
import { logDebug } from "./log";
import {
SPAN_LINK_KIND,
S3_PTR_KIND,
SPAN_POINTER_DIRECTION,
generateS3PointerHash,
} from "dd-trace/packages/dd-trace/src/span_pointers";

Check failure on line 8 in src/utils/span-pointers.ts

View workflow job for this annotation

GitHub Actions / unit-test (16.14)

Cannot find module 'dd-trace/packages/dd-trace/src/span_pointers' or its corresponding type declarations.

Check failure on line 8 in src/utils/span-pointers.ts

View workflow job for this annotation

GitHub Actions / unit-test (18.12)

Cannot find module 'dd-trace/packages/dd-trace/src/span_pointers' or its corresponding type declarations.

Check failure on line 8 in src/utils/span-pointers.ts

View workflow job for this annotation

GitHub Actions / unit-test (20.9)

Cannot find module 'dd-trace/packages/dd-trace/src/span_pointers' or its corresponding type declarations.

interface SpanPointerAttributes {
"ptr.kind": string;
"ptr.dir": string;
"ptr.hash": string;
"link.kind": string;
}

/**
* Computes span pointer attributes
*
* @param {eventTypes} eventSource - The type of event being processed (e.g., S3, DynamoDB).
* @param {any} event - The event object containing source-specific data.
* @returns {SpanPointerAttributes[] | undefined} An array of span pointer attribute objects, or undefined if none could be computed.
*/
export function getSpanPointerAttributes(
eventSource: eventTypes | undefined,
event: any,
): SpanPointerAttributes[] | undefined {
if (!eventSource) {
return;
}

switch (eventSource) {
case eventTypes.s3:
return processS3Event(event);
default:
logDebug(`Event type ${eventSource} not supported by span pointers.`);
return;
}
}

function processS3Event(event: any): SpanPointerAttributes[] {
const records = event.Records || [];
const spanPointerAttributesList = [];
const linkKind = SPAN_LINK_KIND;

for (const record of records) {
const eventName = record.eventName;
if (!["ObjectCreated:Put", "ObjectCreated:Copy", "ObjectCreated:CompleteMultipartUpload"].includes(eventName)) {
continue;
}
// Values are stored in the same place, regardless of AWS SDK v2/v3 or the event type.
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
const s3Event = record?.s3;
const bucketName = s3Event?.bucket?.name;
const objectKey = s3Event?.object?.key;
const eTag = s3Event?.object?.eTag;

if (!bucketName || !objectKey || !eTag) {
logDebug("Unable to calculate span pointer hash because of missing parameters.");
continue;
}

const pointerHash = generateS3PointerHash(bucketName, objectKey, eTag);
const spanPointerAttributes = {
"ptr.kind": S3_PTR_KIND,
"ptr.dir": SPAN_POINTER_DIRECTION.UPSTREAM,
"ptr.hash": pointerHash,
"link.kind": linkKind,
};
spanPointerAttributesList.push(spanPointerAttributes);
}

return spanPointerAttributesList;
}
Loading