Skip to content

Commit 41c0a07

Browse files
authored
2.x: Fix Observable.concatMapEager queueing of source items (#5609)
1 parent 8c60d9d commit 41c0a07

File tree

3 files changed

+72
-1
lines changed

3 files changed

+72
-1
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableConcatMapEager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.reactivex.internal.functions.ObjectHelper;
2525
import io.reactivex.internal.fuseable.*;
2626
import io.reactivex.internal.observers.*;
27+
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
2728
import io.reactivex.internal.util.*;
2829
import io.reactivex.plugins.RxJavaPlugins;
2930

@@ -129,7 +130,7 @@ public void onSubscribe(Disposable d) {
129130
}
130131
}
131132

132-
queue = QueueDrainHelper.createQueue(prefetch);
133+
queue = new SpscLinkedArrayQueue<T>(prefetch);
133134

134135
actual.onSubscribe(this);
135136
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,4 +1193,39 @@ public Flowable<Integer> apply(Integer i) throws Exception {
11931193
.assertResult(1, 2, 3, 4, 5)
11941194
;
11951195
}
1196+
1197+
@Test
1198+
@SuppressWarnings("unchecked")
1199+
public void maxConcurrencyOf2() {
1200+
List<Integer>[] list = new ArrayList[100];
1201+
for (int i = 0; i < 100; i++) {
1202+
List<Integer> lst = new ArrayList<Integer>();
1203+
list[i] = lst;
1204+
for (int k = 1; k <= 10; k++) {
1205+
lst.add((i) * 10 + k);
1206+
}
1207+
}
1208+
1209+
Flowable.range(1, 1000)
1210+
.buffer(10)
1211+
.concatMapEager(new Function<List<Integer>, Flowable<List<Integer>>>() {
1212+
@Override
1213+
public Flowable<List<Integer>> apply(List<Integer> v)
1214+
throws Exception {
1215+
return Flowable.just(v)
1216+
.subscribeOn(Schedulers.io())
1217+
.doOnNext(new Consumer<List<Integer>>() {
1218+
@Override
1219+
public void accept(List<Integer> v)
1220+
throws Exception {
1221+
Thread.sleep(new Random().nextInt(20));
1222+
}
1223+
});
1224+
}
1225+
}
1226+
, 2, 3)
1227+
.test()
1228+
.awaitDone(5, TimeUnit.SECONDS)
1229+
.assertResult(list);
1230+
}
11961231
}

src/test/java/io/reactivex/internal/operators/observable/ObservableConcatMapEagerTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,4 +1001,39 @@ public ObservableSource<Integer> apply(Integer i) throws Exception {
10011001
.assertResult(1, 2, 3, 4, 5)
10021002
;
10031003
}
1004+
1005+
@Test
1006+
@SuppressWarnings("unchecked")
1007+
public void maxConcurrencyOf2() {
1008+
List<Integer>[] list = new ArrayList[100];
1009+
for (int i = 0; i < 100; i++) {
1010+
List<Integer> lst = new ArrayList<Integer>();
1011+
list[i] = lst;
1012+
for (int k = 1; k <= 10; k++) {
1013+
lst.add((i) * 10 + k);
1014+
}
1015+
}
1016+
1017+
Observable.range(1, 1000)
1018+
.buffer(10)
1019+
.concatMapEager(new Function<List<Integer>, ObservableSource<List<Integer>>>() {
1020+
@Override
1021+
public ObservableSource<List<Integer>> apply(List<Integer> v)
1022+
throws Exception {
1023+
return Observable.just(v)
1024+
.subscribeOn(Schedulers.io())
1025+
.doOnNext(new Consumer<List<Integer>>() {
1026+
@Override
1027+
public void accept(List<Integer> v)
1028+
throws Exception {
1029+
Thread.sleep(new Random().nextInt(20));
1030+
}
1031+
});
1032+
}
1033+
}
1034+
, 2, 3)
1035+
.test()
1036+
.awaitDone(5, TimeUnit.SECONDS)
1037+
.assertResult(list);
1038+
}
10041039
}

0 commit comments

Comments
 (0)