Skip to content

Commit 19c0959

Browse files
authored
2.x: Fix publish().refCount() hang due to race (#6505)
* 2.x: Fix publish().refCount() hang due to race * Add more time to GC when detecting leaks. * Fix subscriber swap mistake in the Alt implementation
1 parent b763ffa commit 19c0959

16 files changed

+6262
-9
lines changed

src/main/java/io/reactivex/flowables/ConnectableFlowable.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,24 @@ public final Disposable connect() {
6868
return cc.disposable;
6969
}
7070

71+
/**
72+
* Apply a workaround for a race condition with the regular publish().refCount()
73+
* so that racing subscribers and refCount won't hang.
74+
*
75+
* @return the ConnectableFlowable to work with
76+
* @since 2.2.10
77+
*/
78+
private ConnectableFlowable<T> onRefCount() {
79+
if (this instanceof FlowablePublishClassic) {
80+
@SuppressWarnings("unchecked")
81+
FlowablePublishClassic<T> fp = (FlowablePublishClassic<T>) this;
82+
return RxJavaPlugins.onAssembly(
83+
new FlowablePublishAlt<T>(fp.publishSource(), fp.publishBufferSize())
84+
);
85+
}
86+
return this;
87+
}
88+
7189
/**
7290
* Returns a {@code Flowable} that stays connected to this {@code ConnectableFlowable} as long as there
7391
* is at least one subscription to this {@code ConnectableFlowable}.
@@ -89,7 +107,7 @@ public final Disposable connect() {
89107
@SchedulerSupport(SchedulerSupport.NONE)
90108
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
91109
public Flowable<T> refCount() {
92-
return RxJavaPlugins.onAssembly(new FlowableRefCount<T>(this));
110+
return RxJavaPlugins.onAssembly(new FlowableRefCount<T>(onRefCount()));
93111
}
94112

95113
/**
@@ -216,7 +234,7 @@ public final Flowable<T> refCount(int subscriberCount, long timeout, TimeUnit un
216234
ObjectHelper.verifyPositive(subscriberCount, "subscriberCount");
217235
ObjectHelper.requireNonNull(unit, "unit is null");
218236
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
219-
return RxJavaPlugins.onAssembly(new FlowableRefCount<T>(this, subscriberCount, timeout, unit, scheduler));
237+
return RxJavaPlugins.onAssembly(new FlowableRefCount<T>(onRefCount(), subscriberCount, timeout, unit, scheduler));
220238
}
221239

222240
/**

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
* manner.
3434
* @param <T> the value type
3535
*/
36-
public final class FlowablePublish<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T> {
36+
public final class FlowablePublish<T> extends ConnectableFlowable<T>
37+
implements HasUpstreamPublisher<T>, FlowablePublishClassic<T> {
3738
/**
3839
* Indicates this child has been cancelled: the state is swapped in atomically and
3940
* will prevent the dispatch() to emit (too many) values to a terminated child subscriber.
@@ -77,6 +78,19 @@ public Publisher<T> source() {
7778
return source;
7879
}
7980

81+
/**
82+
* @return The internal buffer size of this FloawblePublish operator.
83+
*/
84+
@Override
85+
public int publishBufferSize() {
86+
return bufferSize;
87+
}
88+
89+
@Override
90+
public Publisher<T> publishSource() {
91+
return source;
92+
}
93+
8094
@Override
8195
protected void subscribeActual(Subscriber<? super T> s) {
8296
onSubscribe.subscribe(s);

0 commit comments

Comments
 (0)