Skip to content

Commit 86993d3

Browse files
Merge pull request #784 from benjchristensen/lift-observer
Lift and Observer+Subscription
2 parents bd87341 + 6725754 commit 86993d3

File tree

206 files changed

+3334
-2889
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

206 files changed

+3334
-2889
lines changed

language-adaptors/rxjava-groovy/src/main/java/rx/lang/groovy/GroovyCreateWrapper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616
package rx.lang.groovy;
1717

1818
import groovy.lang.Closure;
19-
import rx.Operator;
19+
import rx.Observable.OnSubscribe;
20+
import rx.Observer;
2021
import rx.Subscription;
21-
import rx.util.functions.Action1;
2222

23-
public class GroovyCreateWrapper<T> implements Action1<Operator<? super T>> {
23+
public class GroovyCreateWrapper<T> implements OnSubscribe<T> {
2424

2525
private final Closure<Void> closure;
2626

@@ -29,7 +29,7 @@ public GroovyCreateWrapper(Closure<Void> closure) {
2929
}
3030

3131
@Override
32-
public void call(Operator<? super T> op) {
32+
public void call(Observer<? super T> op) {
3333
Object o = closure.call(op);
3434
/*
3535
* If the new signature is being used, we will get NULL back.

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@ package rx.lang.scala.examples
1717

1818
import scala.concurrent.duration.DurationInt
1919
import scala.language.postfixOps
20-
2120
import org.junit.Test
2221
import org.mockito.Matchers._
2322
import org.mockito.Mockito._
2423
import org.scalatest.junit.JUnitSuite
25-
2624
import rx.lang.scala._
2725
import rx.lang.scala.schedulers.TestScheduler
26+
import rx.observers.TestObserver
2827

2928
class TestSchedulerExample extends JUnitSuite {
3029

@@ -36,7 +35,7 @@ class TestSchedulerExample extends JUnitSuite {
3635
val o = Observable.interval(1 second, scheduler)
3736

3837
// Wrap Java Observer in Scala Observer, then subscribe
39-
val sub = o.subscribe(Observer(observer))
38+
val sub = o.subscribe(Observer(new TestObserver(observer)))
4039

4140
verify(observer, never).onNext(0L)
4241
verify(observer, never).onCompleted()

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subject.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ package rx.lang.scala
2121
trait Subject[T] extends Observable[T] with Observer[T] {
2222
private [scala] val asJavaSubject: rx.subjects.Subject[_ >: T, _<: T]
2323

24-
val asJavaObservable: rx.Observable[_ <: T] = asJavaSubject
24+
val asJavaObservable: rx.Observable[_ <: T] = asJavaSubject.toObservable()
2525

2626
override val asJavaObserver: rx.Observer[_ >: T] = asJavaSubject
2727
override def onNext(value: T): Unit = { asJavaObserver.onNext(value)}

rxjava-contrib/rxjava-android/src/test/java/rx/android/observables/AndroidObservableTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@
2525
import org.robolectric.Robolectric;
2626
import org.robolectric.RobolectricTestRunner;
2727
import org.robolectric.annotation.Config;
28+
2829
import rx.Observable;
2930
import rx.Observer;
31+
import rx.observers.TestObserver;
3032
import rx.operators.OperationObserveFromAndroidComponent;
31-
3233
import android.app.Activity;
3334
import android.app.Fragment;
3435
import android.os.Build;
3536
import android.support.v4.app.FragmentActivity;
36-
3737
import rx.android.observables.AndroidObservable;
3838

3939

@@ -66,14 +66,14 @@ public void setup() {
6666

6767
@Test
6868
public void itSupportsFragmentsFromTheSupportV4Library() {
69-
AndroidObservable.fromFragment(supportFragment, Observable.just("success")).subscribe(observer);
69+
AndroidObservable.fromFragment(supportFragment, Observable.just("success")).subscribe(new TestObserver<String>(observer));
7070
verify(observer).onNext("success");
7171
verify(observer).onCompleted();
7272
}
7373

7474
@Test
7575
public void itSupportsNativeFragments() {
76-
AndroidObservable.fromFragment(fragment, Observable.just("success")).subscribe(observer);
76+
AndroidObservable.fromFragment(fragment, Observable.just("success")).subscribe(new TestObserver<String>(observer));
7777
verify(observer).onNext("success");
7878
verify(observer).onCompleted();
7979
}

rxjava-contrib/rxjava-android/src/test/java/rx/android/operators/OperationObserveFromAndroidComponentTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import rx.Observer;
4343
import rx.Subscription;
4444
import rx.android.schedulers.AndroidSchedulers;
45+
import rx.observers.TestObserver;
4546
import rx.operators.OperationObserveFromAndroidComponent;
4647
import rx.schedulers.Schedulers;
4748
import rx.subjects.PublishSubject;
@@ -128,7 +129,7 @@ public void call(Integer i) {
128129
@Test
129130
public void itForwardsOnNextOnCompletedSequenceToTargetObserver() {
130131
Observable<Integer> source = Observable.from(1, 2, 3);
131-
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
132+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver<Integer>(mockObserver));
132133
verify(mockObserver, times(3)).onNext(anyInt());
133134
verify(mockObserver).onCompleted();
134135
verify(mockObserver, never()).onError(any(Exception.class));
@@ -138,7 +139,7 @@ public void itForwardsOnNextOnCompletedSequenceToTargetObserver() {
138139
public void itForwardsOnErrorToTargetObserver() {
139140
final Exception exception = new Exception();
140141
Observable<Integer> source = Observable.error(exception);
141-
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
142+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(new TestObserver<Integer>(mockObserver));
142143
verify(mockObserver).onError(exception);
143144
verify(mockObserver, never()).onNext(anyInt());
144145
verify(mockObserver, never()).onCompleted();
@@ -148,7 +149,7 @@ public void itForwardsOnErrorToTargetObserver() {
148149
public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() throws Throwable {
149150
PublishSubject<Integer> source = PublishSubject.create();
150151

151-
final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source, mockFragment);
152+
final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source.toObservable(), mockFragment);
152153
operator.onSubscribe(mockObserver);
153154

154155
source.onNext(1);
@@ -166,7 +167,7 @@ public void itDropsOnNextOnCompletedSequenceIfTargetComponentIsGone() throws Thr
166167
public void itDropsOnErrorIfTargetComponentIsGone() throws Throwable {
167168
PublishSubject<Integer> source = PublishSubject.create();
168169

169-
final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source, mockFragment);
170+
final Observable.OnSubscribeFunc<Integer> operator = newOnSubscribeFragmentInstance(source.toObservable(), mockFragment);
170171
operator.onSubscribe(mockObserver);
171172

172173
source.onNext(1);
@@ -202,7 +203,7 @@ private void releaseComponentRef(Observable.OnSubscribeFunc<Integer> operator) t
202203
@Test
203204
public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() {
204205
PublishSubject<Integer> source = PublishSubject.create();
205-
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
206+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source.toObservable(), mockFragment).subscribe(new TestObserver<Integer>(mockObserver));
206207

207208
source.onNext(1);
208209

@@ -218,7 +219,7 @@ public void itDoesNotForwardOnNextOnCompletedSequenceIfFragmentIsDetached() {
218219
@Test
219220
public void itDoesNotForwardOnErrorIfFragmentIsDetached() {
220221
PublishSubject<Integer> source = PublishSubject.create();
221-
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source, mockFragment).subscribe(mockObserver);
222+
OperationObserveFromAndroidComponent.observeFromAndroidComponent(source.toObservable(), mockFragment).subscribe(new TestObserver<Integer>(mockObserver));
222223

223224
source.onNext(1);
224225

@@ -244,7 +245,7 @@ public Subscription onSubscribe(Observer<? super Integer> o) {
244245
});
245246

246247
Subscription sub = OperationObserveFromAndroidComponent.observeFromAndroidComponent(
247-
testObservable, mockActivity).subscribe(mockObserver);
248+
testObservable, mockActivity).subscribe(new TestObserver<Integer>(mockObserver));
248249
sub.unsubscribe();
249250

250251
assertTrue(s.isUnsubscribed());

rxjava-contrib/rxjava-android/src/test/java/rx/android/schedulers/HandlerThreadSchedulerTest.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,21 @@
1515
*/
1616
package rx.android.schedulers;
1717

18-
import android.os.Handler;
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.concurrent.TimeUnit;
1922

2023
import org.junit.Test;
2124
import org.junit.runner.RunWith;
2225
import org.mockito.ArgumentCaptor;
23-
import static org.hamcrest.CoreMatchers.equalTo;
24-
import static org.junit.Assert.assertNotNull;
25-
import static org.junit.Assert.assertThat;
2626
import org.robolectric.RobolectricTestRunner;
2727
import org.robolectric.annotation.Config;
2828

2929
import rx.Scheduler;
3030
import rx.Subscription;
31-
import rx.operators.SafeObservableSubscription;
3231
import rx.util.functions.Func2;
33-
34-
import java.util.concurrent.TimeUnit;
35-
36-
import static org.mockito.Matchers.eq;
37-
import static org.mockito.Mockito.mock;
38-
import static org.mockito.Mockito.verify;
32+
import android.os.Handler;
3933

4034
@RunWith(RobolectricTestRunner.class)
4135
@Config(manifest=Config.NONE)

rxjava-contrib/rxjava-apache-http/src/main/java/rx/apache/http/consumers/ResponseConsumerEventStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ protected void onResponseReceived(HttpResponse response) throws HttpException, I
8585

8686
@Override
8787
public Subscription onSubscribe(Observer<? super byte[]> observer) {
88-
parentSubscription.add(contentSubject.subscribe(observer));
88+
parentSubscription.add(contentSubject.toObservable().subscribe(observer));
8989
return parentSubscription;
9090
}
9191
});

0 commit comments

Comments
 (0)