Skip to content

Commit 2d6c393

Browse files
committed
feat(bundle): fix race condition packing CSV for legacy bundle format
1 parent ea250f7 commit 2d6c393

File tree

1 file changed

+8
-10
lines changed

1 file changed

+8
-10
lines changed

stream/bundle.js

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ module.exports.createWriteStream = (options) => {
4747
// create a new tempfile for this placetype
4848
if (!_.has(metadata, placetype)) {
4949
const tmpFileName = tmp.tmpNameSync()
50+
const sink = fs.createWriteStream(tmpFileName)
51+
const closed = new Promise((resolve, reject) => sink.on('error', reject).on('close', resolve))
5052
const store = {
5153
path: tmpFileName,
52-
writable: miss.pipeline.obj(
53-
csv(),
54-
fs.createWriteStream(tmpFileName)
55-
)
54+
writable: miss.pipeline.obj(csv(), sink),
55+
closed
5656
}
5757
metadata[placetype] = store
5858
}
@@ -66,15 +66,13 @@ module.exports.createWriteStream = (options) => {
6666
}
6767

6868
// called at the end
69-
const flush = (done) => {
69+
const flush = async (done) => {
7070
// write meta file(s)
7171
if (options.nometa !== true) {
72-
_.each(metadata, (store, placetype) => {
73-
// end writing to file
72+
// write CSV meta files
73+
_.each(metadata, async (store, placetype) => {
7474
store.writable.end()
75-
76-
// write CSV meta files
77-
// @todo: try getting this to work with pure streams?
75+
await store.closed
7876
const metaFilename = convention.metaFilename(options.collection, placetype, options.vintage)
7977
const header = { name: path.join('meta', metaFilename) }
8078
pack.entry(header, fs.readFileSync(store.path))

0 commit comments

Comments
 (0)