|
16 | 16 | package rx.operators;
|
17 | 17 |
|
18 | 18 | import static org.junit.Assert.*;
|
| 19 | +import static org.mockito.Mockito.*; |
19 | 20 |
|
20 | 21 | import java.util.ArrayList;
|
21 | 22 | import java.util.Arrays;
|
|
29 | 30 | import java.util.concurrent.atomic.AtomicReference;
|
30 | 31 |
|
31 | 32 | import org.junit.Test;
|
| 33 | +import org.mockito.Matchers; |
32 | 34 |
|
33 | 35 | import rx.Observable;
|
34 | 36 | import rx.Observable.OnSubscribe;
|
@@ -932,4 +934,36 @@ public void call(final Subscriber<? super Event> op) {
|
932 | 934 | });
|
933 | 935 | };
|
934 | 936 |
|
| 937 | + @Test |
| 938 | + public void testGroupByOnAsynchronousSourceAcceptsMultipleSubscriptions() throws InterruptedException { |
| 939 | + |
| 940 | + // choose an asynchronous source |
| 941 | + Observable<Long> source = Observable.interval(10, TimeUnit.MILLISECONDS).take(1); |
| 942 | + |
| 943 | + // apply groupBy to the source |
| 944 | + Observable<GroupedObservable<Boolean, Long>> stream = source.groupBy(IS_EVEN); |
| 945 | + |
| 946 | + // create two observers |
| 947 | + @SuppressWarnings("unchecked") |
| 948 | + Observer<GroupedObservable<Boolean, Long>> o1 = mock(Observer.class); |
| 949 | + @SuppressWarnings("unchecked") |
| 950 | + Observer<GroupedObservable<Boolean, Long>> o2 = mock(Observer.class); |
| 951 | + |
| 952 | + // subscribe with the observers |
| 953 | + stream.subscribe(o1); |
| 954 | + stream.subscribe(o2); |
| 955 | + |
| 956 | + // check that subscriptions were successful |
| 957 | + verify(o1, never()).onError(Matchers.<Throwable> any()); |
| 958 | + verify(o2, never()).onError(Matchers.<Throwable> any()); |
| 959 | + } |
| 960 | + |
| 961 | + private static Func1<Long, Boolean> IS_EVEN = new Func1<Long, Boolean>() { |
| 962 | + |
| 963 | + @Override |
| 964 | + public Boolean call(Long n) { |
| 965 | + return n % 2 == 0; |
| 966 | + } |
| 967 | + }; |
| 968 | + |
935 | 969 | }
|
0 commit comments