Skip to content

Commit e49154f

Browse files
trivikraduh95
authored andcommitted
stream: add sync iterable fast path to pipeTo
Avoid normalizing sync iterable sources through from() when pipeTo() has no transforms or signal and the writer can accept sync writes. This keeps writes incremental while preserving async fallback for values that still need it. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: #63318 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Ethan Arrowood <ethan@arrowood.dev>
1 parent 537455e commit e49154f

3 files changed

Lines changed: 155 additions & 6 deletions

File tree

benchmark/streams/iter-throughput-pipeto.js

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const common = require('../common.js');
66
const { Readable, Writable, pipeline } = require('stream');
77

88
const bench = common.createBenchmark(main, {
9-
api: ['classic', 'webstream', 'iter', 'iter-sync'],
9+
api: ['classic', 'webstream', 'iter', 'iter-sync-source', 'iter-sync'],
1010
datasize: [1024 * 1024, 16 * 1024 * 1024, 64 * 1024 * 1024],
1111
n: [5],
1212
}, {
@@ -26,6 +26,8 @@ function main({ api, datasize, n }) {
2626
return benchWebStream(chunk, datasize, n, totalOps);
2727
case 'iter':
2828
return benchIter(chunk, datasize, n, totalOps);
29+
case 'iter-sync-source':
30+
return benchIterSyncSource(chunk, datasize, n, totalOps);
2931
case 'iter-sync':
3032
return benchIterSync(chunk, datasize, n, totalOps);
3133
}
@@ -101,6 +103,29 @@ function benchIter(chunk, datasize, n, totalOps) {
101103
})();
102104
}
103105

106+
function benchIterSyncSource(chunk, datasize, n, totalOps) {
107+
const { pipeTo } = require('stream/iter');
108+
109+
async function run() {
110+
let remaining = datasize;
111+
function* source() {
112+
while (remaining > 0) {
113+
const size = Math.min(remaining, chunk.length);
114+
remaining -= size;
115+
yield size === chunk.length ? chunk : chunk.subarray(0, size);
116+
}
117+
}
118+
const writer = { write() {}, writeSync() { return true; } };
119+
await pipeTo(source(), writer);
120+
}
121+
122+
(async () => {
123+
bench.start();
124+
for (let i = 0; i < n; i++) await run();
125+
bench.end(totalOps);
126+
})();
127+
}
128+
104129
function benchIterSync(chunk, datasize, n, totalOps) {
105130
const { pipeToSync } = require('stream/iter');
106131

lib/internal/streams/iter/pull.js

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
const {
1010
ArrayBufferIsView,
11+
ArrayFromAsync,
12+
ArrayIsArray,
1113
ArrayPrototypePush,
1214
ArrayPrototypeSlice,
1315
PromisePrototypeThen,
@@ -38,7 +40,9 @@ const {
3840
fromSync,
3941
isSyncIterable,
4042
isAsyncIterable,
43+
isPrimitiveChunk,
4144
isUint8ArrayBatch,
45+
normalizeAsyncValue,
4246
} = require('internal/streams/iter/from');
4347

4448
const {
@@ -53,7 +57,10 @@ const {
5357
const {
5458
drainableProtocol,
5559
kSyncWriteAcceptedOnFalse,
60+
kValidatedSource,
5661
kValidatedTransform,
62+
toAsyncStreamable,
63+
toStreamable,
5764
} = require('internal/streams/iter/types');
5865

5966
// =============================================================================
@@ -116,6 +123,22 @@ function parsePipeToArgs(args, requiredMethod) {
116123
};
117124
}
118125

126+
function canUseSyncIterablePipeToFastPath(source, transforms, signal) {
127+
if (signal !== undefined ||
128+
transforms.length !== 0 ||
129+
isPrimitiveChunk(source) ||
130+
ArrayIsArray(source) ||
131+
source?.[kValidatedSource] ||
132+
!isSyncIterable(source) ||
133+
isAsyncIterable(source)) {
134+
return false;
135+
}
136+
137+
// Preserve from()'s top-level protocol precedence for custom iterables.
138+
return typeof source[toAsyncStreamable] !== 'function' &&
139+
typeof source[toStreamable] !== 'function';
140+
}
141+
119142
// =============================================================================
120143
// Transform Output Flattening
121144
// =============================================================================
@@ -822,12 +845,13 @@ async function pipeTo(source, ...args) {
822845
// Check for abort
823846
signal?.throwIfAborted();
824847

825-
// Normalize source via from()
826-
const normalized = from(source);
848+
const hasWriteSync = typeof writer.writeSync === 'function';
849+
const useSyncIterableFastPath =
850+
hasWriteSync && canUseSyncIterablePipeToFastPath(source, transforms, signal);
851+
const normalized = useSyncIterableFastPath ? undefined : from(source);
827852

828853
let totalBytes = 0;
829854
const hasWritev = typeof writer.writev === 'function';
830-
const hasWriteSync = typeof writer.writeSync === 'function';
831855
const hasWritevSync = typeof writer.writevSync === 'function';
832856
const hasEndSync = typeof writer.endSync === 'function';
833857
const syncFalseCanBeAccepted = writer[kSyncWriteAcceptedOnFalse] === true;
@@ -908,8 +932,32 @@ async function pipeTo(source, ...args) {
908932
}
909933

910934
try {
911-
// Fast path: no transforms - iterate normalized source directly
912-
if (transforms.length === 0) {
935+
if (useSyncIterableFastPath) {
936+
// Avoid from()'s async sync-iterable batching path. This keeps writes
937+
// incremental for synchronous sources while preserving async
938+
// normalization for non-primitive yielded values.
939+
for (const value of source) {
940+
if (isUint8ArrayBatch(value)) {
941+
if (value.length > 0) {
942+
const p = writeBatch(value);
943+
if (p) await p;
944+
}
945+
continue;
946+
}
947+
if (isUint8Array(value)) {
948+
const p = writeBatch([value]);
949+
if (p) await p;
950+
continue;
951+
}
952+
953+
const batch = await ArrayFromAsync(normalizeAsyncValue(value));
954+
if (batch.length > 0) {
955+
const p = writeBatch(batch);
956+
if (p) await p;
957+
}
958+
}
959+
} else if (transforms.length === 0) {
960+
// Fast path: no transforms - iterate normalized source directly
913961
if (signal) {
914962
for await (const batch of normalized) {
915963
signal.throwIfAborted();

test/parallel/test-stream-iter-pipeto.js

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,79 @@ async function testPipeToSyncMinimalWriter() {
219219
assert.strictEqual(chunks.length > 0, true);
220220
}
221221

222+
async function testPipeToSyncIterableFastPathWritesIncrementally() {
223+
let pulled = 0;
224+
let firstWritePulled = 0;
225+
const chunks = [];
226+
function* source() {
227+
for (let i = 0; i < 3; i++) {
228+
pulled++;
229+
yield new Uint8Array([0x61 + i]);
230+
}
231+
}
232+
const writer = {
233+
write: common.mustNotCall(),
234+
writeSync(chunk) {
235+
if (firstWritePulled === 0) {
236+
firstWritePulled = pulled;
237+
}
238+
chunks.push(chunk);
239+
return true;
240+
},
241+
};
242+
243+
const totalBytes = await pipeTo(source(), writer);
244+
assert.strictEqual(totalBytes, 3);
245+
assert.strictEqual(firstWritePulled, 1);
246+
assert.deepStrictEqual(chunks, [
247+
new Uint8Array([0x61]),
248+
new Uint8Array([0x62]),
249+
new Uint8Array([0x63]),
250+
]);
251+
}
252+
253+
async function testPipeToSyncIterableFastPathWriteFallback() {
254+
const asyncWrites = [];
255+
const writer = {
256+
writeSync(chunk) {
257+
return chunk[0] !== 0x62;
258+
},
259+
async write(chunk) {
260+
asyncWrites.push(chunk);
261+
},
262+
};
263+
function* source() {
264+
yield new Uint8Array([0x61]);
265+
yield new Uint8Array([0x62]);
266+
yield new Uint8Array([0x63]);
267+
}
268+
269+
const totalBytes = await pipeTo(source(), writer);
270+
assert.strictEqual(totalBytes, 3);
271+
assert.deepStrictEqual(asyncWrites, [new Uint8Array([0x62])]);
272+
}
273+
274+
async function testPipeToSyncIterableFastPathAsyncValue() {
275+
const chunks = [];
276+
const writer = {
277+
write: common.mustNotCall(),
278+
writeSync(chunk) {
279+
chunks.push(chunk);
280+
return true;
281+
},
282+
};
283+
function* source() {
284+
yield Promise.resolve('a');
285+
yield new Uint8Array([0x62]);
286+
}
287+
288+
const totalBytes = await pipeTo(source(), writer);
289+
assert.strictEqual(totalBytes, 2);
290+
const result = new TextDecoder().decode(
291+
new Uint8Array(chunks.reduce((acc, c) => [...acc, ...c], [])));
292+
assert.strictEqual(result, 'ab');
293+
}
294+
222295
Promise.all([
223296
testPipeToSync(),
224297
testPipeTo(),
@@ -234,4 +307,7 @@ Promise.all([
234307
testPipeToSyncPreventClose(),
235308
testPipeToMinimalWriter(),
236309
testPipeToSyncMinimalWriter(),
310+
testPipeToSyncIterableFastPathWritesIncrementally(),
311+
testPipeToSyncIterableFastPathWriteFallback(),
312+
testPipeToSyncIterableFastPathAsyncValue(),
237313
]).then(common.mustCall());

0 commit comments

Comments
 (0)