Skip to content

Commit b2a03c9

Browse files
authored
fix(storage-azure): add stream aborts for error handling and connection closure (#15768)
This adds the exact same stream abort pattern that we use in S3 over to the azure storage adapter to help alleviate memory leak issues in production. For testing: the aforementioned memory issues were not reproducible with the local docker setup however community members confirmed the issue is solved in their production environments using a release canary: #13331 (comment) Closes #13331
1 parent c6054c5 commit b2a03c9

File tree

1 file changed

+62
-25
lines changed

1 file changed

+62
-25
lines changed

packages/storage-azure/src/staticHandler.ts

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,60 @@
1-
import type { ContainerClient } from '@azure/storage-blob'
1+
import type { BlobDownloadResponseParsed, ContainerClient } from '@azure/storage-blob'
22
import type { StaticHandler } from '@payloadcms/plugin-cloud-storage/types'
33
import type { CollectionConfig } from 'payload'
4+
import type { Readable } from 'stream'
45

56
import { RestError } from '@azure/storage-blob'
67
import { getFilePrefix } from '@payloadcms/plugin-cloud-storage/utilities'
78
import path from 'path'
89
import { getRangeRequestInfo } from 'payload/internal'
910

11+
const isNodeReadableStream = (
12+
body: BlobDownloadResponseParsed['readableStreamBody'],
13+
): body is Readable => {
14+
return (
15+
typeof body === 'object' &&
16+
body !== null &&
17+
'pipe' in body &&
18+
typeof body.pipe === 'function' &&
19+
'destroy' in body &&
20+
typeof body.destroy === 'function'
21+
)
22+
}
23+
24+
const abortRequestAndDestroyStream = ({
25+
abortController,
26+
blob,
27+
}: {
28+
abortController: AbortController
29+
blob?: BlobDownloadResponseParsed
30+
}) => {
31+
try {
32+
abortController.abort()
33+
} catch {
34+
/* noop */
35+
}
36+
if (blob?.readableStreamBody && isNodeReadableStream(blob.readableStreamBody)) {
37+
blob.readableStreamBody.destroy()
38+
}
39+
}
40+
1041
interface Args {
1142
collection: CollectionConfig
1243
getStorageClient: () => ContainerClient
1344
}
1445

1546
export const getHandler = ({ collection, getStorageClient }: Args): StaticHandler => {
1647
return async (req, { headers: incomingHeaders, params: { clientUploadContext, filename } }) => {
48+
let blob: BlobDownloadResponseParsed | undefined = undefined
49+
let streamed = false
50+
51+
const abortController = new AbortController()
52+
if (req.signal) {
53+
req.signal.addEventListener('abort', () => {
54+
abortRequestAndDestroyStream({ abortController, blob })
55+
})
56+
}
57+
1758
try {
1859
const prefix = await getFilePrefix({ clientUploadContext, collection, filename, req })
1960
const blockBlobClient = getStorageClient().getBlockBlobClient(
@@ -40,13 +81,14 @@ export const getHandler = ({ collection, getStorageClient }: Args): StaticHandle
4081
}
4182

4283
// Download with range if partial
43-
const blob =
84+
blob =
4485
rangeResult.type === 'partial'
4586
? await blockBlobClient.download(
4687
rangeResult.rangeStart,
4788
rangeResult.rangeEnd - rangeResult.rangeStart + 1,
89+
{ abortSignal: abortController.signal },
4890
)
49-
: await blockBlobClient.download()
91+
: await blockBlobClient.download(0, undefined, { abortSignal: abortController.signal })
5092

5193
let headers = new Headers(incomingHeaders)
5294

@@ -83,36 +125,31 @@ export const getHandler = ({ collection, getStorageClient }: Args): StaticHandle
83125
})
84126
}
85127

86-
// Manually create a ReadableStream for the web from a Node.js stream.
87-
const readableStream = new ReadableStream({
88-
start(controller) {
89-
const nodeStream = blob.readableStreamBody
90-
if (!nodeStream) {
91-
throw new Error('No readable stream body')
92-
}
93-
94-
nodeStream.on('data', (chunk) => {
95-
controller.enqueue(new Uint8Array(chunk))
96-
})
97-
nodeStream.on('end', () => {
98-
controller.close()
99-
})
100-
nodeStream.on('error', (err) => {
101-
controller.error(err)
102-
})
103-
},
104-
})
128+
if (!blob.readableStreamBody || !isNodeReadableStream(blob.readableStreamBody)) {
129+
return new Response('Internal Server Error', { status: 500 })
130+
}
105131

106-
return new Response(readableStream, {
107-
headers,
108-
status: rangeResult.status,
132+
const stream = blob.readableStreamBody
133+
stream.on('error', (err: Error) => {
134+
req.payload.logger.error({
135+
err,
136+
msg: 'Error while streaming Azure blob (aborting)',
137+
})
138+
abortRequestAndDestroyStream({ abortController, blob })
109139
})
140+
141+
streamed = true
142+
return new Response(stream, { headers, status: rangeResult.status })
110143
} catch (err: unknown) {
111144
if (err instanceof RestError && err.statusCode === 404) {
112145
return new Response(null, { status: 404, statusText: 'Not Found' })
113146
}
114147
req.payload.logger.error(err)
115148
return new Response('Internal Server Error', { status: 500 })
149+
} finally {
150+
if (!streamed) {
151+
abortRequestAndDestroyStream({ abortController, blob })
152+
}
116153
}
117154
}
118155
}

0 commit comments

Comments
 (0)