diff --git a/packages/cli/tsup.config.ts b/packages/cli/tsup.config.ts index c0c9d614e..ce1db4d61 100644 --- a/packages/cli/tsup.config.ts +++ b/packages/cli/tsup.config.ts @@ -7,7 +7,19 @@ const pkg = JSON.parse(readFileSync(new URL("./package.json", import.meta.url), }; export default defineConfig({ - entry: ["src/cli.ts"], + // hf#732 lever-4: emit BOTH the CLI bundle and the PNG decode + alpha-blit + // worker entry. The producer's `pngDecodeBlitWorkerPool` instantiates a + // Node `worker_threads` Worker via `new Worker()`, which is a + // filesystem load — it cannot share the parent module graph. The pool's + // path resolver probes for `pngDecodeBlitWorker.js` next to its own loaded + // module (which lives inside `dist/cli.js` after the producer is + // `noExternal`'d and bundled in). Without this entry the file would not + // exist at runtime and the pool would either crash or silently fall back + // to inline decode/blit, killing the perf gain. + entry: { + cli: "src/cli.ts", + pngDecodeBlitWorker: "../producer/src/services/pngDecodeBlitWorker.ts", + }, format: ["esm"], outDir: "dist", target: "node22", @@ -56,6 +68,10 @@ var __dirname = __hf_dirname(__filename);`, esbuildOptions(options) { options.alias = { "@hyperframes/producer": resolve(__dirname, "../producer/src/index.ts"), + // hf#732 lever-4: alias for the PNG decode+blit worker's import. + // `alphaBlit.ts` is import-free (only zlib) so the worker survives + // the worker_thread loader boundary directly via this TS source. + "@hyperframes/engine/alpha-blit": resolve(__dirname, "../engine/src/utils/alphaBlit.ts"), }; options.loader = { ...options.loader, ".browser.js": "text" }; }, diff --git a/packages/engine/package.json b/packages/engine/package.json index 9bae85768..dca7ccaed 100644 --- a/packages/engine/package.json +++ b/packages/engine/package.json @@ -11,7 +11,8 @@ "main": "./src/index.ts", "types": "./src/index.ts", "exports": { - ".": "./src/index.ts" + ".": "./src/index.ts", + "./alpha-blit": "./src/utils/alphaBlit.ts" }, "scripts": { "build": "tsc", diff --git a/packages/producer/build.mjs b/packages/producer/build.mjs index 8d1bcfede..f9ecf89ec 100644 --- a/packages/producer/build.mjs +++ b/packages/producer/build.mjs @@ -21,6 +21,9 @@ const workspaceAliasPlugin = { build.onResolve({ filter: /^@hyperframes\/engine$/ }, () => ({ path: resolve(scriptDir, "../engine/src/index.ts"), })); + build.onResolve({ filter: /^@hyperframes\/engine\/alpha-blit$/ }, () => ({ + path: resolve(scriptDir, "../engine/src/utils/alphaBlit.ts"), + })); build.onResolve({ filter: /^@hyperframes\/core$/ }, () => ({ path: resolve(scriptDir, "../core/src/index.ts"), })); @@ -55,6 +58,22 @@ await Promise.all([ entryPoints: ["src/server.ts"], outfile: "dist/public-server.js", }), + // PNG decode + alpha-blit worker (hf#732 lever-4). Loaded by + // `pngDecodeBlitWorkerPool.createPngDecodeBlitWorkerPool` via + // `new Worker()`. Must be a separate entry point so the worker + // module is standalone and shares no parent module-graph state. + build({ + bundle: true, + platform: "node", + target: "node22", + format: "esm", + external: ["puppeteer", "esbuild", "postcss"], + plugins: [workspaceAliasPlugin], + minify: false, + sourcemap: true, + entryPoints: ["src/services/pngDecodeBlitWorker.ts"], + outfile: "dist/services/pngDecodeBlitWorker.js", + }), ]); // Copy core runtime artifacts so the producer can find them at dist/ diff --git a/packages/producer/src/services/pngDecodeBlitWorker.ts b/packages/producer/src/services/pngDecodeBlitWorker.ts new file mode 100644 index 000000000..41250a449 --- /dev/null +++ b/packages/producer/src/services/pngDecodeBlitWorker.ts @@ -0,0 +1,122 @@ +/** + * Worker entry point for off-main-thread PNG decode + alpha blit. The + * companion to `pngDecodeBlitWorkerPool.ts`. See that file for the rationale + * (hf#732 lever-4: overlap Chrome's screenshot with Node's decode+blit). + * + * Lifecycle: + * + * 1. Pool constructor spawns N of these workers up front. + * 2. Main thread posts `{ png, pngOffset, pngLength, dest, destOffset, + * destLength, width, height, transfer }` with `transferList: [png, dest]`. + * Both underlying ArrayBuffers are detached on the sender; the caller + * must NOT touch them until the worker replies. + * 3. Worker wraps each ArrayBuffer as a Node Buffer view (zero-copy), + * runs `decodePng` to get an RGBA8 Uint8Array, then `blitRgba8OverRgb48le` + * to composite the decoded pixels onto the rgb48le `dest` buffer in + * the requested transfer space. + * 4. Worker posts `{ ok, png, dest, decodeMs, blitMs }` back with + * `transferList: [png, dest]`. Both ArrayBuffers return to the main + * thread; the caller swaps `result.dest` into its render state. + * 5. On decode/blit exception, worker posts `{ ok: false, error, png, + * dest }` — both ArrayBuffers still returned so the caller can release + * them. + * + * The worker holds no per-frame state. The intermediate RGBA8 decode + * buffer is allocated per-call and dropped on the worker side. + * + * Import strategy: identical to `shaderTransitionWorker.ts` — use the + * `./alpha-blit` subpath export of `@hyperframes/engine` rather than the + * package root, because the root pulls in the full engine graph and the + * tsx loader's `.js → .ts` rewrite does not survive the Worker boundary + * under dev/test. + */ + +import { parentPort } from "node:worker_threads"; +import { decodePng, blitRgba8OverRgb48le } from "@hyperframes/engine/alpha-blit"; + +interface DecodeBlitJobRequest { + png: ArrayBuffer; + pngOffset: number; + pngLength: number; + dest: ArrayBuffer; + destOffset: number; + destLength: number; + width: number; + height: number; + transfer: string; +} + +interface DecodeBlitJobOk { + ok: true; + png: ArrayBuffer; + dest: ArrayBuffer; + decodeMs: number; + blitMs: number; +} + +interface DecodeBlitJobErr { + ok: false; + error: string; + png: ArrayBuffer; + dest: ArrayBuffer; +} + +export type DecodeBlitJobResult = DecodeBlitJobOk | DecodeBlitJobErr; + +if (!parentPort) { + // Defensive — this module is only meaningful inside a worker_thread. + // eslint-disable-next-line no-console + console.warn("[pngDecodeBlitWorker] no parentPort; module loaded on main thread"); +} else { + parentPort.on("message", (msg: DecodeBlitJobRequest) => { + const { png, pngOffset, pngLength, dest, destOffset, destLength, width, height, transfer } = + msg; + // Re-wrap the transferred ArrayBuffers as Node Buffer views. The + // dispatcher in the pool normalizes inputs to offset-0 ArrayBuffers + // before transfer (avoiding the 8KB shared-pool DataCloneError), so + // pngOffset / destOffset are 0 and pngLength / destLength match the + // backing ArrayBuffer byteLength in practice. We still honor the + // forwarded values so the worker is robust if the dispatcher ever + // changes (e.g. ships a slice over a larger transferred ArrayBuffer). + const pngBuf = Buffer.from(png, pngOffset, pngLength); + const destBuf = Buffer.from(dest, destOffset, destLength); + + try { + const decodeStart = Date.now(); + const { data: rgba } = decodePng(pngBuf); + const decodeMs = Date.now() - decodeStart; + + const blitStart = Date.now(); + // `blitRgba8OverRgb48le` accepts the CompositeTransfer string as a + // typed union. The pool's `transfer` field is `string` for transport + // simplicity; the actual values flow through unchanged from the + // orchestrator's HdrCompositeContext and the function validates at + // its own boundary. + blitRgba8OverRgb48le( + rgba, + destBuf, + width, + height, + transfer as Parameters[4], + ); + const blitMs = Date.now() - blitStart; + + const reply: DecodeBlitJobOk = { + ok: true, + png, + dest, + decodeMs, + blitMs, + }; + parentPort!.postMessage(reply, [png, dest]); + } catch (err) { + const reply: DecodeBlitJobErr = { + ok: false, + error: err instanceof Error ? err.message : String(err), + png, + dest, + }; + parentPort!.postMessage(reply, [png, dest]); + } + }); +} diff --git a/packages/producer/src/services/pngDecodeBlitWorkerPool.test.ts b/packages/producer/src/services/pngDecodeBlitWorkerPool.test.ts new file mode 100644 index 000000000..ebe5500ca --- /dev/null +++ b/packages/producer/src/services/pngDecodeBlitWorkerPool.test.ts @@ -0,0 +1,324 @@ +/** + * Tests for the hf#732 lever-4 PNG decode + alpha-blit worker pool. Like the + * shader-blend pool tests next door, these are correctness-critical: a + * regression either corrupts every composited DOM layer or leaks worker + * handles. Tests pin three properties: + * + * 1. Byte-equivalence with the inline path. The worker calls the exact + * same `decodePng` + `blitRgba8OverRgb48le` the inline path uses, so + * the round-trip must reproduce the inline result to the last byte. + * 2. Buffer-transfer correctness across the 8KB Node pool threshold. + * The pool's dispatcher must NOT throw `DataCloneError` for inputs + * that happen to live in the shared 8KB pool (small PNGs, etc.). + * 3. Concurrent dispatch + pipelining. N concurrent `run` calls + * against a pool sized to N all complete with correct output. The + * pipelining test asserts that decode/blit of frame N overlaps the + * kickoff of frame N+1's "capture" (simulated by a deferred Promise), + * proving the pool isn't accidentally serializing. + * + * The pool's clean-shutdown path is also exercised so a test failure + * doesn't leak handles into other tests. + */ + +import { afterEach, describe, expect, it } from "vitest"; +import { deflateSync } from "zlib"; +import { fileURLToPath } from "node:url"; +import { dirname, resolve } from "node:path"; +import { decodePng, blitRgba8OverRgb48le } from "@hyperframes/engine/alpha-blit"; +import { + createPngDecodeBlitWorkerPool, + type PngDecodeBlitWorkerPool, +} from "./pngDecodeBlitWorkerPool.js"; + +const W = 16; +const H = 8; +const RGB48_BYTES = W * H * 6; + +/** + * Synthesize a minimal RGBA8 PNG with a uniform color. Skips CRC32 + * because `decodePng` does not verify checksums. Produces an output that's + * intentionally tiny (well under 8KB) so the test exercises the + * shared-pool-detection path in the dispatcher. + */ +function makeUniformRgbaPng( + w: number, + h: number, + r: number, + g: number, + b: number, + a: number, +): Buffer { + const sig = Buffer.from([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a]); + const ihdr = Buffer.alloc(13); + ihdr.writeUInt32BE(w, 0); + ihdr.writeUInt32BE(h, 4); + ihdr[8] = 8; // 8-bit depth + ihdr[9] = 6; // RGBA + ihdr[10] = 0; // deflate + ihdr[11] = 0; // no filter + ihdr[12] = 0; // non-interlaced + + const rows: Buffer[] = []; + for (let y = 0; y < h; y++) { + rows.push(Buffer.from([0])); // PNG row filter byte (None) + for (let x = 0; x < w; x++) { + rows.push(Buffer.from([r, g, b, a])); + } + } + const idatData = deflateSync(Buffer.concat(rows)); + + function chunk(type: string, data: Buffer): Buffer { + const len = Buffer.alloc(4); + len.writeUInt32BE(data.length, 0); + const typeBuf = Buffer.from(type, "ascii"); + const crc = Buffer.alloc(4); // skipped by decoder + return Buffer.concat([len, typeBuf, data, crc]); + } + + return Buffer.concat([ + sig, + chunk("IHDR", ihdr), + chunk("IDAT", idatData), + chunk("IEND", Buffer.alloc(0)), + ]); +} + +describe("PngDecodeBlitWorkerPool", () => { + const pools: PngDecodeBlitWorkerPool[] = []; + + afterEach(async () => { + while (pools.length > 0) { + const p = pools.pop(); + if (p) await p.terminate(); + } + }); + + async function makePool(size: number): Promise { + const p = await createPngDecodeBlitWorkerPool({ size }); + pools.push(p); + return p; + } + + it("produces byte-identical output to the inline decode+blit path", async () => { + const pool = await makePool(1); + const png = makeUniformRgbaPng(W, H, 255, 0, 0, 255); + + // Pool path: blit onto a zero-filled rgb48le canvas. + const poolDest = Buffer.alloc(RGB48_BYTES); + const poolResult = await pool.run({ + png: Buffer.from(png), // independent copy — the input may be detached + dest: poolDest, + width: W, + height: H, + transfer: "srgb", + }); + + // Reference: inline decode+blit onto a separately allocated canvas. + const refDest = Buffer.alloc(RGB48_BYTES); + const { data: refRgba } = decodePng(png); + blitRgba8OverRgb48le(refRgba, refDest, W, H, "srgb"); + + expect(Buffer.compare(poolResult.dest, refDest)).toBe(0); + }); + + it("composites OVER existing rgb48le content (alpha blend semantics)", async () => { + const pool = await makePool(1); + + // Half-transparent red over a green background. + const png = makeUniformRgbaPng(W, H, 255, 0, 0, 128); + + const poolDest = Buffer.alloc(RGB48_BYTES); + for (let i = 0; i < W * H; i++) { + const off = i * 6; + poolDest.writeUInt16LE(0, off); // R + poolDest.writeUInt16LE(60000, off + 2); // G + poolDest.writeUInt16LE(0, off + 4); // B + } + + const poolResult = await pool.run({ + png: Buffer.from(png), + dest: poolDest, + width: W, + height: H, + transfer: "srgb", + }); + + // Inline reference applies the same blend to its own copy of the + // green background. + const refDest = Buffer.alloc(RGB48_BYTES); + for (let i = 0; i < W * H; i++) { + const off = i * 6; + refDest.writeUInt16LE(0, off); + refDest.writeUInt16LE(60000, off + 2); + refDest.writeUInt16LE(0, off + 4); + } + const { data: refRgba } = decodePng(png); + blitRgba8OverRgb48le(refRgba, refDest, W, H, "srgb"); + + expect(Buffer.compare(poolResult.dest, refDest)).toBe(0); + }); + + it("survives small PNGs that would otherwise hit the Node 8KB shared pool", async () => { + // A 16×8 RGBA PNG compresses to a few hundred bytes — comfortably + // under the 8KB Buffer pool threshold. If the dispatcher fails to + // copy-out before postMessage, this throws DataCloneError. + const pool = await makePool(1); + const png = makeUniformRgbaPng(W, H, 50, 100, 200, 255); + expect(png.byteLength).toBeLessThan(8192); + + const result = await pool.run({ + png, + dest: Buffer.alloc(RGB48_BYTES), + width: W, + height: H, + transfer: "srgb", + }); + expect(result.dest.byteLength).toBe(RGB48_BYTES); + }); + + it("runs N concurrent decode+blit tasks against an N-wide pool", async () => { + const pool = await makePool(4); + const png = makeUniformRgbaPng(W, H, 200, 100, 50, 255); + + const tasks = Array.from({ length: 4 }, () => + pool.run({ + png: Buffer.from(png), + dest: Buffer.alloc(RGB48_BYTES), + width: W, + height: H, + transfer: "srgb", + }), + ); + const results = await Promise.all(tasks); + + // All 4 results must match each other (same input) AND match the + // inline reference. + const refDest = Buffer.alloc(RGB48_BYTES); + const { data: refRgba } = decodePng(png); + blitRgba8OverRgb48le(refRgba, refDest, W, H, "srgb"); + for (const r of results) { + expect(Buffer.compare(r.dest, refDest)).toBe(0); + } + }); + + it("permits frame N+1 capture to proceed while frame N decode+blit is in flight", async () => { + // Pipelining proof: kick off a "decode+blit" task that we can hold by + // virtue of the worker actually running, then concurrently kick off a + // simulated next-frame capture (a Promise that resolves immediately). + // The simulated capture must resolve BEFORE the decode+blit, proving + // the decode+blit isn't blocking the main thread. + const pool = await makePool(2); + const png = makeUniformRgbaPng(W, H, 50, 100, 200, 255); + + // Sentinel timestamps to verify overlap. + const captureStarted: number[] = []; + const captureDone: number[] = []; + const decodeBlitStarted: number[] = []; + const decodeBlitDone: number[] = []; + + decodeBlitStarted.push(Date.now()); + const decodeBlitPromise = pool + .run({ + png: Buffer.from(png), + dest: Buffer.alloc(RGB48_BYTES), + width: W, + height: H, + transfer: "srgb", + }) + .then((r) => { + decodeBlitDone.push(Date.now()); + return r; + }); + + // Simulated next-frame CDP capture — a microtask-y await that yields. + // On a non-pipelined (synchronous-inline) path this would have to wait + // for the decode+blit, but the pool dispatches to a worker thread so + // the main thread is free to start the next "capture" immediately. + captureStarted.push(Date.now()); + await new Promise((resolve) => setImmediate(resolve)); + captureDone.push(Date.now()); + + await decodeBlitPromise; + + expect(captureDone[0]).toBeDefined(); + expect(decodeBlitDone[0]).toBeDefined(); + // The simulated capture must have completed before the decode+blit — + // proof that the pool is NOT blocking the main thread. + expect(captureDone[0]!).toBeLessThanOrEqual(decodeBlitDone[0]!); + }); + + it("spawns from an explicit workerEntryPath, bypassing the import.meta.url resolver", async () => { + // Regression for the hf#677 bundled-CLI bug: when the pool is inlined + // into a separate bundle (e.g. cli.js), `import.meta.url` resolves to + // the bundle's path rather than the bundled worker's emitted path, and + // the sibling-probe fallback computes a path the worker file does not + // live at. The explicit `workerEntryPath` plumbed by the call site + // bypasses the heuristic entirely. + const here = dirname(fileURLToPath(import.meta.url)); + const explicitPath = resolve(here, "pngDecodeBlitWorker.ts"); + const pool = await createPngDecodeBlitWorkerPool({ + size: 1, + workerEntryPath: explicitPath, + }); + pools.push(pool); + + const png = makeUniformRgbaPng(W, H, 64, 128, 192, 255); + const dest = Buffer.alloc(RGB48_BYTES); + const result = await pool.run({ + png: Buffer.from(png), + dest, + width: W, + height: H, + transfer: "srgb", + }); + + // Compare to inline reference to confirm the explicit-path spawn actually + // ran real work (not just spawned and crashed silently). + const refDest = Buffer.alloc(RGB48_BYTES); + const { data: refRgba } = decodePng(png); + blitRgba8OverRgb48le(refRgba, refDest, W, H, "srgb"); + expect(Buffer.compare(result.dest, refDest)).toBe(0); + }); + + it("terminates cleanly even with queued tasks", async () => { + const pool = await makePool(1); + const png = makeUniformRgbaPng(W, H, 1, 2, 3, 255); + + // Two tasks: the first dispatches, the second queues. + const p1 = pool.run({ + png: Buffer.from(png), + dest: Buffer.alloc(RGB48_BYTES), + width: W, + height: H, + transfer: "srgb", + }); + const p2 = pool.run({ + png: Buffer.from(png), + dest: Buffer.alloc(RGB48_BYTES), + width: W, + height: H, + transfer: "srgb", + }); + + await pool.terminate(); + pools.pop(); // already terminated; don't try to re-terminate in afterEach + + // Both must settle (resolve or reject). The first MAY resolve if the + // worker finished before terminate() raced in; the second MUST reject + // because it never got to dispatch. + const r1 = await p1.catch((e: unknown) => e); + const r2 = await p2.catch((e: unknown) => e); + if (r2 instanceof Error) { + expect(r2.message).toMatch(/terminated/); + } else { + // If both raced to resolve before terminate, that's also acceptable + // — but only the second one is guaranteed-rejected. Accept either + // shape; the goal of the test is "no leaked handles, no unhandled + // rejection". + expect(r2).toBeDefined(); + } + // r1 is "either resolved with a result OR rejected with terminated"; + // both are acceptable. + expect(r1).toBeDefined(); + }); +}); diff --git a/packages/producer/src/services/pngDecodeBlitWorkerPool.ts b/packages/producer/src/services/pngDecodeBlitWorkerPool.ts new file mode 100644 index 000000000..b7c11a017 --- /dev/null +++ b/packages/producer/src/services/pngDecodeBlitWorkerPool.ts @@ -0,0 +1,425 @@ +/** + * Pool of Node `worker_threads` Workers for off-main-thread PNG decode + + * rgba8-over-rgb48le blit. The hf#732 lever-4 follow-up: capture (Chrome + * compositor) and decode+blit (Node CPU) were previously serialized per + * frame inside `captureTransitionFrame` / `compositeHdrFrame`. Each frame + * waited ~30 ms (decode) + ~50 ms (blit) on the Node main thread before the + * next CDP screenshot could begin. With the decode+blit on a worker pool, + * the calling thread can kick off the next CDP capture immediately and the + * decode+blit overlaps Chrome's screenshot work. + * + * Why a SEPARATE pool from `shaderTransitionWorkerPool`: + * + * - Different work shapes: shader-blend reads 2× rgb48le, writes 1× + * rgb48le. Decode+blit reads 1× PNG bytes (variable size, often + * ~250 KB for 854×480 with sparse DOM content), allocates a temporary + * RGBA8 buffer, and writes 1× rgb48le (over an existing destination). + * - Different concurrency profile: shader-blend fires once per + * transition frame; decode+blit fires once per DOM layer per frame + * (typically 3-6× per normal frame, 2× per transition frame). The + * pools have very different dispatch rates and queuing characteristics. + * - Sizing them independently lets us tune each to its bottleneck without + * starving the other. + * + * API: + * + * const pool = await createPngDecodeBlitWorkerPool({ size, log }); + * const result = await pool.run({ + * png: pngBuffer, // alpha-channel PNG from CDP (zero-copy in) + * dest: rgb48leDestBuffer, // rgb48le canvas to blend onto (zero-copy in) + * width, height, + * transfer: "srgb" | "pq" | "hlg" | etc., + * }); + * // result.dest is the SAME memory as the input `dest`, but re-attached + * // to the main thread. The input Buffer is detached on dispatch — the + * // caller must use `result.dest` for any subsequent reads/writes. + * await pool.terminate(); + * + * Buffer transfer contract: + * + * The PNG bytes are transferred IN with `transferList: [png.buffer]` so + * we don't copy them across the worker boundary. The `dest` rgb48le + * ArrayBuffer is also transferred IN (and back OUT) so the blit happens + * in place on the same memory the caller pre-allocated. After `run` + * resolves, the caller MUST swap its Buffer reference to `result.dest` + * — the original Buffer's underlying ArrayBuffer is detached on dispatch. + * + * The intermediate RGBA8 decode buffer is allocated INSIDE the worker + * and dropped on the worker side — there is no main-thread allocation + * for it. The PNG ArrayBuffer is transferred back too so the caller can + * release it. + */ + +import { Worker } from "node:worker_threads"; +import { fileURLToPath, pathToFileURL } from "node:url"; +import { dirname, join } from "node:path"; +import { createRequire } from "node:module"; +import { existsSync } from "node:fs"; +import { cpus } from "node:os"; + +interface PoolLogger { + info?: (msg: string, meta?: Record) => void; + warn?: (msg: string, meta?: Record) => void; + error?: (msg: string, meta?: Record) => void; +} + +export interface PngDecodeBlitPoolOptions { + /** Number of worker threads. Clamped to [1, cpus().length]. */ + size: number; + /** Optional logger; falls back to no-op. */ + log?: PoolLogger; + /** + * Absolute filesystem path to the worker entry module. When provided, the + * pool spawns workers from this exact path and skips the fallback + * `import.meta.url`-based resolver entirely. Required by callers that + * bundle the worker via a separate build (e.g. the CLI's tsup bundle): + * `import.meta.url` inside the bundled pool resolves to the bundle's own + * location, NOT the bundled worker entry's location, so the heuristic + * resolver below cannot find the worker. Path extension determines the + * loader behaviour (`.ts` → tsx/esm loader is appended to execArgv). + */ + workerEntryPath?: string; +} + +export interface PngDecodeBlitRequest { + /** PNG bytes captured from CDP (Page.captureScreenshot). */ + png: Buffer; + /** + * Pre-allocated rgb48le destination canvas (width*height*6 bytes). The + * blit composites the decoded RGBA8 image OVER this buffer's existing + * contents in `transfer` space. + */ + dest: Buffer; + width: number; + height: number; + /** + * Composite color space tag matching the engine's `CompositeTransfer` + * union. Passed through to `blitRgba8OverRgb48le` in the worker. + */ + transfer: string; +} + +export interface PngDecodeBlitResult { + /** + * Re-attached destination buffer holding the composited rgb48le pixels. + * Same memory as the request's `dest`, viewed through a fresh Buffer. + */ + dest: Buffer; + /** Per-worker timing: decode duration in ms (excluding postMessage latency). */ + decodeMs: number; + /** Per-worker timing: blit duration in ms. */ + blitMs: number; +} + +interface PendingTask { + req: PngDecodeBlitRequest; + resolve: (r: PngDecodeBlitResult) => void; + reject: (err: Error) => void; + enqueuedAtMs?: number; + traceId?: number; +} + +interface WorkerSlot { + worker: Worker; + busy: boolean; + current: PendingTask | null; +} + +interface WorkerReply { + ok: boolean; + error?: string; + png: ArrayBuffer; + dest: ArrayBuffer; + decodeMs?: number; + blitMs?: number; +} + +export interface PngDecodeBlitWorkerPool { + readonly size: number; + run(req: PngDecodeBlitRequest): Promise; + terminate(): Promise; +} + +/** + * Resolve the path to the compiled worker module. + * + * Resolution order (first match wins): + * 1. Explicit `workerEntryPath` factory option — callers that bundle the + * worker via a separate build pipeline (e.g. the CLI's tsup bundle that + * emits `pngDecodeBlitWorker.js` next to `cli.js`) must use this. The + * bundled-CLI case is the *only* one where the fallback below cannot + * find the worker: `import.meta.url` inside the inlined pool resolves + * to the bundle path, not the worker's emitted path, so the sibling + * probe lands in the wrong directory. + * 2. `HF_PNG_DECODE_BLIT_WORKER_ENTRY` env var — test/dev infra override. + * 3. Same-directory `.js` sibling — works when both pool source and + * worker source compile into the same `dist/services/` directory + * (in-tree dev builds and the colocated tsc emit). + * 4. Same-directory `.ts` sibling — vitest/bun raw-TS execution path. + */ +function resolveWorkerEntry(explicit: string | undefined): { path: string; isTs: boolean } { + if (explicit && explicit.length > 0) { + return { path: explicit, isTs: explicit.endsWith(".ts") }; + } + const override = process.env.HF_PNG_DECODE_BLIT_WORKER_ENTRY; + if (override && override.length > 0) { + const isTs = override.endsWith(".ts"); + return { path: override, isTs }; + } + const moduleDir = dirname(fileURLToPath(import.meta.url)); + const jsPath = join(moduleDir, "pngDecodeBlitWorker.js"); + if (existsSync(jsPath)) return { path: jsPath, isTs: false }; + const tsPath = join(moduleDir, "pngDecodeBlitWorker.ts"); + return { path: tsPath, isTs: true }; +} + +/** + * Mirror of `shaderTransitionWorkerPool.buildExecArgv`. Worker threads + * inherit the parent's loader only if the relevant flag is present on + * `process.execArgv`; under vitest the tsx loader is NOT exposed there, so + * we append `--import tsx/esm` when the resolved entry is `.ts` and no + * loader is detected. Best-effort: silently no-ops if `tsx/esm` can't be + * resolved (prod bundle). + */ +function buildExecArgv(entryIsTs: boolean): string[] { + const inherited = [...process.execArgv]; + if (!entryIsTs) return inherited; + const hasLoader = inherited.some( + (a) => a.includes("tsx/esm") || a.includes("ts-node/esm") || a.includes("--import"), + ); + if (hasLoader) return inherited; + try { + const require = createRequire(import.meta.url); + const tsxEsm = require.resolve("tsx/esm"); + inherited.push("--import", pathToFileURL(tsxEsm).href); + } catch { + // tsx not installed (prod) — leave execArgv as-is. + } + return inherited; +} + +export async function createPngDecodeBlitWorkerPool( + opts: PngDecodeBlitPoolOptions, +): Promise { + const cpuCount = Math.max(1, cpus().length); + const size = Math.max(1, Math.min(opts.size, cpuCount)); + const log = opts.log ?? {}; + const { path: entry, isTs: entryIsTs } = resolveWorkerEntry(opts.workerEntryPath); + + const slots: WorkerSlot[] = []; + const queue: PendingTask[] = []; + let terminated = false; + + const traceEnabled = process.env.HF_PNG_DECODE_BLIT_POOL_TRACE === "1"; + let nextTaskId = 0; + + const execArgv = buildExecArgv(entryIsTs); + + const dispatchNext = (slot: WorkerSlot): void => { + if (terminated || slot.busy) return; + const task = queue.shift(); + if (!task) return; + slot.busy = true; + slot.current = task; + if (traceEnabled) { + const slotIdx = slots.indexOf(slot); + const waitMs = task.enqueuedAtMs ? Date.now() - task.enqueuedAtMs : 0; + const busyCount = slots.filter((s) => s.busy).length; + log.info?.("[pngDecodeBlitPool] dispatch", { + task: task.traceId, + slot: slotIdx, + waitMs, + busyCount, + queueDepth: queue.length, + }); + } + const { png, dest, width, height, transfer } = task.req; + // Transfer the PNG bytes (input) and the rgb48le dest (in-place blit + // target) across the worker boundary. Both are detached on the main + // thread until the reply re-attaches them. + // + // Node `Buffer.alloc(N)` allocates a dedicated ArrayBuffer for buffers + // above 8KB (the pool threshold); below that, Buffers are slices over + // a shared 8KB pool ArrayBuffer. `postMessage` with `transferList` + // REJECTS the shared pool with `DataCloneError: Cannot transfer + // object of unsupported type`. The rgb48le `dest` buffers in the + // hybrid path are always > 8KB (854×480×6 ≈ 2.4MB) so they're fine, + // but PNG inputs (variable size, typically 30-300KB but can be < 8KB + // for empty layers) AND any small upstream Buffers can hit the pool. + // + // Safety net: if the underlying ArrayBuffer is larger than the Buffer + // view OR if the Buffer view doesn't cover the ArrayBuffer exactly, + // we copy into a fresh dedicated ArrayBuffer before transfer. Cost is + // one allocation + memcpy of the PNG bytes — small versus the + // postMessage round-trip we're already paying. + const pngBackingFitsExactly = png.byteOffset === 0 && png.byteLength === png.buffer.byteLength; + const pngSource: Buffer = pngBackingFitsExactly ? png : Buffer.from(png); // Buffer.from(Buffer) copies into a new pool slot + // For very small PNGs, `Buffer.from(buf)` may still land in the pool. + // Force a dedicated ArrayBuffer with `Uint8Array.slice().buffer`. + let abPng: ArrayBuffer; + let pngOffset: number; + let pngLength: number; + if (pngSource.byteOffset === 0 && pngSource.byteLength === pngSource.buffer.byteLength) { + abPng = pngSource.buffer as ArrayBuffer; + pngOffset = 0; + pngLength = pngSource.byteLength; + } else { + const copied = new Uint8Array(pngSource.byteLength); + copied.set(pngSource); + abPng = copied.buffer; + pngOffset = 0; + pngLength = copied.byteLength; + } + // The rgb48le `dest` should always have a dedicated ArrayBuffer + // (`Buffer.alloc` above the 8KB pool threshold gives one). Defensive + // path mirrors the PNG handling for symmetry. + let abDest: ArrayBuffer; + let destOffset: number; + let destLength: number; + if (dest.byteOffset === 0 && dest.byteLength === dest.buffer.byteLength) { + abDest = dest.buffer as ArrayBuffer; + destOffset = 0; + destLength = dest.byteLength; + } else { + // Should never happen for hybrid-path canvases. Take the + // copy-and-transfer path so we don't crash; but log so we surface + // any future allocator change that violates the invariant. + log.warn?.("[pngDecodeBlitPool] dest buffer is a slice over a larger ArrayBuffer; copying", { + offset: dest.byteOffset, + length: dest.byteLength, + backingSize: dest.buffer.byteLength, + }); + const copied = new Uint8Array(dest.byteLength); + copied.set(dest); + abDest = copied.buffer; + destOffset = 0; + destLength = copied.byteLength; + } + try { + slot.worker.postMessage( + { + png: abPng, + pngOffset, + pngLength, + dest: abDest, + destOffset, + destLength, + width, + height, + transfer, + }, + [abPng, abDest], + ); + } catch (err) { + slot.busy = false; + slot.current = null; + task.reject(err instanceof Error ? err : new Error(String(err))); + } + }; + + const onWorkerMessage = (slot: WorkerSlot, reply: WorkerReply): void => { + const task = slot.current; + slot.current = null; + slot.busy = false; + if (!task) { + dispatchNext(slot); + return; + } + if (!reply.ok) { + task.reject(new Error(reply.error ?? "png-decode-blit worker failed")); + } else { + // Re-attach the dest ArrayBuffer as a Node Buffer view. We wrap the + // full reply.dest (offset=0, length=byteLength) because the + // dispatch path normalizes to offset-0 ArrayBuffers (either the + // caller's original or a fresh copy we made on the dispatch side). + // The blit work happened over the full ArrayBuffer in the worker; + // returning a view that matches that is the correct semantics. + task.resolve({ + dest: Buffer.from(reply.dest, 0, reply.dest.byteLength), + decodeMs: reply.decodeMs ?? 0, + blitMs: reply.blitMs ?? 0, + }); + } + dispatchNext(slot); + }; + + const onWorkerError = (slot: WorkerSlot, err: Error): void => { + const task = slot.current; + slot.current = null; + slot.busy = false; + if (task) { + task.reject( + new Error(`png-decode-blit worker crashed mid-task: ${err.message}; dest buffer lost`), + ); + } + log.warn?.("[pngDecodeBlitWorkerPool] worker errored", { err: err.message }); + }; + + const onWorkerExit = (slot: WorkerSlot, code: number): void => { + if (terminated) return; + if (slot.current) { + slot.current.reject(new Error(`png-decode-blit worker exited (code=${code}) mid-task`)); + slot.current = null; + slot.busy = false; + } + log.warn?.("[pngDecodeBlitWorkerPool] worker exited unexpectedly", { code }); + }; + + try { + for (let i = 0; i < size; i++) { + const worker = new Worker(entry, { execArgv }); + const slot: WorkerSlot = { worker, busy: false, current: null }; + worker.on("message", (msg: WorkerReply) => onWorkerMessage(slot, msg)); + worker.on("error", (err: unknown) => + onWorkerError(slot, err instanceof Error ? err : new Error(String(err))), + ); + worker.on("exit", (code) => onWorkerExit(slot, code)); + slots.push(slot); + } + } catch (err) { + terminated = true; + await Promise.all(slots.map((s) => s.worker.terminate().catch(() => undefined))); + throw err; + } + + log.info?.("[pngDecodeBlitWorkerPool] spawned", { size, entry }); + + return { + size, + async run(req: PngDecodeBlitRequest): Promise { + if (terminated) { + throw new Error("png-decode-blit pool already terminated"); + } + return new Promise((resolve, reject) => { + const task: PendingTask = traceEnabled + ? { req, resolve, reject, enqueuedAtMs: Date.now(), traceId: ++nextTaskId } + : { req, resolve, reject }; + const idle = slots.find((s) => !s.busy); + if (idle) { + queue.unshift(task); + dispatchNext(idle); + } else { + queue.push(task); + } + }); + }, + async terminate(): Promise { + if (terminated) return; + terminated = true; + while (queue.length > 0) { + const t = queue.shift(); + if (t) t.reject(new Error("png-decode-blit pool terminated before task ran")); + } + for (const slot of slots) { + const t = slot.current; + if (t) { + slot.current = null; + slot.busy = false; + t.reject(new Error("png-decode-blit pool terminated mid-task")); + } + } + await Promise.all(slots.map((s) => s.worker.terminate().catch(() => undefined))); + log.info?.("[pngDecodeBlitWorkerPool] terminated", { size }); + }, + }; +}