1
1
import rx .Observable ;
2
2
import rx .Scheduler ;
3
3
import rx .Subscriber ;
4
+ import rx .Subscription ;
4
5
import rx .functions .Action1 ;
5
6
import rx .schedulers .Schedulers ;
7
+ import rx .schedulers .TestScheduler ;
6
8
7
9
import java .io .File ;
8
10
import java .io .IOException ;
@@ -17,52 +19,44 @@ public static void main(String[] args) throws IOException {
17
19
public static void parseFile (String filename ) throws IOException {
18
20
Scanner scanner = new Scanner (new File (filename ));
19
21
20
- Scheduler scheduler = Schedulers .newThread ();
22
+ //Scheduler scheduler = Schedulers.newThread();
23
+ TestScheduler scheduler = Schedulers .test ();
21
24
22
25
Observable <Stock > stocks = Observable .create (new rx .Observable .OnSubscribe <Stock >() {
23
26
@ Override
24
27
public void call (Subscriber <? super Stock > subscriber ) {
25
28
scheduler .schedule (new Action1 <Scheduler .Inner >() {
26
29
@ Override
27
30
public void call (Scheduler .Inner inner ) {
31
+ try {
32
+ if (scanner .hasNextLine ()) {
33
+ Stock stock = parseLine (scanner .nextLine ());
34
+ subscriber .onNext (stock );
35
+ } else {
36
+ subscriber .onCompleted ();
37
+ }
38
+ } catch (Throwable t ) {
39
+ subscriber .onError (t );
40
+ }
41
+
42
+ final Action1 <Scheduler .Inner > innerAction = this ;
28
43
inner .schedule (new Action1 <Scheduler .Inner >() {
29
44
@ Override
30
45
public void call (Scheduler .Inner inner ) {
31
-
32
- try {
33
- if (scanner .hasNextLine ()) {
34
- Stock stock = parseLine (scanner .nextLine ());
35
- subscriber .onNext (stock );
36
- } else {
37
- subscriber .onCompleted ();
38
- }
39
- } catch (Throwable t ) {
40
- subscriber .onError (t );
41
- }
42
-
43
- final Action1 <Scheduler .Inner > innerAction = this ;
44
- inner .schedule (new Action1 <Scheduler .Inner >() {
45
- @ Override
46
- public void call (Scheduler .Inner inner ) {
47
- innerAction .call (inner );
48
- }
49
- }, 1 , TimeUnit .SECONDS );
50
-
46
+ innerAction .call (inner );
51
47
}
52
- });
48
+ }, 1 , TimeUnit . SECONDS );
53
49
}
54
50
});
55
51
}
56
52
});
57
53
58
- stocks .subscribe (new Action1 <Stock >() {
59
- @ Override
60
- public void call (Stock stock ) {
61
- System .out .println (stock );
62
- }
63
- });
54
+ Subscription subscription = stocks .timestamp (scheduler ).subscribe (System .out ::println );
64
55
65
- System .in .read ();
56
+ while (!subscription .isUnsubscribed ()) {
57
+ System .in .read ();
58
+ scheduler .advanceTimeBy (1 , TimeUnit .SECONDS );
59
+ }
66
60
}
67
61
68
62
public static Stock parseLine (String line ) {
0 commit comments