Skip to content

Commit 71bfcae

Browse files
authored
3.x: Fix takeLast(time) last events time window calculation. (#6653)
1 parent cc690ff commit 71bfcae

File tree

4 files changed

+56
-4
lines changed

4 files changed

+56
-4
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableTakeLastTimed.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ void drain() {
139139
final Observer<? super T> a = downstream;
140140
final SpscLinkedArrayQueue<Object> q = queue;
141141
final boolean delayError = this.delayError;
142+
final long timestampLimit = scheduler.now(unit) - time;
142143

143144
for (;;) {
144145
if (cancelled) {
@@ -171,7 +172,7 @@ void drain() {
171172
@SuppressWarnings("unchecked")
172173
T o = (T)q.poll();
173174

174-
if ((Long)ts < scheduler.now(unit) - time) {
175+
if ((Long)ts < timestampLimit) {
175176
continue;
176177
}
177178

src/test/java/io/reactivex/TimesteppingScheduler.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,13 @@ public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
4242

4343
@Override
4444
public long now(TimeUnit unit) {
45-
return time++;
45+
return TimesteppingScheduler.this.now(unit);
4646
}
4747
}
4848

49-
long time;
49+
public long time;
50+
51+
public boolean stepEnabled;
5052

5153
@Override
5254
public Worker createWorker() {
@@ -55,6 +57,9 @@ public Worker createWorker() {
5557

5658
@Override
5759
public long now(TimeUnit unit) {
58-
return time++;
60+
if (stepEnabled) {
61+
return time++;
62+
}
63+
return time;
5964
}
6065
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeLastTimedTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,4 +336,27 @@ public Publisher<Object> apply(Flowable<Object> f) throws Exception {
336336
public void badRequest() {
337337
TestHelper.assertBadRequestReported(PublishProcessor.create().takeLast(1, TimeUnit.SECONDS));
338338
}
339+
340+
@Test
341+
public void lastWindowIsFixedInTime() {
342+
TimesteppingScheduler scheduler = new TimesteppingScheduler();
343+
scheduler.stepEnabled = false;
344+
345+
PublishProcessor<Integer> pp = PublishProcessor.create();
346+
347+
TestSubscriber<Integer> ts = pp
348+
.takeLast(2, TimeUnit.SECONDS, scheduler)
349+
.test();
350+
351+
pp.onNext(1);
352+
pp.onNext(2);
353+
pp.onNext(3);
354+
pp.onNext(4);
355+
356+
scheduler.stepEnabled = true;
357+
358+
pp.onComplete();
359+
360+
ts.assertResult(1, 2, 3, 4);
361+
}
339362
}

src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastTimedTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,4 +275,27 @@ public void run() {
275275
TestHelper.race(r1, r2);
276276
}
277277
}
278+
279+
@Test
280+
public void lastWindowIsFixedInTime() {
281+
TimesteppingScheduler scheduler = new TimesteppingScheduler();
282+
scheduler.stepEnabled = false;
283+
284+
PublishSubject<Integer> ps = PublishSubject.create();
285+
286+
TestObserver<Integer> to = ps
287+
.takeLast(2, TimeUnit.SECONDS, scheduler)
288+
.test();
289+
290+
ps.onNext(1);
291+
ps.onNext(2);
292+
ps.onNext(3);
293+
ps.onNext(4);
294+
295+
scheduler.stepEnabled = true;
296+
297+
ps.onComplete();
298+
299+
to.assertResult(1, 2, 3, 4);
300+
}
278301
}

0 commit comments

Comments
 (0)