-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathwrite-to-stream.ts
73 lines (62 loc) · 1.67 KB
/
write-to-stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/// <reference lib="esnext.asynciterable" />
import { AnyIterable, NullOrFunction } from './types'
interface IWritable {
once: any
write: any
removeListener: any
}
function once(event: string, stream: IWritable): Promise<any> {
return new Promise(resolve => {
stream.once(event, resolve)
})
}
async function _writeToStream(stream: IWritable, iterable: AnyIterable<any>): Promise<void> {
let lastError = null
let errCb: NullOrFunction = null
let drainCb: NullOrFunction = null
const notifyError = err => {
lastError = err
if (errCb) {
errCb(err)
}
}
const notifyDrain = () => {
if (drainCb) {
drainCb()
}
}
const cleanup = () => {
stream.removeListener('error', notifyError)
stream.removeListener('drain', notifyDrain)
}
stream.once('error', notifyError)
const waitForDrain = () =>
new Promise((resolve, reject) => {
if (lastError) {
return reject(lastError)
}
stream.once('drain', notifyDrain)
drainCb = resolve
errCb = reject
})
for await (const value of iterable) {
if (stream.write(value) === false) {
await waitForDrain()
}
if (lastError) {
break
}
}
cleanup()
if (lastError) {
throw lastError
}
}
export function writeToStream(stream: IWritable): (iterable: AnyIterable<any>) => Promise<void>
export function writeToStream(stream: IWritable, iterable: AnyIterable<any>): Promise<void>
export function writeToStream(stream: IWritable, iterable?: AnyIterable<any>) {
if (iterable === undefined) {
return (curriedIterable: AnyIterable<any>) => _writeToStream(stream, curriedIterable)
}
return _writeToStream(stream, iterable)
}