Skip to content

Commit 04f66f2

Browse files
author
Pedro Kehl
committed
Fix case where pending items flowing will not be decremented after an error
1 parent b0fda08 commit 04f66f2

11 files changed

+128
-64
lines changed

package-lock.json

+27-8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
"license": "ISC",
6161
"devDependencies": {
6262
"@types/jest": "^29.5.12",
63+
"@types/node": "^20.16.0",
6364
"@typescript-eslint/eslint-plugin": "^6.21.0",
6465
"@typescript-eslint/parser": "^6.21.0",
6566
"eslint": "^8.57.0",
@@ -72,4 +73,4 @@
7273
"dependencies": {
7374
"rxjs": "^7.8.1"
7475
}
75-
}
76+
}

src/Caminho.ts

+28-23
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,23 @@ import { parallel } from './operators/parallel'
1010
import { reduce, type ReduceParams } from './operators/reduce'
1111
import { filter, type FilterPredicate } from './operators/filter'
1212

13-
import { applyOperator, isBatch, type OperatorApplier } from './operators/helpers/operatorHelpers'
13+
import {
14+
applyOperator,
15+
isBatch,
16+
type OperatorApplier,
17+
type OperatorApplierWithRunId,
18+
} from './operators/helpers/operatorHelpers'
1419
import { type PendingDataControl, PendingDataControlInMemory } from './utils/PendingDataControl'
1520

1621
import { getOnStepFinished } from './utils/onStepFinished'
1722
import { getOnStepStarted } from './utils/onStepStarted'
23+
import { generateId } from './utils/generateId'
24+
25+
type Generator = (initialBag: ValueBag, runId: string) => AsyncGenerator<ValueBag>
1826

1927
export class Caminho implements CaminhoInterface {
20-
private generator: (initialBag: ValueBag) => AsyncGenerator<ValueBag>
21-
private operators: OperatorApplier[] = []
22-
private finalStep?: OperatorApplier
28+
private generator: Generator
29+
private operators: OperatorApplierWithRunId[] = []
2330
private pendingDataControl?: PendingDataControl
2431

2532
constructor(generatorParams: FromGeneratorParams, private options?: CaminhoOptions) {
@@ -40,14 +47,14 @@ export class Caminho implements CaminhoInterface {
4047

4148
public pipe(params: PipeGenericParams): this {
4249
const operatorApplier = this.getApplierForPipeOrBatch(params)
43-
this.addOperatorApplier(operatorApplier)
50+
this.addOperatorApplier(() => operatorApplier)
4451
return this
4552
}
4653

4754
public parallel(params: PipeGenericParams[]): this {
4855
const operatorAppliers: OperatorApplier[] = params.map(this.getApplierForPipeOrBatch)
4956
const operatorApplier = parallel(params, operatorAppliers)
50-
this.addOperatorApplier(operatorApplier)
57+
this.addOperatorApplier(() => operatorApplier)
5158
return this
5259
}
5360

@@ -64,34 +71,32 @@ export class Caminho implements CaminhoInterface {
6471
}
6572

6673
public async run(initialBag?: ValueBag): Promise<ValueBag> {
67-
const observable$ = this.buildObservable(initialBag)
68-
return lastValueFrom(observable$, { defaultValue: initialBag })
74+
const runId = generateId()
75+
const initial$ = from(this.generator({ ...initialBag }, runId))
76+
const observable$ = this.operators.reduce((acc, operator) => applyOperator(acc, operator, runId), initial$)
77+
78+
const finalObservable$ = this.options?.maxItemsFlowing
79+
? observable$.pipe(tap(() => (this.pendingDataControl as PendingDataControl).decrement(runId)))
80+
: observable$
81+
82+
try {
83+
return await lastValueFrom(finalObservable$, { defaultValue: initialBag })
84+
} finally {
85+
this.pendingDataControl?.destroyBucket(runId)
86+
}
6987
}
7088

71-
private getGenerator(generatorParams: FromGeneratorParams): (initialBag: ValueBag) => AsyncGenerator<ValueBag> {
89+
private getGenerator(generatorParams: FromGeneratorParams): Generator {
7290
const loggers = this.getLoggers(generatorParams)
7391
if (this.options?.maxItemsFlowing) {
7492
const pendingDataControl = this.pendingDataControl as PendingDataControl
75-
this.finalStep = tap({ next: () => pendingDataControl.decrement(), error: () => pendingDataControl.decrement() })
7693
return wrapGeneratorWithBackPressure(generatorParams, this.options.maxItemsFlowing, pendingDataControl, loggers)
7794
}
7895

7996
return wrapGenerator(generatorParams, loggers)
8097
}
8198

82-
private buildObservable(initialBag: ValueBag = {}) {
83-
const initialObservable$ = from(this.generator({ ...initialBag }))
84-
const observable$ = this.operators.reduce(applyOperator, initialObservable$)
85-
86-
if (this.finalStep) {
87-
return observable$
88-
.pipe(this.finalStep)
89-
}
90-
91-
return observable$
92-
}
93-
94-
private addOperatorApplier(operatorApplier: OperatorApplier) {
99+
private addOperatorApplier(operatorApplier: OperatorApplierWithRunId) {
95100
this.operators.push(operatorApplier)
96101
}
97102

src/operators/filter.ts

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
import { filter as filterRxJs } from 'rxjs'
22
import { type PendingDataControl } from '../utils/PendingDataControl'
33
import { type Loggers, type ValueBag } from '../types'
4-
import { type OperatorApplier } from './helpers/operatorHelpers'
4+
import { type OperatorApplierWithRunId } from './helpers/operatorHelpers'
55

66
export type FilterPredicate = (valueBag: ValueBag, index: number) => boolean
77

88
export function filter(
99
predicate: FilterPredicate,
1010
loggers: Loggers,
1111
pendingDataControl?: PendingDataControl,
12-
): OperatorApplier {
13-
function wrappedFilter(valueBag: ValueBag, index: number): boolean {
12+
): OperatorApplierWithRunId {
13+
function wrappedFilter(valueBag: ValueBag, index: number, runId: string): boolean {
1414
const startedAt = new Date()
1515
loggers.onStepStarted([valueBag])
1616
try {
1717
const filterResult = predicate(valueBag, index)
1818
if (!filterResult) {
19-
pendingDataControl?.decrement()
19+
pendingDataControl?.decrement(runId)
2020
}
2121
loggers.onStepFinished([valueBag], startedAt)
2222
return filterResult
@@ -26,5 +26,7 @@ export function filter(
2626
}
2727
}
2828

29-
return filterRxJs(wrappedFilter)
29+
return function filterOperatorWithRunId(runId: string) {
30+
return filterRxJs((valueBag, index) => wrappedFilter(valueBag, index, runId))
31+
}
3032
}

src/operators/generator.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ export function wrapGeneratorWithBackPressure(
3838
loggers: Loggers,
3939
) {
4040
const wrappedGenerator = wrapGenerator(generatorParams, loggers)
41-
return async function* wrappedGeneratorWithBackPressure(initialBag: ValueBag) {
42-
for await (const value of wrappedGenerator(initialBag)) {
43-
pendingDataControl.increment()
41+
return async function* wrappedGeneratorWithBackPressure(initialBag: ValueBag, runId: string) {
42+
for await (const value of wrappedGenerator({ ...initialBag })) {
43+
pendingDataControl.increment(runId)
4444
yield value
4545
if (needsToWaitForBackpressure(pendingDataControl, maxItemsFlowing)) {
4646
await waitOnBackpressure(maxItemsFlowing, pendingDataControl)

src/operators/helpers/operatorHelpers.ts

+4-2
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@ import { type BatchParams } from '../batch'
55
import { type PipeParams } from '../pipe'
66

77
export type OperatorApplier = (observable: Observable<ValueBag>) => Observable<ValueBag>
8+
export type OperatorApplierWithRunId = (runId: string) => OperatorApplier
89

910
export function isBatch(params: PipeParams | BatchParams): params is BatchParams {
1011
return !!(params as BatchParams)?.batch
1112
}
1213

1314
export function applyOperator(
1415
observable: Observable<ValueBag>,
15-
operatorApplier: OperatorApplier,
16+
operatorApplier: OperatorApplierWithRunId,
17+
runId: string,
1618
): Observable<ValueBag> {
17-
return operatorApplier(observable)
19+
return operatorApplier(runId)(observable)
1820
}

src/operators/reduce.ts

+13-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { type Observable, map, reduce as reduceRxJs } from 'rxjs'
22
import type { Loggers, ValueBag } from '../types'
33
import { type PendingDataControl } from '../utils/PendingDataControl'
4-
import { type OperatorApplier } from './helpers/operatorHelpers'
4+
import { type OperatorApplierWithRunId } from './helpers/operatorHelpers'
55
import { getNewValueBag } from '../utils/valueBag'
66
import { pick } from '../utils/pick'
77

@@ -18,12 +18,12 @@ export function reduce<T>(
1818
reduceParams: ReduceParams<T>,
1919
loggers: Loggers,
2020
pendingDataControl?: PendingDataControl,
21-
): OperatorApplier {
21+
): OperatorApplierWithRunId {
2222
const { provides, keep, seed } = reduceParams
2323
const immutableSeed = Object.freeze(seed)
2424
let lastBag: ValueBag = {}
2525

26-
function wrappedReduce(acc: T, valueBag: ValueBag, index: number): T {
26+
function wrappedReduce(acc: T, valueBag: ValueBag, index: number, runId: string): T {
2727
const startedAt = new Date()
2828
loggers.onStepStarted([valueBag])
2929
// RxJs doesn't create a structureClone from the seed parameter when start processing.
@@ -39,16 +39,18 @@ export function reduce<T>(
3939
loggers.onStepFinished([valueBag], startedAt, err as Error)
4040
throw err
4141
} finally {
42-
pendingDataControl?.decrement()
42+
pendingDataControl?.decrement(runId)
4343
}
4444
}
4545

46-
return function operatorApplier(observable: Observable<ValueBag>) {
47-
return observable
48-
.pipe(reduceRxJs(wrappedReduce, seed))
49-
.pipe(map((reduceResult: T) => {
50-
pendingDataControl?.increment()
51-
return getNewValueBag(pick(lastBag, keep ?? []), provides, reduceResult)
52-
}))
46+
return function operatorApplierWithRunId(runId: string) {
47+
return function operatorApplier(observable: Observable<ValueBag>) {
48+
return observable
49+
.pipe(reduceRxJs((acc, valueBag, index) => wrappedReduce(acc, valueBag, index, runId), seed))
50+
.pipe(map((reduceResult: T) => {
51+
pendingDataControl?.increment(runId)
52+
return getNewValueBag(pick(lastBag, keep ?? []), provides, reduceResult)
53+
}))
54+
}
5355
}
5456
}

src/utils/PendingDataControl.ts

+16-4
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,29 @@
11
export class PendingDataControlInMemory implements PendingDataControl {
22
public size = 0
3+
private buckets = new Map()
34

4-
increment(value = 1): void {
5+
increment(bucketId: string, value = 1): void {
56
this.size += value
7+
const current = this.buckets.get(bucketId) ?? 0
8+
this.buckets.set(bucketId, current + value)
69
}
710

8-
decrement(value = 1): void {
11+
decrement(bucketId: string, value = 1): void {
912
this.size -= value
13+
const current = this.buckets.get(bucketId)
14+
this.buckets.set(bucketId, current - value)
15+
}
16+
17+
destroyBucket(bucketId: string) {
18+
const inBucket = this.buckets.get(bucketId) ?? 0
19+
this.size -= inBucket
20+
this.buckets.delete(bucketId)
1021
}
1122
}
1223

1324
export type PendingDataControl = {
1425
size: number
15-
increment: (value?: number) => void
16-
decrement: (value?: number) => void
26+
increment: (bucketId: string, value?: number) => void
27+
decrement: (bucketId: string, value?: number) => void
28+
destroyBucket: (bucketId: string) => void
1729
}

src/utils/generateId.ts

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export function generateId() {
2+
return `${Date.now().toString(36)}-${Math.random().toString(36).substr(2, 9)}`
3+
}

test/integration/error.test.ts

+18
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import { fromGenerator } from '../../src'
2+
import { sleep } from '../../src/utils/sleep'
3+
import { getNumberedArray } from '../mocks/array.mock'
24
import { getMockedGenerator, getThrowingGenerator } from '../mocks/generator.mock'
35

46
describe('Error Handling', () => {
@@ -65,4 +67,20 @@ describe('Error Handling', () => {
6567
await expect(caminho.run()).rejects.toMatchObject({ message: 'Operator error' })
6668
expect(caminho.getNumberOfItemsFlowing()).toBe(0)
6769
})
70+
71+
test('Should not keep account for the pending items in the flow after an error', async () => {
72+
const operator = jest.fn()
73+
.mockImplementationOnce(() => sleep(1))
74+
.mockImplementationOnce(() => sleep(1))
75+
.mockImplementationOnce(() => sleep(1))
76+
.mockRejectedValueOnce(new Error('Operator error'))
77+
78+
const generator = getMockedGenerator(getNumberedArray(10))
79+
80+
const caminho = fromGenerator({ fn: generator, provides: 'number' }, { maxItemsFlowing: 5 })
81+
.pipe({ fn: operator })
82+
83+
await expect(caminho.run()).rejects.toMatchObject({ message: 'Operator error' })
84+
expect(caminho.getNumberOfItemsFlowing()).toBe(0)
85+
})
6886
})

0 commit comments

Comments
 (0)