Skip to content

Migrate to rxjava2 #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 18 additions & 16 deletions src/test/java/com/oreilly/rxjava/ch1/Chapter1.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.oreilly.rxjava.ch1;

import com.oreilly.rxjava.util.Sleeper;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import org.junit.Ignore;
import org.junit.Test;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.schedulers.Schedulers;


import java.time.Duration;
import java.util.Map;
Expand All @@ -25,7 +26,7 @@ public class Chapter1 {
public void sample_6() throws Exception {
Observable.create(s -> {
s.onNext("Hello World!");
s.onCompleted();
s.onComplete();
}).subscribe(hello -> System.out.println(hello));
}

Expand All @@ -36,7 +37,7 @@ public void sample_17() throws Exception {

Observable.create(s -> {
s.onNext(cache.get(SOME_KEY));
s.onCompleted();
s.onComplete();
}).subscribe(value -> System.out.println(value));
}

Expand All @@ -48,14 +49,14 @@ public void sample_35() throws Exception {
if (fromCache != null) {
// emit synchronously
s.onNext(fromCache);
s.onCompleted();
s.onComplete();
} else {
// fetch asynchronously
getDataAsynchronously(SOME_KEY)
.onResponse(v -> {
putInCache(SOME_KEY, v);
s.onNext(v);
s.onCompleted();
s.onComplete();
})
.onFailure(exception -> {
s.onError(exception);
Expand Down Expand Up @@ -90,7 +91,7 @@ public void sample_81() throws Exception {
s.onNext(1);
s.onNext(2);
s.onNext(3);
s.onCompleted();
s.onComplete();
});

o.map(i -> "Number " + i)
Expand Down Expand Up @@ -119,7 +120,7 @@ public void sample_108() throws Exception {
s.onNext("two");
s.onNext("three");
s.onNext("four");
s.onCompleted();
s.onComplete();
}).start();
});
}
Expand Down Expand Up @@ -151,15 +152,15 @@ public void sample_142() throws Exception {
new Thread(() -> {
s.onNext("one");
s.onNext("two");
s.onCompleted();
s.onComplete();
}).start();
});

Observable<String> b = Observable.create(s -> {
new Thread(() -> {
s.onNext("three");
s.onNext("four");
s.onCompleted();
s.onComplete();
}).start();
});

Expand All @@ -173,7 +174,7 @@ public void sample_164() throws Exception {
Observable<String> someData = Observable.create(s -> {
getDataFromServerWithCallback(args, data -> {
s.onNext(data);
s.onCompleted();
s.onComplete();
});
});

Expand Down Expand Up @@ -265,7 +266,8 @@ public void sample_254() throws Exception {
@Test
public void sample_265() throws Exception {
// merge a & b into an Observable stream of 2 values
Observable<String> a_merge_b = getDataA().mergeWith(getDataB());
Observable<String> a_merge_b = getDataA().mergeWith(getDataB())
.toObservable();
}

public static Single<String> getDataA() {
Expand All @@ -283,7 +285,7 @@ public void sample_277() throws Exception {
Single<String> s2 = getDataAsSingle(2);

// o3 is now a stream of s1 and s2 that emits each item without waiting
Observable<String> o3 = Single.merge(s1, s2);
Observable<String> o3 = Single.merge(s1, s2).toObservable();
}

private Single<String> getDataAsSingle(int i) {
Expand All @@ -299,7 +301,7 @@ static Completable writeToDatabase(Object data) {
return Completable.create(s -> {
doAsyncWrite(data,
// callback for successful completion
() -> s.onCompleted(),
() -> s.onComplete(),
// callback for failure with Throwable
error -> s.onError(error));
});
Expand Down