diff --git a/tools/sse-timeout-probe/README.md b/tools/sse-timeout-probe/README.md new file mode 100644 index 000000000..7344c1750 --- /dev/null +++ b/tools/sse-timeout-probe/README.md @@ -0,0 +1,132 @@ +# sse-timeout-probe + +Empirical probe for SSE / streaming response lifetime through Databricks Apps. +Originally built to characterize the cap reported in [ES-1742245] / UI-01 / UI-02. +Now serves as a reusable diagnostic and a regression test for the platform fix. + +[ES-1742245]: https://databricks.atlassian.net/browse/ES-1742245 + +## What it does + +Opens one SSE connection per duration in a configurable ladder, holds each one +idle (or paced by a server-side heartbeat), and reports how long each connection +actually survived and how it was terminated. Runs against a Databricks App, an +EKS / localhost control, or any HTTP origin that speaks the probe's +`/sse-probe` contract. + +## What we used it for + +The cap was a Container-path issue: SSE streams through `apps/oauth2-proxy` +were terminated at exactly **301.5s ± 18ms**, regardless of heartbeat traffic. +Empirical investigation traced it to `upstream_timeout = "5m"` in +`apps/oauth2-proxy/proxy.cfg`, which oauth2-proxy maps to Go's +`transport.ResponseHeaderTimeout`. Despite the stdlib documenting that timer +as header-receipt-only, on HTTP/1.1 chunked-encoding upstream connections it +also fires mid-body — a known quirk acknowledged in +`apps/runtime/pkg/proxy/local_proxy.go:217-221`. + +Spaces apps front oauth2-proxy with TLS (HTTP/2) so the body-affecting variant +of the quirk doesn't trigger; verified clean to 600s. + +Universe fix: +(bumps `upstream_timeout` to 30m). + +## Server contract + +Two equivalent server implementations are provided so the probe can be +deployed against any Databricks App regardless of runtime image: + +- `server.ts` — TS / Node, for app images that include `npx` / `tsx`. +- `server.py` — Python stdlib only, for app images that don't (e.g. Spaces). + +Both serve `GET /sse-probe?hold-ms=&heartbeat-ms=` with these semantics: + +- Send response headers immediately, then `event: probe-start`. +- If `heartbeat-ms > 0`, emit `: heartbeat \n\n` at that interval. +- Hold the connection for `hold-ms` total wall-clock, then send + `event: probe-end\ndata: {"reason":"hold-elapsed"}\n\n` and close. + +`hold-ms` is clamped to 1h, `heartbeat-ms` to 60s, to prevent malformed URLs +from collapsing into tight loops. + +## Usage + +```bash +# Inside a workspace: deploy server.ts (Container) or server.py (Spaces) as the +# app entrypoint. Then from a machine with network access to the app URL: +tsx tools/sse-timeout-probe/probe.ts \ + --base-url https://my-app..cloud.databricks.com \ + --header "Authorization=Bearer " \ + --durations 30000,60000,90000,120000,150000,180000,240000,300000 \ + --json | tee apps-results.jsonl + +# Or with a session cookie from a logged-in browser: +... --header "Cookie=" ... + +# Control run on the same codebase, no proxy in path: +tsx tools/sse-timeout-probe/probe.ts --base-url http://localhost:8000 \ + --json | tee local-results.jsonl +``` + +### Probe flags + +- `--base-url ` (required) — base URL of the SSE-serving app. +- `--path ` — SSE endpoint path. Default: `/sse-probe`. +- `--durations ` — comma-separated milliseconds, one connection per + entry. Default: `30000,60000,90000,120000,150000,180000,240000,300000`. +- `--heartbeat ` — if `>0`, the server emits a keepalive comment every + N ms. Distinguishes idle-timeout from absolute request-timeout. +- `--header ` — extra request header. Repeatable. +- `--json` — emit one JSON line per result. + +## Interpreting results + +| Pattern | Diagnosis | +|---|---| +| All `outcome: completed`, lifetimes match targets | No cap on this path. | +| Sharp cliff at duration X, all probes ≥X cut at ~X with `actualLifetimeMs` variance < 100ms | Absolute request-lifetime timer at X. | +| Cliff at X without heartbeat, no cliff with heartbeat | Idle timeout at X, savable with keepalive. | +| Cliff at X identical with and without heartbeat | Absolute timeout (heartbeats don't save it). Look for HTTP/1.1 ResponseHeaderTimeout-style quirks. | +| `outcome: server-close` with `detail: "Body Timeout Error"` and `actualLifetimeMs` near the target hold | Client-side undici `bodyTimeout` (300s default) — not a real proxy cap; rerun with `--heartbeat` to confirm. | + +## Verifying the universe fix + +Once is broadly +deployed, re-run against a Container app: + +```bash +tsx tools/sse-timeout-probe/probe.ts \ + --base-url https://.cloud.databricks.com \ + --header "Authorization=Bearer $(databricks auth token | jq -r .access_token)" \ + --durations 1500000 --heartbeat 5000 --json +``` + +Expected: `outcome: completed`, `actualLifetimeMs ≈ 1500000` (25 minutes). +Regression: `outcome: server-close` with `actualLifetimeMs` near some lower +value indicates a new (or returned) cap somewhere in the path. + +## Follow-ups + +- **WebSocket variant of the probe + wire both probes into `apps/dev-playground`.** + UI-02's claim is that WS ping/pong bypasses the timeout differently than SSE; + a `ws-timeout-probe` companion measures the WS path on the same axes. + Wiring into `dev-playground` makes both probes one `pnpm deploy` away. +- **Promote AppKit's existing `Last-Event-ID` reconnection pattern as the + recommended shape for long-running SSE apps.** Defense in depth so apps + degrade gracefully if any future platform timeout reappears. The pieces + already exist in `packages/appkit` (`StreamManager`, ring buffer, abort + signals) — this is a docs / examples change. +- **Higher-N concurrency probe** (N=100, N=500) to actually stress the + HTTP/2 multiplexing hypothesis in the original UI-02 doc. We tested N=20 + and saw no degradation; the doc's claim was about higher concurrency. +- **CI regression test.** Once the universe fix is broadly deployed, an + integration test that deploys a short-lived test app and runs a small + probe ladder catches future regressions before they reach customers. + +Out of scope for this repo, tracked elsewhere: + +- Architectural fix on the Container path (TLS / HTTP/2 between proxies, so + the Go ResponseHeaderTimeout body-cut quirk doesn't trigger). Apps platform + / networking work. +- Durable docs of the diagnosis under `apps/tech-docs/`. Universe doc PR + after #1867246 lands. diff --git a/tools/sse-timeout-probe/probe.test.ts b/tools/sse-timeout-probe/probe.test.ts new file mode 100644 index 000000000..09e628d0d --- /dev/null +++ b/tools/sse-timeout-probe/probe.test.ts @@ -0,0 +1,143 @@ +import type { AddressInfo } from "node:net"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { classifyFetchError, PROBE_HARD_TIMEOUT, probeOnce } from "./probe"; +import { createProbeServer, parseDurationParam } from "./server"; + +describe("classifyFetchError", () => { + function abortedSignal(reason: unknown): AbortSignal { + const c = new AbortController(); + c.abort(reason); + return c.signal; + } + + it("returns client-hard-timeout when the signal was aborted with PROBE_HARD_TIMEOUT", () => { + // Reproduces Node 22's actual behaviour: fetch throws the abort reason + // directly, NOT an AbortError-with-cause. The check has to walk the + // signal, not the error. + const err = PROBE_HARD_TIMEOUT; + const result = classifyFetchError( + err, + abortedSignal(PROBE_HARD_TIMEOUT), + false, + ); + expect(result.outcome).toBe("client-hard-timeout"); + }); + + it("returns server-close when bytes were received before the failure", () => { + const err = Object.assign(new Error("fetch failed"), { + cause: { message: "ECONNRESET" }, + }); + const result = classifyFetchError(err, new AbortController().signal, true); + expect(result.outcome).toBe("server-close"); + expect(result.detail).toBe("ECONNRESET"); + }); + + it("returns network-error when the failure happened before any bytes arrived", () => { + const err = Object.assign(new Error("fetch failed"), { + cause: { message: "ENOTFOUND" }, + }); + const result = classifyFetchError(err, new AbortController().signal, false); + expect(result.outcome).toBe("network-error"); + expect(result.detail).toBe("ENOTFOUND"); + }); + + it("does NOT misclassify aborts with other reasons as client-hard-timeout", () => { + // If something else aborts the signal (test harness, parent abort), the + // probe must not silently call it a client-hard-timeout. + const result = classifyFetchError( + new Error("oops"), + abortedSignal(new Error("unrelated")), + true, + ); + expect(result.outcome).toBe("server-close"); + }); + + it("falls back to e.message when there is no underlying .cause", () => { + const result = classifyFetchError( + new Error("plain fetch error"), + new AbortController().signal, + false, + ); + expect(result.detail).toBe("plain fetch error"); + }); +}); + +describe("parseDurationParam", () => { + it("uses the default for null/empty/non-numeric inputs", () => { + expect(parseDurationParam(null, 120_000, 60 * 60_000)).toBe(120_000); + expect(parseDurationParam("", 120_000, 60 * 60_000)).toBe(120_000); + expect(parseDurationParam("not-a-number", 120_000, 60 * 60_000)).toBe( + 120_000, + ); + }); + + it("uses the default for negative values", () => { + expect(parseDurationParam("-100", 120_000, 60 * 60_000)).toBe(120_000); + }); + + it("uses the default for NaN-producing inputs (this guards against the 1ms-setTimeout bug)", () => { + // Math.max(0, NaN) === NaN, and setTimeout(..., NaN) collapses to 1ms, + // which would produce a tight heartbeat loop and bogus measurements. + expect(parseDurationParam("abc", 120_000, 60 * 60_000)).toBe(120_000); + }); + + it("clamps to the max", () => { + expect(parseDurationParam("999999999", 120_000, 60 * 60_000)).toBe( + 60 * 60_000, + ); + }); + + it("passes through valid in-range values", () => { + expect(parseDurationParam("30000", 120_000, 60 * 60_000)).toBe(30_000); + }); +}); + +describe("probeOnce (integration via in-process server)", () => { + let server: ReturnType; + let baseUrl: string; + + beforeEach(async () => { + server = createProbeServer(); + await new Promise((resolve) => + server.listen(0, "127.0.0.1", resolve), + ); + const addr = server.address() as AddressInfo; + baseUrl = `http://127.0.0.1:${addr.port}`; + }); + + afterEach(async () => { + await new Promise((resolve) => server.close(() => resolve())); + }); + + it("reports `completed` when the server holds the connection for the full target", async () => { + const result = await probeOnce( + { + baseUrl, + path: "/sse-probe", + durationsMs: [200], + heartbeatMs: 0, + headers: {}, + jsonOutput: false, + }, + 200, + ); + expect(result.outcome).toBe("completed"); + expect(result.actualLifetimeMs).toBeGreaterThanOrEqual(150); + }); + + it("reports `wrong-content-type` when the endpoint isn't an SSE stream", async () => { + // Probe a 404 path so the server returns text/plain, not text/event-stream. + const result = await probeOnce( + { + baseUrl, + path: "/nope", + durationsMs: [200], + heartbeatMs: 0, + headers: {}, + jsonOutput: false, + }, + 200, + ); + expect(["wrong-content-type", "server-close"]).toContain(result.outcome); + }); +}); diff --git a/tools/sse-timeout-probe/probe.ts b/tools/sse-timeout-probe/probe.ts new file mode 100644 index 000000000..e1b2ddea7 --- /dev/null +++ b/tools/sse-timeout-probe/probe.ts @@ -0,0 +1,328 @@ +#!/usr/bin/env tsx +/** + * sse-timeout-probe: Empirically locate the idle-timeout ceiling that kills SSE + * streams on Databricks Apps. + * + * Background: internal field feedback (ES-1742245, and the EMEA Apps "gaps that + * matter" doc) reports ~75% of SSE connections drop mid-stream through the Apps + * reverse proxy. The source doc claims the drop is distinct from any idle timeout, + * but the ticket's own diagnosis (Naïm Achahboun) suggests the drop *is* caused by + * the effective request timeout — multi-agent LLM calls take varying durations, so + * ~30% finish under the ceiling and ~70% exceed it. + * + * This probe reproduces the condition deterministically: it opens one SSE connection + * per configured duration, keeps it idle (or paced by a configurable heartbeat), and + * records how long the connection survived and how it was terminated. Running it + * against a known-good origin (EKS / localhost) vs a Databricks Apps deployment + * gives you the per-layer ceiling without having to triangulate from noisy LLM + * traces. + * + * Usage: tsx tools/sse-timeout-probe/probe.ts --base-url [flags] + * See --help for flags. + */ + +import { performance } from "node:perf_hooks"; + +interface ProbeConfig { + baseUrl: string; + path: string; + durationsMs: number[]; + heartbeatMs: number; + headers: Record; + jsonOutput: boolean; +} + +export interface ProbeResult { + targetDurationMs: number; + actualLifetimeMs: number; + outcome: + | "completed" + | "server-close" + | "network-error" + | "client-hard-timeout" + | "auth-redirect" + | "wrong-content-type"; + detail?: string; + bytesReceived: number; + firstByteMs?: number; +} + +// Tolerance for distinguishing "server held the full target duration" from "server closed early": +// network/event-loop jitter can shave tens of ms off a clean run, so anything within 500ms of +// the target counts as completed. +const COMPLETION_TOLERANCE_MS = 500; + +// Sentinel attached to AbortController.abort(reason) when the client-side +// safety timer fires. Node 22's fetch throws this reason DIRECTLY (not as an +// AbortError with a `.cause` chain), so callers must compare against it via +// signal.reason or a reference-equality check on the thrown value. +export const PROBE_HARD_TIMEOUT = Symbol("probe-hard-timeout"); + +/** + * Classify an exception thrown by `fetch` (or by the body reader) into a + * ProbeResult outcome. Pure / side-effect-free so it can be unit-tested + * without network IO. + * + * @param err The thrown value. + * @param signal The AbortSignal that was passed to fetch. Its `.reason` is + * authoritative for distinguishing user-aborts from network + * errors. + * @param streamStarted Whether we already read at least one byte of the SSE + * body. Mid-stream proxy resets are reported as + * `server-close`; failures before any bytes arrive are + * `network-error`. + */ +export function classifyFetchError( + err: unknown, + signal: AbortSignal, + streamStarted: boolean, +): { outcome: ProbeResult["outcome"]; detail: string } { + if (signal.aborted && signal.reason === PROBE_HARD_TIMEOUT) { + return { + outcome: "client-hard-timeout", + detail: + "client-side hard-timeout fired (server never closed the connection)", + }; + } + const e = err as Error & { cause?: { message?: string; code?: string } }; + // Node's native fetch wraps the underlying socket error in `cause`; surface + // the inner message instead of an opaque "fetch failed". + const detail = e?.cause?.message ?? e?.message ?? String(err); + return { + outcome: streamStarted ? "server-close" : "network-error", + detail, + }; +} + +function parseArgs(argv: string[]): ProbeConfig { + const args = new Map(); + for (let i = 0; i < argv.length; i++) { + const a = argv[i]; + if (a.startsWith("--")) { + const key = a.slice(2); + const value = + argv[i + 1] && !argv[i + 1].startsWith("--") ? argv[++i] : "true"; + args.set(key, value); + } + } + + if (args.has("help") || !args.has("base-url")) { + process.stderr.write( + [ + "usage: tsx tools/sse-timeout-probe/probe.ts --base-url [flags]", + "", + "flags:", + " --base-url required. Base URL of the SSE-serving app.", + " --path SSE endpoint path. Default: /sse-probe", + " --durations comma-separated ms, one connection per entry.", + " Default: 30000,60000,90000,120000,150000,180000,240000,300000", + " --heartbeat if >0, send a heartbeat comment every MS on the", + " *server* side (requires --path to point at the", + " companion server). If 0, connection is fully idle.", + " Default: 0", + " --header extra request header. Repeatable.", + " --json emit machine-readable JSON line per result.", + " --help show this message.", + "", + ].join("\n"), + ); + process.exit(args.has("help") ? 0 : 2); + } + + const headers: Record = {}; + for (let i = 0; i < argv.length; i++) { + if (argv[i] === "--header" && argv[i + 1]) { + const [k, ...rest] = argv[i + 1].split("="); + headers[k] = rest.join("="); + i++; + } + } + + const rawDurations = + args.get("durations") ?? + "30000,60000,90000,120000,150000,180000,240000,300000"; + const durations = rawDurations + .split(",") + .map((s) => Number.parseInt(s.trim(), 10)) + .filter((n) => Number.isFinite(n) && n > 0); + if (durations.length === 0) { + process.stderr.write( + `--durations resolved to an empty list (input: "${rawDurations}"). ` + + "Provide one or more positive integer milliseconds.\n", + ); + process.exit(2); + } + + return { + baseUrl: args.get("base-url")!.replace(/\/$/, ""), + path: args.get("path") ?? "/sse-probe", + durationsMs: durations, + heartbeatMs: Number.parseInt(args.get("heartbeat") ?? "0", 10), + headers, + jsonOutput: args.get("json") === "true", + }; +} + +export async function probeOnce( + config: ProbeConfig, + targetDurationMs: number, +): Promise { + const url = new URL(config.path, config.baseUrl); + url.searchParams.set("hold-ms", String(targetDurationMs)); + url.searchParams.set("heartbeat-ms", String(config.heartbeatMs)); + + const start = performance.now(); + const controller = new AbortController(); + const hardTimeout = setTimeout( + () => controller.abort(PROBE_HARD_TIMEOUT), + targetDurationMs + 15_000, + ); + + let bytesReceived = 0; + let firstByteMs: number | undefined; + let outcome: ProbeResult["outcome"] = "completed"; + let detail: string | undefined; + + try { + const resp = await fetch(url, { + method: "GET", + headers: { + Accept: "text/event-stream", + "Cache-Control": "no-cache", + ...config.headers, + }, + signal: controller.signal, + // Don't follow redirects: oauth2-proxy will 302 to a login page when + // the session cookie has expired, and following that would mask an + // auth error as a short-lived "stream". + redirect: "manual", + }); + + if ( + resp.type === "opaqueredirect" || + (resp.status >= 300 && resp.status < 400) + ) { + return { + targetDurationMs, + actualLifetimeMs: performance.now() - start, + outcome: "auth-redirect", + detail: `HTTP ${resp.status} ${resp.statusText} -> ${ + resp.headers.get("location") ?? "(no Location header)" + }`, + bytesReceived: 0, + }; + } + + if (!resp.ok) { + return { + targetDurationMs, + actualLifetimeMs: performance.now() - start, + outcome: "server-close", + detail: `HTTP ${resp.status} ${resp.statusText}`, + bytesReceived: 0, + }; + } + + const ct = resp.headers.get("content-type") ?? ""; + if (!ct.toLowerCase().startsWith("text/event-stream")) { + return { + targetDurationMs, + actualLifetimeMs: performance.now() - start, + outcome: "wrong-content-type", + detail: `expected text/event-stream, got "${ct || "(none)"}"`, + bytesReceived: 0, + }; + } + + if (!resp.body) { + return { + targetDurationMs, + actualLifetimeMs: performance.now() - start, + outcome: "network-error", + detail: "no response body", + bytesReceived: 0, + }; + } + + const reader = resp.body.getReader(); + // eslint-disable-next-line no-constant-condition + while (true) { + const { value, done } = await reader.read(); + if (firstByteMs === undefined && value) + firstByteMs = performance.now() - start; + if (done) { + const lifetimeMs = performance.now() - start; + outcome = + lifetimeMs >= targetDurationMs - COMPLETION_TOLERANCE_MS + ? "completed" + : "server-close"; + break; + } + if (value) bytesReceived += value.byteLength; + } + } catch (err) { + const classified = classifyFetchError( + err, + controller.signal, + bytesReceived > 0, + ); + outcome = classified.outcome; + detail = classified.detail; + } finally { + clearTimeout(hardTimeout); + } + + return { + targetDurationMs, + actualLifetimeMs: performance.now() - start, + outcome, + detail, + bytesReceived, + firstByteMs, + }; +} + +function formatResult(r: ProbeResult): string { + const lifeSec = (r.actualLifetimeMs / 1000).toFixed(1); + const ttfb = r.firstByteMs ? `${(r.firstByteMs / 1000).toFixed(1)}s` : "n/a"; + const suffix = r.detail ? ` (${r.detail})` : ""; + return ` target=${r.targetDurationMs / 1000}s lived=${lifeSec}s outcome=${r.outcome} bytes=${r.bytesReceived} ttfb=${ttfb}${suffix}`; +} + +async function main(): Promise { + const config = parseArgs(process.argv.slice(2)); + + if (!config.jsonOutput) { + process.stdout.write( + `sse-timeout-probe → ${config.baseUrl}${config.path}\n`, + ); + process.stdout.write( + ` durations: ${config.durationsMs.map((d) => `${d / 1000}s`).join(", ")}\n`, + ); + process.stdout.write( + ` heartbeat: ${config.heartbeatMs === 0 ? "none (fully idle)" : `${config.heartbeatMs}ms`}\n\n`, + ); + } + + for (const duration of config.durationsMs) { + const result = await probeOnce(config, duration); + if (config.jsonOutput) { + process.stdout.write(`${JSON.stringify(result)}\n`); + } else { + process.stdout.write(`${formatResult(result)}\n`); + } + } +} + +// Only run as a CLI when invoked directly. When imported by tests, this +// guard prevents `parseArgs` from blowing up on missing --base-url. +const invokedDirectly = + typeof require !== "undefined" + ? require.main === module + : import.meta.url === `file://${process.argv[1]}`; +if (invokedDirectly) { + main().catch((err) => { + process.stderr.write(`probe failed: ${(err as Error).message}\n`); + process.exit(1); + }); +} diff --git a/tools/sse-timeout-probe/server.py b/tools/sse-timeout-probe/server.py new file mode 100644 index 000000000..9c832911b --- /dev/null +++ b/tools/sse-timeout-probe/server.py @@ -0,0 +1,104 @@ +"""Companion server for sse-timeout-probe. + +Serves /sse-probe with configurable hold and heartbeat — same wire behavior as +server.ts, rewritten in Python stdlib so it runs on the Spaces microVM image +(no npx/tsx available). The probe client is unchanged. +""" + +import json +import os +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from urllib.parse import parse_qs, urlparse + + +# Mirrors server.ts clamps. Anything outside these is almost certainly a +# malformed URL, not a legitimate probe — clamp instead of trusting the caller. +MAX_HOLD_MS = 60 * 60 * 1000 +MAX_HEARTBEAT_MS = 60 * 1000 + + +def parse_duration(raw: str | None, default_ms: int, max_ms: int) -> int: + if raw is None or raw == "": + return default_ms + try: + n = int(raw) + except ValueError: + return default_ms + if n < 0: + return default_ms + return min(n, max_ms) + + +class ProbeHandler(BaseHTTPRequestHandler): + def log_message(self, format, *args): + # Stderr is captured by /logz; stdout flooding is unhelpful for our + # measurement of long-lived connections. + return + + def do_GET(self): + url = urlparse(self.path) + if not url.path.startswith("/sse-probe"): + self.send_response(404) + self.send_header("Content-Type", "text/plain") + self.end_headers() + self.wfile.write(b"not found - try /sse-probe\n") + return + + params = parse_qs(url.query) + hold_ms = parse_duration( + (params.get("hold-ms") or [None])[0], 120_000, MAX_HOLD_MS + ) + heartbeat_ms = parse_duration( + (params.get("heartbeat-ms") or [None])[0], 0, MAX_HEARTBEAT_MS + ) + + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache, no-transform") + # X-Accel-Buffering=no asks any nginx in front to flush instead of buffer. + self.send_header("X-Accel-Buffering", "no") + self.end_headers() + + try: + self.wfile.write( + f"event: probe-start\ndata: {json.dumps({'holdMs': hold_ms, 'heartbeatMs': heartbeat_ms})}\n\n".encode() + ) + self.wfile.flush() + + start = time.monotonic() + deadline = start + hold_ms / 1000 + next_beat = start + heartbeat_ms / 1000 if heartbeat_ms > 0 else None + + while True: + now = time.monotonic() + if now >= deadline: + break + if next_beat is not None and now >= next_beat: + self.wfile.write(f": heartbeat {int(time.time() * 1000)}\n\n".encode()) + self.wfile.flush() + next_beat += heartbeat_ms / 1000 + # Sleep until the next event (heartbeat or deadline). + next_event = min(deadline, next_beat) if next_beat is not None else deadline + remaining = next_event - time.monotonic() + if remaining > 0: + time.sleep(min(remaining, 1.0)) + + self.wfile.write( + f"event: probe-end\ndata: {json.dumps({'reason': 'hold-elapsed'})}\n\n".encode() + ) + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError): + # Client/proxy disconnected mid-stream; nothing to clean up. + pass + + +def main(): + port = int(os.environ.get("DATABRICKS_APP_PORT") or os.environ.get("PORT") or "8000") + server = ThreadingHTTPServer(("0.0.0.0", port), ProbeHandler) + print(f"sse-timeout-probe server listening on :{port}", flush=True) + server.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/tools/sse-timeout-probe/server.ts b/tools/sse-timeout-probe/server.ts new file mode 100644 index 000000000..61a55606d --- /dev/null +++ b/tools/sse-timeout-probe/server.ts @@ -0,0 +1,111 @@ +#!/usr/bin/env tsx +/** + * Companion server for sse-timeout-probe. + * + * Serves a single SSE endpoint `/sse-probe` that keeps the connection open for a + * configurable duration, optionally sending a heartbeat comment. Intended to run + * inside a Databricks App so a client in the browser (or CLI) can stream against + * it and measure when the effective idle timeout kicks in. + * + * Deploy this as the app's entrypoint, or mount it alongside a larger app. + */ + +import { createServer } from "node:http"; + +const port = Number.parseInt( + process.env.PORT ?? process.env.DATABRICKS_APP_PORT ?? "8000", + 10, +); + +// Server-side ladder bounds: hold up to an hour, heartbeat at most every 60s. +// Anything outside these bounds is almost certainly a malformed URL, not a +// legitimate probe — clamp instead of trusting the caller, and never let +// `setTimeout(..., NaN)` collapse to a 1ms tight loop. +const MAX_HOLD_MS = 60 * 60 * 1000; +const MAX_HEARTBEAT_MS = 60 * 1000; + +export function parseDurationParam( + raw: string | null, + defaultMs: number, + maxMs: number, +): number { + if (raw === null || raw === "") return defaultMs; + const n = Number.parseInt(raw, 10); + if (!Number.isFinite(n) || n < 0) return defaultMs; + return Math.min(n, maxMs); +} + +export function createProbeServer() { + return createServer((req, res) => { + if (!req.url?.startsWith("/sse-probe")) { + res.writeHead(404, { "Content-Type": "text/plain" }); + res.end("not found — try /sse-probe\n"); + return; + } + + const url = new URL(req.url, `http://${req.headers.host ?? "localhost"}`); + const holdMs = parseDurationParam( + url.searchParams.get("hold-ms"), + 120_000, + MAX_HOLD_MS, + ); + const heartbeatMs = parseDurationParam( + url.searchParams.get("heartbeat-ms"), + 0, + MAX_HEARTBEAT_MS, + ); + + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + "X-Accel-Buffering": "no", + }); + + // Initial event so the client can measure time-to-first-byte. + res.write( + `event: probe-start\ndata: ${JSON.stringify({ holdMs, heartbeatMs })}\n\n`, + ); + + let heartbeat: NodeJS.Timeout | undefined; + const stopHeartbeat = (): void => { + if (heartbeat) { + clearInterval(heartbeat); + heartbeat = undefined; + } + }; + if (heartbeatMs > 0) { + heartbeat = setInterval(() => { + res.write(`: heartbeat ${Date.now()}\n\n`); + }, heartbeatMs); + } + + const stop = setTimeout(() => { + stopHeartbeat(); + res.write( + `event: probe-end\ndata: ${JSON.stringify({ reason: "hold-elapsed" })}\n\n`, + ); + res.end(); + }, holdMs); + + const cleanup = (): void => { + stopHeartbeat(); + clearTimeout(stop); + }; + req.on("close", cleanup); + // Async write failures (proxy half-closes between heartbeats, etc.) land + // here as 'error' events on the response. Without a listener, Node's + // default behaviour would crash the server. + res.on("error", cleanup); + }); +} + +const invokedDirectly = + typeof require !== "undefined" + ? require.main === module + : import.meta.url === `file://${process.argv[1]}`; +if (invokedDirectly) { + const server = createProbeServer(); + server.listen(port, () => { + process.stdout.write(`sse-timeout-probe server listening on :${port}\n`); + }); +} diff --git a/vitest.config.ts b/vitest.config.ts index 8c2893b01..5e272f031 100644 --- a/vitest.config.ts +++ b/vitest.config.ts @@ -58,6 +58,13 @@ export default defineConfig({ environment: "node", }, }, + { + test: { + name: "tools", + root: "./tools", + environment: "node", + }, + }, ], }, });