Skip to content

Commit b568649

Browse files
trivikraduh95
authored andcommitted
stream: preserve toReadableSync batch after backpressure
Keep the current batch and index across _read() calls so chunks that remain after push() returns false are emitted on later reads. Fixes: #63275 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: #63276 Fixes: #63275 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 14d3924 commit b568649

2 files changed

Lines changed: 34 additions & 4 deletions

File tree

lib/internal/streams/iter/classic.js

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -363,23 +363,37 @@ function toReadableSync(source, options = kNullPrototype) {
363363

364364
const ReadableCtor = lazyReadable();
365365
const iterator = source[SymbolIterator]();
366+
let hasBatch = false;
367+
let batch;
368+
let batchIndex = 0;
366369

367370
return new ReadableCtor({
368371
__proto__: null,
369372
highWaterMark,
370373
read() {
371374
for (;;) {
372-
const { value: batch, done } = iterator.next();
375+
if (hasBatch) {
376+
while (batchIndex < batch.length) {
377+
if (!this.push(batch[batchIndex++])) return;
378+
}
379+
batch = undefined;
380+
hasBatch = false;
381+
batchIndex = 0;
382+
}
383+
384+
const result = iterator.next();
385+
const { done } = result;
373386
if (done) {
374387
this.push(null);
375388
return;
376389
}
377-
for (let i = 0; i < batch.length; i++) {
378-
if (!this.push(batch[i])) return;
379-
}
390+
batch = result.value;
391+
hasBatch = true;
380392
}
381393
},
382394
destroy(err, cb) {
395+
batch = undefined;
396+
hasBatch = false;
383397
if (typeof iterator.return === 'function') iterator.return();
384398
cb(err);
385399
},

test/parallel/test-stream-iter-to-readable.js

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,21 @@ async function testBackpressureSync() {
439439
assert.strictEqual(chunks.length, 10);
440440
}
441441

442+
// =============================================================================
443+
// fromStreamIterSync: backpressure within a batch
444+
// =============================================================================
445+
446+
async function testBackpressureSyncMultiChunkBatch() {
447+
function* gen() {
448+
yield [Buffer.from('a'), Buffer.from('b'), Buffer.from('c')];
449+
}
450+
451+
const readable = toReadableSync(gen(), { highWaterMark: 1 });
452+
const result = await collect(readable);
453+
454+
assert.strictEqual(result.toString(), 'abc');
455+
}
456+
442457
// =============================================================================
443458
// fromStreamIterSync: source error
444459
// =============================================================================
@@ -613,6 +628,7 @@ Promise.all([
613628
testWithTransformAsync(),
614629
testBasicSync(),
615630
testBackpressureSync(),
631+
testBackpressureSyncMultiChunkBatch(),
616632
testErrorSync(),
617633
testDestroySync(),
618634
testRoundTrip(),

0 commit comments

Comments
 (0)