-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathScan 3.linq
47 lines (40 loc) · 1.35 KB
/
Scan 3.linq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<Query Kind="Statements">
<Reference><ApplicationData>\LINQPad\Samples\Programming Reactive Extensions and LINQ\System.Reactive.dll</Reference>
<Namespace>System.Reactive</Namespace>
<Namespace>System.Reactive.Linq</Namespace>
</Query>
/* Scan 3:
*
* We can even make this code even more readable if we turn ThrowIfBelowZero
* into its own Operator, via writing our own Extension Method.
*/
public static class ThrowObservableMixin
{
public static IObservable<int> ThrowIfBelowZero(this IObservable<int> This)
{
return This.SelectMany(refCount => {
if (refCount >= 0) {
return Observable.Return(refCount);
}
return Observable.Throw<int>(new Exception("Refcount dropped below Zero!"));
});
}
}
void Main()
{
var AddRef = new Subject<Unit>();
var Release = new Subject<Unit>();
var referenceCount = Observable.Merge(
AddRef.Select(_ => 1),
Release.Select(_ => -1))
.Scan(0, (acc, x) => acc + x)
.ThrowIfBelowZero();
referenceCount.Subscribe(x => Console.WriteLine("Current RefCount is {0}", x));
AddRef.OnNext(Unit.Default);
AddRef.OnNext(Unit.Default);
Release.OnNext(Unit.Default);
AddRef.OnNext(Unit.Default);
Release.OnNext(Unit.Default);
Release.OnNext(Unit.Default);
Release.OnNext(Unit.Default);
}