Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Step Functions Trace Merging Support #588

Merged
merged 4 commits into from
Nov 18, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 129 additions & 100 deletions src/trace/step-function-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,57 @@ import { SampleMode, TraceSource } from "./trace-context-service";
import { SpanContextWrapper } from "./span-context-wrapper";
import { Sha256 } from "@aws-crypto/sha256-js";

export interface StepFunctionContext {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the context is also used as tags here code line can we clarify why the change of the tags in the PR description?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i think you explained it by "Also fixed the trigger tags parsing for our use case!" Thanks!

"step_function.execution_name": string;
"step_function.execution_id": string;
"step_function.execution_input": object;
"step_function.execution_role_arn": string;
"step_function.execution_start_time": string;
"step_function.state_machine_name": string;
"step_function.state_machine_arn": string;
"step_function.state_entered_time": string;
"step_function.state_name": string;
"step_function.state_retry_count": number;
interface StepFunctionRootContext {
execution_id: string;
state_entered_time: string;
state_name: string;
root_execution_id: string;
serverless_version: string;
}

interface LambdaRootContext {
execution_id: string;
state_entered_time: string;
state_name: string;
trace_id: string;
dd_p_tid: string;
serverless_version: string;
}

interface LegacyContext {
execution_id: string;
state_entered_time: string;
state_name: string;
}

export type StepFunctionContext = StepFunctionRootContext | LambdaRootContext | LegacyContext;

export const TRACE_ID = "traceId";
export const PARENT_ID = "spanId";
export const DD_P_TID = "_dd.p.tid";

// Type Guard Functions
function isStepFunctionRootContext(obj: any): obj is StepFunctionRootContext {
return typeof obj?.root_execution_id === "string" && typeof obj?.serverless_version === "string";
}

function isLambdaRootContext(obj: any): obj is LambdaRootContext {
return (
typeof obj?.trace_id === "string" &&
typeof obj?.dd_p_tid === "string" &&
typeof obj?.serverless_version === "string"
);
}

function isLegacyContext(obj: any): obj is LegacyContext {
return (
typeof obj?.execution_id === "string" &&
typeof obj?.state_entered_time === "string" &&
typeof obj?.state_name === "string" &&
obj?.serverless_version === undefined
);
}

export class StepFunctionContextService {
private static _instance: StepFunctionContextService;
public context?: StepFunctionContext;
Expand All @@ -41,104 +75,74 @@ export class StepFunctionContextService {
// always triggered by the same event.
if (typeof event !== "object") return;

// Legacy lambda parsing
// Extract Payload if available (Legacy lambda parsing)
if (typeof event.Payload === "object") {
event = event.Payload;
}

// Execution
const execution = event.Execution;
if (typeof execution !== "object") {
logDebug("event.Execution is not an object.");
return;
}
const executionID = execution.Id;
if (typeof executionID !== "string") {
logDebug("event.Execution.Id is not a string.");
return;
}
const executionInput = execution.Input;
const executionName = execution.Name;
if (typeof executionName !== "string") {
logDebug("event.Execution.Name is not a string.");
return;
}
const executionRoleArn = execution.RoleArn;
if (typeof executionRoleArn !== "string") {
logDebug("event.Execution.RoleArn is not a string.");
return;
}
const executionStartTime = execution.StartTime;
if (typeof executionStartTime !== "string") {
logDebug("event.Execution.StartTime is not a string.");
return;
}

// State
const state = event.State;
if (typeof state !== "object") {
logDebug("event.State is not an object.");
return;
}
const stateRetryCount = state.RetryCount;
if (typeof stateRetryCount !== "number") {
logDebug("event.State.RetryCount is not a number.");
return;
}
const stateEnteredTime = state.EnteredTime;
if (typeof stateEnteredTime !== "string") {
logDebug("event.State.EnteredTime is not a string.");
return;
}
const stateName = state.Name;
if (typeof stateName !== "string") {
logDebug("event.State.Name is not a string.");
return;
}

// StateMachine
const stateMachine = event.StateMachine;
if (typeof stateMachine !== "object") {
logDebug("event.StateMachine is not an object.");
return;
}
const stateMachineArn = stateMachine.Id;
if (typeof stateMachineArn !== "string") {
logDebug("event.StateMachine.Id is not a string.");
return;
}
const stateMachineName = stateMachine.Name;
if (typeof stateMachineName !== "string") {
logDebug("event.StateMachine.Name is not a string.");
return;
// Extract _datadog if available (JSONata v1 parsing)
if (typeof event._datadog === "object") {
event = event._datadog;
}

// Extract the common context variables
const stateMachineContext = this.extractStateMachineContext(event);
if (stateMachineContext === null) return;
const { execution_id, state_entered_time, state_name } = stateMachineContext;

if (event.serverless_version === "string" && event.serverless_version == "v1") {
const serverless_version = event.serverless_version;

if (event.RootExecutionId === "string") {
const root_execution_id = event.RootExecutionId;

this.context = {
execution_id,
state_entered_time,
state_name,
root_execution_id,
serverless_version,
} as StepFunctionRootContext;
} else if (event.trace_id === "string" && event.dd_p_tid === "string") {
const trace_id = event.trace_id;
const dd_p_tid = event.dd_p_tid;

this.context = {
execution_id,
state_entered_time,
state_name,
trace_id,
dd_p_tid,
serverless_version,
} as LambdaRootContext;
}
} else {
this.context = { execution_id, state_entered_time, state_name } as LegacyContext;
}

const context = {
"step_function.execution_name": executionName,
"step_function.execution_id": executionID,
"step_function.execution_input": executionInput ?? {},
"step_function.execution_role_arn": executionRoleArn,
"step_function.execution_start_time": executionStartTime,
"step_function.state_entered_time": stateEnteredTime,
"step_function.state_machine_arn": stateMachineArn,
"step_function.state_machine_name": stateMachineName,
"step_function.state_name": stateName,
"step_function.state_retry_count": stateRetryCount,
};

this.context = context;
}

public get spanContext(): SpanContextWrapper | null {
if (this.context === undefined) return null;

const traceId = this.deterministicSha256HashToBigIntString(this.context["step_function.execution_id"], TRACE_ID);
let traceId: string;
let ptid: string;

if (isStepFunctionRootContext(this.context)) {
traceId = this.deterministicSha256HashToBigIntString(this.context.root_execution_id, TRACE_ID);
ptid = this.deterministicSha256HashToBigIntString(this.context.root_execution_id, DD_P_TID);
} else if (isLambdaRootContext(this.context)) {
traceId = this.context["trace_id"];
ptid = this.context["dd_p_tid"];
} else if (isLegacyContext(this.context)) {
traceId = this.deterministicSha256HashToBigIntString(this.context.execution_id, TRACE_ID);
ptid = this.deterministicSha256HashToBigIntString(this.context.execution_id, DD_P_TID);
} else {
logDebug("StepFunctionContext doesn't match any known formats!");
return null;
}

const parentId = this.deterministicSha256HashToBigIntString(
this.context["step_function.execution_id"] +
"#" +
this.context["step_function.state_name"] +
"#" +
this.context["step_function.state_entered_time"],
this.context.execution_id + "#" + this.context.state_name + "#" + this.context.state_entered_time,
PARENT_ID,
);
const sampleMode = SampleMode.AUTO_KEEP;
Expand All @@ -154,7 +158,6 @@ export class StepFunctionContextService {
sampling: { priority: sampleMode.toString(2) },
});

const ptid = this.deterministicSha256HashToBigIntString(this.context["step_function.execution_id"], DD_P_TID);
ddSpanContext._trace.tags["_dd.p.tid"] = id(ptid, 10).toString(16);
if (ddSpanContext === null) return null;

Expand All @@ -175,7 +178,7 @@ export class StepFunctionContextService {
}

private deterministicSha256Hash(s: string, type: string): string {
// returns 128 bits hash unless mostSignificant64Bits options is set to true.
// returns upper or lower 64 bits of the hash

const hash = new Sha256();
hash.update(s);
Expand All @@ -197,4 +200,30 @@ export class StepFunctionContextService {
private numberToBinaryString(num: number): string {
return num.toString(2).padStart(8, "0");
}

private extractStateMachineContext(event: any): {
execution_id: string;
state_entered_time: string;
state_name: string;
} | null {
const execution = event.Execution;
const state = event.State;

if (
typeof execution === "object" &&
typeof execution.Id === "string" &&
typeof state === "object" &&
typeof state.EnteredTime === "string" &&
typeof state.Name === "string"
) {
return {
execution_id: execution.Id,
state_entered_time: state.EnteredTime,
state_name: state.Name,
};
}

logDebug("Cannot extract StateMachine context! Invalid execution or state data.");
return null;
}
}
Loading