-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathindex.js
83 lines (71 loc) · 1.82 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
'use strict';
const
assert = require('assert'),
byline = require('byline');
function split(inputStream, opts, createOutputStreamCallback) {
let outputStream = null;
let chunkIndex = 0;
let lineIndex = 0;
let header;
const options = {
delimiter: opts.delimiter || '\n',
lineLimit: opts.lineLimit
};
return new Promise((resolve, reject) => {
assert(inputStream, 'Provide inputStream');
assert(options.lineLimit > 0, 'Provide non-negative lineLimit');
let lineStream;
function handleError(err) {
if (outputStream) {
outputStream.end();
}
reject(err);
}
inputStream.on('error', handleError);
try {
lineStream = byline(inputStream);
} catch(err) {
handleError(err);
return;
}
lineStream.on('data', line => {
if (!header) {
header = line;
} else {
if (lineIndex === 0) {
if (outputStream) {
outputStream.end();
}
outputStream = createOutputStreamCallback(chunkIndex++);
outputStream.write(header);
outputStream.write(options.delimiter);
}
outputStream.write(line);
outputStream.write(options.delimiter);
lineIndex = (++lineIndex) % options.lineLimit;
}
});
lineStream.on('error', handleError);
lineStream.on('end', () => {
if (!header) {
reject(new Error('The provided CSV is empty'));
return;
}
if (outputStream) {
outputStream.end();
} else {
outputStream = createOutputStreamCallback(chunkIndex++);
outputStream.write(header);
outputStream.write(options.delimiter);
outputStream.end();
}
resolve({
totalChunks: chunkIndex,
options: options
});
});
});
}
module.exports = {
split
};