Skip to content

Commit 28f071a

Browse files
authored
Merge pull request #16628 from Budibase/app-export-memory-use
Use bounded concurrency for retrieveDirectory
2 parents e6989ba + 3d3a14e commit 28f071a

File tree

2 files changed

+61
-37
lines changed

2 files changed

+61
-37
lines changed

packages/backend-core/src/objectStore/objectStore.ts

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { ReadableStream } from "stream/web"
2525
import { NodeJsClient } from "@smithy/types"
2626
import tracer from "dd-trace"
2727
import { pipeline } from "stream/promises"
28+
import { utils } from "@budibase/shared-core"
2829

2930
// use this as a temporary store of buckets that are being created
3031
const STATE = {
@@ -440,22 +441,30 @@ export async function retrieveDirectory(bucketName: string, path: string) {
440441
await fsp.mkdir(writePath, { recursive: true })
441442

442443
let numObjects = 0
443-
for await (const object of listAllObjects(bucketName, path)) {
444-
numObjects++
445-
446-
const filename = object.Key!
447-
const stream = await getReadStream(bucketName, filename)
448-
const possiblePath = filename.split("/")
449-
const dirs = possiblePath.slice(0, possiblePath.length - 1)
450-
const possibleDir = join(writePath, ...dirs)
451-
if (possiblePath.length > 1 && !fs.existsSync(possibleDir)) {
452-
await fsp.mkdir(possibleDir, { recursive: true })
453-
}
454-
await pipeline(
455-
stream,
456-
fs.createWriteStream(join(writePath, ...possiblePath), { mode: 0o644 })
457-
)
458-
}
444+
await utils.parallelForeach(
445+
listAllObjects(bucketName, path),
446+
async object => {
447+
numObjects++
448+
await tracer.trace("retrieveDirectory.object", async span => {
449+
const filename = object.Key!
450+
span.addTags({ filename })
451+
const stream = await getReadStream(bucketName, filename)
452+
const possiblePath = filename.split("/")
453+
const dirs = possiblePath.slice(0, possiblePath.length - 1)
454+
const possibleDir = join(writePath, ...dirs)
455+
if (possiblePath.length > 1 && !fs.existsSync(possibleDir)) {
456+
await fsp.mkdir(possibleDir, { recursive: true })
457+
}
458+
await pipeline(
459+
stream,
460+
fs.createWriteStream(join(writePath, ...possiblePath), {
461+
mode: 0o644,
462+
})
463+
)
464+
})
465+
},
466+
3 /* max concurrency */
467+
)
459468

460469
span.addTags({ numObjects })
461470
return writePath
@@ -592,18 +601,25 @@ export async function getReadStream(
592601
bucketName: string,
593602
path: string
594603
): Promise<Readable> {
595-
bucketName = sanitizeBucket(bucketName)
596-
path = sanitizeKey(path)
597-
const client = ObjectStore()
598-
const params = {
599-
Bucket: bucketName,
600-
Key: path,
601-
}
602-
const response = await client.getObject(params)
603-
if (!response.Body || !(response.Body instanceof stream.Readable)) {
604-
throw new Error("Unable to retrieve stream - invalid response")
605-
}
606-
return response.Body
604+
return await tracer.trace("getReadStream", async span => {
605+
bucketName = sanitizeBucket(bucketName)
606+
path = sanitizeKey(path)
607+
span.addTags({ bucketName, path })
608+
const client = ObjectStore()
609+
const params = {
610+
Bucket: bucketName,
611+
Key: path,
612+
}
613+
const response = await client.getObject(params)
614+
if (!response.Body || !(response.Body instanceof stream.Readable)) {
615+
throw new Error("Unable to retrieve stream - invalid response")
616+
}
617+
span.addTags({
618+
contentLength: response.ContentLength,
619+
contentType: response.ContentType,
620+
})
621+
return response.Body
622+
})
607623
}
608624

609625
export async function getObjectMetadata(

packages/shared-core/src/utils.ts

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,24 +39,32 @@ export function unreachable(
3939
}
4040

4141
export async function parallelForeach<T>(
42-
items: T[],
42+
items: Iterable<T> | AsyncIterable<T>,
4343
task: (item: T) => Promise<void>,
4444
maxConcurrency: number
4545
): Promise<void> {
4646
const results: Promise<void>[] = []
47-
let index = 0
4847

49-
const executeNext = async (): Promise<void> => {
50-
while (index < items.length) {
51-
const currentIndex = index++
52-
const item = items[currentIndex]
48+
// Check if it's an async iterable
49+
const isAsyncIterable = Symbol.asyncIterator in items
50+
const iterator = isAsyncIterable
51+
? (items as AsyncIterable<T>)[Symbol.asyncIterator]()
52+
: (items as Iterable<T>)[Symbol.iterator]()
5353

54-
await task(item)
54+
const executeNext = async (): Promise<void> => {
55+
let result = await (isAsyncIterable
56+
? (iterator as AsyncIterator<T>).next()
57+
: Promise.resolve((iterator as Iterator<T>).next()))
58+
59+
while (!result.done) {
60+
await task(result.value)
61+
result = await (isAsyncIterable
62+
? (iterator as AsyncIterator<T>).next()
63+
: Promise.resolve((iterator as Iterator<T>).next()))
5564
}
5665
}
5766

58-
// Start up to maxConcurrency workers
59-
for (let i = 0; i < Math.min(maxConcurrency, items.length); i++) {
67+
for (let i = 0; i < maxConcurrency; i++) {
6068
results.push(executeNext())
6169
}
6270

0 commit comments

Comments
 (0)