Skip to content

Commit ed21c9e

Browse files
Merge pull request #1886 from akarnokd/MergeFix
Buffer with time and merge fix
2 parents d6cbf59 + b0aeb62 commit ed21c9e

File tree

3 files changed

+42
-6
lines changed

3 files changed

+42
-6
lines changed

src/main/java/rx/internal/operators/OperatorBufferWithTime.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,19 @@ public OperatorBufferWithTime(long timespan, long timeshift, TimeUnit unit, int
7272
@Override
7373
public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
7474
final Worker inner = scheduler.createWorker();
75-
child.add(inner);
75+
SerializedSubscriber<List<T>> serialized = new SerializedSubscriber<List<T>>(child);
7676

7777
if (timespan == timeshift) {
78-
ExactSubscriber bsub = new ExactSubscriber(new SerializedSubscriber<List<T>>(child), inner);
78+
ExactSubscriber bsub = new ExactSubscriber(serialized, inner);
79+
bsub.add(inner);
80+
child.add(bsub);
7981
bsub.scheduleExact();
8082
return bsub;
8183
}
8284

83-
InexactSubscriber bsub = new InexactSubscriber(new SerializedSubscriber<List<T>>(child), inner);
85+
InexactSubscriber bsub = new InexactSubscriber(serialized, inner);
86+
bsub.add(inner);
87+
child.add(bsub);
8488
bsub.startNewChunk();
8589
bsub.scheduleChunk();
8690
return bsub;
@@ -94,7 +98,6 @@ final class InexactSubscriber extends Subscriber<T> {
9498
/** Guarded by this. */
9599
boolean done;
96100
public InexactSubscriber(Subscriber<? super List<T>> child, Worker inner) {
97-
super(child);
98101
this.child = child;
99102
this.inner = inner;
100103
this.chunks = new LinkedList<List<T>>();
@@ -219,7 +222,6 @@ final class ExactSubscriber extends Subscriber<T> {
219222
/** Guarded by this. */
220223
boolean done;
221224
public ExactSubscriber(Subscriber<? super List<T>> child, Worker inner) {
222-
super(child);
223225
this.child = child;
224226
this.inner = inner;
225227
this.chunk = new ArrayList<T>();

src/main/java/rx/internal/operators/OperatorMerge.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ public void request(long n) {
501501
REQUESTED.getAndAdd(this, n);
502502
if (ms.drainQueuesIfNeeded()) {
503503
boolean sendComplete = false;
504-
synchronized (this) {
504+
synchronized (ms) {
505505
if (ms.wip == 0 && ms.scalarValueQueue != null && ms.scalarValueQueue.isEmpty()) {
506506
sendComplete = true;
507507
}

src/test/java/rx/internal/operators/OperatorBufferTest.java

+34
Original file line numberDiff line numberDiff line change
@@ -981,4 +981,38 @@ public void onNext(List<Integer> t) {
981981
});
982982
assertEquals(Long.MAX_VALUE, requested.get());
983983
}
984+
@Test(timeout = 3000)
985+
public void testBufferWithTimeDoesntUnsubscribeDownstream() throws InterruptedException {
986+
@SuppressWarnings("unchecked")
987+
final Observer<Object> o = mock(Observer.class);
988+
989+
990+
final CountDownLatch cdl = new CountDownLatch(1);
991+
Subscriber<Object> s = new Subscriber<Object>() {
992+
@Override
993+
public void onNext(Object t) {
994+
o.onNext(t);
995+
}
996+
@Override
997+
public void onError(Throwable e) {
998+
o.onError(e);
999+
cdl.countDown();
1000+
}
1001+
@Override
1002+
public void onCompleted() {
1003+
o.onCompleted();
1004+
cdl.countDown();
1005+
}
1006+
};
1007+
1008+
Observable.range(1, 1).delay(1, TimeUnit.SECONDS).buffer(2, TimeUnit.SECONDS).unsafeSubscribe(s);
1009+
1010+
cdl.await();
1011+
1012+
verify(o).onNext(Arrays.asList(1));
1013+
verify(o).onCompleted();
1014+
verify(o, never()).onError(any(Throwable.class));
1015+
1016+
assertFalse(s.isUnsubscribed());
1017+
}
9841018
}

0 commit comments

Comments
 (0)