Skip to content

Commit 5693b62

Browse files
authored
fix(webapp): propagate abort signal through realtime proxy fetch (#3442)
## Summary Fixes an RSS-only memory leak in the three realtime proxy routes (`/realtime/v1/runs`, `/realtime/v1/runs/:id`, `/realtime/v1/batches/:id`). Client disconnects during an in-flight long-poll would leave the upstream fetch to Electric running with no way to abort it, so undici kept the socket open and buffered response chunks that would never be consumed. ## Root cause All three routes flow through `RealtimeClient.streamRun/streamRuns/streamBatch` → `#streamRunsWhere` → `#performElectricRequest` → `longPollingFetch(url, { signal })`. The chain was already signal-aware, but `#streamRunsWhere` hardcoded `signal=undefined` when calling `#performElectricRequest`, so no signal ever reached `longPollingFetch`. When a downstream client aborts a long-poll mid-flight: 1. Express tears down the downstream response socket. 2. The `longPollingFetch` promise has already resolved (it returns as soon as upstream headers arrive) and handed back `new Response(upstream.body, {...})`. 3. `undici` keeps the upstream socket open and continues buffering chunks into the `ReadableStream` that nothing will ever read from. 4. The upstream connection is eventually closed by Electric's own poll timeout (~20s). During that window the per-request buffers stay in native memory. These buffers live below V8's accounting — no `heapUsed` or `external` growth, no sign in heap snapshots, only RSS. An isolated standalone reproducer (`fetch` against a slow-streaming upstream, discard the `Response` before consuming its body) measures **~44 KB retained per leaked request** after GC. That's consistent with the undici socket + receive buffer + HTTP parser state for a long-lived chunked response. The pattern is the shape documented in [nodejs/undici#1108](nodejs/undici#1108) and [#2143](nodejs/undici#2143). ## What changed - **`realtimeClient.server.ts`** — add optional `signal` parameter to `streamRun`, `streamRuns`, `streamBatch`, and the shared `#streamRunsWhere`; thread it through to `#performElectricRequest` instead of hardcoding `undefined`. - **`realtime.v1.runs.$runId.ts`, `realtime.v1.runs.ts`, `realtime.v1.batches.$batchId.ts`** — pass `getRequestAbortSignal()` (from `httpAsyncStorage.server.ts`) at the call site. This is the signal wired to `res.on('close')` and fires reliably on downstream disconnect. - **`longPollingFetch.ts`** — belt-and-suspenders: cancel the upstream body explicitly in the error path, and treat `AbortError` as a clean `499` instead of a `500`. This both releases undici's buffers deterministically on error and avoids spurious 500s in request logs when a client legitimately walks away. ## Verification Standalone reproducer: slow upstream server streams 32 KB chunks every 100 ms for 5 seconds per request. The proxy does `fetch(url)` with varying signal/cancel strategies, creates `new Response(upstream.body, ...)`, and discards it without consuming the body (simulating the leak path). Results from 1 000 parallel fetches per variant, measured post-GC: | variant | Δ heap | Δ external | Δ RSS | | --- | --- | --- | --- | | A. no signal, body never consumed (the bug) | +0.3 MB | 0 MB | **+59.4 MB** | | B. signal propagated, aborted after headers (this fix) | −0.1 MB | 0 MB | +15.4 MB | | C. no signal, explicit `res.body.cancel()` | 0 MB | 0 MB | −25.4 MB | 10-round sustained test of variant B to distinguish accumulating retention from one-time allocator overhead: ``` round 1/10 Δ=+3.2 MB round 6/10 Δ=-12.5 MB round 2/10 Δ=-7.6 MB round 7/10 Δ=-11.9 MB round 3/10 Δ=-11.7 MB round 8/10 Δ=-2.6 MB round 4/10 Δ=+3.2 MB round 9/10 Δ=-8.0 MB round 5/10 Δ=-1.2 MB round 10/10 Δ=-12.6 MB ``` RSS oscillates in a 49-65 MB band with no upward trend — signal propagation fully releases the buffers. ## Risk - Behavior change only on aborted long-polls: the upstream fetch now cancels promptly instead of running to its natural timeout. This saves both memory and outbound traffic to Electric. - `AbortError` now surfaces as `499` rather than `500`. Any dashboard or alert that counts 500s in request logs will see slightly fewer of them; this is the intended behavior. - Signal-aware parameter is optional on `RealtimeClient.streamRun/streamRuns/streamBatch`, so callers that don't opt in get the previous behavior. ## Test plan - [ ] Existing realtime integration tests pass - [ ] Dashboard realtime views (runs list, batch details) continue working normally across tab open/close cycles - [ ] Under a burst of aborted long-polls, server RSS returns to baseline rather than climbing
1 parent 2ce981d commit 5693b62

6 files changed

Lines changed: 48 additions & 15 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Fix RSS memory leak in the realtime proxy routes. `/realtime/v1/runs`, `/realtime/v1/runs/:id`, and `/realtime/v1/batches/:id` called `fetch()` into Electric with no abort signal, so when a client disconnected mid long-poll, undici kept the upstream socket open and buffered response chunks that would never be consumed — retained only in RSS, invisible to V8 heap tooling. Thread `getRequestAbortSignal()` through `RealtimeClient.streamRun/streamRuns/streamBatch` to `longPollingFetch` and cancel the upstream body in the error path. Isolated reproducer showed ~44 KB retained per leaked request; signal propagation releases it cleanly.

apps/webapp/app/routes/realtime.v1.batches.$batchId.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { z } from "zod";
22
import { $replica } from "~/db.server";
3+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
34
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
45
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
56

@@ -33,7 +34,8 @@ export const loader = createLoaderApiRoute(
3334
batchRun.id,
3435
apiVersion,
3536
authentication.realtime,
36-
request.headers.get("x-trigger-electric-version") ?? undefined
37+
request.headers.get("x-trigger-electric-version") ?? undefined,
38+
getRequestAbortSignal()
3739
);
3840
}
3941
);

apps/webapp/app/routes/realtime.v1.runs.$runId.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { json } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
4+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
45
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
56
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
67

@@ -46,7 +47,12 @@ export const loader = createLoaderApiRoute(
4647
run.id,
4748
apiVersion,
4849
authentication.realtime,
49-
request.headers.get("x-trigger-electric-version") ?? undefined
50+
request.headers.get("x-trigger-electric-version") ?? undefined,
51+
// Propagate abort on client disconnect so the upstream Electric long-poll
52+
// fetch is cancelled too. Without this, undici buffers from the unconsumed
53+
// upstream response body accumulate until Electric's poll timeout, causing
54+
// steady RSS growth on api (see docs/runbooks for the H1 isolation test).
55+
getRequestAbortSignal()
5056
);
5157
}
5258
);

apps/webapp/app/routes/realtime.v1.runs.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { z } from "zod";
2+
import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
23
import { realtimeClient } from "~/services/realtimeClientGlobal.server";
34
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
45

@@ -31,7 +32,8 @@ export const loader = createLoaderApiRoute(
3132
searchParams,
3233
apiVersion,
3334
authentication.realtime,
34-
request.headers.get("x-trigger-electric-version") ?? undefined
35+
request.headers.get("x-trigger-electric-version") ?? undefined,
36+
getRequestAbortSignal()
3537
);
3638
}
3739
);

apps/webapp/app/services/realtimeClient.server.ts

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -115,15 +115,17 @@ export class RealtimeClient {
115115
runId: string,
116116
apiVersion: API_VERSIONS,
117117
requestOptions?: RealtimeRequestOptions,
118-
clientVersion?: string
118+
clientVersion?: string,
119+
signal?: AbortSignal
119120
) {
120121
return this.#streamRunsWhere(
121122
url,
122123
environment,
123124
`id='${runId}'`,
124125
apiVersion,
125126
requestOptions,
126-
clientVersion
127+
clientVersion,
128+
signal
127129
);
128130
}
129131

@@ -133,7 +135,8 @@ export class RealtimeClient {
133135
batchId: string,
134136
apiVersion: API_VERSIONS,
135137
requestOptions?: RealtimeRequestOptions,
136-
clientVersion?: string
138+
clientVersion?: string,
139+
signal?: AbortSignal
137140
) {
138141
const whereClauses: string[] = [
139142
`"runtimeEnvironmentId"='${environment.id}'`,
@@ -148,7 +151,8 @@ export class RealtimeClient {
148151
whereClause,
149152
apiVersion,
150153
requestOptions,
151-
clientVersion
154+
clientVersion,
155+
signal
152156
);
153157
}
154158

@@ -158,7 +162,8 @@ export class RealtimeClient {
158162
params: RealtimeRunsParams,
159163
apiVersion: API_VERSIONS,
160164
requestOptions?: RealtimeRequestOptions,
161-
clientVersion?: string
165+
clientVersion?: string,
166+
signal?: AbortSignal
162167
) {
163168
const whereClauses: string[] = [`"runtimeEnvironmentId"='${environment.id}'`];
164169

@@ -180,7 +185,8 @@ export class RealtimeClient {
180185
whereClause,
181186
apiVersion,
182187
requestOptions,
183-
clientVersion
188+
clientVersion,
189+
signal
184190
);
185191

186192
if (createdAtFilter) {
@@ -274,7 +280,8 @@ export class RealtimeClient {
274280
whereClause: string,
275281
apiVersion: API_VERSIONS,
276282
requestOptions?: RealtimeRequestOptions,
277-
clientVersion?: string
283+
clientVersion?: string,
284+
signal?: AbortSignal
278285
) {
279286
const electricUrl = this.#constructRunsElectricUrl(
280287
url,
@@ -288,7 +295,7 @@ export class RealtimeClient {
288295
electricUrl,
289296
environment,
290297
apiVersion,
291-
undefined,
298+
signal,
292299
clientVersion
293300
);
294301
}

apps/webapp/app/utils/longPollingFetch.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ export async function longPollingFetch(
1111
options?: RequestInit,
1212
rewriteResponseHeaders?: Record<string, string>
1313
) {
14+
let upstream: Response | undefined;
1415
try {
15-
let response = await fetch(url, options);
16+
upstream = await fetch(url, options);
17+
let response = upstream;
1618

1719
if (response.headers.get("content-encoding")) {
1820
const headers = new Headers(response.headers);
@@ -46,16 +48,24 @@ export async function longPollingFetch(
4648

4749
return response;
4850
} catch (error) {
51+
// Release upstream undici socket + buffers explicitly. Without this the
52+
// ReadableStream stays open and undici keeps buffering chunks into memory
53+
// until the upstream times out (see H1 isolation test — ~44 KB retained
54+
// per unconsumed-body fetch in RSS).
55+
try { await upstream?.body?.cancel(); } catch {}
56+
57+
// AbortError is the expected path when downstream disconnects with a
58+
// propagated signal — treat as a clean client-close, not a server error.
59+
if (error instanceof Error && error.name === "AbortError") {
60+
throw new Response(null, { status: 499 });
61+
}
4962
if (error instanceof TypeError) {
50-
// Network error or other fetch-related errors
5163
logger.error("Network error:", { error: error.message });
5264
throw new Response("Network error occurred", { status: 503 });
5365
} else if (error instanceof Error) {
54-
// HTTP errors or other known errors
5566
logger.error("Fetch error:", { error: error.message });
5667
throw new Response(error.message, { status: 500 });
5768
} else {
58-
// Unknown errors
5969
logger.error("Unknown error occurred during fetch");
6070
throw new Response("An unknown error occurred", { status: 500 });
6171
}

0 commit comments

Comments
 (0)