Skip to content

Commit b5180bb

Browse files
authored
fix: memory leak w/slow sources in parallel-merge and in write-to-stream (#5)
@dko-slapdash reported a issue with Promise.race() where even when the race promise resolves the callbacks it left on the unresolved promises are not garbage collected. This change no longer calls `Promise.race()` more than once on any promise by removing it completely. Closes #4
1 parent fae22d0 commit b5180bb

File tree

3 files changed

+76
-18
lines changed

3 files changed

+76
-18
lines changed

lib/parallel-merge.ts

+36-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/// <reference lib="esnext.asynciterable" />
22
import { getIterator } from './get-iterator'
3-
import { AnyIterable, UnArrayAnyIterable } from './types'
3+
import { AnyIterable, UnArrayAnyIterable, NullOrFunction } from './types'
44

55
export async function* parallelMerge<I extends Array<AnyIterable<any>>>(
66
...iterables: I
@@ -9,6 +9,35 @@ export async function* parallelMerge<I extends Array<AnyIterable<any>>>(
99
const concurrentWork = new Set()
1010
const values = new Map()
1111

12+
let lastError = null
13+
let errCb: NullOrFunction = null
14+
let valueCb: NullOrFunction = null
15+
16+
const notifyError = err => {
17+
lastError = err
18+
if (errCb) {
19+
errCb(err)
20+
}
21+
}
22+
23+
const notifyDone = value => {
24+
if (valueCb) {
25+
valueCb(value)
26+
}
27+
}
28+
29+
const waitForQueue = () =>
30+
new Promise((resolve, reject) => {
31+
if (lastError) {
32+
reject(lastError)
33+
}
34+
if (values.size > 0) {
35+
return resolve()
36+
}
37+
valueCb = resolve
38+
errCb = reject
39+
})
40+
1241
const queueNext = input => {
1342
const nextVal = Promise.resolve(input.next()).then(async ({ done, value }) => {
1443
if (!done) {
@@ -17,17 +46,21 @@ export async function* parallelMerge<I extends Array<AnyIterable<any>>>(
1746
concurrentWork.delete(nextVal)
1847
})
1948
concurrentWork.add(nextVal)
49+
nextVal.then(notifyDone, notifyError)
2050
}
2151

2252
for (const input of inputs) {
2353
queueNext(input)
2454
}
2555

2656
while (true) {
27-
if (concurrentWork.size === 0) {
57+
// We technically don't have to check `values.size` as the for loop should have emptied it
58+
// However I haven't yet found specs verifying that behavior, only tests
59+
// the guard in waitForQueue() checking for values is in place for the same reason
60+
if (concurrentWork.size === 0 && values.size === 0) {
2861
return
2962
}
30-
await Promise.race(concurrentWork)
63+
await waitForQueue()
3164
for (const [input, value] of values) {
3265
values.delete(input)
3366
yield value

lib/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ export type Iterableish<T> = Iterable<T> | Iterator<T> | AsyncIterable<T> | Asyn
33
export type AnyIterable<T> = Iterable<T> | AsyncIterable<T>
44
export type FlatMapValue<B> = B | AnyIterable<B> | undefined | null | Promise<B | AnyIterable<B> | undefined | null>
55
export type UnArrayAnyIterable<A extends Array<AnyIterable<any>>> = A extends Array<AnyIterable<infer T>> ? T : never
6+
export type NullOrFunction = null | ((anything?: any) => void)

lib/write-to-stream.ts

+39-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/// <reference lib="esnext.asynciterable" />
2-
import { AnyIterable } from './types'
2+
import { AnyIterable, NullOrFunction } from './types'
33

44
interface IWritable {
55
once: any
@@ -14,28 +14,52 @@ function once(event: string, stream: IWritable): Promise<any> {
1414
}
1515

1616
async function _writeToStream(stream: IWritable, iterable: AnyIterable<any>): Promise<void> {
17-
let errorListener
18-
let error
19-
const errorPromise = new Promise((resolve, reject) => {
20-
errorListener = err => {
21-
error = err
22-
reject(err)
17+
let lastError = null
18+
let errCb: NullOrFunction = null
19+
let drainCb: NullOrFunction = null
20+
21+
const notifyError = err => {
22+
lastError = err
23+
if (errCb) {
24+
errCb(err)
2325
}
24-
stream.once('error', errorListener)
25-
}) as Promise<void>
26+
}
27+
28+
const notifyDrain = () => {
29+
if (drainCb) {
30+
drainCb()
31+
}
32+
}
33+
34+
const cleanup = () => {
35+
stream.removeListener('error', notifyError)
36+
stream.removeListener('drain', notifyDrain)
37+
}
38+
39+
stream.once('error', notifyError)
40+
41+
const waitForDrain = () =>
42+
new Promise((resolve, reject) => {
43+
if (lastError) {
44+
return reject(lastError)
45+
}
46+
stream.once('drain', notifyDrain)
47+
drainCb = resolve
48+
errCb = reject
49+
})
2650

2751
for await (const value of iterable) {
2852
if (stream.write(value) === false) {
29-
await Promise.race([errorPromise, once('drain', stream)])
53+
await waitForDrain()
3054
}
31-
if (error) {
32-
return errorPromise
55+
if (lastError) {
56+
break
3357
}
3458
}
3559

36-
stream.removeListener('error', errorListener)
37-
if (error) {
38-
return errorPromise
60+
cleanup()
61+
if (lastError) {
62+
throw lastError
3963
}
4064
}
4165

0 commit comments

Comments
 (0)