Skip to content

Commit 22e3579

Browse files
trivikraduh95
authored andcommitted
stream: avoid retrying accepted pipeTo writes
PushWriter in block backpressure mode can return false from writeSync() and writevSync() after accepting data. Treat that false return as backpressure and wait for drain instead of retrying the same chunks asynchronously. Fixes: #63296 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: #63297 Fixes: #63296 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Ethan Arrowood <ethan@arrowood.dev>
1 parent 189d43a commit 22e3579

4 files changed

Lines changed: 76 additions & 3 deletions

File tree

lib/internal/streams/iter/pull.js

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ const {
5151
} = require('internal/streams/iter/utils');
5252

5353
const {
54+
drainableProtocol,
55+
kSyncWriteAcceptedOnFalse,
5456
kValidatedTransform,
5557
} = require('internal/streams/iter/types');
5658

@@ -828,13 +830,33 @@ async function pipeTo(source, ...args) {
828830
const hasWriteSync = typeof writer.writeSync === 'function';
829831
const hasWritevSync = typeof writer.writevSync === 'function';
830832
const hasEndSync = typeof writer.endSync === 'function';
833+
const syncFalseCanBeAccepted = writer[kSyncWriteAcceptedOnFalse] === true;
834+
835+
function syncFalseWasAccepted() {
836+
return syncFalseCanBeAccepted && writer.desiredSize === 0;
837+
}
838+
839+
function waitForSyncBackpressure() {
840+
const ondrain = writer[drainableProtocol];
841+
return ondrain?.call(writer);
842+
}
843+
844+
async function writeBatchAfterAcceptedBackpressure(batch, startIndex) {
845+
await waitForSyncBackpressure();
846+
await writeBatchAsyncFallback(batch, startIndex);
847+
}
848+
831849
// Async fallback for writeBatch when sync write fails partway through.
832850
// Continues writing from batch[startIndex] using async write().
833851
async function writeBatchAsyncFallback(batch, startIndex) {
834852
for (let i = startIndex; i < batch.length; i++) {
835853
const chunk = batch[i];
836854
if (hasWriteSync && writer.writeSync(chunk)) {
837855
// Sync retry succeeded
856+
} else if (syncFalseWasAccepted()) {
857+
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
858+
await waitForSyncBackpressure();
859+
continue;
838860
} else {
839861
const result = writer.write(
840862
chunk, signal ? { __proto__: null, signal } : undefined);
@@ -852,6 +874,12 @@ async function pipeTo(source, ...args) {
852874
function writeBatch(batch) {
853875
if (hasWritev && batch.length > 1) {
854876
if (!hasWritevSync || !writer.writevSync(batch)) {
877+
if (hasWritevSync && syncFalseWasAccepted()) {
878+
for (let i = 0; i < batch.length; i++) {
879+
totalBytes += TypedArrayPrototypeGetByteLength(batch[i]);
880+
}
881+
return waitForSyncBackpressure();
882+
}
855883
const opts = signal ? { __proto__: null, signal } : undefined;
856884
return PromisePrototypeThen(writer.writev(batch, opts), () => {
857885
for (let i = 0; i < batch.length; i++) {
@@ -867,6 +895,10 @@ async function pipeTo(source, ...args) {
867895
for (let i = 0; i < batch.length; i++) {
868896
const chunk = batch[i];
869897
if (!hasWriteSync || !writer.writeSync(chunk)) {
898+
if (hasWriteSync && syncFalseWasAccepted()) {
899+
totalBytes += TypedArrayPrototypeGetByteLength(chunk);
900+
return writeBatchAfterAcceptedBackpressure(batch, i + 1);
901+
}
870902
// Sync path failed at index i - fall back to async for the rest.
871903
// Count bytes for chunks already written synchronously (0..i-1).
872904
return writeBatchAsyncFallback(batch, i);

lib/internal/streams/iter/push.js

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const {
3232

3333
const {
3434
drainableProtocol,
35+
kSyncWriteAcceptedOnFalse,
3536
} = require('internal/streams/iter/types');
3637

3738
const {
@@ -560,6 +561,10 @@ class PushWriter {
560561
return this.#queue.desiredSize;
561562
}
562563

564+
get [kSyncWriteAcceptedOnFalse]() {
565+
return this.#queue.backpressurePolicy === 'block';
566+
}
567+
563568
write(chunk, options) {
564569
if (!options?.signal && this.#queue.canWriteSync()) {
565570
const bytes = toUint8Array(chunk);
@@ -586,7 +591,8 @@ class PushWriter {
586591
writeSync(chunk) {
587592
const bytes = toUint8Array(chunk);
588593
const result = this.#queue.writeSync([bytes]);
589-
if (!result && this.#queue.backpressurePolicy === 'block') {
594+
if (!result && this.#queue.backpressurePolicy === 'block' &&
595+
this.#queue.desiredSize === 0) {
590596
// Block policy: force-enqueue and return false as backpressure signal.
591597
// Data IS accepted; false tells caller to slow down.
592598
this.#queue.forceEnqueue([bytes]);
@@ -601,7 +607,8 @@ class PushWriter {
601607
}
602608
const bytes = convertChunks(chunks);
603609
const result = this.#queue.writeSync(bytes);
604-
if (!result && this.#queue.backpressurePolicy === 'block') {
610+
if (!result && this.#queue.backpressurePolicy === 'block' &&
611+
this.#queue.desiredSize === 0) {
605612
this.#queue.forceEnqueue(bytes);
606613
return false;
607614
}

lib/internal/streams/iter/types.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,12 @@ const kValidatedTransform = Symbol('kValidatedTransform');
6464
*/
6565
const kValidatedSource = Symbol('kValidatedSource');
6666

67+
const kSyncWriteAcceptedOnFalse = Symbol('kSyncWriteAcceptedOnFalse');
68+
6769
module.exports = {
6870
broadcastProtocol,
6971
drainableProtocol,
72+
kSyncWriteAcceptedOnFalse,
7073
kValidatedSource,
7174
kValidatedTransform,
7275
shareProtocol,

test/parallel/test-stream-iter-pipeto-writev.js

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55

66
const common = require('../common');
77
const assert = require('assert');
8-
const { pipeTo, pipeToSync } = require('stream/iter');
8+
const { setImmediate: setImmediatePromise } = require('timers/promises');
9+
const { pipeTo, pipeToSync, push, text } = require('stream/iter');
910

1011
// Multi-chunk batch with writevSync (sync success path)
1112
async function testWritevSyncSuccess() {
@@ -104,6 +105,35 @@ async function testWriteSyncAlwaysFails() {
104105
assert.strictEqual(total, 2);
105106
}
106107

108+
// PushWriter block mode accepts sync writes even when returning false for
109+
// backpressure. pipeTo must wait for drain, not retry the same write.
110+
async function assertPushWriterBlockPipeTo(source, expected, expectedTotal) {
111+
const { writer, readable } = push({
112+
highWaterMark: 1,
113+
backpressure: 'block',
114+
});
115+
116+
const pipe = pipeTo(source, writer);
117+
await setImmediatePromise();
118+
const data = await text(readable);
119+
const total = await pipe;
120+
121+
assert.strictEqual(data, expected);
122+
assert.strictEqual(total, expectedTotal);
123+
}
124+
125+
async function testPushWriterBlockSyncFalseAccepted() {
126+
await assertPushWriterBlockPipeTo((async function*() {
127+
yield [new Uint8Array([97])];
128+
yield [new Uint8Array([98])];
129+
})(), 'ab', 2);
130+
131+
await assertPushWriterBlockPipeTo((async function*() {
132+
yield [new Uint8Array([97, 98])];
133+
yield [new Uint8Array([99]), new Uint8Array([100])];
134+
})(), 'abcd', 4);
135+
}
136+
107137
// pipeToSync with writevSync
108138
async function testPipeToSyncWritev() {
109139
const batches = [];
@@ -142,6 +172,7 @@ Promise.all([
142172
testWritevSyncFails(),
143173
testWriteSyncFailsMidBatch(),
144174
testWriteSyncAlwaysFails(),
175+
testPushWriterBlockSyncFalseAccepted(),
145176
testPipeToSyncWritev(),
146177
testPipeToSyncWriteFallback(),
147178
]).then(common.mustCall());

0 commit comments

Comments
 (0)