Skip to content

Commit 378772c

Browse files
Merge pull request #959 from rickbw/toFuture-cancellation
OperationToFuture must throw CancellationException on get() if cancelled
2 parents 1abaaf2 + 9f990ca commit 378772c

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

rxjava-core/src/main/java/rx/operators/OperationToFuture.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.concurrent.CancellationException;
1819
import java.util.concurrent.CountDownLatch;
1920
import java.util.concurrent.ExecutionException;
2021
import java.util.concurrent.Future;
@@ -120,6 +121,9 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution
120121
private T getValue() throws ExecutionException {
121122
if (error.get() != null) {
122123
throw new ExecutionException("Observable onError", error.get());
124+
} else if (cancelled) {
125+
// Contract of Future.get() requires us to throw this:
126+
throw new CancellationException("Subscription unsubscribed");
123127
} else {
124128
return value.get();
125129
}

rxjava-core/src/test/java/rx/operators/OperationToFutureTest.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,23 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.junit.Assert.*;
19-
import static rx.operators.OperationToFuture.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
20+
import static org.junit.Assert.fail;
21+
import static rx.operators.OperationToFuture.toFuture;
2022

2123
import java.util.List;
24+
import java.util.concurrent.CancellationException;
2225
import java.util.concurrent.ExecutionException;
2326
import java.util.concurrent.Future;
27+
import java.util.concurrent.TimeUnit;
2428

2529
import org.junit.Test;
2630

2731
import rx.Observable;
2832
import rx.Observable.OnSubscribeFunc;
2933
import rx.Observer;
34+
import rx.Subscriber;
3035
import rx.Subscription;
3136
import rx.subscriptions.Subscriptions;
3237

@@ -77,6 +82,34 @@ public Subscription onSubscribe(Observer<? super String> observer) {
7782
}
7883
}
7984

85+
@Test(expected=CancellationException.class)
86+
public void testGetAfterCancel() throws Exception {
87+
Observable<String> obs = Observable.create(new OperationNeverComplete<String>());
88+
Future<String> f = toFuture(obs);
89+
boolean cancelled = f.cancel(true);
90+
assertTrue(cancelled); // because OperationNeverComplete never does
91+
f.get(); // Future.get() docs require this to throw
92+
}
93+
94+
@Test(expected=CancellationException.class)
95+
public void testGetWithTimeoutAfterCancel() throws Exception {
96+
Observable<String> obs = Observable.create(new OperationNeverComplete<String>());
97+
Future<String> f = toFuture(obs);
98+
boolean cancelled = f.cancel(true);
99+
assertTrue(cancelled); // because OperationNeverComplete never does
100+
f.get(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // Future.get() docs require this to throw
101+
}
102+
103+
/**
104+
* Emits no observations. Used to simulate a long-running asynchronous operation.
105+
*/
106+
private static class OperationNeverComplete<T> implements Observable.OnSubscribe<T> {
107+
@Override
108+
public void call(Subscriber<? super T> unused) {
109+
// do nothing
110+
}
111+
}
112+
80113
private static class TestException extends RuntimeException {
81114
private static final long serialVersionUID = 1L;
82115
}

0 commit comments

Comments
 (0)