Skip to content

Commit a3f3f94

Browse files
authored
Merge pull request #4851 from akarnokd/SingleFromEmitter
1.x: add Single.fromEmitter
2 parents 745a922 + a825d70 commit a3f3f94

File tree

7 files changed

+497
-42
lines changed

7 files changed

+497
-42
lines changed

src/main/java/rx/Single.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,46 @@ public static <T> Single<T> fromCallable(final Callable<? extends T> func) {
573573
return create(new SingleFromCallable<T>(func));
574574
}
575575

576+
/**
577+
* Provides an API (in a cold Single) that bridges the Single-reactive world
578+
* with the callback-based world.
579+
* <p>The {@link SingleEmitter} allows registering a callback for
580+
* cancellation/unsubscription of a resource.
581+
* <p>
582+
* Example:
583+
* <pre><code>
584+
* Single.fromEmitter(emitter -&gt; {
585+
* Callback listener = new Callback() {
586+
* &#64;Override
587+
* public void onEvent(Event e) {
588+
* emitter.onSuccess(e.getData());
589+
* }
590+
*
591+
* &#64;Override
592+
* public void onFailure(Exception e) {
593+
* emitter.onError(e);
594+
* }
595+
* };
596+
*
597+
* AutoCloseable c = api.someMethod(listener);
598+
*
599+
* emitter.setCancellation(c::close);
600+
*
601+
* });
602+
* </code></pre>
603+
* <p>All of the SingleEmitter's methods are thread-safe and ensure the
604+
* Single's protocol are held.
605+
* @param <T> the success value type
606+
* @param producer the callback invoked for each incoming SingleSubscriber
607+
* @return the new Single instance
608+
* @since 1.2.3 - experimental (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
609+
*/
610+
@Experimental
611+
public static <T> Single<T> fromEmitter(Action1<SingleEmitter<T>> producer) {
612+
if (producer == null) { throw new NullPointerException("producer is null"); }
613+
return create(new SingleFromEmitter<T>(producer));
614+
}
615+
576616
/**
577617
* Returns a {@code Single} that emits a specified item.
578618
* <p>

src/main/java/rx/SingleEmitter.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
import rx.annotations.Experimental;
19+
import rx.functions.Cancellable;
20+
21+
/**
22+
* Abstraction over a {@link SingleSubscriber} that gets either an onSuccess or onError
23+
* signal and allows registering an cancellation/unsubscription callback.
24+
* <p>
25+
* All methods are thread-safe; calling onSuccess or onError twice or one after the other has
26+
* no effect.
27+
* @since 1.2.3 - experimental (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
28+
*
29+
* @param <T> the success value type
30+
*/
31+
@Experimental
32+
public interface SingleEmitter<T> {
33+
34+
/**
35+
* Notifies the SingleSubscriber that the {@link Single} has completed successfully with
36+
* the given value.
37+
* <p>
38+
* If the {@link Single} calls this method, it will not thereafter call
39+
* {@link #onError}.
40+
*
41+
* @param t the success value
42+
*/
43+
void onSuccess(T t);
44+
45+
/**
46+
* Notifies the SingleSubscriber that the {@link Single} has experienced an error condition.
47+
* <p>
48+
* If the {@link Single} calls this method, it will not thereafter call
49+
* {@link #onSuccess}.
50+
*
51+
* @param t
52+
* the exception encountered by the Observable
53+
*/
54+
void onError(Throwable t);
55+
56+
/**
57+
* Sets a Subscription on this emitter; any previous Subscription
58+
* or Cancellation will be unsubscribed/cancelled.
59+
* @param s the subscription, null is allowed
60+
*/
61+
void setSubscription(Subscription s);
62+
63+
/**
64+
* Sets a Cancellable on this emitter; any previous Subscription
65+
* or Cancellation will be unsubscribed/cancelled.
66+
* @param c the cancellable resource, null is allowed
67+
*/
68+
void setCancellation(Cancellable c);
69+
70+
}

src/main/java/rx/internal/operators/CompletableFromEmitter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919

2020
import rx.*;
2121
import rx.exceptions.Exceptions;
22-
import rx.functions.Action1;
23-
import rx.functions.Cancellable;
24-
import rx.internal.operators.OnSubscribeFromEmitter.CancellableSubscription;
25-
import rx.internal.subscriptions.SequentialSubscription;
22+
import rx.functions.*;
23+
import rx.internal.subscriptions.*;
2624
import rx.plugins.RxJavaHooks;
2725

2826
/**

src/main/java/rx/internal/operators/OnSubscribeFromEmitter.java

Lines changed: 3 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import rx.*;
2222
import rx.Observable.OnSubscribe;
23-
import rx.exceptions.*;
24-
import rx.functions.Action1;
25-
import rx.functions.Cancellable;
23+
import rx.exceptions.MissingBackpressureException;
24+
import rx.functions.*;
25+
import rx.internal.subscriptions.CancellableSubscription;
2626
import rx.internal.util.RxRingBuffer;
2727
import rx.internal.util.atomic.SpscUnboundedAtomicArrayQueue;
2828
import rx.internal.util.unsafe.*;
@@ -73,41 +73,6 @@ public void call(Subscriber<? super T> t) {
7373

7474
}
7575

76-
/**
77-
* A Subscription that wraps an Cancellable instance.
78-
*/
79-
static final class CancellableSubscription
80-
extends AtomicReference<Cancellable>
81-
implements Subscription {
82-
83-
/** */
84-
private static final long serialVersionUID = 5718521705281392066L;
85-
86-
public CancellableSubscription(Cancellable cancellable) {
87-
super(cancellable);
88-
}
89-
90-
@Override
91-
public boolean isUnsubscribed() {
92-
return get() == null;
93-
}
94-
95-
@Override
96-
public void unsubscribe() {
97-
if (get() != null) {
98-
Cancellable c = getAndSet(null);
99-
if (c != null) {
100-
try {
101-
c.cancel();
102-
} catch (Exception ex) {
103-
Exceptions.throwIfFatal(ex);
104-
RxJavaHooks.onError(ex);
105-
}
106-
}
107-
}
108-
}
109-
}
110-
11176
static abstract class BaseEmitter<T>
11277
extends AtomicLong
11378
implements Emitter<T>, Producer, Subscription {
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
21+
import rx.*;
22+
import rx.Single.OnSubscribe;
23+
import rx.exceptions.Exceptions;
24+
import rx.functions.*;
25+
import rx.internal.subscriptions.*;
26+
import rx.plugins.RxJavaHooks;
27+
28+
/**
29+
* Calls an action with a SingleEmitter instance for each individual subscribers that
30+
* generates a terminal signal (eventually).
31+
*
32+
* @param <T> the success value type
33+
*/
34+
public final class SingleFromEmitter<T> implements OnSubscribe<T> {
35+
36+
final Action1<SingleEmitter<T>> producer;
37+
38+
public SingleFromEmitter(Action1<SingleEmitter<T>> producer) {
39+
this.producer = producer;
40+
}
41+
42+
@Override
43+
public void call(SingleSubscriber<? super T> t) {
44+
SingleEmitterImpl<T> parent = new SingleEmitterImpl<T>(t);
45+
t.add(parent);
46+
47+
try {
48+
producer.call(parent);
49+
} catch (Throwable ex) {
50+
Exceptions.throwIfFatal(ex);
51+
parent.onError(ex);
52+
}
53+
}
54+
55+
static final class SingleEmitterImpl<T>
56+
extends AtomicBoolean
57+
implements SingleEmitter<T>, Subscription {
58+
private static final long serialVersionUID = 8082834163465882809L;
59+
60+
final SingleSubscriber<? super T> actual;
61+
62+
final SequentialSubscription resource;
63+
64+
SingleEmitterImpl(SingleSubscriber<? super T> actual) {
65+
this.actual = actual;
66+
this.resource = new SequentialSubscription();
67+
}
68+
69+
@Override
70+
public void unsubscribe() {
71+
if (compareAndSet(false, true)) {
72+
resource.unsubscribe();
73+
}
74+
}
75+
76+
@Override
77+
public boolean isUnsubscribed() {
78+
return get();
79+
}
80+
81+
@Override
82+
public void onSuccess(T t) {
83+
if (compareAndSet(false, true)) {
84+
try {
85+
actual.onSuccess(t);
86+
} finally {
87+
resource.unsubscribe();
88+
}
89+
}
90+
}
91+
92+
@Override
93+
public void onError(Throwable t) {
94+
if (t == null) {
95+
t = new NullPointerException();
96+
}
97+
if (compareAndSet(false, true)) {
98+
try {
99+
actual.onError(t);
100+
} finally {
101+
resource.unsubscribe();
102+
}
103+
} else {
104+
RxJavaHooks.onError(t);
105+
}
106+
}
107+
108+
@Override
109+
public void setSubscription(Subscription s) {
110+
resource.update(s);
111+
}
112+
113+
@Override
114+
public void setCancellation(Cancellable c) {
115+
setSubscription(new CancellableSubscription(c));
116+
}
117+
}
118+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package rx.internal.subscriptions;
2+
3+
import java.util.concurrent.atomic.AtomicReference;
4+
5+
import rx.Subscription;
6+
import rx.exceptions.Exceptions;
7+
import rx.functions.Cancellable;
8+
import rx.plugins.RxJavaHooks;
9+
10+
/**
11+
* A Subscription that wraps an Cancellable instance.
12+
*/
13+
public final class CancellableSubscription
14+
extends AtomicReference<Cancellable>
15+
implements Subscription {
16+
17+
/** */
18+
private static final long serialVersionUID = 5718521705281392066L;
19+
20+
public CancellableSubscription(Cancellable cancellable) {
21+
super(cancellable);
22+
}
23+
24+
@Override
25+
public boolean isUnsubscribed() {
26+
return get() == null;
27+
}
28+
29+
@Override
30+
public void unsubscribe() {
31+
if (get() != null) {
32+
Cancellable c = getAndSet(null);
33+
if (c != null) {
34+
try {
35+
c.cancel();
36+
} catch (Exception ex) {
37+
Exceptions.throwIfFatal(ex);
38+
RxJavaHooks.onError(ex);
39+
}
40+
}
41+
}
42+
}
43+
}

0 commit comments

Comments
 (0)