Skip to content

Commit 9f990ca

Browse files
committed
OperationToFuture must throw CancellationException on get() if cancelled.
The documentation for Future.get() requires the method to throw CancellationException if the Future was cancelled before the task completed. The Futures returned by OperationToFuture.toFuture() did not respect this contract. Now they do.
1 parent 5425b32 commit 9f990ca

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

rxjava-core/src/main/java/rx/operators/OperationToFuture.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.concurrent.CancellationException;
1819
import java.util.concurrent.CountDownLatch;
1920
import java.util.concurrent.ExecutionException;
2021
import java.util.concurrent.Future;
@@ -120,6 +121,9 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution
120121
private T getValue() throws ExecutionException {
121122
if (error.get() != null) {
122123
throw new ExecutionException("Observable onError", error.get());
124+
} else if (cancelled) {
125+
// Contract of Future.get() requires us to throw this:
126+
throw new CancellationException("Subscription unsubscribed");
123127
} else {
124128
return value.get();
125129
}

rxjava-core/src/test/java/rx/operators/OperationToFutureTest.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,23 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.junit.Assert.*;
19-
import static rx.operators.OperationToFuture.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
20+
import static org.junit.Assert.fail;
21+
import static rx.operators.OperationToFuture.toFuture;
2022

2123
import java.util.List;
24+
import java.util.concurrent.CancellationException;
2225
import java.util.concurrent.ExecutionException;
2326
import java.util.concurrent.Future;
27+
import java.util.concurrent.TimeUnit;
2428

2529
import org.junit.Test;
2630

2731
import rx.Observable;
2832
import rx.Observable.OnSubscribeFunc;
2933
import rx.Observer;
34+
import rx.Subscriber;
3035
import rx.Subscription;
3136
import rx.subscriptions.Subscriptions;
3237

@@ -77,6 +82,34 @@ public Subscription onSubscribe(Observer<? super String> observer) {
7782
}
7883
}
7984

85+
@Test(expected=CancellationException.class)
86+
public void testGetAfterCancel() throws Exception {
87+
Observable<String> obs = Observable.create(new OperationNeverComplete<String>());
88+
Future<String> f = toFuture(obs);
89+
boolean cancelled = f.cancel(true);
90+
assertTrue(cancelled); // because OperationNeverComplete never does
91+
f.get(); // Future.get() docs require this to throw
92+
}
93+
94+
@Test(expected=CancellationException.class)
95+
public void testGetWithTimeoutAfterCancel() throws Exception {
96+
Observable<String> obs = Observable.create(new OperationNeverComplete<String>());
97+
Future<String> f = toFuture(obs);
98+
boolean cancelled = f.cancel(true);
99+
assertTrue(cancelled); // because OperationNeverComplete never does
100+
f.get(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // Future.get() docs require this to throw
101+
}
102+
103+
/**
104+
* Emits no observations. Used to simulate a long-running asynchronous operation.
105+
*/
106+
private static class OperationNeverComplete<T> implements Observable.OnSubscribe<T> {
107+
@Override
108+
public void call(Subscriber<? super T> unused) {
109+
// do nothing
110+
}
111+
}
112+
80113
private static class TestException extends RuntimeException {
81114
private static final long serialVersionUID = 1L;
82115
}

0 commit comments

Comments
 (0)