From 0b5dc282ab1c8d58183f49256164306f2f3b089b Mon Sep 17 00:00:00 2001 From: Lukas Stracke Date: Fri, 6 Mar 2026 15:36:38 +0100 Subject: [PATCH 1/2] fix(core): Replace global interval with trace-specific interval based flushing --- packages/core/src/tracing/spans/spanBuffer.ts | 98 +++++++++---------- .../test/lib/tracing/spans/spanBuffer.test.ts | 4 +- 2 files changed, 50 insertions(+), 52 deletions(-) diff --git a/packages/core/src/tracing/spans/spanBuffer.ts b/packages/core/src/tracing/spans/spanBuffer.ts index d6451acb5e44..5020b946d1ab 100644 --- a/packages/core/src/tracing/spans/spanBuffer.ts +++ b/packages/core/src/tracing/spans/spanBuffer.ts @@ -16,6 +16,12 @@ const MAX_SPANS_PER_ENVELOPE = 1000; const MAX_TRACE_WEIGHT_IN_BYTES = 5_000_000; +interface TraceBucket { + spans: Set; + size: number; + timeout: ReturnType; +} + export interface SpanBufferOptions { /** * Max spans per trace before auto-flush @@ -26,7 +32,8 @@ export interface SpanBufferOptions { maxSpanLimit?: number; /** - * Flush interval in ms + * Per-trace flush timeout in ms. A timeout is started when a trace bucket is first created + * and fires flush() for that specific trace when it expires. * Must be greater than 0. * * @default 5_000 @@ -44,7 +51,7 @@ export interface SpanBufferOptions { /** * A buffer for serialized streamed span JSON objects that flushes them to Sentry in Span v2 envelopes. - * Handles interval-based flushing, size thresholds, and graceful shutdown. + * Handles per-trace timeout-based flushing, size thresholds, and graceful shutdown. * Also handles computation of the Dynamic Sampling Context (DSC) for the trace, if it wasn't yet * frozen onto the segment span. * @@ -54,19 +61,16 @@ export interface SpanBufferOptions { * still active and modifyable when child spans are added to the buffer. */ export class SpanBuffer { - /* Bucket spans by their trace id */ - private _traceMap: Map>; - private _traceWeightMap: Map; + /* Bucket spans by their trace id, along with accumulated size and a per-trace flush timeout */ + private _traceBuckets: Map; - private _flushIntervalId: ReturnType | null; private _client: Client; private _maxSpanLimit: number; private _flushInterval: number; private _maxTraceWeight: number; public constructor(client: Client, options?: SpanBufferOptions) { - this._traceMap = new Map(); - this._traceWeightMap = new Map(); + this._traceBuckets = new Map(); this._client = client; const { maxSpanLimit, flushInterval, maxTraceWeightInBytes } = options ?? {}; @@ -79,9 +83,6 @@ export class SpanBuffer { this._maxTraceWeight = maxTraceWeightInBytes && maxTraceWeightInBytes > 0 ? maxTraceWeightInBytes : MAX_TRACE_WEIGHT_IN_BYTES; - this._flushIntervalId = null; - this._debounceFlushInterval(); - this._client.on('flush', () => { this.drain(); }); @@ -89,11 +90,10 @@ export class SpanBuffer { this._client.on('close', () => { // No need to drain the buffer here as `Client.close()` internally already calls `Client.flush()` // which already invokes the `flush` hook and thus drains the buffer. - if (this._flushIntervalId) { - clearInterval(this._flushIntervalId); - } - this._traceMap.clear(); - this._traceWeightMap.clear(); + this._traceBuckets.forEach(bucket => { + clearTimeout(bucket.timeout); + }); + this._traceBuckets.clear(); }); } @@ -102,20 +102,27 @@ export class SpanBuffer { */ public add(spanJSON: SerializedStreamedSpanWithSegmentSpan): void { const traceId = spanJSON.trace_id; - let traceBucket = this._traceMap.get(traceId); - if (traceBucket) { - traceBucket.add(spanJSON); - } else { - traceBucket = new Set([spanJSON]); - this._traceMap.set(traceId, traceBucket); - } + const existingBucket = this._traceBuckets.get(traceId); - const newWeight = (this._traceWeightMap.get(traceId) ?? 0) + estimateSerializedSpanSizeInBytes(spanJSON); - this._traceWeightMap.set(traceId, newWeight); + if (existingBucket) { + existingBucket.spans.add(spanJSON); + existingBucket.size += estimateSerializedSpanSizeInBytes(spanJSON); - if (traceBucket.size >= this._maxSpanLimit || newWeight >= this._maxTraceWeight) { - this.flush(traceId); - this._debounceFlushInterval(); + if (existingBucket.spans.size >= this._maxSpanLimit || existingBucket.size >= this._maxTraceWeight) { + this.flush(traceId); + } + } else { + const size = estimateSerializedSpanSizeInBytes(spanJSON); + const timeout = safeUnref( + setTimeout(() => { + this.flush(traceId); + }, this._flushInterval), + ); + this._traceBuckets.set(traceId, { spans: new Set([spanJSON]), size, timeout }); + + if (size >= this._maxTraceWeight) { + this.flush(traceId); + } } } @@ -123,36 +130,35 @@ export class SpanBuffer { * Drain and flush all buffered traces. */ public drain(): void { - if (!this._traceMap.size) { + if (!this._traceBuckets.size) { return; } - DEBUG_BUILD && debug.log(`Flushing span tree map with ${this._traceMap.size} traces`); + DEBUG_BUILD && debug.log(`Flushing span tree map with ${this._traceBuckets.size} traces`); - this._traceMap.forEach((_, traceId) => { + this._traceBuckets.forEach((_, traceId) => { this.flush(traceId); }); - this._debounceFlushInterval(); } /** * Flush spans of a specific trace. - * In contrast to {@link SpanBuffer.flush}, this method does not flush all traces, but only the one with the given traceId. + * In contrast to {@link SpanBuffer.drain}, this method does not flush all traces, but only the one with the given traceId. */ public flush(traceId: string): void { - const traceBucket = this._traceMap.get(traceId); - if (!traceBucket) { + const bucket = this._traceBuckets.get(traceId); + if (!bucket) { return; } - if (!traceBucket.size) { - // we should never get here, given we always add a span when we create a new bucket + if (!bucket.spans.size) { + // we should never get here, given we always add a span when we create a new bucket // and delete the bucket once we flush out the trace this._removeTrace(traceId); return; } - const spans = Array.from(traceBucket); + const spans = Array.from(bucket.spans); const segmentSpan = spans[0]?._segmentSpan; if (!segmentSpan) { @@ -181,18 +187,10 @@ export class SpanBuffer { } private _removeTrace(traceId: string): void { - this._traceMap.delete(traceId); - this._traceWeightMap.delete(traceId); - } - - private _debounceFlushInterval(): void { - if (this._flushIntervalId) { - clearInterval(this._flushIntervalId); + const bucket = this._traceBuckets.get(traceId); + if (bucket) { + clearTimeout(bucket.timeout); } - this._flushIntervalId = safeUnref( - setInterval(() => { - this.drain(); - }, this._flushInterval), - ); + this._traceBuckets.delete(traceId); } } diff --git a/packages/core/test/lib/tracing/spans/spanBuffer.test.ts b/packages/core/test/lib/tracing/spans/spanBuffer.test.ts index 44a6f6f954db..cbcb1bf7ea59 100644 --- a/packages/core/test/lib/tracing/spans/spanBuffer.test.ts +++ b/packages/core/test/lib/tracing/spans/spanBuffer.test.ts @@ -70,7 +70,7 @@ describe('SpanBuffer', () => { expect(sentEnvelopes[1]?.[1]?.[0]?.[1]?.items[0]?.trace_id).toBe('trace456'); }); - it('drains on interval', () => { + it('flushes trace after per-trace timeout', () => { const buffer = new SpanBuffer(client, { flushInterval: 1000 }); const segmentSpan1 = new SentrySpan({ name: 'segment', sampled: true }); @@ -106,7 +106,7 @@ describe('SpanBuffer', () => { expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1); - // since the buffer is now empty, it should not send anything anymore + // the trace bucket was removed after flushing, so no timeout remains and no further sends occur vi.advanceTimersByTime(1000); expect(sendEnvelopeSpy).toHaveBeenCalledTimes(1); }); From 033c64a4442dd35677da5f960e06bacdff2c817c Mon Sep 17 00:00:00 2001 From: Lukas Stracke Date: Fri, 6 Mar 2026 15:39:43 +0100 Subject: [PATCH 2/2] reduce bundle size --- packages/core/src/tracing/spans/spanBuffer.ts | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/packages/core/src/tracing/spans/spanBuffer.ts b/packages/core/src/tracing/spans/spanBuffer.ts index 5020b946d1ab..cd011df5ac49 100644 --- a/packages/core/src/tracing/spans/spanBuffer.ts +++ b/packages/core/src/tracing/spans/spanBuffer.ts @@ -102,27 +102,26 @@ export class SpanBuffer { */ public add(spanJSON: SerializedStreamedSpanWithSegmentSpan): void { const traceId = spanJSON.trace_id; - const existingBucket = this._traceBuckets.get(traceId); - - if (existingBucket) { - existingBucket.spans.add(spanJSON); - existingBucket.size += estimateSerializedSpanSizeInBytes(spanJSON); - - if (existingBucket.spans.size >= this._maxSpanLimit || existingBucket.size >= this._maxTraceWeight) { - this.flush(traceId); - } - } else { - const size = estimateSerializedSpanSizeInBytes(spanJSON); - const timeout = safeUnref( - setTimeout(() => { - this.flush(traceId); - }, this._flushInterval), - ); - this._traceBuckets.set(traceId, { spans: new Set([spanJSON]), size, timeout }); - - if (size >= this._maxTraceWeight) { - this.flush(traceId); - } + let bucket = this._traceBuckets.get(traceId); + + if (!bucket) { + bucket = { + spans: new Set(), + size: 0, + timeout: safeUnref( + setTimeout(() => { + this.flush(traceId); + }, this._flushInterval), + ), + }; + this._traceBuckets.set(traceId, bucket); + } + + bucket.spans.add(spanJSON); + bucket.size += estimateSerializedSpanSizeInBytes(spanJSON); + + if (bucket.spans.size >= this._maxSpanLimit || bucket.size >= this._maxTraceWeight) { + this.flush(traceId); } }