Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make .flatScan work seedless #737

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
28 changes: 17 additions & 11 deletions dist/Bacon.js
Original file line number Diff line number Diff line change
Expand Up @@ -2919,10 +2919,23 @@ function diff(src, start, f) {
return transformP(scan(src, [start, nullMarker], (function (prevTuple, next) { return [next, f(prevTuple[0], next)]; })), composeT(filterT(function (tuple) { return tuple[1] !== nullMarker; }), mapT(function (tuple) { return tuple[1]; })), new Desc(src, "diff", [start, f]));
}

/** @hidden */
function flatScanSeedless(src, f) {
var current;
var isSeeded = false;
return src.flatMapConcat(function (next) {
return (isSeeded ? makeObservable(f(current, next)) : makeObservable(next))
.doAction(function (updated) {
isSeeded = true;
current = updated;
});
}).toProperty();
}
/** @hidden */
function flatScan(src, seed, f) {
var current = seed;
return src.flatMapConcat(function (next) {
// @ts-ignore: TS2722 Cannot invoke an object which is possibly 'undefined'. Cause it's optional!
return makeObservable(f(current, next)).doAction(function (updated) { return current = updated; });
}).toProperty().startWith(seed).withDesc(new Desc(src, "flatScan", [seed, f]));
}
Expand Down Expand Up @@ -4185,17 +4198,10 @@ var EventStream = /** @class */ (function (_super) {
*/
EventStream.prototype.flatMapWithConcurrencyLimit = function (limit, f) { return flatMapWithConcurrencyLimit(this, limit, f); };
EventStream.prototype.flatMapEvent = function (f) { return flatMapEvent(this, f); };
/**
Scans stream with given seed value and accumulator function, resulting to a Property.
Difference to [`scan`](#scan) is that the function `f` can return an [`EventStream`](eventstream.html) or a [`Property`](property.html) instead
of a pure value, meaning that you can use [`flatScan`](#flatscan) for asynchronous updates of state. It serializes
updates so that that the next update will be queued until the previous one has completed.

* @param seed initial value to start with
* @param f transition function from previous state and new value to next state
* @typeparam V2 state and result type
*/
EventStream.prototype.flatScan = function (seed, f) {
if (arguments.length == 1) {
return flatScanSeedless(this, seed);
}
return flatScan(this, seed, f);
};
EventStream.prototype.groupBy = function (keyF, limitF) {
Expand Down Expand Up @@ -5223,7 +5229,7 @@ var $ = {
/**
* Bacon.js version as string
*/
var version = '3.0.5';
var version = '<version>';

exports.$ = $;
exports.Bus = Bus;
Expand Down
2 changes: 1 addition & 1 deletion dist/Bacon.min.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@
undefined
undefined
17 changes: 15 additions & 2 deletions dist/Bacon.noAssert.js
Original file line number Diff line number Diff line change
Expand Up @@ -2558,6 +2558,16 @@
f
]));
}
function flatScanSeedless(src, f) {
var current;
var isSeeded = false;
return src.flatMapConcat(function (next) {
return (isSeeded ? makeObservable(f(current, next)) : makeObservable(next)).doAction(function (updated) {
isSeeded = true;
current = updated;
});
}).toProperty();
}
function flatScan(src, seed, f) {
var current = seed;
return src.flatMapConcat(function (next) {
Expand Down Expand Up @@ -3154,6 +3164,9 @@
return flatMapEvent(this, f);
};
EventStream.prototype.flatScan = function (seed, f) {
if (arguments.length == 1) {
return flatScanSeedless(this, seed);
}
return flatScan(this, seed, f);
};
EventStream.prototype.groupBy = function (keyF, limitF) {
Expand Down Expand Up @@ -3818,7 +3831,7 @@
jQuery.fn.asEventStream = $.asEventStream;
}
};
var version = '3.0.5';
var version = '<version>';
exports.$ = $;
exports.Bus = Bus;
exports.CompositeUnsubscribe = CompositeUnsubscribe;
Expand Down Expand Up @@ -3880,4 +3893,4 @@
exports.zipAsArray = zipAsArray;
exports.zipWith = zipWith;
Object.defineProperty(exports, '__esModule', { value: true });
}));
}));
19 changes: 17 additions & 2 deletions src/flatscan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,25 @@ import { Observable, Property } from "./observable";
import { Desc } from "./describe";
import { Function2 } from "./types";

/** @hidden */
export function flatScanSeedless<V>(src: Observable<V>, f: Function2<V, V, Observable<V> | V>): Property<V> {
let current: V;
let isSeeded = false;

return src.flatMapConcat(function (next: V) {
return (isSeeded ? makeObservable(f(current, next)) : makeObservable(next))
.doAction(function (updated: V) {
isSeeded = true;
current = updated;
});
}).toProperty().withDesc(new Desc(src, "flatScan", [f]));
}

/** @hidden */
export function flatScan<In, Out>(src: Observable<In>, seed: Out, f: Function2<Out, In, Observable<Out> | Out>): Property<Out> {
let current = seed
let current = seed;
return src.flatMapConcat((next: In) =>
// @ts-ignore: TS2722 Cannot invoke an object which is possibly 'undefined'. Cause it's optional!
makeObservable(f(current, next)).doAction(updated => current = updated)
).toProperty().startWith(seed).withDesc(new Desc(src, "flatScan", [seed, f]))
}
}
9 changes: 8 additions & 1 deletion src/fold.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ import "./scan";
import Observable from "./observable";
import { Desc } from "./describe";
import { Accumulator } from "./scan";
import { Property } from "./observable";;
import { Property } from "./observable";

/** @hidden */
export default function fold<In, Out>(src: Observable<In>, seed: Out, f: Accumulator<In, Out>): Property<Out> {
return <any>src.scan(seed, f)
.last()
.withDesc(new Desc(src, "fold", [seed, f]));
}

/** @hidden */
export function foldSeedless<InOut>(src: Observable<InOut>, f: Accumulator<InOut, InOut>): Property<InOut> {
return <any>src.scan(f)
.last()
.withDesc(new Desc(src, "fold", [f]));
}
52 changes: 41 additions & 11 deletions src/observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import doLogT from "./dolog";
import doErrorT from "./doerror";
import doActionT from "./doaction";
import doEndT from "./doend";
import { Accumulator, default as scan } from "./scan";
import { Accumulator, scanSeedless, default as scan } from "./scan";
import mapEndT from "./mapend";
import mapErrorT from "./maperror";
import { SpawnerOrObservable, EventSpawner, EventOrValue } from "./flatmap_";
Expand All @@ -40,7 +40,7 @@ import { filter } from "./filter";
import { and, not, or } from "./boolean";
import flatMapFirst from "./flatmapfirst";
import addPropertyInitValueToStream from "./internal/addpropertyinitialvaluetostream";
import fold from "./fold";
import { default as fold, foldSeedless } from "./fold";
import { startWithE, startWithP } from "./startwith";
import takeUntil from "./takeuntil";
import flatMap from "./flatmap";
Expand Down Expand Up @@ -69,7 +69,7 @@ import skipWhile from "./skipwhile";
import { groupBy, GroupTransformer } from "./groupby";
import { slidingWindow } from "./slidingwindow";
import { diff, Differ } from "./diff";
import { flatScan } from "./flatscan";
import { flatScan, flatScanSeedless } from "./flatscan";
import { holdWhen } from "./holdwhen";
import { zip } from "./zip";
import decode from "./decode";
Expand Down Expand Up @@ -407,8 +407,16 @@ Works like [`scan`](#scan) but only emits the final
value, i.e. the value just before the observable ends. Returns a
[`Property`](property.html).
*/
fold<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return fold(this, seed, f)

fold<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

fold(f: Accumulator<V, V>): Property<V>

fold<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>foldSeedless(this, seed as any as Accumulator<V, V>);
}
return fold(this, seed as any as V2, f as any as Accumulator<V, V2>)
}

/**
Expand Down Expand Up @@ -596,8 +604,15 @@ Only applicable for observables with arrays as values.
}
/** A synonym for [scan](#scan).
*/
reduce<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return fold(this, seed, f)
reduce<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

reduce(f: Accumulator<V, V>): Property<V>

reduce<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>foldSeedless(this, seed as any as Accumulator<V, V>);
}
return fold(this, seed as any as V2, f as any as Accumulator<V, V2>)
}

/**
Expand Down Expand Up @@ -639,8 +654,16 @@ identically to EventStream.scan: the `seed` will be the initial value of
seed won't be output as is. Instead, the initial value of `r` will be `f(seed, x)`. This makes sense,
because there can only be 1 initial value for a Property at a time.
*/
scan<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2> {
return scan(this, seed, f)

scan<V2>(seed: V2, f: Accumulator<V, V2>): Property<V2>

scan(f: Accumulator<V, V>): Property<V>

scan<V2>(seed: V2 | Accumulator<V, V>, f?: Accumulator<V, V2>): Property<V2> {
if (arguments.length === 1) {
return <any>scanSeedless(this, seed as any as Accumulator<V, V>);
}
return <any>scan(this, seed as any as V2, f as any as Accumulator<V, V2>)
}
/**
Skips the first n elements from the stream
Expand Down Expand Up @@ -1408,8 +1431,15 @@ export class EventStream<V> extends Observable<V> {
* @param f transition function from previous state and new value to next state
* @typeparam V2 state and result type
*/
flatScan<V2>(seed: V2, f: Function2<V2, V, Observable<V2>>): Property<V2> {
return <any>flatScan(this, seed, f)
flatScan<V2>(seed: V2, f: Function2<V2, V, Observable<V2>>): Property<V2>

flatScan(f: Function2<V, V, Observable<V>>): Property<V>

flatScan<V2>(seed: V2 | Function2<V2, V, Observable<V2>>, f?: Function2<V2, V, Observable<V2>>): Property<V2> {
if (arguments.length == 1) {
return <any>flatScanSeedless(this, seed as any as Function2<V, V, Observable<V>>)
}
return <any>flatScan(this, seed as any as V2, f as any as Function2<V2, V, Observable<V2>>)
}

/**
Expand Down
28 changes: 27 additions & 1 deletion src/scan.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Observable from "./observable";
import { Property } from "./observable";;
import { Property } from "./observable";
import { Event, hasValue, Initial } from "./event";
import { more, noMore } from "./reply";
import { nop } from "./helpers";
Expand Down Expand Up @@ -58,3 +58,29 @@ export default function scan<In, Out>(src: Observable<In>, seed: Out, f: Accumul
}
return resultProperty = new Property(new Desc(src, "scan", [seed, f]), subscribe)
}

/** @hidden */
export function scanSeedless<V>(src: Observable<V>, f: Accumulator<V, V>): Property<V> {
let acc: V;
let hasAccumulatedFirstValue: Boolean = false;
const subscribe: Subscribe<V> = (sink: EventSink<V>) => {
let unsub = src.subscribeInternal(function(event: Event<V>) {
if (hasValue(event)) {
//console.log("has value: ", hasValue(event), "isInitial:", event.isInitial);
if (!hasAccumulatedFirstValue) {
acc = event.value;
hasAccumulatedFirstValue = true;
return sink(<any>event); // let the initial event pass through
}

acc = f(acc, event.value);
return sink(event.apply(acc));

} else {
return sink(<any>event);
}
});
return unsub;
}
return new Property(new Desc(src, "scan", [f]), subscribe)
}
14 changes: 14 additions & 0 deletions test/flatscan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ describe("EventStream.flatScan", function() {
[0, 1, 3, error(), 6], semiunstable)
);

describe("Without a seed value", () => {
it ("accumulates values with given seed and accumulator function which returns a stream of updated values", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).flatScan(addAsync(1)),
[1, 3, error(), 6]
)
);
it("Serializes updates even when they occur while performing previous update", () =>
expectPropertyEvents(
() => series(1, [0, 1, 2, error(), 3]).flatScan(addAsync(5)),
[0, error(), 1, 3, 6], semiunstable)
);
});

return it("yields the seed value immediately", function() {
const outputs: number[] = [];
new Bacon.Bus().flatScan(0, (a, b) => <any>1).onValue(value => { outputs.push(value) });
Expand Down
38 changes: 34 additions & 4 deletions test/fold.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,39 @@ describe("EventStream.fold", function() {
);
describe("has reduce as synonym", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).fold(0, add),
() => series(1, [1, 2, error(), 3]).reduce(0, add),
[error(), 6])
);
describe("works with synchronous source", () =>
expectPropertyEvents(
() => fromArray([1, 2, error(), 3]).fold(0, add),
[error(), 6], unstable)
);

describe("Without seed value", function(){
it("folds stream into a single-valued Property, passes through errors", () =>
expectPropertyEvents(
() => series(1, [0, 1, 2, error(), 3]).fold(add),
[error(), 6])
);
it("has reduce as synonym", () =>
expectPropertyEvents(
() => series(1, [1, 2, error(), 3]).reduce(add),
[error(), 6])
);
it("works with synchronous source", () =>
expectPropertyEvents(
() => fromArray([0, 1, 2, error(), 3]).fold(add),
[error(), 6], unstable)
);
it("works with really large chunks too, with { eager: true }", function() {
const count = 50000;
return expectPropertyEvents(
() => series(1, range(1, count, true)).fold((x: number,y: number) => x+1),
[count]);
});
});

return describe("works with really large chunks too, with { eager: true }", function() {
const count = 50000;
return expectPropertyEvents(
Expand All @@ -24,10 +49,15 @@ describe("EventStream.fold", function() {
});
});

describe("Property.fold", () =>
describe("Property.fold", () => {
describe("Folds Property into a single-valued one", () =>
expectPropertyEvents(
() => series(1, [2,3]).toProperty(1).fold(0, add),
() => series(1, [2, 3]).toProperty(1).fold(0, add),
[6])
);
describe("Without seed value folds Property into a single-valued one", () =>
expectPropertyEvents(
() => series(1, [2, 3]).toProperty(1).fold(add),
[6])
)
);
});
Loading