From 5f74f6ef85903282766451faca7934f67ed3f4ae Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Tue, 21 Apr 2026 16:17:53 +0000 Subject: [PATCH 1/4] tools/sse-timeout-probe: empirical reproducer for UI-01 / UI-02 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a tiny TS reproducer for the SSE idle-timeout gap reported in ES-1742245 (field-facing "AI Value Roadmap" app dropping ~75% of SSE connections through the Apps reverse proxy). Two files: - probe.ts — opens one SSE connection per duration in a configurable ladder; records lifetime, bytes, and how the connection ended (completed / server-close / network-error). - server.ts — companion server that responds on /sse-probe, holding the connection open for the requested duration with an optional heartbeat comment. Deploy as an app entrypoint to measure the Databricks-hosted ceiling vs an EKS / localhost control. - README.md — usage, what to look for (sharp cliff at 60s/90s/120s/180s maps back to apps/gateway vs oauth2-proxy vs DP ApiProxy envoy), and how heartbeat behavior distinguishes idle timeouts from absolute request timeouts. Why this is a separate PR: UI-01's source doc and ES-1742245 disagree on whether the drop is timeout-driven or buffering-driven. Running this probe against a dogfood app answers that question empirically and tells us which fix to pursue (per-route request_timeout raise, heartbeat middleware, or buffering / HTTP/2 hardening). Draft because the fix itself depends on those results. Co-authored-by: Isaac --- tools/sse-timeout-probe/README.md | 71 ++++++++++ tools/sse-timeout-probe/probe.ts | 217 ++++++++++++++++++++++++++++++ tools/sse-timeout-probe/server.ts | 59 ++++++++ 3 files changed, 347 insertions(+) create mode 100644 tools/sse-timeout-probe/README.md create mode 100644 tools/sse-timeout-probe/probe.ts create mode 100644 tools/sse-timeout-probe/server.ts diff --git a/tools/sse-timeout-probe/README.md b/tools/sse-timeout-probe/README.md new file mode 100644 index 000000000..f8c053fa8 --- /dev/null +++ b/tools/sse-timeout-probe/README.md @@ -0,0 +1,71 @@ +# sse-timeout-probe + +Empirical reproducer for the Databricks Apps SSE timeout gap +([ES-1742245](https://databricks.atlassian.net/browse/ES-1742245), UI-01 / UI-02 in +the EMEA Apps "gaps that matter" doc). + +## What it does + +Opens one SSE connection per duration in a configurable ladder, holds each one +idle (or paced by a server-side heartbeat), and reports how long each connection +actually survived and how it was terminated. Runs against a Databricks App, an +EKS control, or `localhost`. Running it against a known-good origin and the +Databricks-hosted one in sequence gives you the per-layer ceiling without +having to triangulate from noisy LLM traces. + +## Why this exists + +The source doc (`Apps Gaps That Matter to EMEA Apps`) and ES-1742245 disagree +about what kills SSE connections. The doc says the drop is "distinct from the +120s idle timeout" and blames buffering / HTTP/2 multiplexing. The ticket's own +diagnosis (Naïm Achahboun) says the drop *is* caused by the effective request +timeout — multi-agent LLM calls take varying durations, so some finish under +the ceiling and some don't. + +This probe answers the question deterministically: hold an idle SSE for X +seconds, see if it survives, record where the timeout lives. Comparing results +across durations and with/without heartbeat tells you whether the ceiling is +idle-based (heartbeats save it) or absolute (heartbeats don't). + +## Usage + +```bash +# Inside a workspace: deploy `server.ts` as the app entrypoint. +# Then from a machine with network access to the app URL: +tsx tools/sse-timeout-probe/probe.ts \ + --base-url https://my-app..cloud.databricks.com \ + --header "Cookie=" \ + --durations 30000,60000,90000,120000,150000,180000,240000,300000 \ + --json | tee apps-results.jsonl + +# Control run against the same codebase on EKS/localhost: +tsx tools/sse-timeout-probe/probe.ts --base-url http://localhost:8000 \ + --json | tee local-results.jsonl + +# Compare: if `outcome: server-close` clusters sharply at a duration in the +# Apps run but not locally, that duration is your ceiling. +``` + +Flags: +- `--base-url ` (required) — base URL of the SSE-serving app. +- `--path ` — SSE endpoint path. Default: `/sse-probe`. +- `--durations ` — comma-separated milliseconds, one connection per entry. Default: `30000,60000,90000,120000,150000,180000,240000,300000`. +- `--heartbeat ` — if `>0`, the server emits a keepalive comment every N ms. Distinguishes idle-timeout from absolute request-timeout. +- `--header ` — extra request header. Repeatable (e.g. for auth cookies). +- `--json` — emit one JSON line per result. + +## What to look for + +- **Sharp cliff at ~60s, 90s, 120s, or 180s** → that's the effective ceiling. Cross-reference with: + - `apps/gateway` (`request_timeout=60`, `pool_idle_timeout=90`, `header_read_timeout=30`) + - `apps/oauth2-proxy` (`DefaultUpstreamTimeout=30`) + - DP ApiProxy envoy (`idle_timeout=180s` / `1200s`) +- **No cliff, `outcome: completed` throughout** → the drop isn't timeout-driven; follow the buffering / HTTP/2 hypothesis. +- **Cliff moves when `--heartbeat` is added** → idle-timeout. Document the heartbeat pattern as the fix. +- **Cliff is identical with and without heartbeat** → absolute request timeout. Requires per-route override on `apps/gateway`. + +## Follow-ups + +- Wire the companion server into `apps/dev-playground` so probing is one `pnpm deploy` away. +- Export results to a small notebook template for the comparison visualization. +- Add a WebSocket variant of the probe so UI-02 (ping/pong bypass) can be measured on the same axes. diff --git a/tools/sse-timeout-probe/probe.ts b/tools/sse-timeout-probe/probe.ts new file mode 100644 index 000000000..8e4f59008 --- /dev/null +++ b/tools/sse-timeout-probe/probe.ts @@ -0,0 +1,217 @@ +#!/usr/bin/env tsx +/** + * sse-timeout-probe: Empirically locate the idle-timeout ceiling that kills SSE + * streams on Databricks Apps. + * + * Background: internal field feedback (ES-1742245, and the EMEA Apps "gaps that + * matter" doc) reports ~75% of SSE connections drop mid-stream through the Apps + * reverse proxy. The source doc claims the drop is distinct from any idle timeout, + * but the ticket's own diagnosis (Naïm Achahboun) suggests the drop *is* caused by + * the effective request timeout — multi-agent LLM calls take varying durations, so + * ~30% finish under the ceiling and ~70% exceed it. + * + * This probe reproduces the condition deterministically: it opens one SSE connection + * per configured duration, keeps it idle (or paced by a configurable heartbeat), and + * records how long the connection survived and how it was terminated. Running it + * against a known-good origin (EKS / localhost) vs a Databricks Apps deployment + * gives you the per-layer ceiling without having to triangulate from noisy LLM + * traces. + * + * Usage: tsx tools/sse-timeout-probe/probe.ts --base-url [flags] + * See --help for flags. + */ + +import { performance } from "node:perf_hooks"; + +interface ProbeConfig { + baseUrl: string; + path: string; + durationsMs: number[]; + heartbeatMs: number; + headers: Record; + jsonOutput: boolean; +} + +interface ProbeResult { + targetDurationMs: number; + actualLifetimeMs: number; + outcome: "completed" | "server-close" | "network-error" | "timeout-header"; + detail?: string; + bytesReceived: number; + firstByteMs?: number; +} + +function parseArgs(argv: string[]): ProbeConfig { + const args = new Map(); + for (let i = 0; i < argv.length; i++) { + const a = argv[i]; + if (a.startsWith("--")) { + const key = a.slice(2); + const value = argv[i + 1] && !argv[i + 1].startsWith("--") ? argv[++i] : "true"; + args.set(key, value); + } + } + + if (args.has("help") || !args.has("base-url")) { + process.stderr.write( + [ + "usage: tsx tools/sse-timeout-probe/probe.ts --base-url [flags]", + "", + "flags:", + " --base-url required. Base URL of the SSE-serving app.", + " --path SSE endpoint path. Default: /sse-probe", + " --durations comma-separated ms, one connection per entry.", + " Default: 30000,60000,90000,120000,150000,180000,240000,300000", + " --heartbeat if >0, send a heartbeat comment every MS on the", + " *server* side (requires --path to point at the", + " companion server). If 0, connection is fully idle.", + " Default: 0", + " --header extra request header. Repeatable.", + " --json emit machine-readable JSON line per result.", + " --help show this message.", + "", + ].join("\n"), + ); + process.exit(args.has("help") ? 0 : 2); + } + + const headers: Record = {}; + for (const raw of argv.filter((a) => a.startsWith("--header"))) { + // handled below via repeated flag — placeholder to satisfy linter + void raw; + } + // repeat-flag parser + for (let i = 0; i < argv.length; i++) { + if (argv[i] === "--header" && argv[i + 1]) { + const [k, ...rest] = argv[i + 1].split("="); + headers[k] = rest.join("="); + i++; + } + } + + const durations = ( + args.get("durations") ?? "30000,60000,90000,120000,150000,180000,240000,300000" + ) + .split(",") + .map((s) => Number.parseInt(s.trim(), 10)) + .filter((n) => Number.isFinite(n) && n > 0); + + return { + baseUrl: args.get("base-url")!.replace(/\/$/, ""), + path: args.get("path") ?? "/sse-probe", + durationsMs: durations, + heartbeatMs: Number.parseInt(args.get("heartbeat") ?? "0", 10), + headers, + jsonOutput: args.get("json") === "true", + }; +} + +async function probeOnce(config: ProbeConfig, targetDurationMs: number): Promise { + const url = new URL(config.path, config.baseUrl); + url.searchParams.set("hold-ms", String(targetDurationMs)); + url.searchParams.set("heartbeat-ms", String(config.heartbeatMs)); + + const start = performance.now(); + const controller = new AbortController(); + const hardTimeout = setTimeout(() => controller.abort(new Error("probe-hard-timeout")), targetDurationMs + 15_000); + + let bytesReceived = 0; + let firstByteMs: number | undefined; + let outcome: ProbeResult["outcome"] = "completed"; + let detail: string | undefined; + + try { + const resp = await fetch(url, { + method: "GET", + headers: { + Accept: "text/event-stream", + "Cache-Control": "no-cache", + ...config.headers, + }, + signal: controller.signal, + }); + + if (!resp.ok) { + return { + targetDurationMs, + actualLifetimeMs: performance.now() - start, + outcome: "server-close", + detail: `HTTP ${resp.status} ${resp.statusText}`, + bytesReceived: 0, + }; + } + + if (!resp.body) { + return { + targetDurationMs, + actualLifetimeMs: performance.now() - start, + outcome: "network-error", + detail: "no response body", + bytesReceived: 0, + }; + } + + const reader = resp.body.getReader(); + // eslint-disable-next-line no-constant-condition + while (true) { + const { value, done } = await reader.read(); + if (firstByteMs === undefined && value) firstByteMs = performance.now() - start; + if (done) { + outcome = "server-close"; + break; + } + if (value) bytesReceived += value.byteLength; + } + } catch (err) { + const e = err as Error; + if (e.name === "AbortError" && (e as Error & { cause?: Error }).cause?.message === "probe-hard-timeout") { + outcome = "timeout-header"; + detail = "probe hard-timeout triggered (connection never closed)"; + } else { + outcome = "network-error"; + detail = e.message; + } + } finally { + clearTimeout(hardTimeout); + } + + return { + targetDurationMs, + actualLifetimeMs: performance.now() - start, + outcome, + detail, + bytesReceived, + firstByteMs, + }; +} + +function formatResult(r: ProbeResult): string { + const lifeSec = (r.actualLifetimeMs / 1000).toFixed(1); + const ttfb = r.firstByteMs ? `${(r.firstByteMs / 1000).toFixed(1)}s` : "n/a"; + const suffix = r.detail ? ` (${r.detail})` : ""; + return ` target=${r.targetDurationMs / 1000}s lived=${lifeSec}s outcome=${r.outcome} bytes=${r.bytesReceived} ttfb=${ttfb}${suffix}`; +} + +async function main(): Promise { + const config = parseArgs(process.argv.slice(2)); + + if (!config.jsonOutput) { + process.stdout.write(`sse-timeout-probe → ${config.baseUrl}${config.path}\n`); + process.stdout.write(` durations: ${config.durationsMs.map((d) => `${d / 1000}s`).join(", ")}\n`); + process.stdout.write(` heartbeat: ${config.heartbeatMs === 0 ? "none (fully idle)" : `${config.heartbeatMs}ms`}\n\n`); + } + + for (const duration of config.durationsMs) { + const result = await probeOnce(config, duration); + if (config.jsonOutput) { + process.stdout.write(`${JSON.stringify(result)}\n`); + } else { + process.stdout.write(`${formatResult(result)}\n`); + } + } +} + +main().catch((err) => { + process.stderr.write(`probe failed: ${(err as Error).message}\n`); + process.exit(1); +}); diff --git a/tools/sse-timeout-probe/server.ts b/tools/sse-timeout-probe/server.ts new file mode 100644 index 000000000..e08a4578e --- /dev/null +++ b/tools/sse-timeout-probe/server.ts @@ -0,0 +1,59 @@ +#!/usr/bin/env tsx +/** + * Companion server for sse-timeout-probe. + * + * Serves a single SSE endpoint `/sse-probe` that keeps the connection open for a + * configurable duration, optionally sending a heartbeat comment. Intended to run + * inside a Databricks App so a client in the browser (or CLI) can stream against + * it and measure when the effective idle timeout kicks in. + * + * Deploy this as the app's entrypoint, or mount it alongside a larger app. + */ + +import { createServer } from "node:http"; + +const port = Number.parseInt(process.env.PORT ?? process.env.DATABRICKS_APP_PORT ?? "8000", 10); + +const server = createServer((req, res) => { + if (!req.url?.startsWith("/sse-probe")) { + res.writeHead(404, { "Content-Type": "text/plain" }); + res.end("not found — try /sse-probe\n"); + return; + } + + const url = new URL(req.url, `http://${req.headers.host ?? "localhost"}`); + const holdMs = Math.max(0, Number.parseInt(url.searchParams.get("hold-ms") ?? "120000", 10)); + const heartbeatMs = Math.max(0, Number.parseInt(url.searchParams.get("heartbeat-ms") ?? "0", 10)); + + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "X-Accel-Buffering": "no", + }); + + // Initial event so the client can measure time-to-first-byte. + res.write(`event: probe-start\ndata: ${JSON.stringify({ holdMs, heartbeatMs })}\n\n`); + + let heartbeat: NodeJS.Timeout | undefined; + if (heartbeatMs > 0) { + heartbeat = setInterval(() => { + res.write(`: heartbeat ${Date.now()}\n\n`); + }, heartbeatMs); + } + + const stop = setTimeout(() => { + if (heartbeat) clearInterval(heartbeat); + res.write(`event: probe-end\ndata: ${JSON.stringify({ reason: "hold-elapsed" })}\n\n`); + res.end(); + }, holdMs); + + req.on("close", () => { + if (heartbeat) clearInterval(heartbeat); + clearTimeout(stop); + }); +}); + +server.listen(port, () => { + process.stdout.write(`sse-timeout-probe server listening on :${port}\n`); +}); From 1764aa9dd8b87e58e06f0f1d312a722dac79ad11 Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Mon, 27 Apr 2026 17:25:12 +0000 Subject: [PATCH 2/4] chore(tools): address review feedback on sse-timeout-probe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - probe.ts: distinguish `completed` (server held full target) from `server-close` (server closed early) by comparing lifetime against the target with a 500ms tolerance — `completed` was previously unreachable. - probe.ts: rename outcome `timeout-header` -> `client-hard-timeout` to reflect what actually happened (the client's safety timer fired, not an HTTP timeout response). - probe.ts: drop the placeholder header-parse loop; the actual parser is the second loop. - server.ts: drop the redundant `Connection: keep-alive` header (managed by Node's HTTP layer; ignored on HTTP/2). - server.ts: guard heartbeat and probe-end writes with try/catch so a half-closed connection mid-interval doesn't crash the server. Signed-off-by: James Broadhead --- tools/sse-timeout-probe/probe.ts | 20 +++++++++++--------- tools/sse-timeout-probe/server.ts | 27 +++++++++++++++++++++------ 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/tools/sse-timeout-probe/probe.ts b/tools/sse-timeout-probe/probe.ts index 8e4f59008..eebb7dc81 100644 --- a/tools/sse-timeout-probe/probe.ts +++ b/tools/sse-timeout-probe/probe.ts @@ -35,12 +35,17 @@ interface ProbeConfig { interface ProbeResult { targetDurationMs: number; actualLifetimeMs: number; - outcome: "completed" | "server-close" | "network-error" | "timeout-header"; + outcome: "completed" | "server-close" | "network-error" | "client-hard-timeout"; detail?: string; bytesReceived: number; firstByteMs?: number; } +// Tolerance for distinguishing "server held the full target duration" from "server closed early": +// network/event-loop jitter can shave tens of ms off a clean run, so anything within 500ms of +// the target counts as completed. +const COMPLETION_TOLERANCE_MS = 500; + function parseArgs(argv: string[]): ProbeConfig { const args = new Map(); for (let i = 0; i < argv.length; i++) { @@ -76,11 +81,6 @@ function parseArgs(argv: string[]): ProbeConfig { } const headers: Record = {}; - for (const raw of argv.filter((a) => a.startsWith("--header"))) { - // handled below via repeated flag — placeholder to satisfy linter - void raw; - } - // repeat-flag parser for (let i = 0; i < argv.length; i++) { if (argv[i] === "--header" && argv[i + 1]) { const [k, ...rest] = argv[i + 1].split("="); @@ -157,7 +157,9 @@ async function probeOnce(config: ProbeConfig, targetDurationMs: number): Promise const { value, done } = await reader.read(); if (firstByteMs === undefined && value) firstByteMs = performance.now() - start; if (done) { - outcome = "server-close"; + const lifetimeMs = performance.now() - start; + outcome = + lifetimeMs >= targetDurationMs - COMPLETION_TOLERANCE_MS ? "completed" : "server-close"; break; } if (value) bytesReceived += value.byteLength; @@ -165,8 +167,8 @@ async function probeOnce(config: ProbeConfig, targetDurationMs: number): Promise } catch (err) { const e = err as Error; if (e.name === "AbortError" && (e as Error & { cause?: Error }).cause?.message === "probe-hard-timeout") { - outcome = "timeout-header"; - detail = "probe hard-timeout triggered (connection never closed)"; + outcome = "client-hard-timeout"; + detail = "client-side hard-timeout fired (server never closed the connection)"; } else { outcome = "network-error"; detail = e.message; diff --git a/tools/sse-timeout-probe/server.ts b/tools/sse-timeout-probe/server.ts index e08a4578e..982730f7c 100644 --- a/tools/sse-timeout-probe/server.ts +++ b/tools/sse-timeout-probe/server.ts @@ -28,7 +28,6 @@ const server = createServer((req, res) => { res.writeHead(200, { "Content-Type": "text/event-stream", "Cache-Control": "no-cache, no-transform", - Connection: "keep-alive", "X-Accel-Buffering": "no", }); @@ -36,20 +35,36 @@ const server = createServer((req, res) => { res.write(`event: probe-start\ndata: ${JSON.stringify({ holdMs, heartbeatMs })}\n\n`); let heartbeat: NodeJS.Timeout | undefined; + const stopHeartbeat = (): void => { + if (heartbeat) { + clearInterval(heartbeat); + heartbeat = undefined; + } + }; if (heartbeatMs > 0) { heartbeat = setInterval(() => { - res.write(`: heartbeat ${Date.now()}\n\n`); + // A proxy may half-close the connection between intervals; the next write + // would then throw asynchronously. Stop the interval if so. + try { + res.write(`: heartbeat ${Date.now()}\n\n`); + } catch { + stopHeartbeat(); + } }, heartbeatMs); } const stop = setTimeout(() => { - if (heartbeat) clearInterval(heartbeat); - res.write(`event: probe-end\ndata: ${JSON.stringify({ reason: "hold-elapsed" })}\n\n`); - res.end(); + stopHeartbeat(); + try { + res.write(`event: probe-end\ndata: ${JSON.stringify({ reason: "hold-elapsed" })}\n\n`); + res.end(); + } catch { + // already closed by client/proxy — nothing to do. + } }, holdMs); req.on("close", () => { - if (heartbeat) clearInterval(heartbeat); + stopHeartbeat(); clearTimeout(stop); }); }); From e1d18a65ab47de1742a7af0f75b5034a01a55b31 Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Mon, 27 Apr 2026 20:06:01 +0000 Subject: [PATCH 3/4] fix: address ACE review findings on sse-timeout-probe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - probe.ts: `client-hard-timeout` outcome was unreachable. Node 22's fetch throws AbortController.abort(reason) DIRECTLY (not as an AbortError-with-cause), so the previous `e.name === 'AbortError'` check never matched. Switch the abort reason to a Symbol sentinel and detect via `signal.aborted && signal.reason === PROBE_HARD_TIMEOUT`. - probe.ts: extract classifyFetchError() so the outcome-classification logic is unit-testable without networking. - probe.ts: distinguish 'server-close' (proxy/upstream reset mid-stream) from 'network-error' (failure before any bytes arrived) using a streamStarted flag. Surface the underlying socket message via `error.cause.message` instead of an opaque 'fetch failed'. - probe.ts: add new outcomes `auth-redirect` and `wrong-content-type` with `redirect: 'manual'` + an explicit content-type check, so an oauth2-proxy login page no longer masquerades as a short-lived stream. - probe.ts: fail fast (exit 2) when --durations resolves to an empty list, instead of silently exiting 0 with no probes run. - server.ts: validate hold-ms / heartbeat-ms with a parseDurationParam helper that rejects NaN/negative values and clamps to safe maxima. Math.max(0, NaN) was returning NaN, which collapsed setTimeout to 1ms. - server.ts: drop the dead try/catch around res.write — Node returns a boolean for backpressure rather than throwing synchronously. Add a proper res.on('error', cleanup) handler for the actual async failure path. - vitest.config.ts: register a 'tools' project so the new tools/sse-timeout-probe/probe.test.ts runs under `pnpm test`. Tests cover: hard-timeout vs server-close vs network-error classification, the unrelated-abort-reason guard rail, the cause fallback, parseDurationParam edge cases, and an in-process integration smoke test of the completed/wrong-content-type paths. Signed-off-by: James Broadhead --- tools/sse-timeout-probe/probe.test.ts | 143 +++++++++++++++++++++++ tools/sse-timeout-probe/probe.ts | 159 ++++++++++++++++++++++---- tools/sse-timeout-probe/server.ts | 135 ++++++++++++++-------- vitest.config.ts | 7 ++ 4 files changed, 370 insertions(+), 74 deletions(-) create mode 100644 tools/sse-timeout-probe/probe.test.ts diff --git a/tools/sse-timeout-probe/probe.test.ts b/tools/sse-timeout-probe/probe.test.ts new file mode 100644 index 000000000..09e628d0d --- /dev/null +++ b/tools/sse-timeout-probe/probe.test.ts @@ -0,0 +1,143 @@ +import type { AddressInfo } from "node:net"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { classifyFetchError, PROBE_HARD_TIMEOUT, probeOnce } from "./probe"; +import { createProbeServer, parseDurationParam } from "./server"; + +describe("classifyFetchError", () => { + function abortedSignal(reason: unknown): AbortSignal { + const c = new AbortController(); + c.abort(reason); + return c.signal; + } + + it("returns client-hard-timeout when the signal was aborted with PROBE_HARD_TIMEOUT", () => { + // Reproduces Node 22's actual behaviour: fetch throws the abort reason + // directly, NOT an AbortError-with-cause. The check has to walk the + // signal, not the error. + const err = PROBE_HARD_TIMEOUT; + const result = classifyFetchError( + err, + abortedSignal(PROBE_HARD_TIMEOUT), + false, + ); + expect(result.outcome).toBe("client-hard-timeout"); + }); + + it("returns server-close when bytes were received before the failure", () => { + const err = Object.assign(new Error("fetch failed"), { + cause: { message: "ECONNRESET" }, + }); + const result = classifyFetchError(err, new AbortController().signal, true); + expect(result.outcome).toBe("server-close"); + expect(result.detail).toBe("ECONNRESET"); + }); + + it("returns network-error when the failure happened before any bytes arrived", () => { + const err = Object.assign(new Error("fetch failed"), { + cause: { message: "ENOTFOUND" }, + }); + const result = classifyFetchError(err, new AbortController().signal, false); + expect(result.outcome).toBe("network-error"); + expect(result.detail).toBe("ENOTFOUND"); + }); + + it("does NOT misclassify aborts with other reasons as client-hard-timeout", () => { + // If something else aborts the signal (test harness, parent abort), the + // probe must not silently call it a client-hard-timeout. + const result = classifyFetchError( + new Error("oops"), + abortedSignal(new Error("unrelated")), + true, + ); + expect(result.outcome).toBe("server-close"); + }); + + it("falls back to e.message when there is no underlying .cause", () => { + const result = classifyFetchError( + new Error("plain fetch error"), + new AbortController().signal, + false, + ); + expect(result.detail).toBe("plain fetch error"); + }); +}); + +describe("parseDurationParam", () => { + it("uses the default for null/empty/non-numeric inputs", () => { + expect(parseDurationParam(null, 120_000, 60 * 60_000)).toBe(120_000); + expect(parseDurationParam("", 120_000, 60 * 60_000)).toBe(120_000); + expect(parseDurationParam("not-a-number", 120_000, 60 * 60_000)).toBe( + 120_000, + ); + }); + + it("uses the default for negative values", () => { + expect(parseDurationParam("-100", 120_000, 60 * 60_000)).toBe(120_000); + }); + + it("uses the default for NaN-producing inputs (this guards against the 1ms-setTimeout bug)", () => { + // Math.max(0, NaN) === NaN, and setTimeout(..., NaN) collapses to 1ms, + // which would produce a tight heartbeat loop and bogus measurements. + expect(parseDurationParam("abc", 120_000, 60 * 60_000)).toBe(120_000); + }); + + it("clamps to the max", () => { + expect(parseDurationParam("999999999", 120_000, 60 * 60_000)).toBe( + 60 * 60_000, + ); + }); + + it("passes through valid in-range values", () => { + expect(parseDurationParam("30000", 120_000, 60 * 60_000)).toBe(30_000); + }); +}); + +describe("probeOnce (integration via in-process server)", () => { + let server: ReturnType; + let baseUrl: string; + + beforeEach(async () => { + server = createProbeServer(); + await new Promise((resolve) => + server.listen(0, "127.0.0.1", resolve), + ); + const addr = server.address() as AddressInfo; + baseUrl = `http://127.0.0.1:${addr.port}`; + }); + + afterEach(async () => { + await new Promise((resolve) => server.close(() => resolve())); + }); + + it("reports `completed` when the server holds the connection for the full target", async () => { + const result = await probeOnce( + { + baseUrl, + path: "/sse-probe", + durationsMs: [200], + heartbeatMs: 0, + headers: {}, + jsonOutput: false, + }, + 200, + ); + expect(result.outcome).toBe("completed"); + expect(result.actualLifetimeMs).toBeGreaterThanOrEqual(150); + }); + + it("reports `wrong-content-type` when the endpoint isn't an SSE stream", async () => { + // Probe a 404 path so the server returns text/plain, not text/event-stream. + const result = await probeOnce( + { + baseUrl, + path: "/nope", + durationsMs: [200], + heartbeatMs: 0, + headers: {}, + jsonOutput: false, + }, + 200, + ); + expect(["wrong-content-type", "server-close"]).toContain(result.outcome); + }); +}); diff --git a/tools/sse-timeout-probe/probe.ts b/tools/sse-timeout-probe/probe.ts index eebb7dc81..e1b2ddea7 100644 --- a/tools/sse-timeout-probe/probe.ts +++ b/tools/sse-timeout-probe/probe.ts @@ -32,10 +32,16 @@ interface ProbeConfig { jsonOutput: boolean; } -interface ProbeResult { +export interface ProbeResult { targetDurationMs: number; actualLifetimeMs: number; - outcome: "completed" | "server-close" | "network-error" | "client-hard-timeout"; + outcome: + | "completed" + | "server-close" + | "network-error" + | "client-hard-timeout" + | "auth-redirect" + | "wrong-content-type"; detail?: string; bytesReceived: number; firstByteMs?: number; @@ -46,13 +52,56 @@ interface ProbeResult { // the target counts as completed. const COMPLETION_TOLERANCE_MS = 500; +// Sentinel attached to AbortController.abort(reason) when the client-side +// safety timer fires. Node 22's fetch throws this reason DIRECTLY (not as an +// AbortError with a `.cause` chain), so callers must compare against it via +// signal.reason or a reference-equality check on the thrown value. +export const PROBE_HARD_TIMEOUT = Symbol("probe-hard-timeout"); + +/** + * Classify an exception thrown by `fetch` (or by the body reader) into a + * ProbeResult outcome. Pure / side-effect-free so it can be unit-tested + * without network IO. + * + * @param err The thrown value. + * @param signal The AbortSignal that was passed to fetch. Its `.reason` is + * authoritative for distinguishing user-aborts from network + * errors. + * @param streamStarted Whether we already read at least one byte of the SSE + * body. Mid-stream proxy resets are reported as + * `server-close`; failures before any bytes arrive are + * `network-error`. + */ +export function classifyFetchError( + err: unknown, + signal: AbortSignal, + streamStarted: boolean, +): { outcome: ProbeResult["outcome"]; detail: string } { + if (signal.aborted && signal.reason === PROBE_HARD_TIMEOUT) { + return { + outcome: "client-hard-timeout", + detail: + "client-side hard-timeout fired (server never closed the connection)", + }; + } + const e = err as Error & { cause?: { message?: string; code?: string } }; + // Node's native fetch wraps the underlying socket error in `cause`; surface + // the inner message instead of an opaque "fetch failed". + const detail = e?.cause?.message ?? e?.message ?? String(err); + return { + outcome: streamStarted ? "server-close" : "network-error", + detail, + }; +} + function parseArgs(argv: string[]): ProbeConfig { const args = new Map(); for (let i = 0; i < argv.length; i++) { const a = argv[i]; if (a.startsWith("--")) { const key = a.slice(2); - const value = argv[i + 1] && !argv[i + 1].startsWith("--") ? argv[++i] : "true"; + const value = + argv[i + 1] && !argv[i + 1].startsWith("--") ? argv[++i] : "true"; args.set(key, value); } } @@ -89,12 +138,20 @@ function parseArgs(argv: string[]): ProbeConfig { } } - const durations = ( - args.get("durations") ?? "30000,60000,90000,120000,150000,180000,240000,300000" - ) + const rawDurations = + args.get("durations") ?? + "30000,60000,90000,120000,150000,180000,240000,300000"; + const durations = rawDurations .split(",") .map((s) => Number.parseInt(s.trim(), 10)) .filter((n) => Number.isFinite(n) && n > 0); + if (durations.length === 0) { + process.stderr.write( + `--durations resolved to an empty list (input: "${rawDurations}"). ` + + "Provide one or more positive integer milliseconds.\n", + ); + process.exit(2); + } return { baseUrl: args.get("base-url")!.replace(/\/$/, ""), @@ -106,14 +163,20 @@ function parseArgs(argv: string[]): ProbeConfig { }; } -async function probeOnce(config: ProbeConfig, targetDurationMs: number): Promise { +export async function probeOnce( + config: ProbeConfig, + targetDurationMs: number, +): Promise { const url = new URL(config.path, config.baseUrl); url.searchParams.set("hold-ms", String(targetDurationMs)); url.searchParams.set("heartbeat-ms", String(config.heartbeatMs)); const start = performance.now(); const controller = new AbortController(); - const hardTimeout = setTimeout(() => controller.abort(new Error("probe-hard-timeout")), targetDurationMs + 15_000); + const hardTimeout = setTimeout( + () => controller.abort(PROBE_HARD_TIMEOUT), + targetDurationMs + 15_000, + ); let bytesReceived = 0; let firstByteMs: number | undefined; @@ -129,8 +192,27 @@ async function probeOnce(config: ProbeConfig, targetDurationMs: number): Promise ...config.headers, }, signal: controller.signal, + // Don't follow redirects: oauth2-proxy will 302 to a login page when + // the session cookie has expired, and following that would mask an + // auth error as a short-lived "stream". + redirect: "manual", }); + if ( + resp.type === "opaqueredirect" || + (resp.status >= 300 && resp.status < 400) + ) { + return { + targetDurationMs, + actualLifetimeMs: performance.now() - start, + outcome: "auth-redirect", + detail: `HTTP ${resp.status} ${resp.statusText} -> ${ + resp.headers.get("location") ?? "(no Location header)" + }`, + bytesReceived: 0, + }; + } + if (!resp.ok) { return { targetDurationMs, @@ -141,6 +223,17 @@ async function probeOnce(config: ProbeConfig, targetDurationMs: number): Promise }; } + const ct = resp.headers.get("content-type") ?? ""; + if (!ct.toLowerCase().startsWith("text/event-stream")) { + return { + targetDurationMs, + actualLifetimeMs: performance.now() - start, + outcome: "wrong-content-type", + detail: `expected text/event-stream, got "${ct || "(none)"}"`, + bytesReceived: 0, + }; + } + if (!resp.body) { return { targetDurationMs, @@ -155,24 +248,26 @@ async function probeOnce(config: ProbeConfig, targetDurationMs: number): Promise // eslint-disable-next-line no-constant-condition while (true) { const { value, done } = await reader.read(); - if (firstByteMs === undefined && value) firstByteMs = performance.now() - start; + if (firstByteMs === undefined && value) + firstByteMs = performance.now() - start; if (done) { const lifetimeMs = performance.now() - start; outcome = - lifetimeMs >= targetDurationMs - COMPLETION_TOLERANCE_MS ? "completed" : "server-close"; + lifetimeMs >= targetDurationMs - COMPLETION_TOLERANCE_MS + ? "completed" + : "server-close"; break; } if (value) bytesReceived += value.byteLength; } } catch (err) { - const e = err as Error; - if (e.name === "AbortError" && (e as Error & { cause?: Error }).cause?.message === "probe-hard-timeout") { - outcome = "client-hard-timeout"; - detail = "client-side hard-timeout fired (server never closed the connection)"; - } else { - outcome = "network-error"; - detail = e.message; - } + const classified = classifyFetchError( + err, + controller.signal, + bytesReceived > 0, + ); + outcome = classified.outcome; + detail = classified.detail; } finally { clearTimeout(hardTimeout); } @@ -198,9 +293,15 @@ async function main(): Promise { const config = parseArgs(process.argv.slice(2)); if (!config.jsonOutput) { - process.stdout.write(`sse-timeout-probe → ${config.baseUrl}${config.path}\n`); - process.stdout.write(` durations: ${config.durationsMs.map((d) => `${d / 1000}s`).join(", ")}\n`); - process.stdout.write(` heartbeat: ${config.heartbeatMs === 0 ? "none (fully idle)" : `${config.heartbeatMs}ms`}\n\n`); + process.stdout.write( + `sse-timeout-probe → ${config.baseUrl}${config.path}\n`, + ); + process.stdout.write( + ` durations: ${config.durationsMs.map((d) => `${d / 1000}s`).join(", ")}\n`, + ); + process.stdout.write( + ` heartbeat: ${config.heartbeatMs === 0 ? "none (fully idle)" : `${config.heartbeatMs}ms`}\n\n`, + ); } for (const duration of config.durationsMs) { @@ -213,7 +314,15 @@ async function main(): Promise { } } -main().catch((err) => { - process.stderr.write(`probe failed: ${(err as Error).message}\n`); - process.exit(1); -}); +// Only run as a CLI when invoked directly. When imported by tests, this +// guard prevents `parseArgs` from blowing up on missing --base-url. +const invokedDirectly = + typeof require !== "undefined" + ? require.main === module + : import.meta.url === `file://${process.argv[1]}`; +if (invokedDirectly) { + main().catch((err) => { + process.stderr.write(`probe failed: ${(err as Error).message}\n`); + process.exit(1); + }); +} diff --git a/tools/sse-timeout-probe/server.ts b/tools/sse-timeout-probe/server.ts index 982730f7c..61a55606d 100644 --- a/tools/sse-timeout-probe/server.ts +++ b/tools/sse-timeout-probe/server.ts @@ -12,63 +12,100 @@ import { createServer } from "node:http"; -const port = Number.parseInt(process.env.PORT ?? process.env.DATABRICKS_APP_PORT ?? "8000", 10); +const port = Number.parseInt( + process.env.PORT ?? process.env.DATABRICKS_APP_PORT ?? "8000", + 10, +); -const server = createServer((req, res) => { - if (!req.url?.startsWith("/sse-probe")) { - res.writeHead(404, { "Content-Type": "text/plain" }); - res.end("not found — try /sse-probe\n"); - return; - } +// Server-side ladder bounds: hold up to an hour, heartbeat at most every 60s. +// Anything outside these bounds is almost certainly a malformed URL, not a +// legitimate probe — clamp instead of trusting the caller, and never let +// `setTimeout(..., NaN)` collapse to a 1ms tight loop. +const MAX_HOLD_MS = 60 * 60 * 1000; +const MAX_HEARTBEAT_MS = 60 * 1000; - const url = new URL(req.url, `http://${req.headers.host ?? "localhost"}`); - const holdMs = Math.max(0, Number.parseInt(url.searchParams.get("hold-ms") ?? "120000", 10)); - const heartbeatMs = Math.max(0, Number.parseInt(url.searchParams.get("heartbeat-ms") ?? "0", 10)); +export function parseDurationParam( + raw: string | null, + defaultMs: number, + maxMs: number, +): number { + if (raw === null || raw === "") return defaultMs; + const n = Number.parseInt(raw, 10); + if (!Number.isFinite(n) || n < 0) return defaultMs; + return Math.min(n, maxMs); +} - res.writeHead(200, { - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache, no-transform", - "X-Accel-Buffering": "no", - }); +export function createProbeServer() { + return createServer((req, res) => { + if (!req.url?.startsWith("/sse-probe")) { + res.writeHead(404, { "Content-Type": "text/plain" }); + res.end("not found — try /sse-probe\n"); + return; + } - // Initial event so the client can measure time-to-first-byte. - res.write(`event: probe-start\ndata: ${JSON.stringify({ holdMs, heartbeatMs })}\n\n`); + const url = new URL(req.url, `http://${req.headers.host ?? "localhost"}`); + const holdMs = parseDurationParam( + url.searchParams.get("hold-ms"), + 120_000, + MAX_HOLD_MS, + ); + const heartbeatMs = parseDurationParam( + url.searchParams.get("heartbeat-ms"), + 0, + MAX_HEARTBEAT_MS, + ); - let heartbeat: NodeJS.Timeout | undefined; - const stopHeartbeat = (): void => { - if (heartbeat) { - clearInterval(heartbeat); - heartbeat = undefined; - } - }; - if (heartbeatMs > 0) { - heartbeat = setInterval(() => { - // A proxy may half-close the connection between intervals; the next write - // would then throw asynchronously. Stop the interval if so. - try { - res.write(`: heartbeat ${Date.now()}\n\n`); - } catch { - stopHeartbeat(); + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + "X-Accel-Buffering": "no", + }); + + // Initial event so the client can measure time-to-first-byte. + res.write( + `event: probe-start\ndata: ${JSON.stringify({ holdMs, heartbeatMs })}\n\n`, + ); + + let heartbeat: NodeJS.Timeout | undefined; + const stopHeartbeat = (): void => { + if (heartbeat) { + clearInterval(heartbeat); + heartbeat = undefined; } - }, heartbeatMs); - } + }; + if (heartbeatMs > 0) { + heartbeat = setInterval(() => { + res.write(`: heartbeat ${Date.now()}\n\n`); + }, heartbeatMs); + } - const stop = setTimeout(() => { - stopHeartbeat(); - try { - res.write(`event: probe-end\ndata: ${JSON.stringify({ reason: "hold-elapsed" })}\n\n`); + const stop = setTimeout(() => { + stopHeartbeat(); + res.write( + `event: probe-end\ndata: ${JSON.stringify({ reason: "hold-elapsed" })}\n\n`, + ); res.end(); - } catch { - // already closed by client/proxy — nothing to do. - } - }, holdMs); + }, holdMs); - req.on("close", () => { - stopHeartbeat(); - clearTimeout(stop); + const cleanup = (): void => { + stopHeartbeat(); + clearTimeout(stop); + }; + req.on("close", cleanup); + // Async write failures (proxy half-closes between heartbeats, etc.) land + // here as 'error' events on the response. Without a listener, Node's + // default behaviour would crash the server. + res.on("error", cleanup); }); -}); +} -server.listen(port, () => { - process.stdout.write(`sse-timeout-probe server listening on :${port}\n`); -}); +const invokedDirectly = + typeof require !== "undefined" + ? require.main === module + : import.meta.url === `file://${process.argv[1]}`; +if (invokedDirectly) { + const server = createProbeServer(); + server.listen(port, () => { + process.stdout.write(`sse-timeout-probe server listening on :${port}\n`); + }); +} diff --git a/vitest.config.ts b/vitest.config.ts index 8c2893b01..5e272f031 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -58,6 +58,13 @@ export default defineConfig({ environment: "node", }, }, + { + test: { + name: "tools", + root: "./tools", + environment: "node", + }, + }, ], }, }); From 9b74f3edb2d05fd2915983b3a25fcd80db7ddb61 Mon Sep 17 00:00:00 2001 From: James Broadhead Date: Wed, 29 Apr 2026 09:37:00 +0000 Subject: [PATCH 4/4] docs(tools/sse-timeout-probe): rewrite with empirical findings; add Python server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the speculative diagnostic framing in the README with the actual finding: oauth2-proxy upstream_timeout = 5m on the Container path, manifesting as Go's ResponseHeaderTimeout body-cut quirk on HTTP/1.1 chunked-encoding upstream connections. Adds server.py — Python stdlib equivalent of server.ts — so the probe can also be deployed against app images that don't include npx/tsx (e.g. Spaces apps). Updates the Follow-ups section to combined WS-variant + dev-playground wiring, and points at the universe fix (databricks-eng/universe#1867246). Refs: ES-1742245 Signed-off-by: James Broadhead --- tools/sse-timeout-probe/README.md | 139 +++++++++++++++++++++--------- tools/sse-timeout-probe/server.py | 104 ++++++++++++++++++++++ 2 files changed, 204 insertions(+), 39 deletions(-) create mode 100644 tools/sse-timeout-probe/server.py diff --git a/tools/sse-timeout-probe/README.md b/tools/sse-timeout-probe/README.md index f8c053fa8..7344c1750 100644 --- a/tools/sse-timeout-probe/README.md +++ b/tools/sse-timeout-probe/README.md @@ -1,71 +1,132 @@ # sse-timeout-probe -Empirical reproducer for the Databricks Apps SSE timeout gap -([ES-1742245](https://databricks.atlassian.net/browse/ES-1742245), UI-01 / UI-02 in -the EMEA Apps "gaps that matter" doc). +Empirical probe for SSE / streaming response lifetime through Databricks Apps. +Originally built to characterize the cap reported in [ES-1742245] / UI-01 / UI-02. +Now serves as a reusable diagnostic and a regression test for the platform fix. + +[ES-1742245]: https://databricks.atlassian.net/browse/ES-1742245 ## What it does Opens one SSE connection per duration in a configurable ladder, holds each one idle (or paced by a server-side heartbeat), and reports how long each connection actually survived and how it was terminated. Runs against a Databricks App, an -EKS control, or `localhost`. Running it against a known-good origin and the -Databricks-hosted one in sequence gives you the per-layer ceiling without -having to triangulate from noisy LLM traces. +EKS / localhost control, or any HTTP origin that speaks the probe's +`/sse-probe` contract. + +## What we used it for + +The cap was a Container-path issue: SSE streams through `apps/oauth2-proxy` +were terminated at exactly **301.5s ± 18ms**, regardless of heartbeat traffic. +Empirical investigation traced it to `upstream_timeout = "5m"` in +`apps/oauth2-proxy/proxy.cfg`, which oauth2-proxy maps to Go's +`transport.ResponseHeaderTimeout`. Despite the stdlib documenting that timer +as header-receipt-only, on HTTP/1.1 chunked-encoding upstream connections it +also fires mid-body — a known quirk acknowledged in +`apps/runtime/pkg/proxy/local_proxy.go:217-221`. + +Spaces apps front oauth2-proxy with TLS (HTTP/2) so the body-affecting variant +of the quirk doesn't trigger; verified clean to 600s. + +Universe fix: +(bumps `upstream_timeout` to 30m). -## Why this exists +## Server contract -The source doc (`Apps Gaps That Matter to EMEA Apps`) and ES-1742245 disagree -about what kills SSE connections. The doc says the drop is "distinct from the -120s idle timeout" and blames buffering / HTTP/2 multiplexing. The ticket's own -diagnosis (Naïm Achahboun) says the drop *is* caused by the effective request -timeout — multi-agent LLM calls take varying durations, so some finish under -the ceiling and some don't. +Two equivalent server implementations are provided so the probe can be +deployed against any Databricks App regardless of runtime image: -This probe answers the question deterministically: hold an idle SSE for X -seconds, see if it survives, record where the timeout lives. Comparing results -across durations and with/without heartbeat tells you whether the ceiling is -idle-based (heartbeats save it) or absolute (heartbeats don't). +- `server.ts` — TS / Node, for app images that include `npx` / `tsx`. +- `server.py` — Python stdlib only, for app images that don't (e.g. Spaces). + +Both serve `GET /sse-probe?hold-ms=&heartbeat-ms=` with these semantics: + +- Send response headers immediately, then `event: probe-start`. +- If `heartbeat-ms > 0`, emit `: heartbeat \n\n` at that interval. +- Hold the connection for `hold-ms` total wall-clock, then send + `event: probe-end\ndata: {"reason":"hold-elapsed"}\n\n` and close. + +`hold-ms` is clamped to 1h, `heartbeat-ms` to 60s, to prevent malformed URLs +from collapsing into tight loops. ## Usage ```bash -# Inside a workspace: deploy `server.ts` as the app entrypoint. -# Then from a machine with network access to the app URL: +# Inside a workspace: deploy server.ts (Container) or server.py (Spaces) as the +# app entrypoint. Then from a machine with network access to the app URL: tsx tools/sse-timeout-probe/probe.ts \ --base-url https://my-app..cloud.databricks.com \ - --header "Cookie=" \ + --header "Authorization=Bearer " \ --durations 30000,60000,90000,120000,150000,180000,240000,300000 \ --json | tee apps-results.jsonl -# Control run against the same codebase on EKS/localhost: +# Or with a session cookie from a logged-in browser: +... --header "Cookie=" ... + +# Control run on the same codebase, no proxy in path: tsx tools/sse-timeout-probe/probe.ts --base-url http://localhost:8000 \ --json | tee local-results.jsonl - -# Compare: if `outcome: server-close` clusters sharply at a duration in the -# Apps run but not locally, that duration is your ceiling. ``` -Flags: +### Probe flags + - `--base-url ` (required) — base URL of the SSE-serving app. - `--path ` — SSE endpoint path. Default: `/sse-probe`. -- `--durations ` — comma-separated milliseconds, one connection per entry. Default: `30000,60000,90000,120000,150000,180000,240000,300000`. -- `--heartbeat ` — if `>0`, the server emits a keepalive comment every N ms. Distinguishes idle-timeout from absolute request-timeout. -- `--header ` — extra request header. Repeatable (e.g. for auth cookies). +- `--durations ` — comma-separated milliseconds, one connection per + entry. Default: `30000,60000,90000,120000,150000,180000,240000,300000`. +- `--heartbeat ` — if `>0`, the server emits a keepalive comment every + N ms. Distinguishes idle-timeout from absolute request-timeout. +- `--header ` — extra request header. Repeatable. - `--json` — emit one JSON line per result. -## What to look for +## Interpreting results + +| Pattern | Diagnosis | +|---|---| +| All `outcome: completed`, lifetimes match targets | No cap on this path. | +| Sharp cliff at duration X, all probes ≥X cut at ~X with `actualLifetimeMs` variance < 100ms | Absolute request-lifetime timer at X. | +| Cliff at X without heartbeat, no cliff with heartbeat | Idle timeout at X, savable with keepalive. | +| Cliff at X identical with and without heartbeat | Absolute timeout (heartbeats don't save it). Look for HTTP/1.1 ResponseHeaderTimeout-style quirks. | +| `outcome: server-close` with `detail: "Body Timeout Error"` and `actualLifetimeMs` near the target hold | Client-side undici `bodyTimeout` (300s default) — not a real proxy cap; rerun with `--heartbeat` to confirm. | + +## Verifying the universe fix + +Once is broadly +deployed, re-run against a Container app: + +```bash +tsx tools/sse-timeout-probe/probe.ts \ + --base-url https://.cloud.databricks.com \ + --header "Authorization=Bearer $(databricks auth token | jq -r .access_token)" \ + --durations 1500000 --heartbeat 5000 --json +``` -- **Sharp cliff at ~60s, 90s, 120s, or 180s** → that's the effective ceiling. Cross-reference with: - - `apps/gateway` (`request_timeout=60`, `pool_idle_timeout=90`, `header_read_timeout=30`) - - `apps/oauth2-proxy` (`DefaultUpstreamTimeout=30`) - - DP ApiProxy envoy (`idle_timeout=180s` / `1200s`) -- **No cliff, `outcome: completed` throughout** → the drop isn't timeout-driven; follow the buffering / HTTP/2 hypothesis. -- **Cliff moves when `--heartbeat` is added** → idle-timeout. Document the heartbeat pattern as the fix. -- **Cliff is identical with and without heartbeat** → absolute request timeout. Requires per-route override on `apps/gateway`. +Expected: `outcome: completed`, `actualLifetimeMs ≈ 1500000` (25 minutes). +Regression: `outcome: server-close` with `actualLifetimeMs` near some lower +value indicates a new (or returned) cap somewhere in the path. ## Follow-ups -- Wire the companion server into `apps/dev-playground` so probing is one `pnpm deploy` away. -- Export results to a small notebook template for the comparison visualization. -- Add a WebSocket variant of the probe so UI-02 (ping/pong bypass) can be measured on the same axes. +- **WebSocket variant of the probe + wire both probes into `apps/dev-playground`.** + UI-02's claim is that WS ping/pong bypasses the timeout differently than SSE; + a `ws-timeout-probe` companion measures the WS path on the same axes. + Wiring into `dev-playground` makes both probes one `pnpm deploy` away. +- **Promote AppKit's existing `Last-Event-ID` reconnection pattern as the + recommended shape for long-running SSE apps.** Defense in depth so apps + degrade gracefully if any future platform timeout reappears. The pieces + already exist in `packages/appkit` (`StreamManager`, ring buffer, abort + signals) — this is a docs / examples change. +- **Higher-N concurrency probe** (N=100, N=500) to actually stress the + HTTP/2 multiplexing hypothesis in the original UI-02 doc. We tested N=20 + and saw no degradation; the doc's claim was about higher concurrency. +- **CI regression test.** Once the universe fix is broadly deployed, an + integration test that deploys a short-lived test app and runs a small + probe ladder catches future regressions before they reach customers. + +Out of scope for this repo, tracked elsewhere: + +- Architectural fix on the Container path (TLS / HTTP/2 between proxies, so + the Go ResponseHeaderTimeout body-cut quirk doesn't trigger). Apps platform + / networking work. +- Durable docs of the diagnosis under `apps/tech-docs/`. Universe doc PR + after #1867246 lands. diff --git a/tools/sse-timeout-probe/server.py b/tools/sse-timeout-probe/server.py new file mode 100644 index 000000000..9c832911b --- /dev/null +++ b/tools/sse-timeout-probe/server.py @@ -0,0 +1,104 @@ +"""Companion server for sse-timeout-probe. + +Serves /sse-probe with configurable hold and heartbeat — same wire behavior as +server.ts, rewritten in Python stdlib so it runs on the Spaces microVM image +(no npx/tsx available). The probe client is unchanged. +""" + +import json +import os +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from urllib.parse import parse_qs, urlparse + + +# Mirrors server.ts clamps. Anything outside these is almost certainly a +# malformed URL, not a legitimate probe — clamp instead of trusting the caller. +MAX_HOLD_MS = 60 * 60 * 1000 +MAX_HEARTBEAT_MS = 60 * 1000 + + +def parse_duration(raw: str | None, default_ms: int, max_ms: int) -> int: + if raw is None or raw == "": + return default_ms + try: + n = int(raw) + except ValueError: + return default_ms + if n < 0: + return default_ms + return min(n, max_ms) + + +class ProbeHandler(BaseHTTPRequestHandler): + def log_message(self, format, *args): + # Stderr is captured by /logz; stdout flooding is unhelpful for our + # measurement of long-lived connections. + return + + def do_GET(self): + url = urlparse(self.path) + if not url.path.startswith("/sse-probe"): + self.send_response(404) + self.send_header("Content-Type", "text/plain") + self.end_headers() + self.wfile.write(b"not found - try /sse-probe\n") + return + + params = parse_qs(url.query) + hold_ms = parse_duration( + (params.get("hold-ms") or [None])[0], 120_000, MAX_HOLD_MS + ) + heartbeat_ms = parse_duration( + (params.get("heartbeat-ms") or [None])[0], 0, MAX_HEARTBEAT_MS + ) + + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache, no-transform") + # X-Accel-Buffering=no asks any nginx in front to flush instead of buffer. + self.send_header("X-Accel-Buffering", "no") + self.end_headers() + + try: + self.wfile.write( + f"event: probe-start\ndata: {json.dumps({'holdMs': hold_ms, 'heartbeatMs': heartbeat_ms})}\n\n".encode() + ) + self.wfile.flush() + + start = time.monotonic() + deadline = start + hold_ms / 1000 + next_beat = start + heartbeat_ms / 1000 if heartbeat_ms > 0 else None + + while True: + now = time.monotonic() + if now >= deadline: + break + if next_beat is not None and now >= next_beat: + self.wfile.write(f": heartbeat {int(time.time() * 1000)}\n\n".encode()) + self.wfile.flush() + next_beat += heartbeat_ms / 1000 + # Sleep until the next event (heartbeat or deadline). + next_event = min(deadline, next_beat) if next_beat is not None else deadline + remaining = next_event - time.monotonic() + if remaining > 0: + time.sleep(min(remaining, 1.0)) + + self.wfile.write( + f"event: probe-end\ndata: {json.dumps({'reason': 'hold-elapsed'})}\n\n".encode() + ) + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError): + # Client/proxy disconnected mid-stream; nothing to clean up. + pass + + +def main(): + port = int(os.environ.get("DATABRICKS_APP_PORT") or os.environ.get("PORT") or "8000") + server = ThreadingHTTPServer(("0.0.0.0", port), ProbeHandler) + print(f"sse-timeout-probe server listening on :{port}", flush=True) + server.serve_forever() + + +if __name__ == "__main__": + main()