Skip to content

Commit b0f4233

Browse files
rluvatonronag
andauthored
stream: add highWaterMark for the map operator
this is done so we don't wait for the first items to finish before starting new ones Fixes: #46132 Co-authored-by: Robert Nagy <[email protected]> PR-URL: #49249 Reviewed-By: Matteo Collina <[email protected]> Reviewed-By: Benjamin Gruenbaum <[email protected]> Reviewed-By: Robert Nagy <[email protected]>
1 parent da197d1 commit b0f4233

File tree

4 files changed

+226
-16
lines changed

4 files changed

+226
-16
lines changed

doc/api/stream.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2012,6 +2012,10 @@ showBoth();
20122012
added:
20132013
- v17.4.0
20142014
- v16.14.0
2015+
changes:
2016+
- version: REPLACEME
2017+
pr-url: https://github.com/nodejs/node/pull/49249
2018+
description: added `highWaterMark` in options.
20152019
-->
20162020

20172021
> Stability: 1 - Experimental
@@ -2025,6 +2029,8 @@ added:
20252029
* `options` {Object}
20262030
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
20272031
on the stream at once. **Default:** `1`.
2032+
* `highWaterMark` {number} how many items to buffer while waiting for user
2033+
consumption of the mapped items. **Default:** `concurrency * 2 - 1`.
20282034
* `signal` {AbortSignal} allows destroying the stream if the signal is
20292035
aborted.
20302036
* Returns: {Readable} a stream mapped with the function `fn`.
@@ -2059,6 +2065,10 @@ for await (const result of dnsResults) {
20592065
added:
20602066
- v17.4.0
20612067
- v16.14.0
2068+
changes:
2069+
- version: REPLACEME
2070+
pr-url: https://github.com/nodejs/node/pull/49249
2071+
description: added `highWaterMark` in options.
20622072
-->
20632073

20642074
> Stability: 1 - Experimental
@@ -2071,6 +2081,8 @@ added:
20712081
* `options` {Object}
20722082
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
20732083
on the stream at once. **Default:** `1`.
2084+
* `highWaterMark` {number} how many items to buffer while waiting for user
2085+
consumption of the filtered items. **Default:** `concurrency * 2 - 1`.
20742086
* `signal` {AbortSignal} allows destroying the stream if the signal is
20752087
aborted.
20762088
* Returns: {Readable} a stream filtered with the predicate `fn`.

lib/internal/streams/operators.js

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ const {
3232
NumberIsNaN,
3333
Promise,
3434
PromiseReject,
35+
PromiseResolve,
3536
PromisePrototypeThen,
3637
Symbol,
3738
} = primordials;
@@ -81,7 +82,15 @@ function map(fn, options) {
8182
concurrency = MathFloor(options.concurrency);
8283
}
8384

84-
validateInteger(concurrency, 'concurrency', 1);
85+
let highWaterMark = concurrency - 1;
86+
if (options?.highWaterMark != null) {
87+
highWaterMark = MathFloor(options.highWaterMark);
88+
}
89+
90+
validateInteger(concurrency, 'options.concurrency', 1);
91+
validateInteger(highWaterMark, 'options.highWaterMark', 0);
92+
93+
highWaterMark += concurrency;
8594

8695
return async function* map() {
8796
const signal = AbortSignal.any([options?.signal].filter(Boolean));
@@ -92,9 +101,28 @@ function map(fn, options) {
92101
let next;
93102
let resume;
94103
let done = false;
104+
let cnt = 0;
95105

96-
function onDone() {
106+
function onCatch() {
97107
done = true;
108+
afterItemProcessed();
109+
}
110+
111+
function afterItemProcessed() {
112+
cnt -= 1;
113+
maybeResume();
114+
}
115+
116+
function maybeResume() {
117+
if (
118+
resume &&
119+
!done &&
120+
cnt < concurrency &&
121+
queue.length < highWaterMark
122+
) {
123+
resume();
124+
resume = null;
125+
}
98126
}
99127

100128
async function pump() {
@@ -110,25 +138,27 @@ function map(fn, options) {
110138

111139
try {
112140
val = fn(val, signalOpt);
141+
142+
if (val === kEmpty) {
143+
continue;
144+
}
145+
146+
val = PromiseResolve(val);
113147
} catch (err) {
114148
val = PromiseReject(err);
115149
}
116150

117-
if (val === kEmpty) {
118-
continue;
119-
}
151+
cnt += 1;
120152

121-
if (typeof val?.catch === 'function') {
122-
val.catch(onDone);
123-
}
153+
PromisePrototypeThen(val, afterItemProcessed, onCatch);
124154

125155
queue.push(val);
126156
if (next) {
127157
next();
128158
next = null;
129159
}
130160

131-
if (!done && queue.length && queue.length >= concurrency) {
161+
if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
132162
await new Promise((resolve) => {
133163
resume = resolve;
134164
});
@@ -137,7 +167,7 @@ function map(fn, options) {
137167
queue.push(kEof);
138168
} catch (err) {
139169
const val = PromiseReject(err);
140-
PromisePrototypeThen(val, undefined, onDone);
170+
PromisePrototypeThen(val, afterItemProcessed, onCatch);
141171
queue.push(val);
142172
} finally {
143173
done = true;
@@ -168,10 +198,7 @@ function map(fn, options) {
168198
}
169199

170200
queue.shift();
171-
if (resume) {
172-
resume();
173-
resume = null;
174-
}
201+
maybeResume();
175202
}
176203

177204
await new Promise((resolve) => {

test/parallel/test-stream-forEach.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ const { once } = require('events');
9696
Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
9797
calls++;
9898
await once(signal, 'abort');
99-
}, { signal: ac.signal, concurrency: 2 });
99+
}, { signal: ac.signal, concurrency: 2, highWaterMark: 0 });
100100
// pump
101101
assert.rejects(async () => {
102102
await forEachPromise;

test/parallel/test-stream-map.js

Lines changed: 172 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,25 @@ const assert = require('assert');
88
const { once } = require('events');
99
const { setTimeout } = require('timers/promises');
1010

11+
function createDependentPromises(n) {
12+
const promiseAndResolveArray = [];
13+
14+
for (let i = 0; i < n; i++) {
15+
let res;
16+
const promise = new Promise((resolve) => {
17+
if (i === 0) {
18+
res = resolve;
19+
return;
20+
}
21+
res = () => promiseAndResolveArray[i - 1][0].then(resolve);
22+
});
23+
24+
promiseAndResolveArray.push([promise, res]);
25+
}
26+
27+
return promiseAndResolveArray;
28+
}
29+
1130
{
1231
// Map works on synchronous streams with a synchronous mapper
1332
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
@@ -143,7 +162,7 @@ const { setTimeout } = require('timers/promises');
143162
const stream = range.map(common.mustCall(async (_, { signal }) => {
144163
await once(signal, 'abort');
145164
throw signal.reason;
146-
}, 2), { signal: ac.signal, concurrency: 2 });
165+
}, 2), { signal: ac.signal, concurrency: 2, highWaterMark: 0 });
147166
// pump
148167
assert.rejects(async () => {
149168
for await (const item of stream) {
@@ -173,12 +192,164 @@ const { setTimeout } = require('timers/promises');
173192
})().then(common.mustCall());
174193
}
175194

195+
196+
{
197+
// highWaterMark with small concurrency
198+
const finishOrder = [];
199+
200+
const promises = createDependentPromises(4);
201+
202+
const raw = Readable.from([2, 0, 1, 3]);
203+
const stream = raw.map(async (item) => {
204+
const [promise, resolve] = promises[item];
205+
resolve();
206+
207+
await promise;
208+
finishOrder.push(item);
209+
return item;
210+
}, { concurrency: 2 });
211+
212+
(async () => {
213+
await stream.toArray();
214+
215+
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3]);
216+
})().then(common.mustCall(), common.mustNotCall());
217+
}
218+
219+
{
220+
// highWaterMark with a lot of items and large concurrency
221+
const finishOrder = [];
222+
223+
const promises = createDependentPromises(20);
224+
225+
const input = [10, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 11, 12, 13, 18, 15, 16, 17, 14, 19];
226+
const raw = Readable.from(input);
227+
// Should be
228+
// 10, 1, 0, 3, 4, 2 | next: 0
229+
// 10, 1, 3, 4, 2, 5 | next: 1
230+
// 10, 3, 4, 2, 5, 7 | next: 2
231+
// 10, 3, 4, 5, 7, 8 | next: 3
232+
// 10, 4, 5, 7, 8, 9 | next: 4
233+
// 10, 5, 7, 8, 9, 6 | next: 5
234+
// 10, 7, 8, 9, 6, 11 | next: 6
235+
// 10, 7, 8, 9, 11, 12 | next: 7
236+
// 10, 8, 9, 11, 12, 13 | next: 8
237+
// 10, 9, 11, 12, 13, 18 | next: 9
238+
// 10, 11, 12, 13, 18, 15 | next: 10
239+
// 11, 12, 13, 18, 15, 16 | next: 11
240+
// 12, 13, 18, 15, 16, 17 | next: 12
241+
// 13, 18, 15, 16, 17, 14 | next: 13
242+
// 18, 15, 16, 17, 14, 19 | next: 14
243+
// 18, 15, 16, 17, 19 | next: 15
244+
// 18, 16, 17, 19 | next: 16
245+
// 18, 17, 19 | next: 17
246+
// 18, 19 | next: 18
247+
// 19 | next: 19
248+
//
249+
250+
const stream = raw.map(async (item) => {
251+
const [promise, resolve] = promises[item];
252+
resolve();
253+
254+
await promise;
255+
finishOrder.push(item);
256+
return item;
257+
}, { concurrency: 6 });
258+
259+
(async () => {
260+
const outputOrder = await stream.toArray();
261+
262+
assert.deepStrictEqual(outputOrder, input);
263+
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
264+
})().then(common.mustCall(), common.mustNotCall());
265+
}
266+
267+
{
268+
// Custom highWaterMark with a lot of items and large concurrency
269+
const finishOrder = [];
270+
271+
const promises = createDependentPromises(20);
272+
273+
const input = [11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19];
274+
const raw = Readable.from(input);
275+
// Should be
276+
// 11, 1, 0, 3, 4 | next: 0, buffer: []
277+
// 11, 1, 3, 4, 2 | next: 1, buffer: [0]
278+
// 11, 3, 4, 2, 5 | next: 2, buffer: [0, 1]
279+
// 11, 3, 4, 5, 7 | next: 3, buffer: [0, 1, 2]
280+
// 11, 4, 5, 7, 8 | next: 4, buffer: [0, 1, 2, 3]
281+
// 11, 5, 7, 8, 9 | next: 5, buffer: [0, 1, 2, 3, 4]
282+
// 11, 7, 8, 9, 6 | next: 6, buffer: [0, 1, 2, 3, 4, 5]
283+
// 11, 7, 8, 9, 10 | next: 7, buffer: [0, 1, 2, 3, 4, 5, 6] -- buffer full
284+
// 11, 8, 9, 10, 12 | next: 8, buffer: [0, 1, 2, 3, 4, 5, 6]
285+
// 11, 9, 10, 12, 13 | next: 9, buffer: [0, 1, 2, 3, 4, 5, 6]
286+
// 11, 10, 12, 13, 18 | next: 10, buffer: [0, 1, 2, 3, 4, 5, 6]
287+
// 11, 12, 13, 18, 15 | next: 11, buffer: [0, 1, 2, 3, 4, 5, 6]
288+
// 12, 13, 18, 15, 16 | next: 12, buffer: [] -- all items flushed as 11 is consumed and all the items wait for it
289+
// 13, 18, 15, 16, 17 | next: 13, buffer: []
290+
// 18, 15, 16, 17, 14 | next: 14, buffer: []
291+
// 18, 15, 16, 17, 19 | next: 15, buffer: [14]
292+
// 18, 16, 17, 19 | next: 16, buffer: [14, 15]
293+
// 18, 17, 19 | next: 17, buffer: [14, 15, 16]
294+
// 18, 19 | next: 18, buffer: [14, 15, 16, 17]
295+
// 19 | next: 19, buffer: [] -- all items flushed
296+
//
297+
298+
const stream = raw.map(async (item) => {
299+
const [promise, resolve] = promises[item];
300+
resolve();
301+
302+
await promise;
303+
finishOrder.push(item);
304+
return item;
305+
}, { concurrency: 5, highWaterMark: 7 });
306+
307+
(async () => {
308+
const outputOrder = await stream.toArray();
309+
310+
assert.deepStrictEqual(outputOrder, input);
311+
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
312+
})().then(common.mustCall(), common.mustNotCall());
313+
}
314+
315+
{
316+
// Where there is a delay between the first and the next item it should not wait for filled queue
317+
// before yielding to the user
318+
const promises = createDependentPromises(3);
319+
320+
const raw = Readable.from([0, 1, 2]);
321+
322+
const stream = raw
323+
.map(async (item) => {
324+
if (item !== 0) {
325+
await promises[item][0];
326+
}
327+
328+
return item;
329+
}, { concurrency: 2 })
330+
.map((item) => {
331+
// eslint-disable-next-line no-unused-vars
332+
for (const [_, resolve] of promises) {
333+
resolve();
334+
}
335+
336+
return item;
337+
});
338+
339+
(async () => {
340+
await stream.toArray();
341+
})().then(common.mustCall(), common.mustNotCall());
342+
}
343+
176344
{
177345
// Error cases
178346
assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
179347
assert.throws(() => Readable.from([1]).map((x) => x, {
180348
concurrency: 'Foo'
181349
}), /ERR_OUT_OF_RANGE/);
350+
assert.throws(() => Readable.from([1]).map((x) => x, {
351+
concurrency: -1
352+
}), /ERR_OUT_OF_RANGE/);
182353
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
183354
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
184355
}

0 commit comments

Comments
 (0)