Skip to content

Commit 5ddc510

Browse files
Merge pull request #1568 from benjchristensen/compose-transformer
Compose/Transformer
2 parents 379602a + 3e4aaf6 commit 5ddc510

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,34 @@ public void call(Subscriber<? super R> o) {
179179
}
180180
});
181181
}
182+
183+
184+
/**
185+
* Compose Observables together with a function.
186+
*
187+
* This works on the Observables themselves whereas `lift` works on the internal Subscriber/Observers.
188+
*
189+
* Lift should be used when creating an operator that acts on the underlying data.
190+
* Compose should be used when acting on the observable itself, such as composing multiple operators.
191+
*
192+
* @param transformer
193+
* @return
194+
*/
195+
public <R> Observable<R> compose(Transformer<T, R> transformer) {
196+
return transformer.call(this);
197+
}
198+
199+
/**
200+
* Transformer function for `compose`
201+
*/
202+
public static interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
203+
// cover for generics insanity
204+
}
205+
206+
182207

183208
/* *********************************************************************************************************
184-
* Observers Below Here
209+
* Operators Below Here
185210
* *********************************************************************************************************
186211
*/
187212

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.mockito.MockitoAnnotations;
4747

4848
import rx.Observable.OnSubscribe;
49+
import rx.Observable.Transformer;
4950
import rx.exceptions.OnErrorNotImplementedException;
5051
import rx.functions.Action1;
5152
import rx.functions.Action2;
@@ -1100,4 +1101,27 @@ public void call(List<Boolean> booleans) {
11001101
}
11011102
assertEquals(nums.length, count.get());
11021103
}
1104+
1105+
@Test
1106+
public void testCompose() {
1107+
TestSubscriber<String> ts = new TestSubscriber<String>();
1108+
Observable.from(1, 2, 3).compose(new Transformer<Integer, String>() {
1109+
1110+
@Override
1111+
public Observable<String> call(Observable<Integer> t1) {
1112+
return t1.map(new Func1<Integer, String>() {
1113+
1114+
@Override
1115+
public String call(Integer t1) {
1116+
return String.valueOf(t1);
1117+
}
1118+
1119+
});
1120+
}
1121+
1122+
}).subscribe(ts);
1123+
ts.assertTerminalEvent();
1124+
ts.assertNoErrors();
1125+
ts.assertReceivedOnNext(Arrays.asList("1", "2", "3"));
1126+
}
11031127
}

0 commit comments

Comments
 (0)