From bebcd70499034ffc9e5422b25b837bb50fefdfc7 Mon Sep 17 00:00:00 2001 From: Vance Ingalls Date: Sun, 14 Jun 2026 14:51:13 -0700 Subject: [PATCH 1/3] perf(engine,producer): worker-offload JPEG encode for drawElement fast capture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Moves per-frame JPEG encode (~7.4ms, 57% of frame cost) off the page main thread into an in-page OffscreenCanvas Worker, then pipelines so frame N encodes while frame N+1 seeks+paints. Target ~1.65× (1 worker) wall-time speedup on macOS hardware-GPU drawElement renders. New machinery: - EngineConfig.enableDrawElementWorkerEncode (default false, env HF_DE_WORKER_ENCODE) - drawElementService: WorkerEncodeState, initDrawElementWorkerEncode, cleanupDrawElementWorkerEncode, produceDrawElementFrame - frameCapture: CaptureSession.workerEncodeEnabled, captureFrameToBufferPipelined - captureStreamingStage: runWorkerEncodePipelineLoop helper + depth-2 dispatch Gated off by default. No effect on BeginFrame/Linux, SwiftShader, PNG, or any render without useDrawElement+enableDrawElementWorkerEncode=true. Co-Authored-By: Claude Sonnet 4.6 --- packages/engine/src/config.ts | 13 + packages/engine/src/index.ts | 1 + .../engine/src/services/drawElementService.ts | 268 ++++++++++++++++++ packages/engine/src/services/frameCapture.ts | 83 ++++++ .../render/stages/captureStreamingStage.ts | 93 ++++-- 5 files changed, 438 insertions(+), 20 deletions(-) diff --git a/packages/engine/src/config.ts b/packages/engine/src/config.ts index face25a5a..1f383c3f2 100644 --- a/packages/engine/src/config.ts +++ b/packages/engine/src/config.ts @@ -61,6 +61,14 @@ export interface EngineConfig { * Env fallback: `PRODUCER_EXPERIMENTAL_FAST_CAPTURE`. */ useDrawElement: boolean; + /** + * EXPERIMENTAL. Pipeline JPEG encode into an in-page OffscreenCanvas Worker + * for the drawElement fast-capture path (macOS hardware GPU only). The worker + * encodes frame N while the main thread seeks+paints frame N+1, targeting + * ~1.65–1.96× wall-time speedup. No-op unless `useDrawElement` is also true. + * Default: off. Env: `HF_DE_WORKER_ENCODE=true`. + */ + enableDrawElementWorkerEncode: boolean; /** * Low-memory render profile. When `true`, the orchestrator collapses the * pipeline to its cheapest shape on memory-constrained hosts: it skips the @@ -216,6 +224,7 @@ export const DEFAULT_CONFIG: EngineConfig = { protocolTimeout: 300_000, forceScreenshot: false, useDrawElement: false, + enableDrawElementWorkerEncode: false, // Auto-detected per host in `resolveConfig`; defaults off for the raw // DEFAULT_CONFIG (used directly by tests and worker-sizing fallbacks). lowMemoryMode: false, @@ -316,6 +325,10 @@ export function resolveConfig(overrides?: Partial): EngineConfig { forceScreenshot: envBool("PRODUCER_FORCE_SCREENSHOT", DEFAULT_CONFIG.forceScreenshot), useDrawElement: envBool("PRODUCER_EXPERIMENTAL_FAST_CAPTURE", DEFAULT_CONFIG.useDrawElement), + enableDrawElementWorkerEncode: envBool( + "HF_DE_WORKER_ENCODE", + DEFAULT_CONFIG.enableDrawElementWorkerEncode, + ), lowMemoryMode: resolveLowMemoryMode(), enablePageSideCompositing: envBool( "HF_PAGE_SIDE_COMPOSITING", diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index 61bcaa060..d2e7c9f81 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -71,6 +71,7 @@ export { closeCaptureSession, captureFrame, captureFrameToBuffer, + captureFrameToBufferPipelined, discardWarmupCapture, getCompositionDuration, getCapturePerfSummary, diff --git a/packages/engine/src/services/drawElementService.ts b/packages/engine/src/services/drawElementService.ts index 7286baffe..ae2056bc7 100644 --- a/packages/engine/src/services/drawElementService.ts +++ b/packages/engine/src/services/drawElementService.ts @@ -392,3 +392,271 @@ export async function captureDrawElementFrame( if (!base64) throw new Error("drawElement: toDataURL returned no base64 payload"); return Buffer.from(base64, "base64"); } + +// ── Worker-encode pipeline ──────────────────────────────────────────────────── +// +// Architecture: an in-page OffscreenCanvas Worker encodes JPEG frames off the +// main thread. The main thread does seek+paint+drawElement+createImageBitmap +// (the "produce" phase) and immediately transfers the bitmap to the worker. +// The worker encodes it concurrently while the main thread processes the next +// frame — hiding ~7.4ms of encode cost behind ~8.4ms of produce work. +// +// The worker posts the encoded bytes back by calling window.__hfFrameReady +// (a Puppeteer exposeFunction binding that calls a node-side callback). +// Node resolves the per-frame Promise from that callback. + +interface WorkerEncodeEntry { + resolve: (buf: Buffer) => void; + reject: (err: Error) => void; +} + +interface WorkerEncodeState { + nextId: number; + pending: Map; +} + +const workerEncodeStates = new WeakMap(); + +/** + * Initialize the in-page JPEG encode Worker for a session. Must be called + * after page navigation (post-`initializeSession`) and before any + * `produceDrawElementFrame` calls. + * + * Safe to call multiple times for the same page (e.g. session reuse after + * navigation): the exposeFunction binding survives navigation, but the + * in-page Worker is re-created. Pending promises from a prior navigation are + * rejected with a "session reused" error. + */ +export async function initDrawElementWorkerEncode(page: Page): Promise { + const existing = workerEncodeStates.get(page); + + if (existing) { + // Session reused after navigation — reject stale pending promises. + for (const entry of existing.pending.values()) { + entry.reject(new Error("drawElement worker encode: session reused, frame dropped")); + } + existing.pending.clear(); + } else { + // First init for this page: register the node-side callback. + const state: WorkerEncodeState = { nextId: 0, pending: new Map() }; + workerEncodeStates.set(page, state); + await page.exposeFunction("__hfFrameReady", (id: number, b64: string) => { + const s = workerEncodeStates.get(page); + if (!s) return; + const entry = s.pending.get(id); + if (!entry) return; + s.pending.delete(id); + entry.resolve(Buffer.from(b64, "base64")); + }); + } + + // Inject (or re-create) the in-page Worker after each navigation. + await page.evaluate(() => { + type EncWin = Window & { __hfEncWorker?: Worker }; + const ew = window as EncWin; + if (ew.__hfEncWorker) { + ew.__hfEncWorker.terminate(); + ew.__hfEncWorker = undefined; + } + const workerSrc = ` + self.onmessage = async (e) => { + const { bmp, id, w, h, q } = e.data; + const oc = new OffscreenCanvas(w, h); + const c = oc.getContext('2d'); + c.drawImage(bmp, 0, 0); + bmp.close(); + const blob = await oc.convertToBlob({ type: 'image/jpeg', quality: q }); + const ab = await blob.arrayBuffer(); + const u = new Uint8Array(ab); + let s = ''; const CH = 0x8000; + for (let i = 0; i < u.length; i += CH) + s += String.fromCharCode.apply(null, u.subarray(i, i + CH)); + self.postMessage({ id, b64: btoa(s) }); + }; + `; + const url = URL.createObjectURL(new Blob([workerSrc], { type: "text/javascript" })); + ew.__hfEncWorker = new Worker(url); + ew.__hfEncWorker.onmessage = (ev: MessageEvent) => { + const { id, b64 } = ev.data as { id: number; b64: string }; + (window as EncWin & { __hfFrameReady?: (id: number, b64: string) => void }).__hfFrameReady?.( + id, + b64, + ); + }; + ew.__hfEncWorker.onerror = (err: ErrorEvent) => { + console.error("[hfEncWorker] unhandled error:", err.message); + }; + }); +} + +/** + * Clean up the worker encode state for a session being closed. Rejects any + * pending frame promises and removes the WeakMap entry. Safe to call even if + * `initDrawElementWorkerEncode` was never called for this page. + */ +export function cleanupDrawElementWorkerEncode(page: Page): void { + const state = workerEncodeStates.get(page); + if (!state) return; + for (const entry of state.pending.values()) { + entry.reject(new Error("drawElement worker encode: session closed")); + } + state.pending.clear(); + workerEncodeStates.delete(page); +} + +/** + * Pipelined drawElement frame capture: produce phase only. + * + * Performs seek-prep, paint-wait, drawElementImage, compositing, and + * `createImageBitmap` on the main thread, then transfers the bitmap to the + * in-page encode worker. Returns as soon as the bitmap is transferred — the + * worker encodes asynchronously. The returned `encodeResult` resolves when + * the worker posts the encoded frame back to node. + * + * Call `initDrawElementWorkerEncode` once per page before using this function. + * + * JPEG only (png falls back to synchronous `captureDrawElementFrame`). + */ +export async function produceDrawElementFrame( + page: Page, + width: number, + height: number, + quality = 80, + syncToPaintEvent = true, +): Promise<{ encodeResult: Promise }> { + const state = workerEncodeStates.get(page); + if (!state) { + throw new Error( + "drawElement worker encode not initialized; call initDrawElementWorkerEncode first", + ); + } + + const frameId = ++state.nextId; + const encodeResult = new Promise((resolve, reject) => { + state.pending.set(frameId, { resolve, reject }); + }); + + // Do paint-wait + drawElement composite + createImageBitmap + postMessage. + // Resolves as soon as the bitmap is transferred (not when encode is done). + await page.evaluate( + ({ w, h, q, sync, fid }: { w: number; h: number; q: number; sync: boolean; fid: number }) => { + const canvas = document.getElementById("__hf_de_canvas") as HTMLCanvasElement | null; + const root = document.querySelector("[data-composition-id]") as HTMLElement | null; + if (!canvas || !root) throw new Error("drawElement canvas not initialized"); + const ctx = canvas.getContext("2d"); + if (!ctx) throw new Error("drawElement: 2d context unavailable"); + + type AccelWindow = Window & { + __hf_accel_canvases?: HTMLCanvasElement[]; + __hf_canvas_2d?: HTMLCanvasElement[]; + __hf3d?: { update: () => void }; + }; + const aw = window as AccelWindow; + aw.__hf3d?.update(); + const accel = (aw.__hf_accel_canvases ?? []).filter((c) => root.contains(c)); + if (!sync) { + for (const c of (aw.__hf_canvas_2d ?? []).filter((c2) => root.contains(c2))) { + if (!accel.includes(c)) accel.push(c); + } + accel.sort((a, b) => + a.compareDocumentPosition(b) & Node.DOCUMENT_POSITION_FOLLOWING ? -1 : 1, + ); + } + for (const c of accel) { + if (c.style.visibility !== "hidden") c.style.visibility = "hidden"; + } + + return new Promise((resolveCapture, rejectCapture) => { + let settled = false; + const drawAndKick = () => { + if (settled) return; + settled = true; + try { + ctx.clearRect(0, 0, w, h); + let bg = ""; + for (let el = root.parentElement; el; el = el.parentElement) { + const c = getComputedStyle(el).backgroundColor; + if (c && c !== "transparent" && c !== "rgba(0, 0, 0, 0)") { + bg = c; + break; + } + } + if (!bg) bg = "#fff"; + if (bg) { + ctx.fillStyle = bg; + ctx.fillRect(0, 0, w, h); + } + // fallow-ignore-next-line code-duplication + const rootRect = root.getBoundingClientRect(); + for (const c of accel) { + if (c.hasAttribute("data-hf-3d")) continue; + const r = c.getBoundingClientRect(); + try { + ctx.drawImage(c, r.left - rootRect.left, r.top - rootRect.top, r.width, r.height); + } catch { + // skip + } + } + ( + ctx as unknown as { drawElementImage(el: Element, x: number, y: number): void } + ).drawElementImage(root, 0, 0); + // fallow-ignore-next-line code-duplication + for (const c of accel) { + if (!c.hasAttribute("data-hf-3d")) continue; + const r = c.getBoundingClientRect(); + try { + ctx.drawImage(c, r.left - rootRect.left, r.top - rootRect.top, r.width, r.height); + } catch { + // skip + } + } + } catch (e) { + rejectCapture(e instanceof Error ? e : new Error(String(e))); + return; + } + // Snapshot canvas and hand off to encode worker. createImageBitmap + // captures a pixel snapshot at this point; after .then resolves, the + // canvas is safe to overwrite for the next frame. + setTimeout(() => { + createImageBitmap(canvas) + .then((bmp) => { + type EncWin = Window & { __hfEncWorker?: Worker }; + const ew = window as EncWin; + if (!ew.__hfEncWorker) { + rejectCapture(new Error("drawElement: encode worker not initialized")); + return; + } + ew.__hfEncWorker.postMessage({ bmp, id: fid, w, h, q: q / 100 }, [bmp]); + resolveCapture(); + }) + .catch((e: unknown) => { + rejectCapture(e instanceof Error ? e : new Error(String(e))); + }); + }, 0); + }; + + if (!sync) { + drawAndKick(); + return; + } + const onPaint = () => { + canvas.removeEventListener("paint", onPaint); + drawAndKick(); + }; + canvas.addEventListener("paint", onPaint); + const tick = document.getElementById("__hf_de_tick"); + if (tick) { + tick.style.backgroundColor = + tick.style.backgroundColor === "rgb(0, 0, 0)" ? "rgb(1, 1, 1)" : "rgb(0, 0, 0)"; + } + setTimeout(() => { + canvas.removeEventListener("paint", onPaint); + drawAndKick(); + }, 250); + }); + }, + { w: width, h: height, q: quality, sync: syncToPaintEvent, fid: frameId }, + ); + + return { encodeResult }; +} diff --git a/packages/engine/src/services/frameCapture.ts b/packages/engine/src/services/frameCapture.ts index 7e3a96e61..442db71c1 100644 --- a/packages/engine/src/services/frameCapture.ts +++ b/packages/engine/src/services/frameCapture.ts @@ -35,6 +35,9 @@ import { captureDrawElementFrame, resolveDrawElementCaptureMode, instrumentAcceleratedCanvases, + initDrawElementWorkerEncode, + cleanupDrawElementWorkerEncode, + produceDrawElementFrame, } from "./drawElementService.js"; import { initThreeDProjection, @@ -99,6 +102,12 @@ export interface CaptureSession { isSwiftShader?: boolean; /** drawElementImage canvas was injected and is ready for capture. */ drawElementReady?: boolean; + /** + * Worker-encode pipeline is active for this session. Set by + * `initDrawElementOrTransparentBackground` when `enableDrawElementWorkerEncode` + * is true and capture mode resolved to "drawelement". + */ + workerEncodeEnabled?: boolean; } // Circular buffer for browser console messages dumped on render failure diagnostics. @@ -485,6 +494,18 @@ async function initDrawElementOrTransparentBackground( session.captureMode = "drawelement"; session.drawElementReady = true; logInitPhase("drawElement canvas injected"); + // Worker-encode pipeline: macOS hardware GPU path only (syncToPaintEvent=true, + // beginFrameTimeTicks=0). Skip for BeginFrame (Linux/Docker) and transparent + // (PNG) output — those use the existing synchronous path unchanged. + const workerEncodeEnabled = + (session.config?.enableDrawElementWorkerEncode ?? false) && + !transparent && + session.beginFrameTimeTicks === 0; + if (workerEncodeEnabled) { + await initDrawElementWorkerEncode(page); + session.workerEncodeEnabled = true; + logInitPhase("drawElement worker encode initialized"); + } } } else if (session.options.format === "png") { await initTransparentBackground(session.page); @@ -1641,6 +1662,65 @@ export async function captureFrameToBuffer( return { buffer, captureTimeMs }; } +/** + * Pipelined drawElement frame capture for the worker-encode path. + * + * Performs seek prep + paint-wait + drawElementImage + composite + + * `createImageBitmap` + transfers the bitmap to the in-page encode worker. + * Returns `encodeResult` immediately (before the worker finishes encoding). + * The caller overlaps frame N's encode with frame N+1's produce phase. + * + * Requirements: + * - `session.workerEncodeEnabled` must be true (set by initializeSession when + * `config.enableDrawElementWorkerEncode` is true and mode resolved to drawelement). + * - JPEG format only. PNG falls back to `captureFrameToBuffer`. + * - macOS hardware GPU path (syncToPaintEvent=true, beginFrameTimeTicks=0). + * BeginFrame (Linux) uses the standard synchronous path unchanged. + */ +export async function captureFrameToBufferPipelined( + session: CaptureSession, + frameIndex: number, + time: number, +): Promise<{ encodeResult: Promise; captureTimeMs: number }> { + const { page, options } = session; + const startTime = Date.now(); + + const { quantizedTime, seekMs, beforeCaptureMs } = await prepareFrameForCapture( + session, + frameIndex, + time, + ); + void quantizedTime; + + if (session.beginFrameTimeTicks > 0) { + const client = await getCdpSession(page); + await client.send("HeadlessExperimental.beginFrame", { + frameTimeTicks: session.beginFrameTimeTicks + frameIndex * session.beginFrameIntervalMs, + interval: session.beginFrameIntervalMs, + noDisplayUpdates: false, + }); + } + + const { encodeResult } = await produceDrawElementFrame( + page, + options.width, + options.height, + options.quality ?? 80, + session.beginFrameTimeTicks === 0, + ); + + const captureTimeMs = Date.now() - startTime; + + session.capturePerf.frames += 1; + session.capturePerf.seekMs += seekMs; + session.capturePerf.beforeCaptureMs += beforeCaptureMs; + // screenshotMs reflects produce time only (encode is async, not tracked here) + session.capturePerf.screenshotMs += captureTimeMs - seekMs - beforeCaptureMs; + session.capturePerf.totalMs += captureTimeMs; + + return { encodeResult, captureTimeMs }; +} + /** * Type of the "inner capture" function consumed by * {@link discardWarmupCapture}. Matches the real `captureFrameCore` signature @@ -1720,6 +1800,9 @@ export async function closeCaptureSession(session: CaptureSession): Promise, + assertNotAborted: () => void, + onProgress: CaptureStreamingStageInput["onProgress"], +): Promise { + let prev: { idx: number; encodeResult: Promise } | null = null; + + const drainPrev = async (): Promise => { + if (!prev) return; + const buf = await prev.encodeResult; + await reorderBuffer.waitForFrame(prev.idx); + ensureFrameWritten(await currentEncoder.writeFrame(buf), prev.idx); + reorderBuffer.advanceTo(prev.idx + 1); + job.framesRendered = prev.idx + 1; + updateJobStatus( + job, + "rendering", + `Streaming frame ${prev.idx + 1}/${totalFrames}`, + Math.round(25 + ((prev.idx + 1) / totalFrames) * 55), + onProgress, + ); + }; + + for (let i = 0; i < totalFrames; i++) { + assertNotAborted(); + const time = (i * job.config.fps.den) / job.config.fps.num; + const { encodeResult } = await captureFrameToBufferPipelined(session, i, time); + await drainPrev(); + prev = { idx: i, encodeResult }; + } + await drainPrev(); +} + export async function runCaptureStreamingStage( input: CaptureStreamingStageInput, ): Promise { @@ -259,29 +298,43 @@ export async function runCaptureStreamingStage( assertNotAborted(); lastBrowserConsole = session.browserConsoleBuffer; - for (let i = 0; i < totalFrames; i++) { - assertNotAborted(); - const time = (i * job.config.fps.den) / job.config.fps.num; - const { buffer } = await captureFrameToBuffer(session, i, time); - await reorderBuffer.waitForFrame(i); - ensureFrameWritten(await currentEncoder.writeFrame(buffer), i); - reorderBuffer.advanceTo(i + 1); - job.framesRendered = i + 1; - - const frameProgress = (i + 1) / totalFrames; - const progress = 25 + frameProgress * 55; - - // Keep status cadence identical to disk sequential capture; the - // capture error wrapper below must remain separate from finally so it - // can throw with the browser console before encoder cleanup runs. - // fallow-ignore-next-line code-duplication - updateJobStatus( + if (session.workerEncodeEnabled) { + // Worker-encode pipeline: depth-2. Frame N's in-page Worker encodes + // while frame N+1's main thread does seek+paint+drawElement+kick. + await runWorkerEncodePipelineLoop( + session, + totalFrames, job, - "rendering", - `Streaming frame ${i + 1}/${totalFrames}`, - Math.round(progress), + currentEncoder, + reorderBuffer, + assertNotAborted, onProgress, ); + } else { + for (let i = 0; i < totalFrames; i++) { + assertNotAborted(); + const time = (i * job.config.fps.den) / job.config.fps.num; + const { buffer } = await captureFrameToBuffer(session, i, time); + await reorderBuffer.waitForFrame(i); + ensureFrameWritten(await currentEncoder.writeFrame(buffer), i); + reorderBuffer.advanceTo(i + 1); + job.framesRendered = i + 1; + + const frameProgress = (i + 1) / totalFrames; + const progress = 25 + frameProgress * 55; + + // Keep status cadence identical to disk sequential capture; the + // capture error wrapper below must remain separate from finally so it + // can throw with the browser console before encoder cleanup runs. + // fallow-ignore-next-line code-duplication + updateJobStatus( + job, + "rendering", + `Streaming frame ${i + 1}/${totalFrames}`, + Math.round(progress), + onProgress, + ); + } } // This must mirror disk capture: catch wraps the original failure with // browser diagnostics, finally only handles cleanup. From 00e1792e11e36148ab60774bfcff1492cc4cbc37 Mon Sep 17 00:00:00 2001 From: Vance Ingalls Date: Sun, 14 Jun 2026 16:14:28 -0700 Subject: [PATCH 2/3] fix(engine,producer): harden worker-encode pipeline (review findings) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Liveness/correctness: - Worker encode failures now propagate: the in-page worker wraps encode in try/catch + null-checks getContext and posts {id,error} on failure; its onerror posts a fatal signal (id=-1). The node binding rejects the matching pending promise (or all of them on fatal) instead of leaving encodeResult pending forever — previously any worker throw hung the render to timeout. - Pipeline loop attaches a no-op catch to the orphaned in-flight encode on abort/throw so cleanup's rejection is not an unhandled promise rejection. - drainPrev checks assertNotAborted before awaiting so aborts are observed while parked on the encode wait, not one frame later. - captureFrameToBufferPipelined wraps capture in captureFrameErrorDiagnostics, restoring per-frame frame-error PNG/HTML/JSON the serial path produced. Lifecycle/efficiency: - __hfFrameReady binding tracked via a separate workerEncodeBoundPages WeakSet so a re-init after cleanup doesn't call exposeFunction twice ('already exists'). - URL.revokeObjectURL after Worker construction (was leaked per init). - Drop the unconditional setTimeout(0) around createImageBitmap (~1-4ms/frame of macrotask latency on the produce critical path; encode runs in the worker). - Reset nextId on session reuse; remove the unreachable BeginFrame branch from the pipelined path (gated to beginFrameTimeTicks===0). Base64 stays in the worker (off the main thread) by design; a binary side-channel to node is a follow-up. Gated off by default. Co-Authored-By: Claude Opus 4.8 (1M context) Co-authored-by: Miguel Ángel --- .../engine/src/services/drawElementService.ts | 132 ++++++++++++------ packages/engine/src/services/frameCapture.ts | 70 ++++++---- .../render/stages/captureStreamingStage.ts | 25 +++- 3 files changed, 147 insertions(+), 80 deletions(-) diff --git a/packages/engine/src/services/drawElementService.ts b/packages/engine/src/services/drawElementService.ts index ae2056bc7..d49d681ed 100644 --- a/packages/engine/src/services/drawElementService.ts +++ b/packages/engine/src/services/drawElementService.ts @@ -416,6 +416,12 @@ interface WorkerEncodeState { } const workerEncodeStates = new WeakMap(); +// Pages that already have the `__hfFrameReady` binding installed. The binding +// survives navigation and cannot be cleanly removed, so its lifetime is +// tracked separately from WorkerEncodeState (which is recreated per session). +// Without this, a re-init after cleanup would call exposeFunction twice and +// throw "already exists". +const workerEncodeBoundPages = new WeakSet(); /** * Initialize the in-page JPEG encode Worker for a session. Must be called @@ -431,60 +437,95 @@ export async function initDrawElementWorkerEncode(page: Page): Promise { const existing = workerEncodeStates.get(page); if (existing) { - // Session reused after navigation — reject stale pending promises. + // Session reused after navigation — reject stale pending promises and + // reset the frame-id counter so ids track the new render's frame indices. for (const entry of existing.pending.values()) { entry.reject(new Error("drawElement worker encode: session reused, frame dropped")); } existing.pending.clear(); + existing.nextId = 0; } else { - // First init for this page: register the node-side callback. const state: WorkerEncodeState = { nextId: 0, pending: new Map() }; workerEncodeStates.set(page, state); - await page.exposeFunction("__hfFrameReady", (id: number, b64: string) => { + } + + // Register the node-side callback ONCE per page. The exposeFunction binding + // survives navigation and cannot be re-added (throws "already exists"), so + // guard with workerEncodeBoundPages rather than the per-session state. The + // callback reads the CURRENT WorkerEncodeState live, so it works across + // re-inits where the state object is replaced. + if (!workerEncodeBoundPages.has(page)) { + workerEncodeBoundPages.add(page); + await page.exposeFunction("__hfFrameReady", (id: number, b64: string, error?: string) => { const s = workerEncodeStates.get(page); if (!s) return; + // id < 0 is a fatal worker signal (e.g. worker onerror): the worker is + // dead and no frame will ever come back — reject every in-flight frame + // so awaiters fail fast instead of hanging forever. + if (id < 0) { + for (const entry of s.pending.values()) { + entry.reject(new Error(`drawElement worker encode failed: ${error ?? "worker error"}`)); + } + s.pending.clear(); + return; + } const entry = s.pending.get(id); if (!entry) return; s.pending.delete(id); - entry.resolve(Buffer.from(b64, "base64")); + if (error) entry.reject(new Error(`drawElement worker encode failed: ${error}`)); + else entry.resolve(Buffer.from(b64, "base64")); }); } // Inject (or re-create) the in-page Worker after each navigation. await page.evaluate(() => { - type EncWin = Window & { __hfEncWorker?: Worker }; + type EncWin = Window & { + __hfEncWorker?: Worker; + __hfFrameReady?: (id: number, b64: string, error?: string) => void; + }; const ew = window as EncWin; if (ew.__hfEncWorker) { ew.__hfEncWorker.terminate(); ew.__hfEncWorker = undefined; } + // Base64 is done INSIDE the worker (off the main thread) so it never + // competes with the produce phase; the worker posts a string the page + // relays to node. On any encode failure the worker posts an error for that + // frame's id so the node-side promise rejects instead of hanging. const workerSrc = ` self.onmessage = async (e) => { const { bmp, id, w, h, q } = e.data; - const oc = new OffscreenCanvas(w, h); - const c = oc.getContext('2d'); - c.drawImage(bmp, 0, 0); - bmp.close(); - const blob = await oc.convertToBlob({ type: 'image/jpeg', quality: q }); - const ab = await blob.arrayBuffer(); - const u = new Uint8Array(ab); - let s = ''; const CH = 0x8000; - for (let i = 0; i < u.length; i += CH) - s += String.fromCharCode.apply(null, u.subarray(i, i + CH)); - self.postMessage({ id, b64: btoa(s) }); + try { + const oc = new OffscreenCanvas(w, h); + const c = oc.getContext('2d'); + if (!c) throw new Error('OffscreenCanvas 2d context unavailable'); + c.drawImage(bmp, 0, 0); + bmp.close(); + const blob = await oc.convertToBlob({ type: 'image/jpeg', quality: q }); + const ab = await blob.arrayBuffer(); + const u = new Uint8Array(ab); + let s = ''; const CH = 0x8000; + for (let i = 0; i < u.length; i += CH) + s += String.fromCharCode.apply(null, u.subarray(i, i + CH)); + self.postMessage({ id, b64: btoa(s) }); + } catch (err) { + try { if (bmp) bmp.close(); } catch (_) {} + self.postMessage({ id, error: (err && err.message) || String(err) }); + } }; `; const url = URL.createObjectURL(new Blob([workerSrc], { type: "text/javascript" })); - ew.__hfEncWorker = new Worker(url); - ew.__hfEncWorker.onmessage = (ev: MessageEvent) => { - const { id, b64 } = ev.data as { id: number; b64: string }; - (window as EncWin & { __hfFrameReady?: (id: number, b64: string) => void }).__hfFrameReady?.( - id, - b64, - ); + const worker = new Worker(url); + URL.revokeObjectURL(url); // only needed for Worker construction + ew.__hfEncWorker = worker; + worker.onmessage = (ev: MessageEvent) => { + const d = ev.data as { id: number; b64?: string; error?: string }; + ew.__hfFrameReady?.(d.id, d.b64 ?? "", d.error); }; - ew.__hfEncWorker.onerror = (err: ErrorEvent) => { - console.error("[hfEncWorker] unhandled error:", err.message); + worker.onerror = (err: ErrorEvent) => { + // Fatal, not tied to a frame id — signal node (id = -1) to reject all + // in-flight frames so the pipeline fails fast instead of hanging. + ew.__hfFrameReady?.(-1, "", err.message || "worker fatal error"); }; }); } @@ -614,25 +655,28 @@ export async function produceDrawElementFrame( rejectCapture(e instanceof Error ? e : new Error(String(e))); return; } - // Snapshot canvas and hand off to encode worker. createImageBitmap - // captures a pixel snapshot at this point; after .then resolves, the - // canvas is safe to overwrite for the next frame. - setTimeout(() => { - createImageBitmap(canvas) - .then((bmp) => { - type EncWin = Window & { __hfEncWorker?: Worker }; - const ew = window as EncWin; - if (!ew.__hfEncWorker) { - rejectCapture(new Error("drawElement: encode worker not initialized")); - return; - } - ew.__hfEncWorker.postMessage({ bmp, id: fid, w, h, q: q / 100 }, [bmp]); - resolveCapture(); - }) - .catch((e: unknown) => { - rejectCapture(e instanceof Error ? e : new Error(String(e))); - }); - }, 0); + // Snapshot the canvas and hand off to the encode worker. + // createImageBitmap is async (does not block the paint handler) and + // captures a pixel snapshot now; resolveCapture only fires after the + // bitmap is transferred, so the canvas is safe to overwrite for the + // next frame once this evaluate's promise resolves. No setTimeout(0) + // wrapper — it added ~1-4ms/frame of macrotask latency on the produce + // critical path for no benefit (the heavy encode runs in the worker, + // not the paint handler). + createImageBitmap(canvas) + .then((bmp) => { + type EncWin = Window & { __hfEncWorker?: Worker }; + const ew = window as EncWin; + if (!ew.__hfEncWorker) { + rejectCapture(new Error("drawElement: encode worker not initialized")); + return; + } + ew.__hfEncWorker.postMessage({ bmp, id: fid, w, h, q: q / 100 }, [bmp]); + resolveCapture(); + }) + .catch((e: unknown) => { + rejectCapture(e instanceof Error ? e : new Error(String(e))); + }); }; if (!sync) { diff --git a/packages/engine/src/services/frameCapture.ts b/packages/engine/src/services/frameCapture.ts index 442db71c1..4b0d24dc0 100644 --- a/packages/engine/src/services/frameCapture.ts +++ b/packages/engine/src/services/frameCapture.ts @@ -1685,40 +1685,50 @@ export async function captureFrameToBufferPipelined( const { page, options } = session; const startTime = Date.now(); - const { quantizedTime, seekMs, beforeCaptureMs } = await prepareFrameForCapture( - session, - frameIndex, - time, - ); - void quantizedTime; - - if (session.beginFrameTimeTicks > 0) { - const client = await getCdpSession(page); - await client.send("HeadlessExperimental.beginFrame", { - frameTimeTicks: session.beginFrameTimeTicks + frameIndex * session.beginFrameIntervalMs, - interval: session.beginFrameIntervalMs, - noDisplayUpdates: false, - }); - } + try { + const { quantizedTime, seekMs, beforeCaptureMs } = await prepareFrameForCapture( + session, + frameIndex, + time, + ); + void quantizedTime; - const { encodeResult } = await produceDrawElementFrame( - page, - options.width, - options.height, - options.quality ?? 80, - session.beginFrameTimeTicks === 0, - ); + // Worker-encode is gated to the macOS GPU path (beginFrameTimeTicks === 0, + // syncToPaintEvent = true); see initDrawElementOrTransparentBackground. The + // BeginFrame branch present in the synchronous captureFrameCore is therefore + // unreachable here and intentionally omitted. + const { encodeResult } = await produceDrawElementFrame( + page, + options.width, + options.height, + options.quality ?? 80, + true, + ); - const captureTimeMs = Date.now() - startTime; + const captureTimeMs = Date.now() - startTime; - session.capturePerf.frames += 1; - session.capturePerf.seekMs += seekMs; - session.capturePerf.beforeCaptureMs += beforeCaptureMs; - // screenshotMs reflects produce time only (encode is async, not tracked here) - session.capturePerf.screenshotMs += captureTimeMs - seekMs - beforeCaptureMs; - session.capturePerf.totalMs += captureTimeMs; + session.capturePerf.frames += 1; + session.capturePerf.seekMs += seekMs; + session.capturePerf.beforeCaptureMs += beforeCaptureMs; + // screenshotMs reflects produce time only (encode is async, not tracked here) + session.capturePerf.screenshotMs += captureTimeMs - seekMs - beforeCaptureMs; + session.capturePerf.totalMs += captureTimeMs; - return { encodeResult, captureTimeMs }; + return { encodeResult, captureTimeMs }; + } catch (captureError) { + // Mirror captureFrameCore: capture per-frame diagnostics (frame-error + // PNG/HTML/JSON + console tail) before rethrowing so pipelined-path + // failures are debuggable like the serial path. + if (session.isInitialized) { + await captureFrameErrorDiagnostics( + session, + frameIndex, + time, + captureError instanceof Error ? captureError : new Error(String(captureError)), + ); + } + throw captureError; + } } /** diff --git a/packages/producer/src/services/render/stages/captureStreamingStage.ts b/packages/producer/src/services/render/stages/captureStreamingStage.ts index 68539ca16..718d3d9b8 100644 --- a/packages/producer/src/services/render/stages/captureStreamingStage.ts +++ b/packages/producer/src/services/render/stages/captureStreamingStage.ts @@ -138,6 +138,10 @@ async function runWorkerEncodePipelineLoop( const drainPrev = async (): Promise => { if (!prev) return; + // Observe aborts while parked here (the encode wait + ffmpeg write are the + // longest stretch of the loop); without this an abort isn't seen until the + // next produce iteration. + assertNotAborted(); const buf = await prev.encodeResult; await reorderBuffer.waitForFrame(prev.idx); ensureFrameWritten(await currentEncoder.writeFrame(buf), prev.idx); @@ -152,14 +156,23 @@ async function runWorkerEncodePipelineLoop( ); }; - for (let i = 0; i < totalFrames; i++) { - assertNotAborted(); - const time = (i * job.config.fps.den) / job.config.fps.num; - const { encodeResult } = await captureFrameToBufferPipelined(session, i, time); + try { + for (let i = 0; i < totalFrames; i++) { + assertNotAborted(); + const time = (i * job.config.fps.den) / job.config.fps.num; + const { encodeResult } = await captureFrameToBufferPipelined(session, i, time); + await drainPrev(); + prev = { idx: i, encodeResult }; + } await drainPrev(); - prev = { idx: i, encodeResult }; + } catch (err) { + // On abort/throw the previously-produced frame's encode is still in flight + // and never awaited; closeCaptureSession → cleanupDrawElementWorkerEncode + // will reject it. Attach a no-op catch so that rejection is not an + // unhandled promise rejection (which can crash the producer worker). + if (prev) prev.encodeResult.catch(() => {}); + throw err; } - await drainPrev(); } export async function runCaptureStreamingStage( From 52fd9c2c9f921e2c589b55b389594fc7c52f5ae5 Mon Sep 17 00:00:00 2001 From: Vance Ingalls Date: Sun, 14 Jun 2026 19:20:59 -0700 Subject: [PATCH 3/3] fix(engine,producer): close worker-encode orphan/hang/corruption gaps (re-review) Re-review found the prior orphan-rejection fix attached to the wrong promise. - Unhandled-rejection guard moved to source: produceDrawElementFrame attaches a no-op .catch to every encodeResult at creation, covering the depth-2 loop's orphaned in-flight frame. Removed the ineffective loop-level prev.catch. - Per-frame encode watchdog (30s): a lost worker message no longer hangs the render to the protocol timeout (onerror->id=-1 only covered crashes). - Empty-frame guard: payload-less worker success rejects instead of resolving a 0-byte Buffer ffmpeg would write as a corrupt frame. - Worker reuses one OffscreenCanvas across frames (was per-frame alloc). - Close the ImageBitmap on the worker-missing reject path (GPU leak). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../engine/src/services/drawElementService.ts | 50 +++++++++++++++++-- .../render/stages/captureStreamingStage.ts | 26 ++++------ 2 files changed, 56 insertions(+), 20 deletions(-) diff --git a/packages/engine/src/services/drawElementService.ts b/packages/engine/src/services/drawElementService.ts index d49d681ed..0fff1c1b3 100644 --- a/packages/engine/src/services/drawElementService.ts +++ b/packages/engine/src/services/drawElementService.ts @@ -472,8 +472,15 @@ export async function initDrawElementWorkerEncode(page: Page): Promise { const entry = s.pending.get(id); if (!entry) return; s.pending.delete(id); - if (error) entry.reject(new Error(`drawElement worker encode failed: ${error}`)); - else entry.resolve(Buffer.from(b64, "base64")); + if (error) { + entry.reject(new Error(`drawElement worker encode failed: ${error}`)); + } else if (!b64) { + // A success message with no payload would otherwise resolve a 0-byte + // Buffer and ffmpeg would write a corrupt/empty frame silently. Fail loud. + entry.reject(new Error(`drawElement worker encode returned empty frame (frame ${id})`)); + } else { + entry.resolve(Buffer.from(b64, "base64")); + } }); } @@ -493,11 +500,17 @@ export async function initDrawElementWorkerEncode(page: Page): Promise { // relays to node. On any encode failure the worker posts an error for that // frame's id so the node-side promise rejects instead of hanging. const workerSrc = ` + // Reuse one OffscreenCanvas across frames (dimensions are constant for a + // render) — a fresh canvas per frame churns ~w*h*4 bytes of backing store + // every frame and pressures GC on the encode hot path. + let oc = null, c = null; self.onmessage = async (e) => { const { bmp, id, w, h, q } = e.data; try { - const oc = new OffscreenCanvas(w, h); - const c = oc.getContext('2d'); + if (!oc || oc.width !== w || oc.height !== h) { + oc = new OffscreenCanvas(w, h); + c = oc.getContext('2d'); + } if (!c) throw new Error('OffscreenCanvas 2d context unavailable'); c.drawImage(bmp, 0, 0); bmp.close(); @@ -574,8 +587,34 @@ export async function produceDrawElementFrame( const frameId = ++state.nextId; const encodeResult = new Promise((resolve, reject) => { - state.pending.set(frameId, { resolve, reject }); + // Watchdog: worker.onerror (→ id=-1, reject-all) covers worker CRASHES, but + // a lost message (page navigation, OOM-killed worker with no ErrorEvent, a + // dropped postMessage) would never settle this promise — `drainPrev`'s + // `await encodeResult` would then hang the whole render to the protocol + // timeout. Bound it so the render fails with a clear error. Generous vs the + // ~10ms encode to avoid false positives on large frames. + const timer = setTimeout(() => { + if (state.pending.delete(frameId)) { + reject(new Error(`drawElement worker encode timed out (frame ${frameId})`)); + } + }, 30_000); + state.pending.set(frameId, { + resolve: (b) => { + clearTimeout(timer); + resolve(b); + }, + reject: (e) => { + clearTimeout(timer); + reject(e); + }, + }); }); + // Guard against an unhandled rejection if the caller never awaits this promise + // (the depth-2 pipeline loop orphans the just-produced frame's encode when an + // earlier frame's drain throws or the render aborts). The loop's own + // `await encodeResult` still observes rejections on its separate reaction + // chain; this only suppresses the no-awaiter case. + void encodeResult.catch(() => {}); // Do paint-wait + drawElement composite + createImageBitmap + postMessage. // Resolves as soon as the bitmap is transferred (not when encode is done). @@ -668,6 +707,7 @@ export async function produceDrawElementFrame( type EncWin = Window & { __hfEncWorker?: Worker }; const ew = window as EncWin; if (!ew.__hfEncWorker) { + bmp.close(); // don't leak the GPU-backed ImageBitmap on this reject path rejectCapture(new Error("drawElement: encode worker not initialized")); return; } diff --git a/packages/producer/src/services/render/stages/captureStreamingStage.ts b/packages/producer/src/services/render/stages/captureStreamingStage.ts index 718d3d9b8..81585f83c 100644 --- a/packages/producer/src/services/render/stages/captureStreamingStage.ts +++ b/packages/producer/src/services/render/stages/captureStreamingStage.ts @@ -156,23 +156,19 @@ async function runWorkerEncodePipelineLoop( ); }; - try { - for (let i = 0; i < totalFrames; i++) { - assertNotAborted(); - const time = (i * job.config.fps.den) / job.config.fps.num; - const { encodeResult } = await captureFrameToBufferPipelined(session, i, time); - await drainPrev(); - prev = { idx: i, encodeResult }; - } + // On abort/throw the just-produced frame's encode is still in flight and never + // awaited (it isn't `prev` yet); cleanupDrawElementWorkerEncode rejects it on + // close. produceDrawElementFrame attaches a no-op catch to every encodeResult + // at creation so that orphaned rejection is never an unhandled rejection — so + // the loop needs no special guard here. + for (let i = 0; i < totalFrames; i++) { + assertNotAborted(); + const time = (i * job.config.fps.den) / job.config.fps.num; + const { encodeResult } = await captureFrameToBufferPipelined(session, i, time); await drainPrev(); - } catch (err) { - // On abort/throw the previously-produced frame's encode is still in flight - // and never awaited; closeCaptureSession → cleanupDrawElementWorkerEncode - // will reject it. Attach a no-op catch so that rejection is not an - // unhandled promise rejection (which can crash the producer worker). - if (prev) prev.encodeResult.catch(() => {}); - throw err; + prev = { idx: i, encodeResult }; } + await drainPrev(); } export async function runCaptureStreamingStage(