Skip to content

Commit e21b8a4

Browse files
trivikraduh95
authored andcommitted
stream: limit iter from sync iterable batches
Bound sync iterable normalization in from() and fromSync() to FROM_BATCH_SIZE. This avoids unbounded batches for from() sync iterable fallbacks and lets fromSync() coalesce plain Uint8Array values for writev paths. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: #63324 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Ethan Arrowood <ethan@arrowood.dev>
1 parent 3bdb64d commit e21b8a4

5 files changed

Lines changed: 216 additions & 20 deletions

File tree

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Measures batching behavior for stream/iter from() and fromSync()
2+
// with plain synchronous Uint8Array iterables.
3+
'use strict';
4+
5+
const common = require('../common.js');
6+
const { closeSync, openSync, writeSync, writevSync } = require('fs');
7+
const { devNull } = require('os');
8+
9+
const bench = common.createBenchmark(main, {
10+
method: ['from-first-batch', 'from-sync-writev'],
11+
chunks: [256, 4096, 16384],
12+
chunkSize: [16],
13+
n: [100, 1000],
14+
}, {
15+
flags: ['--experimental-stream-iter'],
16+
combinationFilter({ method, chunks, n }) {
17+
if (n === 1) {
18+
return true;
19+
}
20+
if (method === 'from-first-batch') {
21+
return n === 1000;
22+
}
23+
return n === 100 && chunks !== 16384;
24+
},
25+
test: {
26+
chunks: 256,
27+
chunkSize: 16,
28+
n: 1,
29+
},
30+
});
31+
32+
function main({ method, chunks, chunkSize, n }) {
33+
switch (method) {
34+
case 'from-first-batch':
35+
return benchFromFirstBatch(chunks, chunkSize, n);
36+
case 'from-sync-writev':
37+
return benchFromSyncWritev(chunks, chunkSize, n);
38+
}
39+
}
40+
41+
function* source(chunks, chunk) {
42+
for (let i = 0; i < chunks; i++) {
43+
yield chunk;
44+
}
45+
}
46+
47+
function benchFromFirstBatch(chunks, chunkSize, n) {
48+
const { from } = require('stream/iter');
49+
const chunk = new Uint8Array(chunkSize);
50+
let seen = 0;
51+
52+
(async () => {
53+
bench.start();
54+
for (let i = 0; i < n; i++) {
55+
const iterator = from(source(chunks, chunk))[Symbol.asyncIterator]();
56+
const { value, done } = await iterator.next();
57+
if (done || value.length === 0) {
58+
throw new Error('expected a batch');
59+
}
60+
seen += value.length;
61+
}
62+
bench.end(n);
63+
if (seen === 0) {
64+
throw new Error('expected chunks');
65+
}
66+
})();
67+
}
68+
69+
function benchFromSyncWritev(chunks, chunkSize, n) {
70+
const { pipeToSync } = require('stream/iter');
71+
const chunk = new Uint8Array(chunkSize);
72+
const expected = chunks * chunkSize * n;
73+
let seen = 0;
74+
let total = 0;
75+
const fd = openSync(devNull, 'w');
76+
const writer = {
77+
writeSync(chunk) {
78+
writeSync(fd, chunk);
79+
seen++;
80+
},
81+
writevSync(batch) {
82+
writevSync(fd, batch);
83+
seen += batch.length;
84+
},
85+
};
86+
87+
try {
88+
bench.start();
89+
for (let i = 0; i < n; i++) {
90+
total += pipeToSync(source(chunks, chunk), writer);
91+
}
92+
bench.end(chunks * n);
93+
} finally {
94+
closeSync(fd);
95+
}
96+
97+
if (total !== expected || seen !== chunks * n) {
98+
throw new Error('unexpected chunk count');
99+
}
100+
}

lib/internal/streams/iter/from.js

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ const {
4747
toUint8Array,
4848
} = require('internal/streams/iter/utils');
4949

50-
// Maximum number of chunks to yield per batch from from(Uint8Array[]).
50+
// Maximum number of chunks to yield per batch from from()/fromSync().
5151
// Bounds peak memory when arrays flow through transforms, which must
5252
// allocate output for the entire batch at once.
5353
const FROM_BATCH_SIZE = 128;
@@ -190,33 +190,66 @@ function isUint8ArrayBatch(value) {
190190
return true;
191191
}
192192

193+
function* yieldBoundedBatch(batch) {
194+
if (batch.length === 0) {
195+
return;
196+
}
197+
if (batch.length <= FROM_BATCH_SIZE) {
198+
yield batch;
199+
return;
200+
}
201+
for (let i = 0; i < batch.length; i += FROM_BATCH_SIZE) {
202+
yield ArrayPrototypeSlice(batch, i, i + FROM_BATCH_SIZE);
203+
}
204+
}
205+
193206
/**
194207
* Normalize a sync streamable source, yielding batches of Uint8Array.
195208
* @param {Iterable} source
196209
* @yields {Uint8Array[]}
197210
*/
198211
function* normalizeSyncSource(source) {
212+
let batch = [];
213+
199214
for (const value of source) {
200215
// Fast path 1: value is already a Uint8Array[] batch
201216
if (isUint8ArrayBatch(value)) {
202-
if (value.length > 0) {
203-
yield value;
217+
if (batch.length > 0) {
218+
yield batch;
219+
batch = [];
204220
}
221+
yield* yieldBoundedBatch(value);
205222
continue;
206223
}
207224
// Fast path 2: value is a single Uint8Array (very common)
208225
if (isUint8Array(value)) {
209-
yield [value];
226+
ArrayPrototypePush(batch, value);
227+
if (batch.length === FROM_BATCH_SIZE) {
228+
yield batch;
229+
batch = [];
230+
}
210231
continue;
211232
}
212233
// Slow path: normalize the value
213-
const batch = [];
214-
for (const chunk of normalizeSyncValue(value)) {
215-
ArrayPrototypePush(batch, chunk);
216-
}
217234
if (batch.length > 0) {
218235
yield batch;
236+
batch = [];
237+
}
238+
let valueBatch = [];
239+
for (const chunk of normalizeSyncValue(value)) {
240+
ArrayPrototypePush(valueBatch, chunk);
241+
if (valueBatch.length === FROM_BATCH_SIZE) {
242+
yield valueBatch;
243+
valueBatch = [];
244+
}
219245
}
246+
if (valueBatch.length > 0) {
247+
yield valueBatch;
248+
}
249+
}
250+
251+
if (batch.length > 0) {
252+
yield batch;
220253
}
221254
}
222255

@@ -329,36 +362,42 @@ async function* normalizeAsyncSource(source) {
329362
return;
330363
}
331364

332-
// Fall back to sync iteration - batch all sync values together
365+
// Fall back to sync iteration - batch sync values together with a bound.
333366
if (isSyncIterable(source)) {
334-
const batch = [];
367+
let batch = [];
335368

336369
for (const value of source) {
337370
// Fast path 1: value is already a Uint8Array[] batch
338371
if (isUint8ArrayBatch(value)) {
339372
// Flush any accumulated batch first
340373
if (batch.length > 0) {
341-
yield ArrayPrototypeSlice(batch);
342-
batch.length = 0;
343-
}
344-
if (value.length > 0) {
345-
yield value;
374+
yield batch;
375+
batch = [];
346376
}
377+
yield* yieldBoundedBatch(value);
347378
continue;
348379
}
349380
// Fast path 2: value is a single Uint8Array (very common)
350381
if (isUint8Array(value)) {
351382
ArrayPrototypePush(batch, value);
383+
if (batch.length === FROM_BATCH_SIZE) {
384+
yield batch;
385+
batch = [];
386+
}
352387
continue;
353388
}
354389
// Slow path: normalize the value - must flush and yield individually
355390
if (batch.length > 0) {
356-
yield ArrayPrototypeSlice(batch);
357-
batch.length = 0;
391+
yield batch;
392+
batch = [];
358393
}
359-
const asyncBatch = [];
394+
let asyncBatch = [];
360395
for await (const chunk of normalizeAsyncValue(value)) {
361396
ArrayPrototypePush(asyncBatch, chunk);
397+
if (asyncBatch.length === FROM_BATCH_SIZE) {
398+
yield asyncBatch;
399+
asyncBatch = [];
400+
}
362401
}
363402
if (asyncBatch.length > 0) {
364403
yield asyncBatch;

test/parallel/test-stream-iter-from-coverage.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,22 @@ async function testFromSyncSubBatching() {
3131
assert.strictEqual(totalChunks, 200);
3232
}
3333

34+
// fromSync: generic sync iterables of Uint8Array use bounded batches
35+
async function testFromSyncIterableSubBatching() {
36+
function* gen() {
37+
for (let i = 0; i < 200; i++) {
38+
yield new Uint8Array([i & 0xFF]);
39+
}
40+
}
41+
const batches = [];
42+
for (const batch of fromSync(gen())) {
43+
batches.push(batch);
44+
}
45+
assert.strictEqual(batches.length, 2);
46+
assert.strictEqual(batches[0].length, 128);
47+
assert.strictEqual(batches[1].length, 72);
48+
}
49+
3450
// from: Uint8Array[] with > 128 elements triggers sub-batching (async)
3551
async function testFromAsyncSubBatching() {
3652
const bigBatch = Array.from({ length: 200 },
@@ -44,6 +60,22 @@ async function testFromAsyncSubBatching() {
4460
assert.strictEqual(batches[1].length, 72);
4561
}
4662

63+
// from: sync iterables use bounded batches instead of one unbounded batch
64+
async function testFromAsyncSyncIterableSubBatching() {
65+
function* gen() {
66+
for (let i = 0; i < 200; i++) {
67+
yield new Uint8Array([i & 0xFF]);
68+
}
69+
}
70+
const batches = [];
71+
for await (const batch of from(gen())) {
72+
batches.push(batch);
73+
}
74+
assert.strictEqual(batches.length, 2);
75+
assert.strictEqual(batches[0].length, 128);
76+
assert.strictEqual(batches[1].length, 72);
77+
}
78+
4779
// Exact boundary: 128 elements → single batch (no split)
4880
async function testFromSubBatchingBoundary() {
4981
const exactBatch = Array.from({ length: 128 },
@@ -133,7 +165,9 @@ async function testFromSyncInvalidYield() {
133165

134166
Promise.all([
135167
testFromSyncSubBatching(),
168+
testFromSyncIterableSubBatching(),
136169
testFromAsyncSubBatching(),
170+
testFromAsyncSyncIterableSubBatching(),
137171
testFromSubBatchingBoundary(),
138172
testFromSubBatchingBoundaryPlus1(),
139173
testFromSyncDataViewInGenerator(),

test/parallel/test-stream-iter-from-sync.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ function testFromSyncGenerator() {
6666
for (const batch of readable) {
6767
batches.push(batch);
6868
}
69-
assert.strictEqual(batches.length, 2);
69+
assert.strictEqual(batches.length, 1);
70+
assert.strictEqual(batches[0].length, 2);
7071
assert.deepStrictEqual(batches[0][0], new Uint8Array([1, 2]));
71-
assert.deepStrictEqual(batches[1][0], new Uint8Array([3, 4]));
72+
assert.deepStrictEqual(batches[0][1], new Uint8Array([3, 4]));
7273
}
7374

7475
function testFromSyncNestedIterables() {

test/parallel/test-stream-iter-pipeto-writev.js

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,27 @@ async function testPipeToSyncWritev() {
151151
assert.ok(batches.some((b) => b.length > 1));
152152
}
153153

154+
// pipeToSync batches plain Uint8Array chunks for writevSync
155+
async function testPipeToSyncPlainChunksWritev() {
156+
const batches = [];
157+
const writes = [];
158+
const writer = {
159+
writevSync(chunks) { batches.push(chunks); },
160+
writeSync(chunk) { writes.push(chunk); return true; },
161+
endSync() { return 0; },
162+
};
163+
function* source() {
164+
yield new Uint8Array([1]);
165+
yield new Uint8Array([2]);
166+
yield new Uint8Array([3]);
167+
}
168+
const total = pipeToSync(source(), writer);
169+
assert.strictEqual(total, 3);
170+
assert.strictEqual(batches.length, 1);
171+
assert.strictEqual(batches[0].length, 3);
172+
assert.strictEqual(writes.length, 0);
173+
}
174+
154175
// pipeToSync with writer that has write() and writeSync() — writeSync preferred
155176
async function testPipeToSyncWriteFallback() {
156177
const syncWrites = [];
@@ -174,5 +195,6 @@ Promise.all([
174195
testWriteSyncAlwaysFails(),
175196
testPushWriterBlockSyncFalseAccepted(),
176197
testPipeToSyncWritev(),
198+
testPipeToSyncPlainChunksWritev(),
177199
testPipeToSyncWriteFallback(),
178200
]).then(common.mustCall());

0 commit comments

Comments
 (0)