Skip to content

Commit 5586634

Browse files
committed
fix: memory leak w/slow sources in parallel-merge
Works #4
1 parent fae22d0 commit 5586634

File tree

1 file changed

+31
-1
lines changed

1 file changed

+31
-1
lines changed

lib/parallel-merge.ts

+31-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,35 @@ export async function* parallelMerge<I extends Array<AnyIterable<any>>>(
99
const concurrentWork = new Set()
1010
const values = new Map()
1111

12+
type NullOrFunction = null | ((anything: any) => void)
13+
14+
let lastError = null
15+
let errCb: NullOrFunction = null
16+
let valueCb: NullOrFunction = null
17+
18+
const notifyError = err => {
19+
lastError = err
20+
if (errCb) {
21+
errCb(err)
22+
}
23+
}
24+
25+
const notifyDone = value => {
26+
if (valueCb) {
27+
valueCb(value)
28+
}
29+
}
30+
31+
const waitForQueue = () =>
32+
new Promise((resolve, reject) => {
33+
if (lastError) {
34+
reject(lastError)
35+
}
36+
valueCb = resolve
37+
errCb = reject
38+
return this
39+
})
40+
1241
const queueNext = input => {
1342
const nextVal = Promise.resolve(input.next()).then(async ({ done, value }) => {
1443
if (!done) {
@@ -17,6 +46,7 @@ export async function* parallelMerge<I extends Array<AnyIterable<any>>>(
1746
concurrentWork.delete(nextVal)
1847
})
1948
concurrentWork.add(nextVal)
49+
nextVal.then(notifyDone, notifyError)
2050
}
2151

2252
for (const input of inputs) {
@@ -27,7 +57,7 @@ export async function* parallelMerge<I extends Array<AnyIterable<any>>>(
2757
if (concurrentWork.size === 0) {
2858
return
2959
}
30-
await Promise.race(concurrentWork)
60+
await waitForQueue()
3161
for (const [input, value] of values) {
3262
values.delete(input)
3363
yield value

0 commit comments

Comments
 (0)