diff --git a/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts b/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts index f88aa6158..ba8251832 100644 --- a/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts +++ b/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts @@ -147,15 +147,37 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise } const workerCanvases: Buffer[] = sessions.map(() => Buffer.alloc(bufSize)); - const workerTransitionBuffers: Array = sessions.map(() => - hasTransitions - ? { - bufferA: Buffer.alloc(bufSize), - bufferB: Buffer.alloc(bufSize), - output: Buffer.alloc(bufSize), - } - : null, + // hf#732 PR 5: K-deep ring of transition buffer-triples per worker. The + // ring lets capture-N+1 proceed on the DOM worker while the shader-blend + // pool is still working on frames N-K+1..N. Without the ring (PR 4), each + // worker awaited its own blend before the next capture, capping the pool + // at <=1 task per worker. With K=4, the pool sees up to min(N_workers * K, + // poolSize) concurrent blends, which empirically pushes shader-render + // wall time another ~10-20% past PR 4 alone. + // + // The ideal K is `blend_per_frame / capture_per_frame`. For 854x480 + // rgb48le with the more complex shaders this is ~910ms / ~175ms ≈ 5. + // K=4 strikes a perf vs. memory balance. Override via + // `HF_TRANSITION_RING_DEPTH` if a workload's blend/capture ratio is very + // different (simpler shaders that blend in ~100ms tolerate K=1-2 without + // perf loss). + const DEFAULT_TRANSITION_RING_DEPTH = 4; + const TRANSITION_RING_DEPTH = Math.max( + 1, + Number(process.env.HF_TRANSITION_RING_DEPTH ?? String(DEFAULT_TRANSITION_RING_DEPTH)), ); + const workerTransitionRings: Array = sessions.map(() => { + if (!hasTransitions) return null; + const ring: LayeredTransitionBuffers[] = []; + for (let k = 0; k < TRANSITION_RING_DEPTH; k++) { + ring.push({ + bufferA: Buffer.alloc(bufSize), + bufferB: Buffer.alloc(bufSize), + output: Buffer.alloc(bufSize), + }); + } + return ring; + }); const workerRanges = distributeLayeredHybridFrameRanges(totalFrames, activeWorkerCount); let framesWritten = 0; const reorderBuffer = createFrameReorderBuffer(0, totalFrames); @@ -185,15 +207,32 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise const session = sessions[w]; const canvas = workerCanvases[w]; const range = workerRanges[w]; - const buffers = workerTransitionBuffers[w]; + const ring = workerTransitionRings[w]; if (!session || !canvas || !range) return; + // Per-ring-slot in-flight promise. When a slot is mid-blend, its + // promise is non-null; before reusing the slot for a new capture we + // await it so the buffer triple is free + the encoder has seen the + // earlier frame (writeEncoded gates ordering via the reorder buffer). + const ringInFlight: Array | null> = ring ? ring.map(() => null) : []; + let nextRingIdx = 0; for (let i = range.start; i < range.end; i++) { assertNotAborted(); const time = (i * job.config.fps.den) / job.config.fps.num; const activeTransition = transitionFramesSet.has(i) ? transitionRanges.find((t) => i >= t.startFrame && i <= t.endFrame) : undefined; - if (activeTransition && buffers) { + if (activeTransition && ring) { + // Pick the next ring slot. If it's still in flight from an earlier + // capture, await it to drain before reusing its buffer triple. + const slot = nextRingIdx; + nextRingIdx = (nextRingIdx + 1) % TRANSITION_RING_DEPTH; + const prev = ringInFlight[slot]; + if (prev) await prev; + const buffers = ring[slot]; + if (!buffers) continue; + // CAPTURE on the DOM worker (this thread). Fills bufferA/bufferB + // synchronously w.r.t. this loop — DOM work can't be pipelined + // because the per-worker browser session is single-threaded. await captureTransitionFrameOnWorker({ session, frameIdx: i, @@ -216,28 +255,48 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise ? 1 : (i - activeTransition.startFrame) / (activeTransition.endFrame - activeTransition.startFrame); - if (poolRef) { - const blendStart = Date.now(); - const result = await poolRef.run({ - shader: activeTransition.shader, - bufferA: buffers.bufferA, - bufferB: buffers.bufferB, - output: buffers.output, - width, - height, - progress, - }); - buffers.bufferA = result.bufferA; - buffers.bufferB = result.bufferB; - buffers.output = result.output; - addHdrTiming(hdrPerf, "transitionCompositeMs", blendStart); - } else { - const transitionFn: TransitionFn = TRANSITIONS[activeTransition.shader] ?? crossfade; - const blendStart = Date.now(); - transitionFn(buffers.bufferA, buffers.bufferB, buffers.output, width, height, progress); - addHdrTiming(hdrPerf, "transitionCompositeMs", blendStart); - } - await writeEncoded(i, buffers.output); + // BLEND + ENCODE without awaiting. The promise drains back into + // `ringInFlight[slot]`; the next iteration that picks `slot` + // awaits it. The encoder reorder buffer fences ordering so out- + // of-order blend completion is fine. + const frameIdx = i; + const dispatch: Promise = (async () => { + if (poolRef) { + const blendStart = Date.now(); + const result = await poolRef.run({ + shader: activeTransition.shader, + bufferA: buffers.bufferA, + bufferB: buffers.bufferB, + output: buffers.output, + width, + height, + progress, + }); + buffers.bufferA = result.bufferA; + buffers.bufferB = result.bufferB; + buffers.output = result.output; + addHdrTiming(hdrPerf, "transitionCompositeMs", blendStart); + } else { + const transitionFn: TransitionFn = TRANSITIONS[activeTransition.shader] ?? crossfade; + const blendStart = Date.now(); + transitionFn( + buffers.bufferA, + buffers.bufferB, + buffers.output, + width, + height, + progress, + ); + addHdrTiming(hdrPerf, "transitionCompositeMs", blendStart); + } + await writeEncoded(frameIdx, buffers.output); + })(); + // Catch on a separate handle so an unhandled-rejection can't fire + // if no one awaits this slot before the worker exits. The error + // is re-thrown on the next await (slot reuse OR end-of-task drain). + ringInFlight[slot] = dispatch.catch((err: unknown) => { + throw err instanceof Error ? err : new Error(String(err)); + }); } else { const beforeCaptureHook = session.onBeforeCapture; let timingStart = Date.now(); @@ -268,6 +327,12 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise await writeEncoded(i, canvas); } } + // Drain any pipelined blends still in flight on this worker before + // returning. If any rejected, the rejection bubbles here so + // `Promise.all` over `workerTaskOf` sees the failure. + for (const pending of ringInFlight) { + if (pending) await pending; + } }; await Promise.all(sessions.map((_, w) => workerTaskOf(w))); await reorderBuffer.waitForAllDone();