Skip to content

Commit 526313b

Browse files
jasnelladuh95
authored andcommitted
quic: fixup quic stream variable chunk len
Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: #63230 Fixes: #63216 Reviewed-By: Tim Perry <pimterry@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 4f1426d commit 526313b

3 files changed

Lines changed: 110 additions & 16 deletions

File tree

lib/internal/quic/quic.js

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,19 +1279,14 @@ function waitForDrain(stream) {
12791279

12801280
// Writes a batch to the handle, awaiting drain if backpressured.
12811281
// Returns true if the stream was destroyed during the wait.
1282-
// Checks writeDesiredSize before writing to enforce backpressure
1283-
// against the outbound DataQueue's uncommitted bytes.
1282+
// Only waits when writeDesiredSize is 0 (no capacity at all).
1283+
// When there is any capacity, the write proceeds even if the batch
1284+
// is larger -- the C++ side buffers the data and writeDesiredSize
1285+
// drops toward 0, letting the normal drain mechanism take over.
12841286
async function writeBatchWithDrain(handle, stream, batch) {
12851287
const state = getQuicStreamState(stream);
12861288

1287-
// Calculate total batch size for the capacity check.
1288-
let len = 0;
1289-
for (const chunk of batch) len += TypedArrayPrototypeGetByteLength(chunk);
1290-
1291-
// If insufficient capacity, wait for the C++ drain signal which
1292-
// fires when writeDesiredSize transitions from 0 to > 0 (i.e.,
1293-
// ngtcp2 has consumed data from the outbound DataQueue).
1294-
if (len > state.writeDesiredSize) {
1289+
if (state.writeDesiredSize === 0) {
12951290
await waitForDrain(stream);
12961291
if (stream.destroyed) return true;
12971292
}
@@ -2029,9 +2024,15 @@ class QuicStream {
20292024
chunk = toUint8Array(chunk);
20302025
const len = TypedArrayPrototypeGetByteLength(chunk);
20312026
if (len === 0) return true;
2032-
// Refuse the write if the chunk doesn't fit in the available
2033-
// buffer capacity. The caller should wait for drain and retry.
2034-
if (len > stream.#state.writeDesiredSize) return false;
2027+
// Refuse the write only when there is no available capacity at
2028+
// all. When writeDesiredSize > 0 we allow the write even if the
2029+
// chunk is larger than the remaining capacity -- the C++ side
2030+
// will accept the data into the DataQueue and
2031+
// UpdateWriteDesiredSize() will drop writeDesiredSize toward 0,
2032+
// at which point the standard drain mechanism takes over.
2033+
// This follows the Web Streams model where writes beyond the HWM
2034+
// succeed and backpressure applies to *subsequent* writes.
2035+
if (stream.#state.writeDesiredSize === 0) return false;
20352036
const result = handle.write([chunk]);
20362037
if (result === undefined) return false;
20372038
totalBytesWritten += len;
@@ -2070,7 +2071,7 @@ class QuicStream {
20702071
let len = 0;
20712072
for (const c of chunks) len += TypedArrayPrototypeGetByteLength(c);
20722073
if (len === 0) return true;
2073-
if (len > stream.#state.writeDesiredSize) return false;
2074+
if (stream.#state.writeDesiredSize === 0) return false;
20742075
const result = handle.write(chunks);
20752076
if (result === undefined) return false;
20762077
totalBytesWritten += len;

src/quic/streams.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1592,8 +1592,10 @@ void Stream::UpdateWriteDesiredSize() {
15921592
uint32_t old_size = state_->write_desired_size;
15931593
state_->write_desired_size = clamped;
15941594

1595-
// Fire drain when transitioning from 0 to non-zero
1596-
if (old_size == 0 && desired > 0) {
1595+
// Fire drain when transitioning from 0 to non-zero.
1596+
// writeDesiredSize == 0 means the buffer is full or flow control is
1597+
// exhausted, so the JS side may be waiting for capacity.
1598+
if (old_size == 0 && clamped > 0) {
15971599
EmitDrain();
15981600
}
15991601
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Flags: --experimental-quic --experimental-stream-iter --no-warnings
2+
3+
// Test: bidirectional data transfer with varying chunk sizes.
4+
// This is a regression test for a stall caused by a mismatch between
5+
// writeSync (which rejects when chunk > writeDesiredSize) and
6+
// drainableProtocol (which returned null when writeDesiredSize > 0).
7+
// When chunks don't evenly fill the high water mark, writeDesiredSize
8+
// can be positive but smaller than the next chunk, causing the
9+
// while(!writeSync) { dp(); await } loop to spin without yielding.
10+
// See: https://github.com/nodejs/node/issues/63216
11+
12+
import { hasQuic, skip, mustCall } from '../common/index.mjs';
13+
import assert from 'node:assert';
14+
15+
const { strictEqual } = assert;
16+
17+
if (!hasQuic) {
18+
skip('QUIC is not enabled');
19+
}
20+
21+
const { listen, connect } = await import('../common/quic.mjs');
22+
const { bytes, drainableProtocol: dp } = await import('stream/iter');
23+
24+
// Varying chunk sizes — the pattern of alternating large and small
25+
// chunks is effective at triggering the writeDesiredSize gap.
26+
const chunkSizes = [60000, 12, 50000, 1600, 20000, 30000, 0, 100];
27+
const numChunks = chunkSizes.length;
28+
const byteLength = chunkSizes.reduce((a, b) => a + b, 0);
29+
30+
// Build a deterministic payload so we can verify integrity.
31+
function buildChunk(index) {
32+
const chunk = new Uint8Array(chunkSizes[index]);
33+
const val = index & 0xff;
34+
for (let i = 0; i < chunkSizes[index]; i++) {
35+
chunk[i] = (val + i) & 0xff;
36+
}
37+
return chunk;
38+
}
39+
40+
function checksum(data) {
41+
let sum = 0;
42+
for (let i = 0; i < data.byteLength; i++) {
43+
sum = (sum + data[i]) | 0;
44+
}
45+
return sum;
46+
}
47+
48+
// Compute expected checksum.
49+
let expectedChecksum = 0;
50+
for (let i = 0; i < numChunks; i++) {
51+
const chunk = buildChunk(i);
52+
expectedChecksum = (expectedChecksum + checksum(chunk)) | 0;
53+
}
54+
55+
const done = Promise.withResolvers();
56+
57+
const serverEndpoint = await listen(mustCall((serverSession) => {
58+
serverSession.onstream = mustCall(async (stream) => {
59+
const received = await bytes(stream);
60+
strictEqual(received.byteLength, byteLength);
61+
strictEqual(checksum(received), expectedChecksum);
62+
63+
stream.writer.endSync();
64+
await stream.closed;
65+
serverSession.close();
66+
done.resolve();
67+
});
68+
}));
69+
70+
const clientSession = await connect(serverEndpoint.address);
71+
await clientSession.opened;
72+
73+
const stream = await clientSession.createBidirectionalStream();
74+
const w = stream.writer;
75+
76+
// Write chunks, respecting backpressure via drainableProtocol.
77+
for (let i = 0; i < numChunks; i++) {
78+
const chunk = buildChunk(i);
79+
while (!w.writeSync(chunk)) {
80+
// Flow controlled — wait for drain before retrying.
81+
const drainable = w[dp]();
82+
if (drainable) await drainable;
83+
}
84+
}
85+
86+
const totalWritten = w.endSync();
87+
strictEqual(totalWritten, byteLength);
88+
89+
await Promise.all([stream.closed, done.promise]);
90+
await clientSession.close();
91+
await serverEndpoint.close();

0 commit comments

Comments
 (0)