@jasnell
Here is a test (I just wanted to trigger some problem I have seen when I create my PR (I could not provoke any problem, well I have seen some stalls, but they are gone, maybe I did something wrong or net load changed on my machine)):
// Flags: --experimental-quic --experimental-stream-iter --no-warnings
// Test: bidirectional stream echo with binary.
// The client sends a message, the server reads it and echoes it back.
// Both directions of the bidi stream carry data and are properly FIN'd.
// Verifies that both client and server can read and write on the same stream.
import { hasQuic, skip, mustCall } from '../common/index.mjs';
import assert from 'node:assert';
const { strictEqual } = assert;
if (!hasQuic) {
skip('QUIC is not enabled');
}
const { listen, connect } = await import('../common/quic.mjs');
const { bytes, drainableProtocol: dp } = await import('stream/iter');
const chunkSizes = [60000, 12, 1000000, 50000, 1600, 20000, 1000000, 30000, 0, 100]
const numChunks = chunkSizes.length
const byteLength = chunkSizes.reduce((accumulator, currentValue) => accumulator + currentValue, 0);
const sleep = ms => new Promise(res => setTimeout(res, ms));
// Build a deterministic payload so we can verify integrity.
function buildChunk(index) {
const chunk = new Uint8Array(chunkSizes[index]);
// Fill with a pattern derived from the chunk index.
const val = index & 0xff;
for (let i = 0; i < chunkSizes[index]; i++) {
chunk[i] = (val + i) & 0xff;
}
return chunk;
}
function checksum(data) {
let sum = 0;
for (let i = 0; i < data.byteLength; i++) {
sum = (sum + data[i]) | 0;
}
return sum;
}
// Compute expected checksum.
let expectedChecksum = 0;
for (let i = 0; i < numChunks; i++) {
const chunk = buildChunk(i);
expectedChecksum = (expectedChecksum + checksum(chunk)) | 0;
}
const done = Promise.withResolvers();
const serverEndpoint = await listen(mustCall((serverSession) => {
serverSession.onstream = mustCall(async (stream) => {
const writer = stream.writer
let written = 0;
for await (const chunks of stream) {
for (const chunk of chunks) {
written += chunk.byteLength;
console.log('chunk', chunk.byteLength)
while (!writer.writeSync(chunk)) {
// Flow controlled — wait for drain before retrying.
const drainable = writer[dp]();
if (drainable) await drainable;
}
}
}
writer.end();
// await stream.closed; // may be we are not closing
serverSession.close();
done.resolve();
});
}));
const clientSession = await connect(serverEndpoint.address);
await clientSession.opened;
const stream = await clientSession.createBidirectionalStream();
const readChunks = [];
const readFromStream = async (stream) => {
for await (const chunks of stream) {
readChunks.push(...chunks);
}
const receivedBytes = readChunks.reduce((accu, curVal) => accu + curVal.byteLength, 0);
strictEqual(receivedBytes, byteLength);
let receivedChecksum = 0;
for (const chunk of readChunks) {
receivedChecksum = (receivedChecksum + checksum(chunk)) | 0;
}
strictEqual(receivedChecksum, expectedChecksum);
}
const writeToStream = async (stream) => {
const w = stream.writer
for (let i = 0; i < numChunks; i++) {
const chunk = buildChunk(i);
while (!w.writeSync(chunk)) {
// Flow controlled — wait for drain before retrying.
const drainable = w[dp]();
if (drainable) await drainable;
}
await sleep(1)
}
w.endSync();
}
await Promise.all([readFromStream(stream), writeToStream(stream)])
await Promise.all([stream.closed, done.promise]);
await clientSession.close();
await serverEndpoint.close();
What you see here is that the chunks that arrive are limited by UDP packet size:
chunk 1000
chunk 1156
chunk 1156
chunk 1156
chunk 1156
chunk 1156
chunk 1156
chunk 1156
chunk 1156
.......
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 6
chunk 5664
chunk 8192
chunk 3439
chunk 1141
chunk 1153
chunk 1153
chunk 1153
chunk 1153
chunk 1154
chunk 1154
.......
chunk 1154
chunk 1144
chunk 506
chunk 8192
chunk 8192
chunk 8192
chunk 3705
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1147
chunk 4608
chunk 8192
chunk 2167
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1146
chunk 4993
chunk 8192
chunk 1804
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
.....
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 3496
chunk 8192
chunk 8192
chunk 8192
chunk 8192
chunk 652
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1145
chunk 5355
chunk 8192
chunk 3742
chunk 1143
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 4
chunk 4571
chunk 8192
chunk 8192
chunk 7877
chunk 1143
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
......
chunk 1154
chunk 1154
chunk 1154
chunk 1148
chunk 1154
chunk 1154
chunk 1154
.......
chunk 1153
chunk 1153
chunk 1153
chunk 1153
chunk 1153
chunk 1153
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1147
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1153
chunk 1153
chunk 1153
chunk 1153
chunk 1153
chunk 1153
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1153
chunk 1153
chunk 1153
chunk 1148
chunk 1154
chunk 1154
chunk 1154
.......
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1145
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
.......
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 1154
chunk 3
chunk 100
This is not astonishing by the current design of the underlying node_blob. However, I believe that this can result in poor performance, as the overhead per packet is really high and it may lead to memory fragmentation.
(see #60237, for example:
78b724d
which does this early on in the code.
(I can rebase and rework this part if desired).
Sideload: I will try to create some other tests to trigger some cases I know from other implementations.
But so far it is really working great. So I may soon move to implement Webtransport on the js side on top, and see what is missing, and run tests against browsers. (Though on the long run a complete native solution is desired.).
@jasnell
Here is a test (I just wanted to trigger some problem I have seen when I create my PR (I could not provoke any problem, well I have seen some stalls, but they are gone, maybe I did something wrong or net load changed on my machine)):
What you see here is that the chunks that arrive are limited by UDP packet size:
This is not astonishing by the current design of the underlying node_blob. However, I believe that this can result in poor performance, as the overhead per packet is really high and it may lead to memory fragmentation.
(see #60237, for example:
78b724d
which does this early on in the code.
(I can rebase and rework this part if desired).
Sideload: I will try to create some other tests to trigger some cases I know from other implementations.
But so far it is really working great. So I may soon move to implement Webtransport on the js side on top, and see what is missing, and run tests against browsers. (Though on the long run a complete native solution is desired.).