From 6319b1d1cbc4545f8309e85e40cfeaa388a941ab Mon Sep 17 00:00:00 2001 From: John Chrostek Date: Sun, 29 Mar 2026 08:29:51 -0400 Subject: [PATCH] feat: Add automatic trace propagation for Lambda Durable Functions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This adds automatic tracing support for AWS Lambda Durable Functions: - Auto-patches @aws/durable-execution-sdk-js when datadog-lambda-js loads - Extracts trace context from preserved InputPayload (HTTP headers, SNS/SQS attributes) - Falls back to deterministic trace ID from execution ARN when no parent exists - Creates child spans for each operation (step, wait, invoke, parallel, map) - Detects replay operations to avoid duplicate spans Zero user code changes required - existing code with datadog() wrapper works automatically. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/index.ts | 4 + src/trace/context/extractor.ts | 11 + .../extractors/durable-function.spec.ts | 130 +++++++ .../context/extractors/durable-function.ts | 19 + src/trace/context/extractors/index.ts | 1 + .../durable-function-context-wrapper.spec.ts | 330 +++++++++++++++++ src/trace/durable-function-context-wrapper.ts | 339 ++++++++++++++++++ src/trace/durable-function-patch.spec.ts | 149 ++++++++ src/trace/durable-function-patch.ts | 91 +++++ src/trace/durable-function-service.spec.ts | 328 +++++++++++++++++ src/trace/durable-function-service.ts | 250 +++++++++++++ src/trace/listener.ts | 2 + 12 files changed, 1654 insertions(+) create mode 100644 src/trace/context/extractors/durable-function.spec.ts create mode 100644 src/trace/context/extractors/durable-function.ts create mode 100644 src/trace/durable-function-context-wrapper.spec.ts create mode 100644 src/trace/durable-function-context-wrapper.ts create mode 100644 src/trace/durable-function-patch.spec.ts create mode 100644 src/trace/durable-function-patch.ts create mode 100644 src/trace/durable-function-service.spec.ts create mode 100644 src/trace/durable-function-service.ts diff --git a/src/index.ts b/src/index.ts index deb8245e4..f7141d705 100644 --- a/src/index.ts +++ b/src/index.ts @@ -27,6 +27,7 @@ import { getEnhancedMetricTags } from "./metrics/enhanced-metrics"; import { DatadogTraceHeaders } from "./trace/context/extractor"; import { SpanWrapper } from "./trace/span-wrapper"; import { SpanOptions, TracerWrapper } from "./trace/tracer-wrapper"; +import { initDurableFunctionTracing } from "./trace/durable-function-patch"; // Backwards-compatible export, TODO deprecate in next major export { DatadogTraceHeaders as TraceHeaders } from "./trace/context/extractor"; @@ -113,6 +114,9 @@ if (getEnvValue(coldStartTracingEnvVar, "true").toLowerCase() === "true" && !isM subscribeToDC(); } +// Initialize durable function tracing if SDK is present +initDurableFunctionTracing(); + const initTime = Date.now(); /** diff --git a/src/trace/context/extractor.ts b/src/trace/context/extractor.ts index 31fafa8f3..4d884d92c 100644 --- a/src/trace/context/extractor.ts +++ b/src/trace/context/extractor.ts @@ -14,8 +14,10 @@ import { SNSSQSEventTraceExtractor, SQSEventTraceExtractor, StepFunctionEventTraceExtractor, + DurableFunctionEventTraceExtractor, } from "./extractors"; import { StepFunctionContextService } from "../step-function-service"; +import { DurableFunctionContextService } from "../durable-function-service"; import { EventValidator } from "../../utils/event-validator"; import { TracerWrapper } from "../tracer-wrapper"; import { SpanContextWrapper } from "../span-context-wrapper"; @@ -37,6 +39,7 @@ export interface DatadogTraceHeaders { export class TraceContextExtractor { private xrayService: XrayService; private stepFunctionContextService?: StepFunctionContextService; + private durableFunctionContextService?: DurableFunctionContextService; constructor(private tracerWrapper: TracerWrapper, private config: TraceConfig) { this.xrayService = new XrayService(); @@ -64,6 +67,14 @@ export class TraceContextExtractor { } } + if (spanContext === null) { + this.durableFunctionContextService = DurableFunctionContextService.instance(event); + if (this.durableFunctionContextService?.context) { + const extractor = new DurableFunctionEventTraceExtractor(); + spanContext = extractor.extract(event); + } + } + if (spanContext === null) { const contextExtractor = new LambdaContextTraceExtractor(this.tracerWrapper); spanContext = contextExtractor.extract(context); diff --git a/src/trace/context/extractors/durable-function.spec.ts b/src/trace/context/extractors/durable-function.spec.ts new file mode 100644 index 000000000..93fc50124 --- /dev/null +++ b/src/trace/context/extractors/durable-function.spec.ts @@ -0,0 +1,130 @@ +import { DurableFunctionEventTraceExtractor } from "./durable-function"; +import { DurableFunctionContextService } from "../../durable-function-service"; + +describe("DurableFunctionEventTraceExtractor", () => { + const validDurableExecutionEvent = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:$LATEST/durable-execution/order-123/550e8400-e29b-41d4-a716-446655440001", + CheckpointToken: "some-token", + InitialExecutionState: { + Operations: [ + { + Id: "step-1", + Type: "STEP", + Status: "SUCCEEDED", + ExecutionDetails: { + InputPayload: JSON.stringify({ + headers: { + "x-datadog-trace-id": "12345678901234567890", + "x-datadog-parent-id": "9876543210987654321", + }, + }), + }, + }, + ], + }, + }; + + const regularLambdaEvent = { + body: '{"key": "value"}', + headers: { + "Content-Type": "application/json", + }, + }; + + beforeEach(() => { + DurableFunctionContextService.reset(); + }); + + describe("extract", () => { + it("extracts span context from valid durable execution event", () => { + const extractor = new DurableFunctionEventTraceExtractor(); + + const spanContext = extractor.extract(validDurableExecutionEvent); + + expect(spanContext).not.toBeNull(); + expect(spanContext?.toTraceId()).toBe("12345678901234567890"); + expect(spanContext?.toSpanId()).toBe("9876543210987654321"); + expect(spanContext?.source).toBe("event"); + }); + + it("returns null for regular Lambda event", () => { + const extractor = new DurableFunctionEventTraceExtractor(); + + const spanContext = extractor.extract(regularLambdaEvent); + + expect(spanContext).toBeNull(); + }); + + it("returns null for null event", () => { + const extractor = new DurableFunctionEventTraceExtractor(); + + const spanContext = extractor.extract(null); + + expect(spanContext).toBeNull(); + }); + + it("returns null for undefined event", () => { + const extractor = new DurableFunctionEventTraceExtractor(); + + const spanContext = extractor.extract(undefined); + + expect(spanContext).toBeNull(); + }); + + it("returns null for event with invalid DurableExecutionArn", () => { + const extractor = new DurableFunctionEventTraceExtractor(); + + const spanContext = extractor.extract({ + DurableExecutionArn: "invalid-arn", + }); + + expect(spanContext).toBeNull(); + }); + + it("extracts deterministic span context when no preserved trace headers", () => { + const eventWithoutPreservedTrace = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:$LATEST/durable-execution/order-456/650e8400-e29b-41d4-a716-446655440002", + CheckpointToken: "some-token", + InitialExecutionState: { + Operations: [ + { + Id: "step-1", + Type: "STEP", + Status: "SUCCEEDED", + ExecutionDetails: { + InputPayload: JSON.stringify({ + body: '{"key": "value"}', + }), + }, + }, + ], + }, + }; + + const extractor = new DurableFunctionEventTraceExtractor(); + + const spanContext = extractor.extract(eventWithoutPreservedTrace); + + expect(spanContext).not.toBeNull(); + // Should have deterministic IDs based on execution ID + expect(spanContext?.toTraceId()).toBeDefined(); + expect(spanContext?.toSpanId()).toBeDefined(); + }); + + it("uses singleton service instance", () => { + const extractor = new DurableFunctionEventTraceExtractor(); + + // First extraction + extractor.extract(validDurableExecutionEvent); + const instance1 = DurableFunctionContextService.instance(); + + // Second extraction with same event + extractor.extract(validDurableExecutionEvent); + const instance2 = DurableFunctionContextService.instance(); + + expect(instance1).toBe(instance2); + }); + }); +}); diff --git a/src/trace/context/extractors/durable-function.ts b/src/trace/context/extractors/durable-function.ts new file mode 100644 index 000000000..3b227d973 --- /dev/null +++ b/src/trace/context/extractors/durable-function.ts @@ -0,0 +1,19 @@ +import { SpanContextWrapper } from "../../span-context-wrapper"; +import { DurableFunctionContextService } from "../../durable-function-service"; +import { EventTraceExtractor } from "../extractor"; + +export class DurableFunctionEventTraceExtractor implements EventTraceExtractor { + extract(event: any): SpanContextWrapper | null { + const durableFunctionInstance = DurableFunctionContextService.instance(event); + const durableFunctionContext = durableFunctionInstance.context; + + if (durableFunctionContext !== undefined) { + const spanContext = durableFunctionInstance.spanContext; + if (spanContext !== null) { + return spanContext; + } + } + + return null; + } +} diff --git a/src/trace/context/extractors/index.ts b/src/trace/context/extractors/index.ts index 6bd690713..17839f91c 100644 --- a/src/trace/context/extractors/index.ts +++ b/src/trace/context/extractors/index.ts @@ -7,5 +7,6 @@ export { SQSEventTraceExtractor } from "./sqs"; export { SNSEventTraceExtractor } from "./sns"; export { SNSSQSEventTraceExtractor } from "./sns-sqs"; export { StepFunctionEventTraceExtractor } from "./step-function"; +export { DurableFunctionEventTraceExtractor } from "./durable-function"; export { LambdaContextTraceExtractor } from "./lambda-context"; export { CustomTraceExtractor } from "./custom"; diff --git a/src/trace/durable-function-context-wrapper.spec.ts b/src/trace/durable-function-context-wrapper.spec.ts new file mode 100644 index 000000000..145b58fd3 --- /dev/null +++ b/src/trace/durable-function-context-wrapper.spec.ts @@ -0,0 +1,330 @@ +import { wrapDurableContext, DurableExecutionContext, OperationState } from "./durable-function-context-wrapper"; + +// Mock dd-trace +const mockFinish = jest.fn(); +const mockSetTag = jest.fn(); +const mockStartSpan = jest.fn().mockReturnValue({ + finish: mockFinish, + setTag: mockSetTag, +}); + +jest.mock("dd-trace", () => ({ + startSpan: mockStartSpan, +})); + +describe("wrapDurableContext", () => { + let mockContext: DurableExecutionContext; + + beforeEach(() => { + jest.clearAllMocks(); + + mockContext = { + step: jest.fn().mockResolvedValue("step-result"), + wait: jest.fn().mockResolvedValue(undefined), + invoke: jest.fn().mockResolvedValue("invoke-result"), + waitForCallback: jest.fn().mockResolvedValue("callback-result"), + parallel: jest.fn().mockResolvedValue(["result1", "result2"]), + map: jest.fn().mockResolvedValue(["mapped1", "mapped2"]), + runInChildContext: jest.fn().mockImplementation(async (name, fn) => { + return fn(mockContext); + }), + }; + }); + + describe("step", () => { + it("creates span for new execution", async () => { + const event = { + InitialExecutionState: { + Operations: [] as OperationState[], + }, + }; + + const wrappedCtx = wrapDurableContext(mockContext, event); + const result = await wrappedCtx.step("fetch-user", async () => "user-data"); + + expect(mockStartSpan).toHaveBeenCalledWith("aws.lambda.durable.step", { + tags: { + "durable.operation.type": "step", + "durable.operation.name": "fetch-user", + "resource.name": "fetch-user", + }, + }); + expect(mockFinish).toHaveBeenCalled(); + expect(mockContext.step).toHaveBeenCalledWith("fetch-user", expect.any(Function), undefined); + }); + + it("skips span for replay (Status=SUCCEEDED)", async () => { + const event = { + InitialExecutionState: { + Operations: [ + { + Id: "step-fetch-user-1", + Type: "STEP", + Name: "fetch-user", + Status: "SUCCEEDED" as const, + }, + ], + }, + }; + + const wrappedCtx = wrapDurableContext(mockContext, event); + await wrappedCtx.step("fetch-user", async () => "user-data"); + + expect(mockStartSpan).not.toHaveBeenCalled(); + expect(mockContext.step).toHaveBeenCalled(); + }); + + it("captures errors in span", async () => { + const event = { + InitialExecutionState: { + Operations: [] as OperationState[], + }, + }; + + (mockContext.step as jest.Mock).mockRejectedValue(new Error("Step failed")); + + const wrappedCtx = wrapDurableContext(mockContext, event); + + await expect(wrappedCtx.step("failing-step", async () => "data")).rejects.toThrow("Step failed"); + + expect(mockSetTag).toHaveBeenCalledWith("error", true); + expect(mockSetTag).toHaveBeenCalledWith("error.message", "Step failed"); + expect(mockSetTag).toHaveBeenCalledWith("error.type", "Error"); + expect(mockFinish).toHaveBeenCalled(); + }); + }); + + describe("wait", () => { + it("creates span for new wait", async () => { + const event = { + InitialExecutionState: { + Operations: [] as OperationState[], + }, + }; + + const wrappedCtx = wrapDurableContext(mockContext, event); + await wrappedCtx.wait("delay-5min", { minutes: 5 }); + + expect(mockStartSpan).toHaveBeenCalledWith("aws.lambda.durable.wait", { + tags: { + "durable.operation.type": "wait", + "durable.operation.name": "delay-5min", + "resource.name": "delay-5min", + "durable.wait.duration": '{"minutes":5}', + }, + }); + expect(mockFinish).toHaveBeenCalled(); + }); + + it("skips span for replay", async () => { + const event = { + InitialExecutionState: { + Operations: [ + { + Id: "wait-delay-5min-1", + Type: "WAIT", + Name: "delay-5min", + Status: "SUCCEEDED" as const, + }, + ], + }, + }; + + const wrappedCtx = wrapDurableContext(mockContext, event); + await wrappedCtx.wait("delay-5min", { minutes: 5 }); + + expect(mockStartSpan).not.toHaveBeenCalled(); + }); + }); + + describe("invoke", () => { + it("creates span with function name tag", async () => { + const event = { + InitialExecutionState: { + Operations: [] as OperationState[], + }, + }; + + const wrappedCtx = wrapDurableContext(mockContext, event); + await wrappedCtx.invoke("call-processor", "processor-function", { data: "test" }); + + expect(mockStartSpan).toHaveBeenCalledWith("aws.lambda.durable.invoke", { + tags: { + "durable.operation.type": "invoke", + "durable.operation.name": "call-processor", + "resource.name": "call-processor", + "durable.invoke.function_name": "processor-function", + }, + }); + expect(mockFinish).toHaveBeenCalled(); + }); + + it("skips span for replay", async () => { + const event = { + InitialExecutionState: { + Operations: [ + { + Id: "invoke-call-processor-1", + Type: "INVOKE", + Name: "call-processor", + Status: "SUCCEEDED" as const, + }, + ], + }, + }; + + const wrappedCtx = wrapDurableContext(mockContext, event); + await wrappedCtx.invoke("call-processor", "processor-function", { data: "test" }); + + expect(mockStartSpan).not.toHaveBeenCalled(); + }); + }); + + describe("waitForCallback", () => { + it("creates span for new callback", async () => { + const event = { + InitialExecutionState: { + Operations: [] as OperationState[], + }, + }; + + const wrappedCtx = wrapDurableContext(mockContext, event); + await wrappedCtx.waitForCallback("approval", async () => {}); + + expect(mockStartSpan).toHaveBeenCalledWith("aws.lambda.durable.callback", { + tags: { + "durable.operation.type": "callback", + "durable.operation.name": "approval", + "resource.name": "approval", + }, + }); + }); + }); + + describe("parallel", () => { + it("creates span with branch count tag", async () => { + const event = { + InitialExecutionState: { + Operations: [] as OperationState[], + }, + }; + + const branches = [async () => "a", async () => "b", async () => "c"]; + + const wrappedCtx = wrapDurableContext(mockContext, event); + await wrappedCtx.parallel("process-all", branches); + + expect(mockStartSpan).toHaveBeenCalledWith("aws.lambda.durable.parallel", { + tags: { + "durable.operation.type": "parallel", + "durable.operation.name": "process-all", + "resource.name": "process-all", + "durable.parallel.branch_count": "3", + }, + }); + }); + }); + + describe("map", () => { + it("creates span with item count tag", async () => { + const event = { + InitialExecutionState: { + Operations: [] as OperationState[], + }, + }; + + const items = [1, 2, 3, 4, 5]; + + const wrappedCtx = wrapDurableContext(mockContext, event); + await wrappedCtx.map("process-items", items, async (item) => item * 2); + + expect(mockStartSpan).toHaveBeenCalledWith("aws.lambda.durable.map", { + tags: { + "durable.operation.type": "map", + "durable.operation.name": "process-items", + "resource.name": "process-items", + "durable.map.item_count": "5", + }, + }); + }); + }); + + describe("preserves original context behavior", () => { + it("returns result from original step function", async () => { + const event = { + InitialExecutionState: { + Operations: [] as OperationState[], + }, + }; + + (mockContext.step as jest.Mock).mockResolvedValue("custom-result"); + + const wrappedCtx = wrapDurableContext(mockContext, event); + const result = await wrappedCtx.step("my-step", async () => "data"); + + expect(result).toBe("custom-result"); + }); + + it("passes through options to original step", async () => { + const event = { + InitialExecutionState: { + Operations: [] as OperationState[], + }, + }; + + const options = { retryPolicy: { maxAttempts: 3 } }; + + const wrappedCtx = wrapDurableContext(mockContext, event); + await wrappedCtx.step("my-step", async () => "data", options); + + expect(mockContext.step).toHaveBeenCalledWith("my-step", expect.any(Function), options); + }); + }); + + describe("runInChildContext", () => { + it("wraps child context recursively", async () => { + const event = { + InitialExecutionState: { + Operations: [] as OperationState[], + }, + }; + + const wrappedCtx = wrapDurableContext(mockContext, event); + await wrappedCtx.runInChildContext("child-workflow", async (childCtx) => { + // Child context should also be wrapped + await childCtx.step("child-step", async () => "child-data"); + return "child-result"; + }); + + // Should create spans for both child_context and the nested step + expect(mockStartSpan).toHaveBeenCalledWith("aws.lambda.durable.child_context", expect.any(Object)); + }); + }); + + describe("operation counter tracking", () => { + it("tracks operation IDs correctly for multiple same-name operations", async () => { + const event = { + InitialExecutionState: { + Operations: [ + { + Id: "step-1", + Type: "STEP", + Name: "fetch", + Status: "SUCCEEDED" as const, + }, + ] as OperationState[], + }, + }; + + const wrappedCtx = wrapDurableContext(mockContext, event); + + // First call - should be replay (matches Name="fetch" and Type="STEP") + await wrappedCtx.step("fetch", async () => "data1"); + expect(mockStartSpan).not.toHaveBeenCalled(); + + // Second call - should create span (the first operation is consumed) + await wrappedCtx.step("fetch", async () => "data2"); + expect(mockStartSpan).toHaveBeenCalled(); + }); + }); +}); diff --git a/src/trace/durable-function-context-wrapper.ts b/src/trace/durable-function-context-wrapper.ts new file mode 100644 index 000000000..798459743 --- /dev/null +++ b/src/trace/durable-function-context-wrapper.ts @@ -0,0 +1,339 @@ +import { Span, Tracer } from "dd-trace"; +import { logDebug } from "../utils"; + +// Types from AWS Durable Execution SDK +export interface DurableExecutionContext { + step(name: string, fn: () => Promise, options?: StepOptions): Promise; + wait(name: string, duration: WaitDuration): Promise; + invoke(name: string, functionName: string, payload?: any): Promise; + waitForCallback(name: string, fn: (callbackId: string) => Promise, options?: CallbackOptions): Promise; + parallel(name: string, branches: (() => Promise)[]): Promise; + map(name: string, items: T[], fn: (item: T) => Promise): Promise; + runInChildContext(name: string, fn: (childCtx: DurableExecutionContext) => Promise): Promise; +} + +export interface StepOptions { + retryPolicy?: RetryPolicy; +} + +export interface WaitDuration { + seconds?: number; + minutes?: number; + hours?: number; + days?: number; +} + +export interface CallbackOptions { + timeout?: WaitDuration; +} + +export interface RetryPolicy { + maxAttempts?: number; + initialDelay?: WaitDuration; + backoffRate?: number; +} + +// Operation state from the durable execution event +export interface OperationState { + Id: string; + Type: string; + SubType?: string; + Name?: string; + Status: "STARTED" | "SUCCEEDED" | "FAILED"; + StartTimestamp?: string; + EndTimestamp?: string; +} + +/** + * Wraps a DurableExecutionContext to automatically create Datadog spans + * for each operation (step, wait, invoke, etc.). + * + * Usage: + * ```typescript + * import { wrapDurableContext } from 'datadog-lambda-js'; + * + * export const handler = datadog( + * withDurableExecution(async (event, ctx) => { + * const tracedCtx = wrapDurableContext(ctx, event); + * await tracedCtx.step('fetch-user', async () => fetchUser()); + * }) + * ); + * ``` + */ +export function wrapDurableContext(ctx: DurableExecutionContext, event: any): DurableExecutionContext { + let tracer: Tracer; + try { + tracer = require("dd-trace"); + } catch { + logDebug("dd-trace not available, returning unwrapped context"); + return ctx; + } + + const operations = extractOperations(event); + + // Track which operations have been "consumed" to avoid double-matching + const consumedOperationIds = new Set(); + + /** + * Check if an operation is a replay (already completed in previous invocation). + * + * The SDK uses: + * - `Id`: Auto-generated format like "step-1", "wait-1" + * - `Name`: User-provided name like "fetch-user" + * - `Type`: Operation type like "STEP", "WAIT", "INVOKE" + * + * We match by Name (user-provided) and Type since these are stable across replays. + */ + const isReplay = (operationName: string, operationType: string): boolean => { + // Find operation by name and type that hasn't been consumed yet + const op = operations.find((o) => { + if (consumedOperationIds.has(o.Id)) { + return false; + } + // Match by name (what user provides) and type + return o.Name === operationName && o.Type === operationType.toUpperCase(); + }); + + if (op && (op.Status === "SUCCEEDED" || op.Status === "FAILED")) { + // Mark this operation as consumed so it won't match again + consumedOperationIds.add(op.Id); + return true; + } + return false; + }; + + const createOperationSpan = (type: string, name: string, additionalTags?: Record): Span | null => { + try { + const span = tracer.startSpan(`aws.lambda.durable.${type}`, { + tags: { + "durable.operation.type": type, + "durable.operation.name": name, + "resource.name": name, + ...additionalTags, + }, + }); + return span; + } catch (error) { + logDebug(`Failed to create span for ${type}:${name}`, { error }); + return null; + } + }; + + const originalStep = ctx.step.bind(ctx); + const wrappedStep = async (name: string, fn: () => Promise, options?: StepOptions): Promise => { + if (isReplay(name, "step")) { + logDebug(`Replay detected for step: ${name}, skipping span`); + return originalStep(name, fn, options); + } + + const span = createOperationSpan("step", name); + if (!span) { + return originalStep(name, fn, options); + } + + try { + const result = await originalStep(name, fn, options); + span.finish(); + return result; + } catch (error) { + span.setTag("error", true); + if (error instanceof Error) { + span.setTag("error.message", error.message); + span.setTag("error.type", error.name); + } + span.finish(); + throw error; + } + }; + + const originalWait = ctx.wait.bind(ctx); + const wrappedWait = async (name: string, duration: WaitDuration): Promise => { + if (isReplay(name, "wait")) { + logDebug(`Replay detected for wait: ${name}, skipping span`); + return originalWait(name, duration); + } + + const span = createOperationSpan("wait", name, { + "durable.wait.duration": JSON.stringify(duration), + }); + if (!span) { + return originalWait(name, duration); + } + + try { + // Note: If this operation causes Lambda to exit (PENDING state), this span will not be + // finished. This is expected - the span represents the operation attempt, not completion. + // On replay, no span is created since the operation already completed (isReplay returns true). + const result = await originalWait(name, duration); + span.finish(); + return result; + } catch (error) { + span.setTag("error", true); + span.finish(); + throw error; + } + }; + + const originalInvoke = ctx.invoke.bind(ctx); + const wrappedInvoke = async (name: string, functionName: string, payload?: any): Promise => { + if (isReplay(name, "invoke")) { + logDebug(`Replay detected for invoke: ${name}, skipping span`); + return originalInvoke(name, functionName, payload); + } + + const span = createOperationSpan("invoke", name, { + "durable.invoke.function_name": functionName, + }); + if (!span) { + return originalInvoke(name, functionName, payload); + } + + try { + // Note: If this operation causes Lambda to exit (PENDING state), this span will not be + // finished. This is expected - the span represents the operation attempt, not completion. + // On replay, no span is created since the operation already completed (isReplay returns true). + const result = await originalInvoke(name, functionName, payload); + span.finish(); + return result; + } catch (error) { + span.setTag("error", true); + span.finish(); + throw error; + } + }; + + const originalWaitForCallback = ctx.waitForCallback.bind(ctx); + const wrappedWaitForCallback = async ( + name: string, + fn: (callbackId: string) => Promise, + options?: CallbackOptions, + ): Promise => { + if (isReplay(name, "callback")) { + logDebug(`Replay detected for waitForCallback: ${name}, skipping span`); + return originalWaitForCallback(name, fn, options); + } + + const span = createOperationSpan("callback", name); + if (!span) { + return originalWaitForCallback(name, fn, options); + } + + try { + // Note: If this operation causes Lambda to exit (PENDING state), this span will not be + // finished. This is expected - the span represents the operation attempt, not completion. + // On replay, no span is created since the operation already completed (isReplay returns true). + const result = await originalWaitForCallback(name, fn, options); + span.finish(); + return result; + } catch (error) { + span.setTag("error", true); + span.finish(); + throw error; + } + }; + + const originalParallel = ctx.parallel.bind(ctx); + const wrappedParallel = async (name: string, branches: (() => Promise)[]): Promise => { + if (isReplay(name, "parallel")) { + logDebug(`Replay detected for parallel: ${name}, skipping span`); + return originalParallel(name, branches); + } + + const span = createOperationSpan("parallel", name, { + "durable.parallel.branch_count": branches.length.toString(), + }); + if (!span) { + return originalParallel(name, branches); + } + + try { + const result = await originalParallel(name, branches); + span.finish(); + return result; + } catch (error) { + span.setTag("error", true); + span.finish(); + throw error; + } + }; + + const originalMap = ctx.map.bind(ctx); + const wrappedMap = async (name: string, items: T[], fn: (item: T) => Promise): Promise => { + if (isReplay(name, "map")) { + logDebug(`Replay detected for map: ${name}, skipping span`); + return originalMap(name, items, fn); + } + + const span = createOperationSpan("map", name, { + "durable.map.item_count": items.length.toString(), + }); + if (!span) { + return originalMap(name, items, fn); + } + + try { + const result = await originalMap(name, items, fn); + span.finish(); + return result; + } catch (error) { + span.setTag("error", true); + span.finish(); + throw error; + } + }; + + const originalRunInChildContext = ctx.runInChildContext?.bind(ctx); + const wrappedRunInChildContext = originalRunInChildContext + ? async (name: string, fn: (childCtx: DurableExecutionContext) => Promise): Promise => { + if (isReplay(name, "child_context")) { + logDebug(`Replay detected for runInChildContext: ${name}, skipping span`); + return originalRunInChildContext(name, fn); + } + + const span = createOperationSpan("child_context", name); + if (!span) { + return originalRunInChildContext(name, fn); + } + + try { + // Wrap the child context recursively + const result = await originalRunInChildContext(name, (childCtx) => { + const wrappedChildCtx = wrapDurableContext(childCtx, event); + return fn(wrappedChildCtx); + }); + span.finish(); + return result; + } catch (error) { + span.setTag("error", true); + span.finish(); + throw error; + } + } + : undefined; + + return { + ...ctx, + step: wrappedStep, + wait: wrappedWait, + invoke: wrappedInvoke, + waitForCallback: wrappedWaitForCallback, + parallel: wrappedParallel, + map: wrappedMap, + ...(wrappedRunInChildContext && { runInChildContext: wrappedRunInChildContext }), + } as DurableExecutionContext; +} + +/** + * Extract operations array from the durable execution event. + */ +function extractOperations(event: any): OperationState[] { + try { + const operations = event?.InitialExecutionState?.Operations; + if (Array.isArray(operations)) { + return operations; + } + return []; + } catch { + return []; + } +} diff --git a/src/trace/durable-function-patch.spec.ts b/src/trace/durable-function-patch.spec.ts new file mode 100644 index 000000000..f344d0d01 --- /dev/null +++ b/src/trace/durable-function-patch.spec.ts @@ -0,0 +1,149 @@ +// Mock the durable execution SDK +const mockOriginalWithDurableExecution = jest.fn((handler) => { + return async (event: any, context: any) => { + // Simulate the SDK's behavior of passing ctx + const mockCtx = { + step: jest.fn().mockResolvedValue("step-result"), + wait: jest.fn().mockResolvedValue(undefined), + invoke: jest.fn().mockResolvedValue("invoke-result"), + waitForCallback: jest.fn().mockResolvedValue("callback-result"), + parallel: jest.fn().mockResolvedValue([]), + map: jest.fn().mockResolvedValue([]), + }; + return handler(event, mockCtx); + }; +}); + +// Create the mock SDK object that will be modified by the patching +const mockDurableSDK = { + withDurableExecution: mockOriginalWithDurableExecution, +}; + +// Set up the mock before any imports that might trigger require() +jest.mock("@aws/durable-execution-sdk-js", () => mockDurableSDK, { virtual: true }); + +import { + initDurableFunctionTracing, + isDurableFunctionTracingEnabled, + resetDurableFunctionPatch, +} from "./durable-function-patch"; + +describe("durable-function-patch", () => { + const originalEnv = process.env; + + beforeEach(() => { + resetDurableFunctionPatch(); + process.env = { ...originalEnv }; + + // Reset the mock SDK function to original + mockDurableSDK.withDurableExecution = mockOriginalWithDurableExecution; + }); + + afterEach(() => { + process.env = originalEnv; + jest.restoreAllMocks(); + }); + + describe("initDurableFunctionTracing", () => { + it("patches withDurableExecution when SDK is present", () => { + initDurableFunctionTracing(); + + expect(isDurableFunctionTracingEnabled()).toBe(true); + // The original function should be replaced + expect(mockDurableSDK.withDurableExecution).not.toBe(mockOriginalWithDurableExecution); + }); + + it("respects DD_DISABLE_DURABLE_FUNCTION_TRACING env var", () => { + process.env.DD_DISABLE_DURABLE_FUNCTION_TRACING = "true"; + + initDurableFunctionTracing(); + + expect(isDurableFunctionTracingEnabled()).toBe(false); + }); + + it("only patches once (idempotent)", () => { + initDurableFunctionTracing(); + const firstPatchedFunction = mockDurableSDK.withDurableExecution; + + initDurableFunctionTracing(); + const secondPatchedFunction = mockDurableSDK.withDurableExecution; + + // Should be the same patched function + expect(firstPatchedFunction).toBe(secondPatchedFunction); + expect(isDurableFunctionTracingEnabled()).toBe(true); + }); + }); + + describe("wrapped handler behavior", () => { + it("wrapped handler receives traced context", async () => { + initDurableFunctionTracing(); + + // Create a handler using the patched SDK + const userHandler = jest.fn().mockResolvedValue("handler-result"); + const wrappedHandler = mockDurableSDK.withDurableExecution(userHandler); + + // Invoke the wrapped handler + const event = { test: "event" }; + const result = await wrappedHandler(event, {}); + + // User handler should be called with the event and a context + expect(userHandler).toHaveBeenCalled(); + const [receivedEvent, receivedCtx] = userHandler.mock.calls[0]; + expect(receivedEvent).toEqual(event); + // Context should have wrapped methods (step, wait, etc.) + expect(receivedCtx.step).toBeDefined(); + expect(receivedCtx.wait).toBeDefined(); + expect(receivedCtx.invoke).toBeDefined(); + }); + + it("original handler behavior preserved", async () => { + initDurableFunctionTracing(); + + const userHandler = jest.fn().mockResolvedValue("expected-result"); + const wrappedHandler = mockDurableSDK.withDurableExecution(userHandler); + + const result = await wrappedHandler({ test: "event" }, {}); + + expect(result).toBe("expected-result"); + }); + + it("preserves options passed to withDurableExecution", () => { + // Track if original was called with options + const originalMock = jest.fn((handler: any, options?: any) => { + return async () => handler({}, {}); + }); + (mockDurableSDK as any).withDurableExecution = originalMock; + + initDurableFunctionTracing(); + + const userHandler = jest.fn(); + const options = { timeout: 30000 }; + + (mockDurableSDK as any).withDurableExecution(userHandler, options); + + expect(originalMock).toHaveBeenCalledWith(expect.any(Function), options); + }); + }); + + describe("isDurableFunctionTracingEnabled", () => { + it("returns false before initialization", () => { + expect(isDurableFunctionTracingEnabled()).toBe(false); + }); + + it("returns true after successful initialization", () => { + initDurableFunctionTracing(); + + expect(isDurableFunctionTracingEnabled()).toBe(true); + }); + }); + + describe("resetDurableFunctionPatch", () => { + it("resets patch state for testing", () => { + initDurableFunctionTracing(); + expect(isDurableFunctionTracingEnabled()).toBe(true); + + resetDurableFunctionPatch(); + expect(isDurableFunctionTracingEnabled()).toBe(false); + }); + }); +}); diff --git a/src/trace/durable-function-patch.ts b/src/trace/durable-function-patch.ts new file mode 100644 index 000000000..179d6836d --- /dev/null +++ b/src/trace/durable-function-patch.ts @@ -0,0 +1,91 @@ +import { logDebug } from "../utils"; +import { wrapDurableContext } from "./durable-function-context-wrapper"; + +let patchApplied = false; +let originalWithDurableExecution: ((...args: any[]) => any) | null = null; +let patchedSDK: any = null; + +/** + * Initialize automatic durable function tracing. + * Call this when datadog-lambda-js loads. + * + * If @aws/durable-execution-sdk-js is installed, this patches + * withDurableExecution to automatically wrap the context for tracing. + */ +export function initDurableFunctionTracing(): void { + // Only patch once + if (patchApplied) { + return; + } + + // Allow disabling via env var + if (process.env.DD_DISABLE_DURABLE_FUNCTION_TRACING === "true") { + logDebug("Durable function tracing disabled via DD_DISABLE_DURABLE_FUNCTION_TRACING"); + return; + } + + try { + // Try to require the SDK - if not installed, this throws + const durableSDK = require("@aws/durable-execution-sdk-js"); + + if (typeof durableSDK.withDurableExecution !== "function") { + logDebug("Durable SDK found but withDurableExecution is not a function"); + return; + } + + patchWithDurableExecution(durableSDK); + patchApplied = true; + logDebug("Durable function tracing enabled"); + } catch { + // SDK not installed - this is expected for non-durable functions + logDebug("Durable function tracing not enabled (SDK not found)"); + } +} + +/** + * Patch withDurableExecution to wrap context for automatic tracing. + */ +function patchWithDurableExecution(durableSDK: any): void { + // Store original function and SDK reference for reset + const originalFn = durableSDK.withDurableExecution; + originalWithDurableExecution = originalFn; + patchedSDK = durableSDK; + + // tslint:disable-next-line:ban-types + durableSDK.withDurableExecution = function patchedWithDurableExecution( + userHandler: (...args: any[]) => any, + options?: any, + ) { + // Create a traced handler that wraps the user's handler + const tracedHandler = async (event: any, ctx: any) => { + // Wrap the context to enable operation tracing + const tracedCtx = wrapDurableContext(ctx, event); + + // Call user's handler with wrapped context + return userHandler(event, tracedCtx); + }; + + // Call original withDurableExecution with our traced handler + return originalFn.call(durableSDK, tracedHandler, options); + }; +} + +/** + * Check if durable function tracing is active. + */ +export function isDurableFunctionTracingEnabled(): boolean { + return patchApplied; +} + +/** + * Reset the patch state (for testing purposes). + * Restores the original SDK function if it was patched. + */ +export function resetDurableFunctionPatch(): void { + if (originalWithDurableExecution && patchedSDK) { + patchedSDK.withDurableExecution = originalWithDurableExecution; + originalWithDurableExecution = null; + patchedSDK = null; + } + patchApplied = false; +} diff --git a/src/trace/durable-function-service.spec.ts b/src/trace/durable-function-service.spec.ts new file mode 100644 index 000000000..d24df4225 --- /dev/null +++ b/src/trace/durable-function-service.spec.ts @@ -0,0 +1,328 @@ +import { DurableFunctionContextService, PARENT_ID, TRACE_ID } from "./durable-function-service"; + +describe("DurableFunctionContextService", () => { + // Use valid trace IDs that fit within 64-bit BigInt constraints (max: 18446744073709551615) + const validDurableExecutionEvent = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:$LATEST/durable-execution/order-123/550e8400-e29b-41d4-a716-446655440001", + CheckpointToken: "some-token", + InitialExecutionState: { + Operations: [ + { + Id: "step-1", + Type: "STEP", + Name: "fetch-user", + Status: "SUCCEEDED", + ExecutionDetails: { + InputPayload: JSON.stringify({ + headers: { + "x-datadog-trace-id": "1234567890123456789", + "x-datadog-parent-id": "9876543210987654321", + "x-datadog-sampling-priority": "1", + }, + body: '{"key": "value"}', + }), + }, + }, + ], + }, + }; + + const durableEventWithoutPreservedTrace = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:$LATEST/durable-execution/order-456/650e8400-e29b-41d4-a716-446655440002", + CheckpointToken: "some-token", + InitialExecutionState: { + Operations: [ + { + Id: "step-1", + Type: "STEP", + Name: "fetch-user", + Status: "SUCCEEDED", + ExecutionDetails: { + InputPayload: JSON.stringify({ + body: '{"key": "value"}', + }), + }, + }, + ], + }, + }; + + const durableEventWithDatadogObject = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:$LATEST/durable-execution/order-789/750e8400-e29b-41d4-a716-446655440003", + CheckpointToken: "some-token", + InitialExecutionState: { + Operations: [ + { + Id: "step-1", + Type: "STEP", + Name: "fetch-user", + Status: "SUCCEEDED", + ExecutionDetails: { + InputPayload: JSON.stringify({ + _datadog: { + "x-datadog-trace-id": "1111111111111111111", + "x-datadog-parent-id": "2222222222222222222", + "x-datadog-sampling-priority": "2", + }, + }), + }, + }, + ], + }, + }; + + const durableEventWithSNS = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:$LATEST/durable-execution/order-sns/850e8400-e29b-41d4-a716-446655440004", + CheckpointToken: "some-token", + InitialExecutionState: { + Operations: [ + { + Id: "step-1", + Type: "STEP", + Status: "SUCCEEDED", + ExecutionDetails: { + InputPayload: JSON.stringify({ + Records: [ + { + Sns: { + MessageAttributes: { + "_datadog.trace-id": { Value: "3333333333333333333" }, + "_datadog.parent-id": { Value: "4444444444444444444" }, + }, + }, + }, + ], + }), + }, + }, + ], + }, + }; + + const durableEventWithSQS = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:$LATEST/durable-execution/order-sqs/950e8400-e29b-41d4-a716-446655440005", + CheckpointToken: "some-token", + InitialExecutionState: { + Operations: [ + { + Id: "step-1", + Type: "STEP", + Status: "SUCCEEDED", + ExecutionDetails: { + InputPayload: JSON.stringify({ + Records: [ + { + messageAttributes: { + _datadog_trace_id: { stringValue: "5555555555555555555" }, + _datadog_parent_id: { stringValue: "6666666666666666666" }, + }, + }, + ], + }), + }, + }, + ], + }, + }; + + const durableEventWithEventBridge = { + DurableExecutionArn: + "arn:aws:lambda:us-east-1:123456789012:function:my-func:$LATEST/durable-execution/order-eb/a50e8400-e29b-41d4-a716-446655440006", + CheckpointToken: "some-token", + InitialExecutionState: { + Operations: [ + { + Id: "step-1", + Type: "STEP", + Status: "SUCCEEDED", + ExecutionDetails: { + InputPayload: JSON.stringify({ + detail: { + _datadog: { + "x-datadog-trace-id": "7777777777777777777", + "x-datadog-parent-id": "8888888888888888888", + "x-datadog-sampling-priority": "1", + }, + }, + }), + }, + }, + ], + }, + }; + + describe("instance", () => { + beforeEach(() => { + DurableFunctionContextService.reset(); + }); + + it("returns the same instance every time", () => { + const instance1 = DurableFunctionContextService.instance(validDurableExecutionEvent); + const instance2 = DurableFunctionContextService.instance(); + + expect(instance1).toBe(instance2); + }); + + it("returns undefined context for non-durable events", () => { + const instance = DurableFunctionContextService.instance({ + body: '{"key": "value"}', + headers: {}, + }); + + expect(instance.context).toBeUndefined(); + }); + }); + + describe("context extraction", () => { + beforeEach(() => { + DurableFunctionContextService.reset(); + }); + + it("extracts context from valid durable execution event", () => { + const instance = DurableFunctionContextService.instance(validDurableExecutionEvent); + + expect(instance.context).toEqual({ + durable_function_execution_name: "order-123", + durable_function_execution_id: "550e8400-e29b-41d4-a716-446655440001", + }); + }); + + it("returns undefined context when DurableExecutionArn is missing", () => { + const instance = DurableFunctionContextService.instance({ + CheckpointToken: "some-token", + }); + + expect(instance.context).toBeUndefined(); + }); + }); + + describe("spanContext", () => { + beforeEach(() => { + DurableFunctionContextService.reset(); + }); + + it("extracts trace from preserved HTTP headers", () => { + const instance = DurableFunctionContextService.instance(validDurableExecutionEvent); + + const spanContext = instance.spanContext; + + expect(spanContext).not.toBeNull(); + expect(spanContext?.toTraceId()).toBe("1234567890123456789"); + expect(spanContext?.toSpanId()).toBe("9876543210987654321"); + expect(spanContext?.source).toBe("event"); + }); + + it("extracts trace from preserved _datadog object", () => { + const instance = DurableFunctionContextService.instance(durableEventWithDatadogObject); + + const spanContext = instance.spanContext; + + expect(spanContext).not.toBeNull(); + expect(spanContext?.toTraceId()).toBe("1111111111111111111"); + expect(spanContext?.toSpanId()).toBe("2222222222222222222"); + }); + + it("extracts trace from preserved SNS attributes", () => { + const instance = DurableFunctionContextService.instance(durableEventWithSNS); + + const spanContext = instance.spanContext; + + expect(spanContext).not.toBeNull(); + expect(spanContext?.toTraceId()).toBe("3333333333333333333"); + expect(spanContext?.toSpanId()).toBe("4444444444444444444"); + }); + + it("extracts trace from preserved SQS attributes", () => { + const instance = DurableFunctionContextService.instance(durableEventWithSQS); + + const spanContext = instance.spanContext; + + expect(spanContext).not.toBeNull(); + expect(spanContext?.toTraceId()).toBe("5555555555555555555"); + expect(spanContext?.toSpanId()).toBe("6666666666666666666"); + }); + + it("extracts trace from preserved EventBridge detail", () => { + const instance = DurableFunctionContextService.instance(durableEventWithEventBridge); + + const spanContext = instance.spanContext; + + expect(spanContext).not.toBeNull(); + expect(spanContext?.toTraceId()).toBe("7777777777777777777"); + expect(spanContext?.toSpanId()).toBe("8888888888888888888"); + }); + + it("falls back to deterministic when no preserved trace", () => { + const instance = DurableFunctionContextService.instance(durableEventWithoutPreservedTrace); + + const spanContext = instance.spanContext; + + expect(spanContext).not.toBeNull(); + // Deterministic IDs should be based on execution ID + expect(spanContext?.toTraceId()).toBeDefined(); + expect(spanContext?.toSpanId()).toBeDefined(); + // Trace ID should be consistent for the same execution ID + const spanContext2 = instance.spanContext; + expect(spanContext?.toTraceId()).toBe(spanContext2?.toTraceId()); + }); + + it("returns null when context is not set", () => { + const instance = DurableFunctionContextService.instance({}); + + const spanContext = instance.spanContext; + + expect(spanContext).toBeNull(); + }); + }); + + describe("deterministicSha256HashToBigIntString", () => { + beforeEach(() => { + DurableFunctionContextService.reset(); + }); + + it("generates deterministic trace ID (same input = same output)", () => { + const instance = DurableFunctionContextService.instance(durableEventWithoutPreservedTrace); + const hash1 = instance["deterministicSha256HashToBigIntString"]("650e8400-e29b-41d4-a716-446655440002", TRACE_ID); + const hash2 = instance["deterministicSha256HashToBigIntString"]("650e8400-e29b-41d4-a716-446655440002", TRACE_ID); + expect(hash1).toEqual(hash2); + }); + + it("different execution IDs produce different trace IDs", () => { + const instance = DurableFunctionContextService.instance(durableEventWithoutPreservedTrace); + const hash1 = instance["deterministicSha256HashToBigIntString"]("650e8400-e29b-41d4-a716-446655440002", TRACE_ID); + const hash2 = instance["deterministicSha256HashToBigIntString"]("750e8400-e29b-41d4-a716-446655440003", TRACE_ID); + expect(hash1).not.toEqual(hash2); + }); + + it("returns different hashes for TRACE_ID and PARENT_ID types", () => { + const instance = DurableFunctionContextService.instance(durableEventWithoutPreservedTrace); + const traceHash = instance["deterministicSha256HashToBigIntString"]( + "650e8400-e29b-41d4-a716-446655440002", + TRACE_ID, + ); + const parentHash = instance["deterministicSha256HashToBigIntString"]( + "650e8400-e29b-41d4-a716-446655440002", + PARENT_ID, + ); + expect(traceHash).not.toEqual(parentHash); + }); + }); + + describe("reset", () => { + it("clears the singleton instance", () => { + const instance1 = DurableFunctionContextService.instance(validDurableExecutionEvent); + expect(instance1.context).toBeDefined(); + + DurableFunctionContextService.reset(); + + const instance2 = DurableFunctionContextService.instance({}); + expect(instance2.context).toBeUndefined(); + expect(instance1).not.toBe(instance2); + }); + }); +}); diff --git a/src/trace/durable-function-service.ts b/src/trace/durable-function-service.ts new file mode 100644 index 000000000..772bb01b6 --- /dev/null +++ b/src/trace/durable-function-service.ts @@ -0,0 +1,250 @@ +import { logDebug } from "../utils"; +import { SampleMode, TraceSource } from "./trace-context-service"; +import { SpanContextWrapper } from "./span-context-wrapper"; +import { Sha256 } from "@aws-crypto/sha256-js"; +import { extractDurableFunctionContext, DurableFunctionContext } from "./durable-function-context"; + +// Types for preserved event trace context +interface PreservedTraceContext { + traceId: string; + parentId: string; + samplingPriority?: string; + source: "http" | "sns" | "sqs" | "eventbridge" | "custom"; +} + +export const TRACE_ID = "traceId"; +export const PARENT_ID = "spanId"; +export const DD_P_TID = "_dd.p.tid"; + +export class DurableFunctionContextService { + private static _instance: DurableFunctionContextService; + public context?: DurableFunctionContext; + private preservedEvent?: any; + private preservedTraceContext?: PreservedTraceContext | null; + + private constructor(event: any) { + this.context = extractDurableFunctionContext(event); + this.preservedEvent = this.extractPreservedEvent(event); + this.preservedTraceContext = undefined; // Lazy evaluated + } + + public static instance(event?: any): DurableFunctionContextService { + return this._instance || (this._instance = new this(event)); + } + + public static reset(): void { + this._instance = undefined as any; + } + + /** + * Extract the original customer event from InitialExecutionState. + * This event is preserved across all replays and may contain trace context. + */ + private extractPreservedEvent(event: any): any | undefined { + try { + const operations = event?.InitialExecutionState?.Operations; + if (!Array.isArray(operations) || operations.length === 0) { + return undefined; + } + + // First operation contains the execution details with InputPayload + const firstOperation = operations[0]; + const inputPayload = firstOperation?.ExecutionDetails?.InputPayload; + + if (typeof inputPayload === "string") { + return JSON.parse(inputPayload); + } + return undefined; + } catch (error) { + logDebug("Failed to parse preserved event from InputPayload", { error }); + return undefined; + } + } + + /** + * Try to extract trace context from the preserved original event. + * Checks HTTP headers, SNS attributes, SQS attributes, etc. + */ + private extractTraceFromPreservedEvent(): PreservedTraceContext | null { + if (!this.preservedEvent) return null; + + // Check for HTTP headers (API Gateway, ALB, Function URL) + const headers = this.preservedEvent.headers ?? this.preservedEvent.multiValueHeaders; + if (headers && typeof headers === "object") { + const traceId = this.getHeader(headers, "x-datadog-trace-id"); + const parentId = this.getHeader(headers, "x-datadog-parent-id"); + const samplingPriority = this.getHeader(headers, "x-datadog-sampling-priority"); + + if (traceId && parentId) { + logDebug("Found trace context in preserved HTTP headers", { traceId, parentId }); + return { traceId, parentId, samplingPriority, source: "http" }; + } + } + + // Check for _datadog object (common injection point) + if (this.preservedEvent._datadog) { + const dd = this.preservedEvent._datadog; + if (dd["x-datadog-trace-id"] && dd["x-datadog-parent-id"]) { + logDebug("Found trace context in preserved _datadog object", dd); + return { + traceId: dd["x-datadog-trace-id"], + parentId: dd["x-datadog-parent-id"], + samplingPriority: dd["x-datadog-sampling-priority"], + source: "custom", + }; + } + } + + // Check SNS Records + if (this.preservedEvent.Records?.[0]?.Sns?.MessageAttributes) { + const attrs = this.preservedEvent.Records[0].Sns.MessageAttributes; + const traceId = attrs["_datadog.trace-id"]?.Value ?? attrs["x-datadog-trace-id"]?.Value; + const parentId = attrs["_datadog.parent-id"]?.Value ?? attrs["x-datadog-parent-id"]?.Value; + + if (traceId && parentId) { + logDebug("Found trace context in preserved SNS attributes", { traceId, parentId }); + return { traceId, parentId, source: "sns" }; + } + } + + // Check SQS Records + if (this.preservedEvent.Records?.[0]?.messageAttributes) { + const attrs = this.preservedEvent.Records[0].messageAttributes; + // tslint:disable-next-line:no-string-literal + const datadogTraceId = attrs._datadog_trace_id ?? attrs["x-datadog-trace-id"]; + // tslint:disable-next-line:no-string-literal + const datadogParentId = attrs._datadog_parent_id ?? attrs["x-datadog-parent-id"]; + const traceId = datadogTraceId?.stringValue; + const parentId = datadogParentId?.stringValue; + + if (traceId && parentId) { + logDebug("Found trace context in preserved SQS attributes", { traceId, parentId }); + return { traceId, parentId, source: "sqs" }; + } + } + + // Check EventBridge detail + if (this.preservedEvent.detail?._datadog) { + const dd = this.preservedEvent.detail._datadog; + if (dd["x-datadog-trace-id"] && dd["x-datadog-parent-id"]) { + logDebug("Found trace context in preserved EventBridge detail", dd); + return { + traceId: dd["x-datadog-trace-id"], + parentId: dd["x-datadog-parent-id"], + samplingPriority: dd["x-datadog-sampling-priority"], + source: "eventbridge", + }; + } + } + + return null; + } + + private getHeader(headers: any, key: string): string | undefined { + // Handle both single-value and multi-value headers (case-insensitive) + const lowerKey = key.toLowerCase(); + for (const [k, v] of Object.entries(headers)) { + if (k.toLowerCase() === lowerKey) { + return Array.isArray(v) ? v[0] : (v as string); + } + } + return undefined; + } + + public get spanContext(): SpanContextWrapper | null { + if (!this.context) return null; + + // Lazy evaluate preserved trace context + if (this.preservedTraceContext === undefined) { + this.preservedTraceContext = this.extractTraceFromPreservedEvent(); + } + + let traceId: string; + let parentId: string; + let ptid: string; + let usePreservedContext = false; + + if (this.preservedTraceContext) { + // Use trace context from preserved original event + traceId = this.preservedTraceContext.traceId; + parentId = this.preservedTraceContext.parentId; + ptid = ""; // Will be extracted from trace tags if available + usePreservedContext = true; + logDebug("Using trace context from preserved event", { traceId, parentId }); + } else { + // Fall back to deterministic trace ID from execution ID + traceId = this.deterministicSha256HashToBigIntString(this.context.durable_function_execution_id, TRACE_ID); + ptid = this.deterministicSha256HashToBigIntString(this.context.durable_function_execution_id, DD_P_TID); + // Parent ID includes execution name for uniqueness + parentId = this.deterministicSha256HashToBigIntString( + `${this.context.durable_function_execution_id}#${this.context.durable_function_execution_name}`, + PARENT_ID, + ); + logDebug("Using deterministic trace ID from execution ID", { traceId, parentId }); + } + + // Use sampling priority from preserved event if available, otherwise default to AUTO_KEEP + let sampleMode = SampleMode.AUTO_KEEP; + if (this.preservedTraceContext?.samplingPriority) { + const priority = parseInt(this.preservedTraceContext.samplingPriority, 10); + if (!isNaN(priority)) { + sampleMode = priority; + } + } + + try { + const _DatadogSpanContext = require("dd-trace/packages/dd-trace/src/opentracing/span_context"); + const id = require("dd-trace/packages/dd-trace/src/id"); + + const ddSpanContext = new _DatadogSpanContext({ + traceId: id(traceId, 10), + spanId: id(parentId, 10), + sampling: { priority: sampleMode }, + }); + + if (ptid) { + ddSpanContext._trace.tags["_dd.p.tid"] = id(ptid, 10).toString(16); + } + + logDebug("Created SpanContext for DurableFunction", { + traceContext: ddSpanContext, + source: usePreservedContext ? "preserved_event" : "deterministic", + }); + + return new SpanContextWrapper(ddSpanContext, TraceSource.Event); + } catch (error) { + if (error instanceof Error) { + logDebug("Couldn't generate SpanContext with tracer for durable function.", error); + } + return null; + } + } + + private deterministicSha256HashToBigIntString(s: string, type: string): string { + const binaryString = this.deterministicSha256Hash(s, type); + return BigInt("0b" + binaryString).toString(); + } + + private deterministicSha256Hash(s: string, type: string): string { + const hash = new Sha256(); + hash.update(s); + const uint8Array = hash.digestSync(); + + let intArray = uint8Array.subarray(0, 8); + if (type === TRACE_ID) { + intArray = uint8Array.subarray(8, 16); + } + + const binaryString = intArray.reduce((acc, num) => acc + this.numberToBinaryString(num), ""); + const res = "0" + binaryString.substring(1, 64); + + if (res === "0".repeat(64)) { + return "1"; + } + return res; + } + + private numberToBinaryString(num: number): string { + return num.toString(2).padStart(8, "0"); + } +} diff --git a/src/trace/listener.ts b/src/trace/listener.ts index d846d7d49..e2e71ef15 100644 --- a/src/trace/listener.ts +++ b/src/trace/listener.ts @@ -21,6 +21,7 @@ import { getTraceTree, clearTraceTree } from "../runtime/index"; import { TraceContext, TraceContextService, TraceSource } from "./trace-context-service"; import { StepFunctionContext, StepFunctionContextService } from "./step-function-service"; import { DurableFunctionContext, extractDurableFunctionContext } from "./durable-function-context"; +import { DurableFunctionContextService } from "./durable-function-service"; import { XrayService } from "./xray-service"; import { AUTHORIZING_REQUEST_ID_HEADER } from "./context/extractors/http"; import { getSpanPointerAttributes, SpanPointerAttributes } from "../utils/span-pointers"; @@ -297,6 +298,7 @@ export class TraceListener { this.stepFunctionContext = undefined; this.durableFunctionContext = undefined; StepFunctionContextService.reset(); + DurableFunctionContextService.reset(); this.contextService.reset(); }