Skip to content

Commit 6ab0178

Browse files
committed
[PECO-1541] Optimize UC Volume ingestion
Signed-off-by: Levko Kravets <levko.ne@gmail.com>
1 parent c84ef8a commit 6ab0178

File tree

1 file changed

+31
-4
lines changed

1 file changed

+31
-4
lines changed

lib/DBSQLSession.ts

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import * as fs from 'fs';
22
import * as path from 'path';
3+
import { pipeline } from 'stream';
34
import { stringify, NIL, parse } from 'uuid';
45
import fetch, { HeadersInit } from 'node-fetch';
56
import {
@@ -271,8 +272,23 @@ export default class DBSQLSession implements IDBSQLSession {
271272
if (!response.ok) {
272273
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
273274
}
274-
const buffer = await response.arrayBuffer();
275-
fs.writeFileSync(localFile, Buffer.from(buffer));
275+
276+
return new Promise((resolve, reject) => {
277+
try {
278+
const fileStream = fs.createWriteStream(localFile);
279+
// `pipeline` will do all the dirty job for us, including error handling and closing all the streams properly
280+
// Also, we use callback-style `pipeline` because Promise-style one is not available in Node 14
281+
pipeline(response.body, fileStream, (error) => {
282+
if (error) {
283+
reject(error);
284+
} else {
285+
resolve();
286+
}
287+
});
288+
} catch (error) {
289+
reject(error);
290+
}
291+
});
276292
}
277293

278294
private async handleStagingRemove(presignedUrl: string, headers: HeadersInit): Promise<void> {
@@ -301,8 +317,19 @@ export default class DBSQLSession implements IDBSQLSession {
301317
const connectionProvider = await this.context.getConnectionProvider();
302318
const agent = await connectionProvider.getAgent();
303319

304-
const data = fs.readFileSync(localFile);
305-
const response = await fetch(presignedUrl, { method: 'PUT', headers, agent, body: data });
320+
const fileStream = fs.createReadStream(localFile);
321+
const fileInfo = fs.statSync(localFile, { bigint: true });
322+
323+
const response = await fetch(presignedUrl, {
324+
method: 'PUT',
325+
headers: {
326+
...headers,
327+
// This header is required by server
328+
'Content-Length': fileInfo.size.toString(),
329+
},
330+
agent,
331+
body: fileStream,
332+
});
306333
if (!response.ok) {
307334
throw new StagingError(`HTTP error ${response.status} ${response.statusText}`);
308335
}

0 commit comments

Comments
 (0)