Skip to content

Commit b0fda08

Browse files
Pedro Kehlpedrokehl
Pedro Kehl
authored andcommitted
Fix case that backpressure lose consistency on error
1 parent 7d359db commit b0fda08

File tree

4 files changed

+14
-5
lines changed

4 files changed

+14
-5
lines changed

package-lock.json

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "caminho",
3-
"version": "1.7.0",
3+
"version": "1.7.1",
44
"description": "Tool for creating efficient data pipelines in a JavaScript environment",
55
"main": "./dist/cjs/index.js",
66
"module": "./dist/esm5/index.js",
@@ -72,4 +72,4 @@
7272
"dependencies": {
7373
"rxjs": "^7.8.1"
7474
}
75-
}
75+
}

src/Caminho.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ export class Caminho implements CaminhoInterface {
7272
const loggers = this.getLoggers(generatorParams)
7373
if (this.options?.maxItemsFlowing) {
7474
const pendingDataControl = this.pendingDataControl as PendingDataControl
75-
this.finalStep = tap(() => pendingDataControl.decrement())
75+
this.finalStep = tap({ next: () => pendingDataControl.decrement(), error: () => pendingDataControl.decrement() })
7676
return wrapGeneratorWithBackPressure(generatorParams, this.options.maxItemsFlowing, pendingDataControl, loggers)
7777
}
7878

test/integration/error.test.ts

+9
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,13 @@ describe('Error Handling', () => {
5656

5757
await expect(caminho.run()).rejects.toMatchObject({ message: 'Reduce error' })
5858
})
59+
60+
test('Should not interfere with the backpressure when error happens', async () => {
61+
const operator = jest.fn().mockRejectedValueOnce(new Error('Operator error')).mockResolvedValue(null)
62+
const caminho = fromGenerator({ fn: getMockedGenerator([1, 2]), provides: 'number' }, { maxItemsFlowing: 1 })
63+
.pipe({ fn: operator })
64+
65+
await expect(caminho.run()).rejects.toMatchObject({ message: 'Operator error' })
66+
expect(caminho.getNumberOfItemsFlowing()).toBe(0)
67+
})
5968
})

0 commit comments

Comments
 (0)