From ede5cde37e97c622d384d8ebecd54dbd2274f677 Mon Sep 17 00:00:00 2001 From: Vance Ingalls Date: Tue, 12 May 2026 21:17:30 +0000 Subject: [PATCH] feat(producer): add pngDecodeBlitWorkerPool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit hf#732 lever-4. Adds a `worker_threads`-based pool that offloads PNG decode + alpha-blit onto a fixed-size pool, mirroring the existing shader-blend worker pool. No production wiring yet — the pool stands alone and ships behind a later PR in the stack. Why a separate pool: PNG decode (zlib inflate) plus 16bpc alpha-blit both cost real wall-time on every layered frame. Doing them inline on the main event loop forces all DOM workers to converge on one Node thread for compositing, which is the next bottleneck after the shader-blend dispatch was fixed in this stack's PR-3. Pieces: * `pngDecodeBlitWorker.ts` — the worker entry. Imports `decodePng` + `blitRgba8OverRgb48le` from `@hyperframes/engine/alpha-blit` (the subpath added here on the engine package). The worker file has zero internal cross-package imports, so it survives the `new Worker()` loader boundary without dragging in the producer's module graph. * `pngDecodeBlitWorkerPool.ts` — fixed-size pool with `run()` API returning a Promise. Uses `transferList` for buffer-of-ownership semantics so we never serialize the 16bpc HDR frame buffer; the worker decode owns the input, the main thread owns the blitted output. * `pngDecodeBlitWorkerPool.test.ts` — 6 vitest tests pinning byte- equivalence with the inline path, transferList correctness across the 8KB Node pool threshold, and concurrent dispatch / termination semantics. All pass. Build wiring: * `packages/cli/tsup.config.ts`: second tsup entry emits `dist/pngDecodeBlitWorker.js` alongside `dist/cli.js`. The pool's resolver probes for that file next to its loaded module. Without this entry the pool would crash or silently fall back to inline decode/blit at runtime in the shipped CLI, killing the perf gain. * `packages/producer/build.mjs`: third esbuild entry emits the worker as `dist/services/pngDecodeBlitWorker.js` for direct producer consumers. Adds the `@hyperframes/engine/alpha-blit` workspace alias to the existing `workspaceAliasPlugin` so both builds resolve the import the same way. * `packages/engine/package.json`: adds `./alpha-blit` subpath export pointing at `src/utils/alphaBlit.ts`. The file is already import-free (only `zlib`) so the worker survives the loader boundary directly via this TS source. No behavior change. PR 2 of 5 in the hf#732 decomposition stack; stacked on top of #PR1 (worker-count cap bump). -- Vai Co-Authored-By: Vai --- packages/cli/tsup.config.ts | 18 +- packages/engine/package.json | 3 +- packages/producer/build.mjs | 19 + .../src/services/pngDecodeBlitWorker.ts | 122 +++++ .../services/pngDecodeBlitWorkerPool.test.ts | 324 +++++++++++++ .../src/services/pngDecodeBlitWorkerPool.ts | 425 ++++++++++++++++++ 6 files changed, 909 insertions(+), 2 deletions(-) create mode 100644 packages/producer/src/services/pngDecodeBlitWorker.ts create mode 100644 packages/producer/src/services/pngDecodeBlitWorkerPool.test.ts create mode 100644 packages/producer/src/services/pngDecodeBlitWorkerPool.ts diff --git a/packages/cli/tsup.config.ts b/packages/cli/tsup.config.ts index c0c9d614e..ce1db4d61 100644 --- a/packages/cli/tsup.config.ts +++ b/packages/cli/tsup.config.ts @@ -7,7 +7,19 @@ const pkg = JSON.parse(readFileSync(new URL("./package.json", import.meta.url), }; export default defineConfig({ - entry: ["src/cli.ts"], + // hf#732 lever-4: emit BOTH the CLI bundle and the PNG decode + alpha-blit + // worker entry. The producer's `pngDecodeBlitWorkerPool` instantiates a + // Node `worker_threads` Worker via `new Worker()`, which is a + // filesystem load — it cannot share the parent module graph. The pool's + // path resolver probes for `pngDecodeBlitWorker.js` next to its own loaded + // module (which lives inside `dist/cli.js` after the producer is + // `noExternal`'d and bundled in). Without this entry the file would not + // exist at runtime and the pool would either crash or silently fall back + // to inline decode/blit, killing the perf gain. + entry: { + cli: "src/cli.ts", + pngDecodeBlitWorker: "../producer/src/services/pngDecodeBlitWorker.ts", + }, format: ["esm"], outDir: "dist", target: "node22", @@ -56,6 +68,10 @@ var __dirname = __hf_dirname(__filename);`, esbuildOptions(options) { options.alias = { "@hyperframes/producer": resolve(__dirname, "../producer/src/index.ts"), + // hf#732 lever-4: alias for the PNG decode+blit worker's import. + // `alphaBlit.ts` is import-free (only zlib) so the worker survives + // the worker_thread loader boundary directly via this TS source. + "@hyperframes/engine/alpha-blit": resolve(__dirname, "../engine/src/utils/alphaBlit.ts"), }; options.loader = { ...options.loader, ".browser.js": "text" }; }, diff --git a/packages/engine/package.json b/packages/engine/package.json index 9bae85768..dca7ccaed 100644 --- a/packages/engine/package.json +++ b/packages/engine/package.json @@ -11,7 +11,8 @@ "main": "./src/index.ts", "types": "./src/index.ts", "exports": { - ".": "./src/index.ts" + ".": "./src/index.ts", + "./alpha-blit": "./src/utils/alphaBlit.ts" }, "scripts": { "build": "tsc", diff --git a/packages/producer/build.mjs b/packages/producer/build.mjs index 8d1bcfede..f9ecf89ec 100644 --- a/packages/producer/build.mjs +++ b/packages/producer/build.mjs @@ -21,6 +21,9 @@ const workspaceAliasPlugin = { build.onResolve({ filter: /^@hyperframes\/engine$/ }, () => ({ path: resolve(scriptDir, "../engine/src/index.ts"), })); + build.onResolve({ filter: /^@hyperframes\/engine\/alpha-blit$/ }, () => ({ + path: resolve(scriptDir, "../engine/src/utils/alphaBlit.ts"), + })); build.onResolve({ filter: /^@hyperframes\/core$/ }, () => ({ path: resolve(scriptDir, "../core/src/index.ts"), })); @@ -55,6 +58,22 @@ await Promise.all([ entryPoints: ["src/server.ts"], outfile: "dist/public-server.js", }), + // PNG decode + alpha-blit worker (hf#732 lever-4). Loaded by + // `pngDecodeBlitWorkerPool.createPngDecodeBlitWorkerPool` via + // `new Worker()`. Must be a separate entry point so the worker + // module is standalone and shares no parent module-graph state. + build({ + bundle: true, + platform: "node", + target: "node22", + format: "esm", + external: ["puppeteer", "esbuild", "postcss"], + plugins: [workspaceAliasPlugin], + minify: false, + sourcemap: true, + entryPoints: ["src/services/pngDecodeBlitWorker.ts"], + outfile: "dist/services/pngDecodeBlitWorker.js", + }), ]); // Copy core runtime artifacts so the producer can find them at dist/ diff --git a/packages/producer/src/services/pngDecodeBlitWorker.ts b/packages/producer/src/services/pngDecodeBlitWorker.ts new file mode 100644 index 000000000..41250a449 --- /dev/null +++ b/packages/producer/src/services/pngDecodeBlitWorker.ts @@ -0,0 +1,122 @@ +/** + * Worker entry point for off-main-thread PNG decode + alpha blit. The + * companion to `pngDecodeBlitWorkerPool.ts`. See that file for the rationale + * (hf#732 lever-4: overlap Chrome's screenshot with Node's decode+blit). + * + * Lifecycle: + * + * 1. Pool constructor spawns N of these workers up front. + * 2. Main thread posts `{ png, pngOffset, pngLength, dest, destOffset, + * destLength, width, height, transfer }` with `transferList: [png, dest]`. + * Both underlying ArrayBuffers are detached on the sender; the caller + * must NOT touch them until the worker replies. + * 3. Worker wraps each ArrayBuffer as a Node Buffer view (zero-copy), + * runs `decodePng` to get an RGBA8 Uint8Array, then `blitRgba8OverRgb48le` + * to composite the decoded pixels onto the rgb48le `dest` buffer in + * the requested transfer space. + * 4. Worker posts `{ ok, png, dest, decodeMs, blitMs }` back with + * `transferList: [png, dest]`. Both ArrayBuffers return to the main + * thread; the caller swaps `result.dest` into its render state. + * 5. On decode/blit exception, worker posts `{ ok: false, error, png, + * dest }` — both ArrayBuffers still returned so the caller can release + * them. + * + * The worker holds no per-frame state. The intermediate RGBA8 decode + * buffer is allocated per-call and dropped on the worker side. + * + * Import strategy: identical to `shaderTransitionWorker.ts` — use the + * `./alpha-blit` subpath export of `@hyperframes/engine` rather than the + * package root, because the root pulls in the full engine graph and the + * tsx loader's `.js → .ts` rewrite does not survive the Worker boundary + * under dev/test. + */ + +import { parentPort } from "node:worker_threads"; +import { decodePng, blitRgba8OverRgb48le } from "@hyperframes/engine/alpha-blit"; + +interface DecodeBlitJobRequest { + png: ArrayBuffer; + pngOffset: number; + pngLength: number; + dest: ArrayBuffer; + destOffset: number; + destLength: number; + width: number; + height: number; + transfer: string; +} + +interface DecodeBlitJobOk { + ok: true; + png: ArrayBuffer; + dest: ArrayBuffer; + decodeMs: number; + blitMs: number; +} + +interface DecodeBlitJobErr { + ok: false; + error: string; + png: ArrayBuffer; + dest: ArrayBuffer; +} + +export type DecodeBlitJobResult = DecodeBlitJobOk | DecodeBlitJobErr; + +if (!parentPort) { + // Defensive — this module is only meaningful inside a worker_thread. + // eslint-disable-next-line no-console + console.warn("[pngDecodeBlitWorker] no parentPort; module loaded on main thread"); +} else { + parentPort.on("message", (msg: DecodeBlitJobRequest) => { + const { png, pngOffset, pngLength, dest, destOffset, destLength, width, height, transfer } = + msg; + // Re-wrap the transferred ArrayBuffers as Node Buffer views. The + // dispatcher in the pool normalizes inputs to offset-0 ArrayBuffers + // before transfer (avoiding the 8KB shared-pool DataCloneError), so + // pngOffset / destOffset are 0 and pngLength / destLength match the + // backing ArrayBuffer byteLength in practice. We still honor the + // forwarded values so the worker is robust if the dispatcher ever + // changes (e.g. ships a slice over a larger transferred ArrayBuffer). + const pngBuf = Buffer.from(png, pngOffset, pngLength); + const destBuf = Buffer.from(dest, destOffset, destLength); + + try { + const decodeStart = Date.now(); + const { data: rgba } = decodePng(pngBuf); + const decodeMs = Date.now() - decodeStart; + + const blitStart = Date.now(); + // `blitRgba8OverRgb48le` accepts the CompositeTransfer string as a + // typed union. The pool's `transfer` field is `string` for transport + // simplicity; the actual values flow through unchanged from the + // orchestrator's HdrCompositeContext and the function validates at + // its own boundary. + blitRgba8OverRgb48le( + rgba, + destBuf, + width, + height, + transfer as Parameters[4], + ); + const blitMs = Date.now() - blitStart; + + const reply: DecodeBlitJobOk = { + ok: true, + png, + dest, + decodeMs, + blitMs, + }; + parentPort!.postMessage(reply, [png, dest]); + } catch (err) { + const reply: DecodeBlitJobErr = { + ok: false, + error: err instanceof Error ? err.message : String(err), + png, + dest, + }; + parentPort!.postMessage(reply, [png, dest]); + } + }); +} diff --git a/packages/producer/src/services/pngDecodeBlitWorkerPool.test.ts b/packages/producer/src/services/pngDecodeBlitWorkerPool.test.ts new file mode 100644 index 000000000..ebe5500ca --- /dev/null +++ b/packages/producer/src/services/pngDecodeBlitWorkerPool.test.ts @@ -0,0 +1,324 @@ +/** + * Tests for the hf#732 lever-4 PNG decode + alpha-blit worker pool. Like the + * shader-blend pool tests next door, these are correctness-critical: a + * regression either corrupts every composited DOM layer or leaks worker + * handles. Tests pin three properties: + * + * 1. Byte-equivalence with the inline path. The worker calls the exact + * same `decodePng` + `blitRgba8OverRgb48le` the inline path uses, so + * the round-trip must reproduce the inline result to the last byte. + * 2. Buffer-transfer correctness across the 8KB Node pool threshold. + * The pool's dispatcher must NOT throw `DataCloneError` for inputs + * that happen to live in the shared 8KB pool (small PNGs, etc.). + * 3. Concurrent dispatch + pipelining. N concurrent `run` calls + * against a pool sized to N all complete with correct output. The + * pipelining test asserts that decode/blit of frame N overlaps the + * kickoff of frame N+1's "capture" (simulated by a deferred Promise), + * proving the pool isn't accidentally serializing. + * + * The pool's clean-shutdown path is also exercised so a test failure + * doesn't leak handles into other tests. + */ + +import { afterEach, describe, expect, it } from "vitest"; +import { deflateSync } from "zlib"; +import { fileURLToPath } from "node:url"; +import { dirname, resolve } from "node:path"; +import { decodePng, blitRgba8OverRgb48le } from "@hyperframes/engine/alpha-blit"; +import { + createPngDecodeBlitWorkerPool, + type PngDecodeBlitWorkerPool, +} from "./pngDecodeBlitWorkerPool.js"; + +const W = 16; +const H = 8; +const RGB48_BYTES = W * H * 6; + +/** + * Synthesize a minimal RGBA8 PNG with a uniform color. Skips CRC32 + * because `decodePng` does not verify checksums. Produces an output that's + * intentionally tiny (well under 8KB) so the test exercises the + * shared-pool-detection path in the dispatcher. + */ +function makeUniformRgbaPng( + w: number, + h: number, + r: number, + g: number, + b: number, + a: number, +): Buffer { + const sig = Buffer.from([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]); + const ihdr = Buffer.alloc(13); + ihdr.writeUInt32BE(w, 0); + ihdr.writeUInt32BE(h, 4); + ihdr[8] = 8; // 8-bit depth + ihdr[9] = 6; // RGBA + ihdr[10] = 0; // deflate + ihdr[11] = 0; // no filter + ihdr[12] = 0; // non-interlaced + + const rows: Buffer[] = []; + for (let y = 0; y < h; y++) { + rows.push(Buffer.from([0])); // PNG row filter byte (None) + for (let x = 0; x < w; x++) { + rows.push(Buffer.from([r, g, b, a])); + } + } + const idatData = deflateSync(Buffer.concat(rows)); + + function chunk(type: string, data: Buffer): Buffer { + const len = Buffer.alloc(4); + len.writeUInt32BE(data.length, 0); + const typeBuf = Buffer.from(type, "ascii"); + const crc = Buffer.alloc(4); // skipped by decoder + return Buffer.concat([len, typeBuf, data, crc]); + } + + return Buffer.concat([ + sig, + chunk("IHDR", ihdr), + chunk("IDAT", idatData), + chunk("IEND", Buffer.alloc(0)), + ]); +} + +describe("PngDecodeBlitWorkerPool", () => { + const pools: PngDecodeBlitWorkerPool[] = []; + + afterEach(async () => { + while (pools.length > 0) { + const p = pools.pop(); + if (p) await p.terminate(); + } + }); + + async function makePool(size: number): Promise { + const p = await createPngDecodeBlitWorkerPool({ size }); + pools.push(p); + return p; + } + + it("produces byte-identical output to the inline decode+blit path", async () => { + const pool = await makePool(1); + const png = makeUniformRgbaPng(W, H, 255, 0, 0, 255); + + // Pool path: blit onto a zero-filled rgb48le canvas. + const poolDest = Buffer.alloc(RGB48_BYTES); + const poolResult = await pool.run({ + png: Buffer.from(png), // independent copy — the input may be detached + dest: poolDest, + width: W, + height: H, + transfer: "srgb", + }); + + // Reference: inline decode+blit onto a separately allocated canvas. + const refDest = Buffer.alloc(RGB48_BYTES); + const { data: refRgba } = decodePng(png); + blitRgba8OverRgb48le(refRgba, refDest, W, H, "srgb"); + + expect(Buffer.compare(poolResult.dest, refDest)).toBe(0); + }); + + it("composites OVER existing rgb48le content (alpha blend semantics)", async () => { + const pool = await makePool(1); + + // Half-transparent red over a green background. + const png = makeUniformRgbaPng(W, H, 255, 0, 0, 128); + + const poolDest = Buffer.alloc(RGB48_BYTES); + for (let i = 0; i < W * H; i++) { + const off = i * 6; + poolDest.writeUInt16LE(0, off); // R + poolDest.writeUInt16LE(60000, off + 2); // G + poolDest.writeUInt16LE(0, off + 4); // B + } + + const poolResult = await pool.run({ + png: Buffer.from(png), + dest: poolDest, + width: W, + height: H, + transfer: "srgb", + }); + + // Inline reference applies the same blend to its own copy of the + // green background. + const refDest = Buffer.alloc(RGB48_BYTES); + for (let i = 0; i < W * H; i++) { + const off = i * 6; + refDest.writeUInt16LE(0, off); + refDest.writeUInt16LE(60000, off + 2); + refDest.writeUInt16LE(0, off + 4); + } + const { data: refRgba } = decodePng(png); + blitRgba8OverRgb48le(refRgba, refDest, W, H, "srgb"); + + expect(Buffer.compare(poolResult.dest, refDest)).toBe(0); + }); + + it("survives small PNGs that would otherwise hit the Node 8KB shared pool", async () => { + // A 16×8 RGBA PNG compresses to a few hundred bytes — comfortably + // under the 8KB Buffer pool threshold. If the dispatcher fails to + // copy-out before postMessage, this throws DataCloneError. + const pool = await makePool(1); + const png = makeUniformRgbaPng(W, H, 50, 100, 200, 255); + expect(png.byteLength).toBeLessThan(8192); + + const result = await pool.run({ + png, + dest: Buffer.alloc(RGB48_BYTES), + width: W, + height: H, + transfer: "srgb", + }); + expect(result.dest.byteLength).toBe(RGB48_BYTES); + }); + + it("runs N concurrent decode+blit tasks against an N-wide pool", async () => { + const pool = await makePool(4); + const png = makeUniformRgbaPng(W, H, 200, 100, 50, 255); + + const tasks = Array.from({ length: 4 }, () => + pool.run({ + png: Buffer.from(png), + dest: Buffer.alloc(RGB48_BYTES), + width: W, + height: H, + transfer: "srgb", + }), + ); + const results = await Promise.all(tasks); + + // All 4 results must match each other (same input) AND match the + // inline reference. + const refDest = Buffer.alloc(RGB48_BYTES); + const { data: refRgba } = decodePng(png); + blitRgba8OverRgb48le(refRgba, refDest, W, H, "srgb"); + for (const r of results) { + expect(Buffer.compare(r.dest, refDest)).toBe(0); + } + }); + + it("permits frame N+1 capture to proceed while frame N decode+blit is in flight", async () => { + // Pipelining proof: kick off a "decode+blit" task that we can hold by + // virtue of the worker actually running, then concurrently kick off a + // simulated next-frame capture (a Promise that resolves immediately). + // The simulated capture must resolve BEFORE the decode+blit, proving + // the decode+blit isn't blocking the main thread. + const pool = await makePool(2); + const png = makeUniformRgbaPng(W, H, 50, 100, 200, 255); + + // Sentinel timestamps to verify overlap. + const captureStarted: number[] = []; + const captureDone: number[] = []; + const decodeBlitStarted: number[] = []; + const decodeBlitDone: number[] = []; + + decodeBlitStarted.push(Date.now()); + const decodeBlitPromise = pool + .run({ + png: Buffer.from(png), + dest: Buffer.alloc(RGB48_BYTES), + width: W, + height: H, + transfer: "srgb", + }) + .then((r) => { + decodeBlitDone.push(Date.now()); + return r; + }); + + // Simulated next-frame CDP capture — a microtask-y await that yields. + // On a non-pipelined (synchronous-inline) path this would have to wait + // for the decode+blit, but the pool dispatches to a worker thread so + // the main thread is free to start the next "capture" immediately. + captureStarted.push(Date.now()); + await new Promise((resolve) => setImmediate(resolve)); + captureDone.push(Date.now()); + + await decodeBlitPromise; + + expect(captureDone[0]).toBeDefined(); + expect(decodeBlitDone[0]).toBeDefined(); + // The simulated capture must have completed before the decode+blit — + // proof that the pool is NOT blocking the main thread. + expect(captureDone[0]!).toBeLessThanOrEqual(decodeBlitDone[0]!); + }); + + it("spawns from an explicit workerEntryPath, bypassing the import.meta.url resolver", async () => { + // Regression for the hf#677 bundled-CLI bug: when the pool is inlined + // into a separate bundle (e.g. cli.js), `import.meta.url` resolves to + // the bundle's path rather than the bundled worker's emitted path, and + // the sibling-probe fallback computes a path the worker file does not + // live at. The explicit `workerEntryPath` plumbed by the call site + // bypasses the heuristic entirely. + const here = dirname(fileURLToPath(import.meta.url)); + const explicitPath = resolve(here, "pngDecodeBlitWorker.ts"); + const pool = await createPngDecodeBlitWorkerPool({ + size: 1, + workerEntryPath: explicitPath, + }); + pools.push(pool); + + const png = makeUniformRgbaPng(W, H, 64, 128, 192, 255); + const dest = Buffer.alloc(RGB48_BYTES); + const result = await pool.run({ + png: Buffer.from(png), + dest, + width: W, + height: H, + transfer: "srgb", + }); + + // Compare to inline reference to confirm the explicit-path spawn actually + // ran real work (not just spawned and crashed silently). + const refDest = Buffer.alloc(RGB48_BYTES); + const { data: refRgba } = decodePng(png); + blitRgba8OverRgb48le(refRgba, refDest, W, H, "srgb"); + expect(Buffer.compare(result.dest, refDest)).toBe(0); + }); + + it("terminates cleanly even with queued tasks", async () => { + const pool = await makePool(1); + const png = makeUniformRgbaPng(W, H, 1, 2, 3, 255); + + // Two tasks: the first dispatches, the second queues. + const p1 = pool.run({ + png: Buffer.from(png), + dest: Buffer.alloc(RGB48_BYTES), + width: W, + height: H, + transfer: "srgb", + }); + const p2 = pool.run({ + png: Buffer.from(png), + dest: Buffer.alloc(RGB48_BYTES), + width: W, + height: H, + transfer: "srgb", + }); + + await pool.terminate(); + pools.pop(); // already terminated; don't try to re-terminate in afterEach + + // Both must settle (resolve or reject). The first MAY resolve if the + // worker finished before terminate() raced in; the second MUST reject + // because it never got to dispatch. + const r1 = await p1.catch((e: unknown) => e); + const r2 = await p2.catch((e: unknown) => e); + if (r2 instanceof Error) { + expect(r2.message).toMatch(/terminated/); + } else { + // If both raced to resolve before terminate, that's also acceptable + // — but only the second one is guaranteed-rejected. Accept either + // shape; the goal of the test is "no leaked handles, no unhandled + // rejection". + expect(r2).toBeDefined(); + } + // r1 is "either resolved with a result OR rejected with terminated"; + // both are acceptable. + expect(r1).toBeDefined(); + }); +}); diff --git a/packages/producer/src/services/pngDecodeBlitWorkerPool.ts b/packages/producer/src/services/pngDecodeBlitWorkerPool.ts new file mode 100644 index 000000000..b7c11a017 --- /dev/null +++ b/packages/producer/src/services/pngDecodeBlitWorkerPool.ts @@ -0,0 +1,425 @@ +/** + * Pool of Node `worker_threads` Workers for off-main-thread PNG decode + + * rgba8-over-rgb48le blit. The hf#732 lever-4 follow-up: capture (Chrome + * compositor) and decode+blit (Node CPU) were previously serialized per + * frame inside `captureTransitionFrame` / `compositeHdrFrame`. Each frame + * waited ~30 ms (decode) + ~50 ms (blit) on the Node main thread before the + * next CDP screenshot could begin. With the decode+blit on a worker pool, + * the calling thread can kick off the next CDP capture immediately and the + * decode+blit overlaps Chrome's screenshot work. + * + * Why a SEPARATE pool from `shaderTransitionWorkerPool`: + * + * - Different work shapes: shader-blend reads 2× rgb48le, writes 1× + * rgb48le. Decode+blit reads 1× PNG bytes (variable size, often + * ~250 KB for 854×480 with sparse DOM content), allocates a temporary + * RGBA8 buffer, and writes 1× rgb48le (over an existing destination). + * - Different concurrency profile: shader-blend fires once per + * transition frame; decode+blit fires once per DOM layer per frame + * (typically 3-6× per normal frame, 2× per transition frame). The + * pools have very different dispatch rates and queuing characteristics. + * - Sizing them independently lets us tune each to its bottleneck without + * starving the other. + * + * API: + * + * const pool = await createPngDecodeBlitWorkerPool({ size, log }); + * const result = await pool.run({ + * png: pngBuffer, // alpha-channel PNG from CDP (zero-copy in) + * dest: rgb48leDestBuffer, // rgb48le canvas to blend onto (zero-copy in) + * width, height, + * transfer: "srgb" | "pq" | "hlg" | etc., + * }); + * // result.dest is the SAME memory as the input `dest`, but re-attached + * // to the main thread. The input Buffer is detached on dispatch — the + * // caller must use `result.dest` for any subsequent reads/writes. + * await pool.terminate(); + * + * Buffer transfer contract: + * + * The PNG bytes are transferred IN with `transferList: [png.buffer]` so + * we don't copy them across the worker boundary. The `dest` rgb48le + * ArrayBuffer is also transferred IN (and back OUT) so the blit happens + * in place on the same memory the caller pre-allocated. After `run` + * resolves, the caller MUST swap its Buffer reference to `result.dest` + * — the original Buffer's underlying ArrayBuffer is detached on dispatch. + * + * The intermediate RGBA8 decode buffer is allocated INSIDE the worker + * and dropped on the worker side — there is no main-thread allocation + * for it. The PNG ArrayBuffer is transferred back too so the caller can + * release it. + */ + +import { Worker } from "node:worker_threads"; +import { fileURLToPath, pathToFileURL } from "node:url"; +import { dirname, join } from "node:path"; +import { createRequire } from "node:module"; +import { existsSync } from "node:fs"; +import { cpus } from "node:os"; + +interface PoolLogger { + info?: (msg: string, meta?: Record) => void; + warn?: (msg: string, meta?: Record) => void; + error?: (msg: string, meta?: Record) => void; +} + +export interface PngDecodeBlitPoolOptions { + /** Number of worker threads. Clamped to [1, cpus().length]. */ + size: number; + /** Optional logger; falls back to no-op. */ + log?: PoolLogger; + /** + * Absolute filesystem path to the worker entry module. When provided, the + * pool spawns workers from this exact path and skips the fallback + * `import.meta.url`-based resolver entirely. Required by callers that + * bundle the worker via a separate build (e.g. the CLI's tsup bundle): + * `import.meta.url` inside the bundled pool resolves to the bundle's own + * location, NOT the bundled worker entry's location, so the heuristic + * resolver below cannot find the worker. Path extension determines the + * loader behaviour (`.ts` → tsx/esm loader is appended to execArgv). + */ + workerEntryPath?: string; +} + +export interface PngDecodeBlitRequest { + /** PNG bytes captured from CDP (Page.captureScreenshot). */ + png: Buffer; + /** + * Pre-allocated rgb48le destination canvas (width*height*6 bytes). The + * blit composites the decoded RGBA8 image OVER this buffer's existing + * contents in `transfer` space. + */ + dest: Buffer; + width: number; + height: number; + /** + * Composite color space tag matching the engine's `CompositeTransfer` + * union. Passed through to `blitRgba8OverRgb48le` in the worker. + */ + transfer: string; +} + +export interface PngDecodeBlitResult { + /** + * Re-attached destination buffer holding the composited rgb48le pixels. + * Same memory as the request's `dest`, viewed through a fresh Buffer. + */ + dest: Buffer; + /** Per-worker timing: decode duration in ms (excluding postMessage latency). */ + decodeMs: number; + /** Per-worker timing: blit duration in ms. */ + blitMs: number; +} + +interface PendingTask { + req: PngDecodeBlitRequest; + resolve: (r: PngDecodeBlitResult) => void; + reject: (err: Error) => void; + enqueuedAtMs?: number; + traceId?: number; +} + +interface WorkerSlot { + worker: Worker; + busy: boolean; + current: PendingTask | null; +} + +interface WorkerReply { + ok: boolean; + error?: string; + png: ArrayBuffer; + dest: ArrayBuffer; + decodeMs?: number; + blitMs?: number; +} + +export interface PngDecodeBlitWorkerPool { + readonly size: number; + run(req: PngDecodeBlitRequest): Promise; + terminate(): Promise; +} + +/** + * Resolve the path to the compiled worker module. + * + * Resolution order (first match wins): + * 1. Explicit `workerEntryPath` factory option — callers that bundle the + * worker via a separate build pipeline (e.g. the CLI's tsup bundle that + * emits `pngDecodeBlitWorker.js` next to `cli.js`) must use this. The + * bundled-CLI case is the *only* one where the fallback below cannot + * find the worker: `import.meta.url` inside the inlined pool resolves + * to the bundle path, not the worker's emitted path, so the sibling + * probe lands in the wrong directory. + * 2. `HF_PNG_DECODE_BLIT_WORKER_ENTRY` env var — test/dev infra override. + * 3. Same-directory `.js` sibling — works when both pool source and + * worker source compile into the same `dist/services/` directory + * (in-tree dev builds and the colocated tsc emit). + * 4. Same-directory `.ts` sibling — vitest/bun raw-TS execution path. + */ +function resolveWorkerEntry(explicit: string | undefined): { path: string; isTs: boolean } { + if (explicit && explicit.length > 0) { + return { path: explicit, isTs: explicit.endsWith(".ts") }; + } + const override = process.env.HF_PNG_DECODE_BLIT_WORKER_ENTRY; + if (override && override.length > 0) { + const isTs = override.endsWith(".ts"); + return { path: override, isTs }; + } + const moduleDir = dirname(fileURLToPath(import.meta.url)); + const jsPath = join(moduleDir, "pngDecodeBlitWorker.js"); + if (existsSync(jsPath)) return { path: jsPath, isTs: false }; + const tsPath = join(moduleDir, "pngDecodeBlitWorker.ts"); + return { path: tsPath, isTs: true }; +} + +/** + * Mirror of `shaderTransitionWorkerPool.buildExecArgv`. Worker threads + * inherit the parent's loader only if the relevant flag is present on + * `process.execArgv`; under vitest the tsx loader is NOT exposed there, so + * we append `--import tsx/esm` when the resolved entry is `.ts` and no + * loader is detected. Best-effort: silently no-ops if `tsx/esm` can't be + * resolved (prod bundle). + */ +function buildExecArgv(entryIsTs: boolean): string[] { + const inherited = [...process.execArgv]; + if (!entryIsTs) return inherited; + const hasLoader = inherited.some( + (a) => a.includes("tsx/esm") || a.includes("ts-node/esm") || a.includes("--import"), + ); + if (hasLoader) return inherited; + try { + const require = createRequire(import.meta.url); + const tsxEsm = require.resolve("tsx/esm"); + inherited.push("--import", pathToFileURL(tsxEsm).href); + } catch { + // tsx not installed (prod) — leave execArgv as-is. + } + return inherited; +} + +export async function createPngDecodeBlitWorkerPool( + opts: PngDecodeBlitPoolOptions, +): Promise { + const cpuCount = Math.max(1, cpus().length); + const size = Math.max(1, Math.min(opts.size, cpuCount)); + const log = opts.log ?? {}; + const { path: entry, isTs: entryIsTs } = resolveWorkerEntry(opts.workerEntryPath); + + const slots: WorkerSlot[] = []; + const queue: PendingTask[] = []; + let terminated = false; + + const traceEnabled = process.env.HF_PNG_DECODE_BLIT_POOL_TRACE === "1"; + let nextTaskId = 0; + + const execArgv = buildExecArgv(entryIsTs); + + const dispatchNext = (slot: WorkerSlot): void => { + if (terminated || slot.busy) return; + const task = queue.shift(); + if (!task) return; + slot.busy = true; + slot.current = task; + if (traceEnabled) { + const slotIdx = slots.indexOf(slot); + const waitMs = task.enqueuedAtMs ? Date.now() - task.enqueuedAtMs : 0; + const busyCount = slots.filter((s) => s.busy).length; + log.info?.("[pngDecodeBlitPool] dispatch", { + task: task.traceId, + slot: slotIdx, + waitMs, + busyCount, + queueDepth: queue.length, + }); + } + const { png, dest, width, height, transfer } = task.req; + // Transfer the PNG bytes (input) and the rgb48le dest (in-place blit + // target) across the worker boundary. Both are detached on the main + // thread until the reply re-attaches them. + // + // Node `Buffer.alloc(N)` allocates a dedicated ArrayBuffer for buffers + // above 8KB (the pool threshold); below that, Buffers are slices over + // a shared 8KB pool ArrayBuffer. `postMessage` with `transferList` + // REJECTS the shared pool with `DataCloneError: Cannot transfer + // object of unsupported type`. The rgb48le `dest` buffers in the + // hybrid path are always > 8KB (854×480×6 ≈ 2.4MB) so they're fine, + // but PNG inputs (variable size, typically 30-300KB but can be < 8KB + // for empty layers) AND any small upstream Buffers can hit the pool. + // + // Safety net: if the underlying ArrayBuffer is larger than the Buffer + // view OR if the Buffer view doesn't cover the ArrayBuffer exactly, + // we copy into a fresh dedicated ArrayBuffer before transfer. Cost is + // one allocation + memcpy of the PNG bytes — small versus the + // postMessage round-trip we're already paying. + const pngBackingFitsExactly = png.byteOffset === 0 && png.byteLength === png.buffer.byteLength; + const pngSource: Buffer = pngBackingFitsExactly ? png : Buffer.from(png); // Buffer.from(Buffer) copies into a new pool slot + // For very small PNGs, `Buffer.from(buf)` may still land in the pool. + // Force a dedicated ArrayBuffer with `Uint8Array.slice().buffer`. + let abPng: ArrayBuffer; + let pngOffset: number; + let pngLength: number; + if (pngSource.byteOffset === 0 && pngSource.byteLength === pngSource.buffer.byteLength) { + abPng = pngSource.buffer as ArrayBuffer; + pngOffset = 0; + pngLength = pngSource.byteLength; + } else { + const copied = new Uint8Array(pngSource.byteLength); + copied.set(pngSource); + abPng = copied.buffer; + pngOffset = 0; + pngLength = copied.byteLength; + } + // The rgb48le `dest` should always have a dedicated ArrayBuffer + // (`Buffer.alloc` above the 8KB pool threshold gives one). Defensive + // path mirrors the PNG handling for symmetry. + let abDest: ArrayBuffer; + let destOffset: number; + let destLength: number; + if (dest.byteOffset === 0 && dest.byteLength === dest.buffer.byteLength) { + abDest = dest.buffer as ArrayBuffer; + destOffset = 0; + destLength = dest.byteLength; + } else { + // Should never happen for hybrid-path canvases. Take the + // copy-and-transfer path so we don't crash; but log so we surface + // any future allocator change that violates the invariant. + log.warn?.("[pngDecodeBlitPool] dest buffer is a slice over a larger ArrayBuffer; copying", { + offset: dest.byteOffset, + length: dest.byteLength, + backingSize: dest.buffer.byteLength, + }); + const copied = new Uint8Array(dest.byteLength); + copied.set(dest); + abDest = copied.buffer; + destOffset = 0; + destLength = copied.byteLength; + } + try { + slot.worker.postMessage( + { + png: abPng, + pngOffset, + pngLength, + dest: abDest, + destOffset, + destLength, + width, + height, + transfer, + }, + [abPng, abDest], + ); + } catch (err) { + slot.busy = false; + slot.current = null; + task.reject(err instanceof Error ? err : new Error(String(err))); + } + }; + + const onWorkerMessage = (slot: WorkerSlot, reply: WorkerReply): void => { + const task = slot.current; + slot.current = null; + slot.busy = false; + if (!task) { + dispatchNext(slot); + return; + } + if (!reply.ok) { + task.reject(new Error(reply.error ?? "png-decode-blit worker failed")); + } else { + // Re-attach the dest ArrayBuffer as a Node Buffer view. We wrap the + // full reply.dest (offset=0, length=byteLength) because the + // dispatch path normalizes to offset-0 ArrayBuffers (either the + // caller's original or a fresh copy we made on the dispatch side). + // The blit work happened over the full ArrayBuffer in the worker; + // returning a view that matches that is the correct semantics. + task.resolve({ + dest: Buffer.from(reply.dest, 0, reply.dest.byteLength), + decodeMs: reply.decodeMs ?? 0, + blitMs: reply.blitMs ?? 0, + }); + } + dispatchNext(slot); + }; + + const onWorkerError = (slot: WorkerSlot, err: Error): void => { + const task = slot.current; + slot.current = null; + slot.busy = false; + if (task) { + task.reject( + new Error(`png-decode-blit worker crashed mid-task: ${err.message}; dest buffer lost`), + ); + } + log.warn?.("[pngDecodeBlitWorkerPool] worker errored", { err: err.message }); + }; + + const onWorkerExit = (slot: WorkerSlot, code: number): void => { + if (terminated) return; + if (slot.current) { + slot.current.reject(new Error(`png-decode-blit worker exited (code=${code}) mid-task`)); + slot.current = null; + slot.busy = false; + } + log.warn?.("[pngDecodeBlitWorkerPool] worker exited unexpectedly", { code }); + }; + + try { + for (let i = 0; i < size; i++) { + const worker = new Worker(entry, { execArgv }); + const slot: WorkerSlot = { worker, busy: false, current: null }; + worker.on("message", (msg: WorkerReply) => onWorkerMessage(slot, msg)); + worker.on("error", (err: unknown) => + onWorkerError(slot, err instanceof Error ? err : new Error(String(err))), + ); + worker.on("exit", (code) => onWorkerExit(slot, code)); + slots.push(slot); + } + } catch (err) { + terminated = true; + await Promise.all(slots.map((s) => s.worker.terminate().catch(() => undefined))); + throw err; + } + + log.info?.("[pngDecodeBlitWorkerPool] spawned", { size, entry }); + + return { + size, + async run(req: PngDecodeBlitRequest): Promise { + if (terminated) { + throw new Error("png-decode-blit pool already terminated"); + } + return new Promise((resolve, reject) => { + const task: PendingTask = traceEnabled + ? { req, resolve, reject, enqueuedAtMs: Date.now(), traceId: ++nextTaskId } + : { req, resolve, reject }; + const idle = slots.find((s) => !s.busy); + if (idle) { + queue.unshift(task); + dispatchNext(idle); + } else { + queue.push(task); + } + }); + }, + async terminate(): Promise { + if (terminated) return; + terminated = true; + while (queue.length > 0) { + const t = queue.shift(); + if (t) t.reject(new Error("png-decode-blit pool terminated before task ran")); + } + for (const slot of slots) { + const t = slot.current; + if (t) { + slot.current = null; + slot.busy = false; + t.reject(new Error("png-decode-blit pool terminated mid-task")); + } + } + await Promise.all(slots.map((s) => s.worker.terminate().catch(() => undefined))); + log.info?.("[pngDecodeBlitWorkerPool] terminated", { size }); + }, + }; +}