Skip to content

Commit 936e185

Browse files
Merge pull request #1458 from benjchristensen/remove-pivot
Remove Pivot Operator
2 parents 341c8f7 + 68c7332 commit 936e185

File tree

5 files changed

+0
-969
lines changed

5 files changed

+0
-969
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1424,20 +1424,4 @@ class RxScalaDemo extends JUnitSuite {
14241424
o.take(3).toBlocking.foreach(println)
14251425
}
14261426

1427-
@Test def pivotExample() {
1428-
val o1 = (1 to 20).toObservable.groupBy(i => if (i <= 10) "x" else "y").map {
1429-
case (t: String, o: Observable[Int]) => (t, o.groupBy(i => i % 2 == 0))
1430-
}
1431-
println("o1:")
1432-
(for ((k1, o) <- o1;
1433-
(k2, vs) <- o;
1434-
v <- vs
1435-
) yield (k1, k2, v)).subscribe(println(_))
1436-
val o2 = o1.pivot
1437-
println("o2:")
1438-
(for ((k1, o) <- o2;
1439-
(k2, vs) <- o;
1440-
v <- vs
1441-
) yield (k1, k2, v)).subscribe(println(_))
1442-
}
14431427
}

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3808,62 +3808,6 @@ trait Observable[+T]
38083808
asJavaObservable.subscribe(onNext, onError, onComplete)
38093809
}
38103810

3811-
/**
3812-
* Pivots a sequence of `(K1, Observable[(K2, Observable[U])])`s emitted by an `Observable` so as to swap the group
3813-
* and and the set on which their items are grouped.
3814-
* <p>
3815-
* <img width="640" height="580" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.png">
3816-
*
3817-
* For example an `Observable` such as `this = Observable[(String, Observable[(Boolean, Observable[Integer])])`:
3818-
* <ul>
3819-
* <li>o1.odd: 1, 3, 5, 7, 9 on Thread 1</li>
3820-
* <li>o1.even: 2, 4, 6, 8, 10 on Thread 1</li>
3821-
* <li>o2.odd: 11, 13, 15, 17, 19 on Thread 2</li>
3822-
* <li>o2.even: 12, 14, 16, 18, 20 on Thread 2</li>
3823-
* </ul>
3824-
* is pivoted to become `this = Observable[(Boolean, Observable[(String, Observable[Integer])])`:
3825-
*
3826-
* <ul>
3827-
* <li>odd.o1: 1, 3, 5, 7, 9 on Thread 1</li>
3828-
* <li>odd.o2: 11, 13, 15, 17, 19 on Thread 2</li>
3829-
* <li>even.o1: 2, 4, 6, 8, 10 on Thread 1</li>
3830-
* <li>even.o2: 12, 14, 16, 18, 20 on Thread 2</li>
3831-
* </ul>
3832-
* <p>
3833-
* <img width="640" height="1140" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.ex.png">
3834-
* <p>
3835-
* <em>Note:</em> A `(K, Observable[_])` will cache the items it is to emit until such time as it
3836-
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
3837-
* `(K, Observable[_])`s that do not concern you. Instead, you can signal to them that they may
3838-
* discard their buffers by applying an operator like `take(0)` to them.
3839-
*
3840-
* @return an `Observable`containing a stream of nested `(K1, Observable[(K2, Observable[U])])`s with swapped
3841-
* inner-outer keys.
3842-
*/
3843-
def pivot[U, K1, K2](implicit evidence: Observable[T] <:< Observable[(K1, Observable[(K2, Observable[U])])]): Observable[(K2, Observable[(K1, Observable[U])])] = {
3844-
import rx.observables.{GroupedObservable => JGroupedObservable}
3845-
val f1 = new Func1[(K1, Observable[(K2, Observable[U])]), JGroupedObservable[K1, JGroupedObservable[K2, U]]]() {
3846-
override def call(t1: (K1, Observable[(K2, Observable[U])])): JGroupedObservable[K1, JGroupedObservable[K2, U]] = {
3847-
val jo = t1._2.asJavaObservable.asInstanceOf[rx.Observable[(K2, Observable[U])]].map[JGroupedObservable[K2, U]](new Func1[(K2, Observable[U]), JGroupedObservable[K2, U]]() {
3848-
override def call(t2: (K2, Observable[U])): JGroupedObservable[K2, U] = {
3849-
JGroupedObservable.from(t2._1, t2._2.asJavaObservable.asInstanceOf[rx.Observable[U]])
3850-
}
3851-
})
3852-
JGroupedObservable.from(t1._1, jo)
3853-
}
3854-
}
3855-
val o1: Observable[(K1, Observable[(K2, Observable[U])])] = this
3856-
val o2 = toScalaObservable[JGroupedObservable[K2, JGroupedObservable[K1, U]]](rx.Observable.pivot(o1.asJavaObservable.map(f1)))
3857-
o2.map {
3858-
(jgo1: JGroupedObservable[K2, JGroupedObservable[K1, U]]) => {
3859-
val jo = jgo1.map[(K1, Observable[U])](new Func1[JGroupedObservable[K1, U], (K1, Observable[U])]() {
3860-
override def call(jgo2: JGroupedObservable[K1, U]): (K1, Observable[U]) = (jgo2.getKey, toScalaObservable[U](jgo2))
3861-
})
3862-
(jgo1.getKey, toScalaObservable[(K1, Observable[U])](jo))
3863-
}
3864-
}
3865-
}
3866-
38673811
/**
38683812
* Returns an Observable that counts the total number of items emitted by the source Observable and emits this count as a 64-bit Long.
38693813
*

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2393,50 +2393,6 @@ public final static <T> Observable<Observable<T>> parallelMerge(Observable<Obser
23932393
return OperatorParallelMerge.parallelMerge(source, parallelObservables, scheduler);
23942394
}
23952395

2396-
/**
2397-
* Pivots a sequence of {@code GroupedObservable}s emitted by an {@code Observable} so as to swap the group
2398-
* and and the set on which their items are grouped.
2399-
* <p>
2400-
* <img width="640" height="580" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.png" alt="">
2401-
* <p>
2402-
* For example an {@code Observable} such as this =&gt;
2403-
*
2404-
* {@code Observable<GroupedObservable<String, GroupedObservable<Boolean, Integer>>>}:
2405-
* <ul>
2406-
* <li>o1.odd: 1, 3, 5, 7, 9 on Thread 1</li>
2407-
* <li>o1.even: 2, 4, 6, 8, 10 on Thread 1</li>
2408-
* <li>o2.odd: 11, 13, 15, 17, 19 on Thread 2</li>
2409-
* <li>o2.even: 12, 14, 16, 18, 20 on Thread 2</li>
2410-
* </ul>
2411-
* is pivoted to become this =&gt;
2412-
*
2413-
* {@code Observable<GroupedObservable<Boolean, GroupedObservable<String, Integer>>>}:
2414-
* <ul>
2415-
* <li>odd.o1: 1, 3, 5, 7, 9 on Thread 1</li>
2416-
* <li>odd.o2: 11, 13, 15, 17, 19 on Thread 2</li>
2417-
* <li>even.o1: 2, 4, 6, 8, 10 on Thread 1</li>
2418-
* <li>even.o2: 12, 14, 16, 18, 20 on Thread 2</li>
2419-
* </ul>
2420-
* <p>
2421-
* <img width="640" height="1140" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/pivot.ex.png" alt="">
2422-
* <p>
2423-
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
2424-
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
2425-
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
2426-
* discard their buffers by applying an operator like {@link #take}{@code (0)} to them.
2427-
* <p>
2428-
* {@code pivot} does not operate by default on a particular {@link Scheduler}.
2429-
*
2430-
* @param groups
2431-
the {@link GroupedObservable} to pivot
2432-
* @return an {@code Observable} containing a stream of nested {@code GroupedObservable}s with swapped
2433-
* inner-outer keys.
2434-
* @since 0.17
2435-
*/
2436-
public static final <K1, K2, T> Observable<GroupedObservable<K2, GroupedObservable<K1, T>>> pivot(Observable<GroupedObservable<K1, GroupedObservable<K2, T>>> groups) {
2437-
return groups.lift(new OperatorPivot<K1, K2, T>());
2438-
}
2439-
24402396
/**
24412397
* Returns an Observable that emits a sequence of Integers within a specified range.
24422398
* <p>

0 commit comments

Comments
 (0)