Skip to content

Commit 5aaae26

Browse files
committed
wip: pruning features of file tree serialization
1 parent 83a10a6 commit 5aaae26

2 files changed

Lines changed: 106 additions & 201 deletions

File tree

src/vaults/fileTree.ts

Lines changed: 78 additions & 200 deletions
Original file line numberDiff line numberDiff line change
@@ -342,65 +342,43 @@ async function* encodeContent(
342342
function serializerStreamFactory(
343343
fs: FileSystem | FileSystemReadable,
344344
treeGen: AsyncGenerator<TreeNode, void, void>,
345-
yieldContents: boolean = true,
346345
): ReadableStream<Uint8Array> {
347-
const files: Array<[number, string]> = [];
348-
let treeDataGen: AsyncGenerator<TreeNode, void, void> | undefined = treeGen;
349-
let contentsGen: AsyncGenerator<Uint8Array, void, void> | undefined =
350-
undefined;
351-
// Will get the next content chunk or return undefined if there is no more data to send
346+
let contentsGen: AsyncGenerator<Uint8Array, void, void> | undefined;
347+
let fileNode: TreeNode | undefined;
348+
349+
async function getNextFileNode(): Promise<TreeNode | undefined> {
350+
while (true) {
351+
const result = await treeGen.next();
352+
if (result.done) return undefined;
353+
if (result.value.type === 'FILE') return result.value;
354+
// If it's not a file, keep iterating
355+
}
356+
}
352357
async function getNextContentChunk(): Promise<Uint8Array | undefined> {
353-
if (!yieldContents) return undefined;
354358
while (true) {
355359
if (contentsGen == null) {
356-
const next = files.shift();
357-
// No more files means we're done
358-
if (next == null) return undefined;
359-
const [iNode, path] = next;
360-
contentsGen = encodeContent(fs, path, iNode);
360+
fileNode = await getNextFileNode();
361+
if (fileNode == null) return undefined;
362+
contentsGen = encodeContent(fs, fileNode.path, fileNode.iNode);
361363
}
362-
const result = await contentsGen.next();
363-
if (!result.done) return result.value;
364-
else contentsGen = undefined;
364+
const contentChunk = await contentsGen.next();
365+
if (!contentChunk.done) return contentChunk.value;
366+
contentsGen = undefined;
365367
}
366368
}
367369
async function cleanup(reason: unknown) {
368-
await treeDataGen?.throw(reason).catch(() => {});
370+
await treeGen?.throw(reason).catch(() => {});
369371
await contentsGen?.throw(reason).catch(() => {});
370372
}
371373
return new ReadableStream<Uint8Array>({
372-
start: (controller) => {
373-
controller.enqueue(generateGenericHeader({ type: HeaderType.TREE }));
374-
},
375374
pull: async (controller) => {
376375
try {
377-
if (treeDataGen != null) {
378-
const result = await treeGen.next();
379-
if (!result.done) {
380-
// If a file, add to the file list to encode contents later
381-
if (result.value.type === 'FILE') {
382-
files.push([result.value.iNode, result.value.path]);
383-
}
384-
// Normal tree nodes are just serialized and converted to `UInt8Array`
385-
const jsonSerialized = JSON.stringify(result.value);
386-
controller.enqueue(
387-
vaultsUtils.bufferToUint8ArrayCopyless(
388-
Buffer.from(jsonSerialized, 'utf-8'),
389-
),
390-
);
391-
} else {
392-
const treeDoneMessage = JSON.stringify({ type: 'DONE' });
393-
controller.enqueue(
394-
vaultsUtils.bufferToUint8ArrayCopyless(
395-
Buffer.from(treeDoneMessage, 'utf-8'),
396-
),
397-
);
398-
treeDataGen = undefined;
399-
}
400-
} else {
401-
const contentDataChunk = await getNextContentChunk();
402-
if (contentDataChunk == null) return controller.close();
403-
controller.enqueue(contentDataChunk);
376+
const contentChunk = await getNextContentChunk();
377+
if (contentChunk == null) {
378+
return controller.close();
379+
}
380+
else {
381+
controller.enqueue(contentChunk);
404382
}
405383
} catch (e) {
406384
await cleanup(e);
@@ -469,84 +447,17 @@ function parseTreeNode(data: unknown): asserts data is TreeNode {
469447
*/
470448
function parserTransformStreamFactory(): TransformStream<
471449
Uint8Array,
472-
TreeNode | ContentNode | Uint8Array
450+
string | ContentNode | Uint8Array
473451
> {
474452
let workingBuffer: Uint8Array = new Uint8Array(0);
475-
let phase: 'START' | 'TREE' | 'CONTENT' = 'START';
476-
let jsonParser: JSONParser | undefined = undefined;
477-
let lastChunk: Uint8Array | undefined;
478453
let contentLength: bigint | undefined = undefined;
479-
const enterTreeState = (
480-
controller: TransformStreamDefaultController<
481-
TreeNode | ContentNode | Uint8Array
482-
>,
483-
initialChunk: Uint8Array,
484-
) => {
485-
let done = false;
486-
phase = 'TREE';
487-
workingBuffer = new Uint8Array(0);
488-
// Setting up the JSON stream parser
489-
jsonParser = new JSONParser({
490-
separator: '',
491-
paths: ['$'],
492-
});
493-
const handleEnd = (e?: unknown) => {
494-
if (e != null && !(done && e instanceof TokenizerError)) {
495-
controller.error(e);
496-
return;
497-
}
498-
if (e instanceof TokenizerError) {
499-
// Extracting error position.
500-
const match = e.message.match(/at position "(.*)" in state/);
501-
if (match == null) {
502-
controller.error(
503-
new utilsErrors.ErrorUtilsUndefinedBehaviour(
504-
'failed to match for buffer index',
505-
),
506-
);
507-
return;
508-
}
509-
const bufferIndex = parseInt(match[1]);
510-
if (isNaN(bufferIndex)) {
511-
controller.error(
512-
new utilsErrors.ErrorUtilsUndefinedBehaviour(
513-
'failed to parse buffer index',
514-
),
515-
);
516-
return;
517-
}
518-
if (lastChunk == null) {
519-
controller.error(
520-
new utilsErrors.ErrorUtilsUndefinedBehaviour(
521-
'lastChunk was undefined',
522-
),
523-
);
524-
return;
525-
}
526-
workingBuffer = lastChunk.subarray(bufferIndex);
527-
}
528-
jsonParser = undefined;
529-
};
530-
jsonParser.onEnd = handleEnd;
531-
jsonParser.onError = handleEnd;
532-
jsonParser.onValue = (value) => {
533-
const message = value.value;
534-
if (isDoneMessage(message)) {
535-
done = true;
536-
jsonParser?.end();
537-
phase = 'CONTENT';
538-
return;
539-
}
540-
parseTreeNode(message);
541-
controller.enqueue(message);
542-
};
543-
jsonParser.write(initialChunk);
544-
};
545-
/* Check if any chunks have been processed. If the stream is being flushed
546-
* without processing any chunks, then something went wrong with the stream.
547-
*/
548454
let processedChunks: boolean = false;
549-
return new TransformStream<Uint8Array, TreeNode | ContentNode | Uint8Array>({
455+
456+
return new TransformStream<Uint8Array, ContentNode | Uint8Array | string>({
457+
/**
458+
* Check if any chunks have been processed. If the stream is being flushed
459+
* without processing any chunks, then something went wrong with the stream.
460+
*/
550461
flush: (controller) => {
551462
if (!processedChunks) {
552463
controller.error(
@@ -556,91 +467,58 @@ function parserTransformStreamFactory(): TransformStream<
556467
},
557468
transform: (chunk, controller) => {
558469
if (chunk.byteLength > 0) processedChunks = true;
559-
switch (phase) {
560-
case 'START': {
561-
workingBuffer = vaultsUtils.uint8ArrayConcat([workingBuffer, chunk]);
562-
// Start phase expects a TREE header to indicate start of TREE data
563-
const { data, remainder } = parseGenericHeader(workingBuffer);
564-
if (data == null) {
565-
// Wait for more data
566-
workingBuffer = remainder;
567-
return;
568-
}
569-
if (data.type !== HeaderType.TREE) {
570-
controller.error(
571-
new validationErrors.ErrorParse(
572-
`expected TREE header, got "${HeaderType[data.type]}"`,
573-
),
574-
);
575-
return;
576-
}
577-
// We have the tree header, so we switch to tree mode
578-
enterTreeState(controller, remainder);
579-
lastChunk = remainder;
580-
return;
581-
}
582-
case 'TREE':
583-
{
584-
// Tree needs to parse a JSON stream
585-
lastChunk = chunk;
586-
jsonParser?.write(chunk);
587-
}
588-
return;
589-
case 'CONTENT':
590-
{
591-
workingBuffer = vaultsUtils.uint8ArrayConcat([
592-
workingBuffer,
593-
chunk,
594-
]);
595-
if (contentLength == null) {
596-
const genericHeader = parseGenericHeader(workingBuffer);
597-
if (genericHeader.data == null) return;
598-
if (genericHeader.data.type === HeaderType.TREE) {
599-
enterTreeState(controller, genericHeader.remainder);
600-
lastChunk = genericHeader.remainder;
601-
return;
602-
}
603-
if (genericHeader.data.type !== HeaderType.CONTENT) {
604-
controller.error(
605-
new validationErrors.ErrorParse(
606-
`expected CONTENT or TREE message, got "${genericHeader.data.type}"`,
607-
),
608-
);
609-
return;
610-
}
611-
const contentHeader = parseContentHeader(genericHeader.remainder);
612-
if (contentHeader.data == null) return;
613-
614-
const { dataSize, iNode } = contentHeader.data;
615-
controller.enqueue({ type: 'CONTENT', dataSize, iNode });
616-
contentLength = dataSize;
617-
workingBuffer = contentHeader.remainder;
618-
}
619-
// We yield the whole buffer, or split it for the next header
620-
if (workingBuffer.byteLength === 0) return;
621-
if (workingBuffer.byteLength <= contentLength) {
622-
contentLength -= BigInt(workingBuffer.byteLength);
623-
controller.enqueue(workingBuffer);
624-
workingBuffer = new Uint8Array(0);
625-
if (contentLength === 0n) contentLength = undefined;
626-
return;
627-
} else {
628-
controller.enqueue(
629-
workingBuffer.subarray(0, Number(contentLength)),
630-
);
631-
workingBuffer = workingBuffer.subarray(Number(contentLength));
632-
contentLength = undefined;
633-
}
634-
}
635-
return;
636-
default:
470+
workingBuffer = vaultsUtils.uint8ArrayConcat([
471+
workingBuffer,
472+
chunk,
473+
]);
474+
if (contentLength == null) {
475+
const genericHeader = parseGenericHeader(workingBuffer);
476+
if (genericHeader.data == null) return;
477+
if (genericHeader.data.type !== HeaderType.CONTENT) {
637478
controller.error(
638-
new utilsErrors.ErrorUtilsUndefinedBehaviour(
639-
`invalid state "${phase}"`,
479+
new validationErrors.ErrorParse(
480+
`expected CONTENT message, got "${genericHeader.data.type}"`,
640481
),
641482
);
642483
return;
484+
}
485+
const contentHeader = parseContentHeader(genericHeader.remainder);
486+
if (contentHeader.data == null) return;
487+
488+
const { dataSize, iNode } = contentHeader.data;
489+
controller.enqueue({ type: 'CONTENT', dataSize, iNode });
490+
contentLength = dataSize;
491+
workingBuffer = contentHeader.remainder;
492+
}
493+
// We yield the whole buffer, or split it for the next header
494+
if (workingBuffer.byteLength === 0) return;
495+
if (workingBuffer.byteLength <= contentLength) {
496+
contentLength -= BigInt(workingBuffer.byteLength);
497+
const fileContents = new TextDecoder().decode(workingBuffer); // newcode
498+
controller.enqueue(fileContents); // newcode
499+
// controller.enqueue(workingBuffer);
500+
workingBuffer = new Uint8Array(0);
501+
if (contentLength === 0n) contentLength = undefined;
502+
// return;
503+
} else {
504+
// controller.enqueue(
505+
// workingBuffer.subarray(0, Number(contentLength)),
506+
// );
507+
const contentChunk = workingBuffer.subarray(0, Number(contentLength)); // new
508+
const contentString = new TextDecoder().decode(contentChunk); // nwe
509+
controller.enqueue(contentString); // nwe
510+
workingBuffer = workingBuffer.subarray(Number(contentLength));
511+
contentLength = undefined;
643512
}
513+
// return;
514+
// default:
515+
// controller.error(
516+
// new utilsErrors.ErrorUtilsUndefinedBehaviour(
517+
// `invalid state "${phase}"`,
518+
// ),
519+
// );
520+
// return;
521+
// }
644522
},
645523
});
646524
}

tests/vaults/fileTree.test.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import os from 'os';
44
import path from 'path';
55
import { ReadableStream } from 'stream/web';
66
import { test } from '@fast-check/jest';
7-
import fc from 'fast-check';
7+
import fc, { uint8Array } from 'fast-check';
88
import * as fileTree from '@/vaults/fileTree';
99
import * as vaultsTestUtils from './utils';
1010

@@ -718,5 +718,32 @@ describe('fileTree', () => {
718718
// TODO: tests for
719719
// - empty files
720720
// - files larger than content chunks
721+
722+
// TEST: DEBUGGGG
723+
test('view serializer', async () => {
724+
const fileTreeGen = fileTree.globWalk({
725+
fs,
726+
yieldStats: false,
727+
yieldRoot: false,
728+
yieldFiles: true,
729+
yieldParents: false,
730+
yieldDirectories: false,
731+
});
732+
let serializedStream = fileTree.serializerStreamFactory(fs, fileTreeGen);
733+
// const data: Array<Uint8Array> = [];
734+
// for await (const d of serializedStream) {
735+
// data.push(d);
736+
// }
737+
// console.log(data.map((v) => Buffer.from(v as Uint8Array).toString()));
738+
739+
// serializedStream = fileTree.serializerStreamFactory(fs, fileTreeGen);
740+
const parserTransform = fileTree.parserTransformStreamFactory();
741+
const outputStream = serializedStream.pipeThrough(parserTransform);
742+
const output: Array<string> = [];
743+
for await (const d of outputStream) {
744+
output.push(d);
745+
}
746+
console.log(output);
747+
});
721748
});
722749
});

0 commit comments

Comments
 (0)