Skip to content

Commit 2a9269c

Browse files
committed
improve: add custom span processor to account for serverless environment
and help with flushing correctly
1 parent 0e2ef73 commit 2a9269c

File tree

8 files changed

+244
-9
lines changed

8 files changed

+244
-9
lines changed

src/__test__/development/metrics.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { l } from '@/lib/clients/logger/logger'
1+
import { l } from '@/lib/clients/logger'
22
import { Sandbox } from 'e2b'
33
import { describe, expect, it } from 'vitest'
44

src/app/(rewrites)/[[...slug]]/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import sitemap from '@/app/sitemap'
22
import { ALLOW_SEO_INDEXING } from '@/configs/flags'
33
import { ROUTE_REWRITE_CONFIG } from '@/configs/rewrites'
44
import { BASE_URL } from '@/configs/urls'
5-
import { l } from '@/lib/clients/logger/logger'
5+
import { l } from '@/lib/clients/logger'
66
import {
77
getRewriteForPath,
88
rewriteContentPagesHtml,

src/app/api/auth/callback/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { AUTH_URLS, PROTECTED_URLS } from '@/configs/urls'
2-
import { l } from '@/lib/clients/logger/logger'
2+
import { l } from '@/lib/clients/logger'
33
import { createClient } from '@/lib/clients/supabase/server'
44
import { encodedRedirect } from '@/lib/utils/auth'
55
import { redirect } from 'next/navigation'

src/app/api/auth/confirm/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { AUTH_URLS, PROTECTED_URLS } from '@/configs/urls'
2-
import { l } from '@/lib/clients/logger/logger'
2+
import { l } from '@/lib/clients/logger'
33
import { createClient } from '@/lib/clients/supabase/server'
44
import { encodedRedirect } from '@/lib/utils/auth'
55
import { redirect } from 'next/navigation'

src/features/dashboard/sandbox/header/refresh.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use client'
22

3-
import { l } from '@/lib/clients/logger/logger'
3+
import { l } from '@/lib/clients/logger'
44
import { PollingButton } from '@/ui/polling-button'
55
import { useCallback, useState } from 'react'
66
import { serializeError } from 'serialize-error'

src/instrumentation.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ export async function register() {
44
if (!process.env.OTEL_EXPORTER_OTLP_ENDPOINT) return
55

66
if (process.env.NEXT_RUNTIME === 'nodejs') {
7-
await import('./instrumentation.node')
7+
await import('./instrumentation/instrumentation.node')
88
}
99

1010
if (process.env.NEXT_RUNTIME === 'edge') {

src/instrumentation.node.ts renamed to src/instrumentation/instrumentation.node.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import {
1010
import { BatchLogRecordProcessor } from '@opentelemetry/sdk-logs'
1111
import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'
1212
import { NodeSDK } from '@opentelemetry/sdk-node'
13+
import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base'
1314
import {
1415
ATTR_SERVICE_NAME,
1516
ATTR_SERVICE_VERSION,
1617
} from '@opentelemetry/semantic-conventions'
1718
import { FetchInstrumentation } from '@vercel/otel'
19+
import { NextCompositeSpanProcessor } from './span-processor'
1820

1921
function parseResourceAttributes(
2022
resourceAttrs?: string
@@ -46,6 +48,10 @@ const {
4648
VERCEL_GIT_COMMIT_SHA,
4749
} = process.env
4850

51+
const traceExporter = new OTLPTraceExporter({
52+
url: `${OTEL_EXPORTER_OTLP_ENDPOINT}/v1/traces`,
53+
})
54+
4955
const sdk = new NodeSDK({
5056
resource: resourceFromAttributes({
5157
[ATTR_SERVICE_NAME]: OTEL_SERVICE_NAME || 'e2b-dashboard',
@@ -67,9 +73,9 @@ const sdk = new NodeSDK({
6773
'vercel.git.commit_sha': VERCEL_GIT_COMMIT_SHA,
6874
}),
6975
}),
70-
traceExporter: new OTLPTraceExporter({
71-
url: `${OTEL_EXPORTER_OTLP_ENDPOINT}/v1/traces`,
72-
}),
76+
spanProcessors: [
77+
new NextCompositeSpanProcessor([new BatchSpanProcessor(traceExporter)]),
78+
],
7379
metricReader: new PeriodicExportingMetricReader({
7480
exporter: new OTLPMetricExporter({
7581
url: `${OTEL_EXPORTER_OTLP_ENDPOINT}/v1/metrics`,
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
import type { Attributes, Context, TextMapGetter } from '@opentelemetry/api'
2+
import { diag, SpanKind } from '@opentelemetry/api'
3+
import type {
4+
ReadableSpan,
5+
Span,
6+
SpanProcessor,
7+
} from '@opentelemetry/sdk-trace-base'
8+
9+
import { TraceFlags } from '@opentelemetry/api'
10+
11+
type AttributesFromHeaderFunc = <Carrier = unknown>(
12+
headers: Carrier,
13+
getter: TextMapGetter<Carrier>
14+
) => Attributes | undefined
15+
16+
type AttributesFromHeaders = Record<string, string> | AttributesFromHeaderFunc
17+
18+
/**
19+
* Helper ────────────────────────────────────────────────────────────────────
20+
* Try to obtain the callback that extends the lifetime of the request so that
21+
* our exporter has time to flush.
22+
*
23+
* Priority:
24+
* 1. `after(cb)` (Next.js 15)
25+
* 2. fallback (best-effort `setTimeout`)
26+
*/
27+
function scheduleAfterResponse(task: () => Promise<void>) {
28+
try {
29+
// avoid a hard dependency so this file can be imported outside a route.
30+
// `require` is evaluated lazily – if Next isn't around it will throw.
31+
// eslint-disable-next-line
32+
const mod = require('next/server') as { after?: (cb: () => void) => void }
33+
34+
if (typeof mod.after === 'function') {
35+
mod.after(() => {
36+
// no await – Next treats sync or async the same,
37+
// we just fire the promise and let it resolve.
38+
void task()
39+
})
40+
return
41+
}
42+
} catch {
43+
/* ignored – we're probably not inside a Next context */
44+
}
45+
46+
// 2. Node / local fallback – try our best and hope the
47+
// process stays alive long enough.
48+
setTimeout(() => {
49+
void task()
50+
}, 0)
51+
}
52+
53+
function isSampled(traceFlags: number): boolean {
54+
// Use bitwise AND to inspect the sampled flag
55+
return (traceFlags & TraceFlags.SAMPLED) !== 0
56+
}
57+
58+
/** Custom CompositeSpanProcessor for Next.js */
59+
export class NextCompositeSpanProcessor implements SpanProcessor {
60+
private readonly rootSpanIds = new Map<
61+
string,
62+
{ rootSpanId: string; open: Span[] }
63+
>()
64+
private readonly waitSpanEnd = new Map<string, () => void>()
65+
/** makes concurrent `forceFlush()` invocations queue instead of collide */
66+
private flushInFlight: Promise<void> | null = null
67+
68+
constructor(
69+
private readonly processors: SpanProcessor[],
70+
private readonly attributesFromHeaders?: AttributesFromHeaders
71+
) {}
72+
73+
// ───────────────────────────── infrastructure ────────────────────────────
74+
forceFlush(): Promise<void> {
75+
// Serialise: if a flush is already happening, share that promise.
76+
if (this.flushInFlight) return this.flushInFlight
77+
78+
const flushPromise = Promise.all(
79+
this.processors.map((p) =>
80+
p.forceFlush().catch((e) => {
81+
diag.error('forceFlush failed:', e)
82+
})
83+
)
84+
)
85+
.then(() => undefined) // ensure Promise<void>
86+
.catch(() => undefined) // already logged
87+
.finally(() => {
88+
this.flushInFlight = null
89+
})
90+
91+
this.flushInFlight = flushPromise
92+
93+
return this.flushInFlight as Promise<void>
94+
}
95+
96+
async shutdown(): Promise<void> {
97+
return Promise.all(
98+
this.processors.map((p) => p.shutdown().catch(() => undefined))
99+
).then(() => undefined)
100+
}
101+
102+
// ────────────────────────────────── onStart ──────────────────────────────
103+
onStart(span: Span, parentContext: Context): void {
104+
const { traceId, spanId, traceFlags } = span.spanContext()
105+
const isRoot = !this.rootSpanIds.has(traceId)
106+
107+
if (isRoot) {
108+
this.rootSpanIds.set(traceId, { rootSpanId: spanId, open: [] })
109+
} else {
110+
this.rootSpanIds.get(traceId)?.open.push(span)
111+
}
112+
113+
// Attach request-specific attributes only on the root span
114+
if (isRoot && isSampled(traceFlags)) {
115+
// When the *response* (or prerender) is done, flush traces.
116+
scheduleAfterResponse(async () => {
117+
if (this.rootSpanIds.has(traceId)) {
118+
// Root hasn’t finished yet – wait via onEnd().
119+
const waiter = new Promise<void>((resolve) =>
120+
this.waitSpanEnd.set(traceId, resolve)
121+
)
122+
let timer: NodeJS.Timeout | undefined
123+
124+
await Promise.race([
125+
waiter,
126+
new Promise((res) => {
127+
timer = setTimeout(() => {
128+
this.waitSpanEnd.delete(traceId)
129+
res(undefined)
130+
}, 50) // same 50 ms guard as Vercel’s impl
131+
}),
132+
])
133+
if (timer) clearTimeout(timer)
134+
}
135+
136+
await this.forceFlush()
137+
})
138+
}
139+
140+
// Fan-out start to underlying processors
141+
for (const p of this.processors) p.onStart(span, parentContext)
142+
}
143+
144+
// ─────────────────────────────────── onEnd ───────────────────────────────
145+
onEnd(span: ReadableSpan): void {
146+
const { traceId, spanId, traceFlags } = span.spanContext()
147+
const root = this.rootSpanIds.get(traceId)
148+
const isRoot = root?.rootSpanId === spanId
149+
150+
// Datadog-style resource/operation name enrichment
151+
if (isSampled(traceFlags)) {
152+
const resAttrs = getResourceAttributes(span)
153+
if (resAttrs) Object.assign(span.attributes, resAttrs)
154+
}
155+
156+
// Maintain open-span book-keeping
157+
if (isRoot) {
158+
// Root finished: no need to force-end children; they will end naturally.
159+
this.rootSpanIds.delete(traceId)
160+
} else if (root) {
161+
root.open = root.open.filter((s) => s.spanContext().spanId !== spanId)
162+
}
163+
164+
// Fan-out end
165+
for (const p of this.processors) p.onEnd(span)
166+
167+
// Release waiter if anyone is waiting for the root span to finish
168+
if (isRoot) {
169+
const pending = this.waitSpanEnd.get(traceId)
170+
if (pending) {
171+
this.waitSpanEnd.delete(traceId)
172+
pending()
173+
}
174+
}
175+
}
176+
}
177+
178+
/* ───────────────────────── Helpers copied from Vercel impl ─────────────── */
179+
const SPAN_KIND_NAME: { [k in SpanKind]: string } = {
180+
[SpanKind.INTERNAL]: 'internal',
181+
[SpanKind.SERVER]: 'server',
182+
[SpanKind.CLIENT]: 'client',
183+
[SpanKind.PRODUCER]: 'producer',
184+
[SpanKind.CONSUMER]: 'consumer',
185+
}
186+
187+
function getResourceAttributes(span: ReadableSpan): Attributes | undefined {
188+
const { kind, attributes } = span
189+
const {
190+
'operation.name': opName,
191+
'resource.name': resName,
192+
'span.type': spanTypeAttr,
193+
'next.span_type': nextSpanType,
194+
'http.method': httpMethod,
195+
'http.route': httpRoute,
196+
} = attributes
197+
if (opName) return undefined
198+
199+
const resourceName =
200+
resName ??
201+
(httpMethod && httpRoute ? `${httpMethod} ${httpRoute}` : httpRoute)
202+
203+
if (
204+
kind === SpanKind.SERVER &&
205+
typeof httpMethod === 'string' &&
206+
typeof httpRoute === 'string'
207+
) {
208+
return { 'operation.name': 'web.request', 'resource.name': resourceName }
209+
}
210+
211+
const spanType = nextSpanType ?? spanTypeAttr
212+
if (typeof spanType === 'string') {
213+
return httpRoute
214+
? { 'operation.name': spanType, 'resource.name': resourceName }
215+
: { 'operation.name': spanType }
216+
}
217+
218+
return {
219+
'operation.name':
220+
kind === SpanKind.INTERNAL ? 'internal' : SPAN_KIND_NAME[kind],
221+
}
222+
}
223+
224+
function toOperationName(lib: string, name: string) {
225+
if (!lib) return name
226+
let clean = lib.replace(/[ @./]/g, '_')
227+
if (clean.startsWith('_')) clean = clean.slice(1)
228+
return name ? `${clean}.${name}` : clean
229+
}

0 commit comments

Comments
 (0)