Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: memory leak w/slow sources in parallel-merge and in write-to-stream #5

Merged
merged 5 commits into from
Jan 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions lib/parallel-merge.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// <reference lib="esnext.asynciterable" />
import { getIterator } from './get-iterator'
import { AnyIterable, UnArrayAnyIterable } from './types'
import { AnyIterable, UnArrayAnyIterable, NullOrFunction } from './types'

export async function* parallelMerge<I extends Array<AnyIterable<any>>>(
...iterables: I
Expand All @@ -9,6 +9,35 @@ export async function* parallelMerge<I extends Array<AnyIterable<any>>>(
const concurrentWork = new Set()
const values = new Map()

let lastError = null
let errCb: NullOrFunction = null
let valueCb: NullOrFunction = null

const notifyError = err => {
lastError = err
if (errCb) {
errCb(err)
}
}

const notifyDone = value => {
if (valueCb) {
valueCb(value)
}
}

const waitForQueue = () =>
new Promise((resolve, reject) => {
if (lastError) {
reject(lastError)
}
if (values.size > 0) {
return resolve()
}
valueCb = resolve
errCb = reject
})

const queueNext = input => {
const nextVal = Promise.resolve(input.next()).then(async ({ done, value }) => {
if (!done) {
Expand All @@ -17,17 +46,21 @@ export async function* parallelMerge<I extends Array<AnyIterable<any>>>(
concurrentWork.delete(nextVal)
})
concurrentWork.add(nextVal)
nextVal.then(notifyDone, notifyError)
}

for (const input of inputs) {
queueNext(input)
}

while (true) {
if (concurrentWork.size === 0) {
// We technically don't have to check `values.size` as the for loop should have emptied it
// However I haven't yet found specs verifying that behavior, only tests
// the guard in waitForQueue() checking for values is in place for the same reason
if (concurrentWork.size === 0 && values.size === 0) {
return
}
await Promise.race(concurrentWork)
await waitForQueue()
for (const [input, value] of values) {
values.delete(input)
yield value
Expand Down
1 change: 1 addition & 0 deletions lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export type Iterableish<T> = Iterable<T> | Iterator<T> | AsyncIterable<T> | Asyn
export type AnyIterable<T> = Iterable<T> | AsyncIterable<T>
export type FlatMapValue<B> = B | AnyIterable<B> | undefined | null | Promise<B | AnyIterable<B> | undefined | null>
export type UnArrayAnyIterable<A extends Array<AnyIterable<any>>> = A extends Array<AnyIterable<infer T>> ? T : never
export type NullOrFunction = null | ((anything?: any) => void)
54 changes: 39 additions & 15 deletions lib/write-to-stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// <reference lib="esnext.asynciterable" />
import { AnyIterable } from './types'
import { AnyIterable, NullOrFunction } from './types'

interface IWritable {
once: any
Expand All @@ -14,28 +14,52 @@ function once(event: string, stream: IWritable): Promise<any> {
}

async function _writeToStream(stream: IWritable, iterable: AnyIterable<any>): Promise<void> {
let errorListener
let error
const errorPromise = new Promise((resolve, reject) => {
errorListener = err => {
error = err
reject(err)
let lastError = null
let errCb: NullOrFunction = null
let drainCb: NullOrFunction = null

const notifyError = err => {
lastError = err
if (errCb) {
errCb(err)
}
stream.once('error', errorListener)
}) as Promise<void>
}

const notifyDrain = () => {
if (drainCb) {
drainCb()
}
}

const cleanup = () => {
stream.removeListener('error', notifyError)
stream.removeListener('drain', notifyDrain)
}

stream.once('error', notifyError)

const waitForDrain = () =>
new Promise((resolve, reject) => {
if (lastError) {
return reject(lastError)
}
stream.once('drain', notifyDrain)
drainCb = resolve
errCb = reject
})

for await (const value of iterable) {
if (stream.write(value) === false) {
await Promise.race([errorPromise, once('drain', stream)])
await waitForDrain()
}
if (error) {
return errorPromise
if (lastError) {
break
}
}

stream.removeListener('error', errorListener)
if (error) {
return errorPromise
cleanup()
if (lastError) {
throw lastError
}
}

Expand Down