Skip to content

Commit dc63e53

Browse files
authored
1.x: remove ObjectPool, code style cleanups (#4846)
2 parents aa1c4ed + 077c4d0 commit dc63e53

File tree

114 files changed

+607
-1065
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

114 files changed

+607
-1065
lines changed

src/main/java/rx/BackpressureOverflow.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
@Beta
2727
public final class BackpressureOverflow {
2828

29+
private BackpressureOverflow() {
30+
throw new IllegalStateException("No instances!");
31+
}
32+
2933
/**
3034
* Signal a MissingBackressureException due to lack of requests.
3135
*/
@@ -65,10 +69,10 @@ public interface Strategy {
6569
/**
6670
* Drop oldest items from the buffer making room for newer ones.
6771
*/
68-
static class DropOldest implements BackpressureOverflow.Strategy {
72+
static final class DropOldest implements BackpressureOverflow.Strategy {
6973
static final DropOldest INSTANCE = new DropOldest();
7074

71-
private DropOldest() {}
75+
private DropOldest() { }
7276

7377
@Override
7478
public boolean mayAttemptDrop() {
@@ -80,10 +84,10 @@ public boolean mayAttemptDrop() {
8084
* Drop most recent items, but not {@code onError} nor unsubscribe from source
8185
* (as {code OperatorOnBackpressureDrop}).
8286
*/
83-
static class DropLatest implements BackpressureOverflow.Strategy {
87+
static final class DropLatest implements BackpressureOverflow.Strategy {
8488
static final DropLatest INSTANCE = new DropLatest();
8589

86-
private DropLatest() {}
90+
private DropLatest() { }
8791

8892
@Override
8993
public boolean mayAttemptDrop() {
@@ -94,11 +98,11 @@ public boolean mayAttemptDrop() {
9498
/**
9599
* {@code onError} a MissingBackpressureException and unsubscribe from source.
96100
*/
97-
static class Error implements BackpressureOverflow.Strategy {
101+
static final class Error implements BackpressureOverflow.Strategy {
98102

99103
static final Error INSTANCE = new Error();
100104

101-
private Error() {}
105+
private Error() { }
102106

103107
@Override
104108
public boolean mayAttemptDrop() throws MissingBackpressureException {

src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3470,7 +3470,7 @@ public static Observable<Integer> range(int start, int count) {
34703470
if (start > Integer.MAX_VALUE - count + 1) {
34713471
throw new IllegalArgumentException("start + count can not exceed Integer.MAX_VALUE");
34723472
}
3473-
if(count == 1) {
3473+
if (count == 1) {
34743474
return Observable.just(start);
34753475
}
34763476
return Observable.create(new OnSubscribeRange(start, start + (count - 1)));
@@ -11742,7 +11742,7 @@ public final Observable<List<T>> toSortedList(Func2<? super T, ? super T, Intege
1174211742
* @return an Observable that emits the items emitted by the source Observable in sorted order
1174311743
*/
1174411744
@Experimental
11745-
public final Observable<T> sorted(){
11745+
public final Observable<T> sorted() {
1174611746
return toSortedList().flatMapIterable(UtilityFunctions.<List<T>>identity());
1174711747
}
1174811748

src/main/java/rx/Scheduler.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,9 @@ public long now() {
212212
*
213213
* <pre>
214214
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
215-
* // use merge max concurrent to limit the number of concurrent
216-
* // callbacks two at a time
217-
* return Completable.merge(Observable.merge(workers), 2);
215+
* // use merge max concurrent to limit the number of concurrent
216+
* // callbacks two at a time
217+
* return Completable.merge(Observable.merge(workers), 2);
218218
* });
219219
* </pre>
220220
* <p>
@@ -230,9 +230,9 @@ public long now() {
230230
*
231231
* <pre>
232232
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
233-
* // use merge max concurrent to limit the number of concurrent
234-
* // Observables two at a time
235-
* return Completable.merge(Observable.merge(workers, 2));
233+
* // use merge max concurrent to limit the number of concurrent
234+
* // Observables two at a time
235+
* return Completable.merge(Observable.merge(workers, 2));
236236
* });
237237
* </pre>
238238
*
@@ -243,10 +243,10 @@ public long now() {
243243
*
244244
* <pre>
245245
* Scheduler slowScheduler = Schedulers.computation().when(workers -> {
246-
* // use concatenate to make each worker happen one at a time.
247-
* return Completable.concat(workers.map(actions -> {
248-
* // delay the starting of the next worker by 1 second.
249-
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
246+
* // use concatenate to make each worker happen one at a time.
247+
* return Completable.concat(workers.map(actions -> {
248+
* // delay the starting of the next worker by 1 second.
249+
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
250250
* }));
251251
* });
252252
* </pre>

src/main/java/rx/exceptions/CompositeException.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public synchronized Throwable getCause() { // NOPMD
141141

142142
List<Throwable> listOfCauses = getListOfCauses(e);
143143
// check if any of them have been seen before
144-
for(Throwable child : listOfCauses) {
144+
for (Throwable child : listOfCauses) {
145145
if (seenCauses.contains(child)) {
146146
// already seen this outer Throwable so skip
147147
e = new RuntimeException("Duplicate found in causal chain so cropping to prevent loop ...");
@@ -288,7 +288,7 @@ private List<Throwable> getListOfCauses(Throwable ex) {
288288
if (root == null || root == ex) {
289289
return list;
290290
} else {
291-
while(true) {
291+
while (true) {
292292
list.add(root);
293293
Throwable cause = root.getCause();
294294
if (cause == null || cause == root) {
@@ -311,7 +311,7 @@ private Throwable getRootCause(Throwable e) {
311311
if (root == null || root == e) {
312312
return e;
313313
} else {
314-
while(true) {
314+
while (true) {
315315
Throwable cause = root.getCause();
316316
if (cause == null || cause == root) {
317317
return root;

src/main/java/rx/internal/operators/BufferUntilSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void call() {
102102
}
103103
}
104104
if (win) {
105-
while(true) {
105+
while (true) {
106106
Object o;
107107
while ((o = state.buffer.poll()) != null) {
108108
NotificationLite.accept(state.get(), o);

src/main/java/rx/internal/operators/OnSubscribeAmb.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* Given multiple {@link Observable}s, propagates the one that first emits an item.
3030
* @param <T> the value type
3131
*/
32-
public final class OnSubscribeAmb<T> implements OnSubscribe<T>{
32+
public final class OnSubscribeAmb<T> implements OnSubscribe<T> {
3333
//give default access instead of private as a micro-optimization
3434
//for access from anonymous classes below
3535
final Iterable<? extends Observable<? extends T>> sources;
@@ -336,12 +336,12 @@ private boolean isSelected() {
336336
}
337337

338338
@SuppressWarnings("serial")
339-
static final class Selection<T> extends AtomicReference<AmbSubscriber<T>> {
339+
static final class Selection<T> extends AtomicReference<AmbSubscriber<T>> {
340340
final Collection<AmbSubscriber<T>> ambSubscribers = new ConcurrentLinkedQueue<AmbSubscriber<T>>();
341341

342342
public void unsubscribeLosers() {
343343
AmbSubscriber<T> winner = get();
344-
if(winner != null) {
344+
if (winner != null) {
345345
unsubscribeOthers(winner);
346346
}
347347
}
@@ -437,7 +437,7 @@ public void request(long n) {
437437
}
438438

439439
static <T> void unsubscribeAmbSubscribers(Collection<AmbSubscriber<T>> ambSubscribers) {
440-
if(!ambSubscribers.isEmpty()) {
440+
if (!ambSubscribers.isEmpty()) {
441441
for (AmbSubscriber<T> other : ambSubscribers) {
442442
other.unsubscribe();
443443
}

src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
public final class OnSubscribeAutoConnect<T> extends AtomicInteger implements OnSubscribe<T> {
3434
// AtomicInteger aspect of `this` represents the number of clients
3535

36-
final ConnectableObservable<? extends T> source;
36+
final ConnectableObservable<? extends T> source;
3737
final int numberOfSubscribers;
3838
final Action1<? super Subscription> connection;
3939

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@ public Observable<?> call(Observable<? extends Notification<?>> ts) {
7878

7979
@Override
8080
public Notification<?> call(Notification<?> terminalNotification) {
81-
if(count == 0) {
81+
if (count == 0) {
8282
return terminalNotification;
8383
}
8484

8585
num++;
86-
if(num <= count) {
86+
if (num <= count) {
8787
return Notification.createOnNext(num);
8888
} else {
8989
return terminalNotification;
@@ -153,7 +153,7 @@ public static <T> Observable<T> repeat(Observable<T> source, final long count) {
153153
}
154154

155155
public static <T> Observable<T> repeat(Observable<T> source, final long count, Scheduler scheduler) {
156-
if(count == 0) {
156+
if (count == 0) {
157157
return Observable.empty();
158158
}
159159
if (count < 0) {

src/main/java/rx/internal/operators/OnSubscribeToMultimap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public void onStart() {
155155

156156
@Override
157157
public void onNext(T t) {
158-
if (done){
158+
if (done) {
159159
return;
160160
}
161161
try {

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
* @param <V>
4444
* the value type of the groups
4545
*/
46-
public final class OperatorGroupBy<T, K, V> implements Operator<GroupedObservable<K, V>, T>{
46+
public final class OperatorGroupBy<T, K, V> implements Operator<GroupedObservable<K, V>, T> {
4747
final Func1<? super T, ? extends K> keySelector;
4848
final Func1<? super T, ? extends V> valueSelector;
4949
final int bufferSize;

src/main/java/rx/internal/operators/OperatorMulticast.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void call() {
112112
guardedSubscription = gs.get();
113113

114114
// register any subscribers that are waiting with this new subject
115-
for(final Subscriber<? super R> s : waitingForConnect) {
115+
for (final Subscriber<? super R> s : waitingForConnect) {
116116
subject.unsafeSubscribe(new Subscriber<R>(s) {
117117
@Override
118118
public void onNext(R t) {

src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void onNext(T t) {
9999
requested.decrementAndGet();
100100
} else {
101101
// item dropped
102-
if(onDrop != null) {
102+
if (onDrop != null) {
103103
try {
104104
onDrop.call(t);
105105
} catch (Throwable e) {

src/main/java/rx/internal/operators/OperatorPublish.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import rx.*;
2222
import rx.exceptions.*;
2323
import rx.functions.*;
24-
import rx.internal.util.*;
24+
import rx.internal.util.RxRingBuffer;
25+
import rx.internal.util.atomic.SpscAtomicArrayQueue;
2526
import rx.internal.util.unsafe.*;
2627
import rx.observables.ConnectableObservable;
2728
import rx.subscriptions.Subscriptions;
@@ -244,7 +245,7 @@ static final class PublishSubscriber<T> extends Subscriber<T> implements Subscri
244245
public PublishSubscriber(AtomicReference<PublishSubscriber<T>> current) {
245246
this.queue = UnsafeAccess.isUnsafeAvailable()
246247
? new SpscArrayQueue<Object>(RxRingBuffer.SIZE)
247-
: new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
248+
: new SpscAtomicArrayQueue<Object>(RxRingBuffer.SIZE);
248249

249250
this.producers = new AtomicReference<InnerProducer[]>(EMPTY);
250251
this.current = current;

src/main/java/rx/internal/operators/OperatorSwitch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void call() {
113113
clearProducer();
114114
}
115115
}));
116-
child.setProducer(new Producer(){
116+
child.setProducer(new Producer() {
117117

118118
@Override
119119
public void request(long n) {

src/main/java/rx/internal/operators/OperatorTake.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void setProducer(final Producer producer) {
9797

9898
@Override
9999
public void request(long n) {
100-
if (n >0 && !completed) {
100+
if (n > 0 && !completed) {
101101
// because requests may happen concurrently use a CAS loop to
102102
// ensure we only request as much as needed, no more no less
103103
while (true) {

src/main/java/rx/internal/schedulers/CachedThreadScheduler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import rx.subscriptions.*;
2525

2626
public final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle {
27-
private static final long KEEP_ALIVE_TIME = 60;
27+
private static final long KEEP_ALIVE_TIME;
2828
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
2929

3030
static final ThreadWorker SHUTDOWN_THREADWORKER;
@@ -41,6 +41,8 @@ public final class CachedThreadScheduler extends Scheduler implements SchedulerL
4141

4242
NONE = new CachedWorkerPool(null, 0, null);
4343
NONE.shutdown();
44+
45+
KEEP_ALIVE_TIME = Integer.getInteger("rx.io-scheduler.keepalive", 60);
4446
}
4547

4648
static final class CachedWorkerPool {

src/main/java/rx/internal/schedulers/SchedulerWhen.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@
6363
*
6464
* <pre>
6565
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
66-
* // use merge max concurrent to limit the number of concurrent
67-
* // callbacks two at a time
68-
* return Completable.merge(Observable.merge(workers), 2);
66+
* // use merge max concurrent to limit the number of concurrent
67+
* // callbacks two at a time
68+
* return Completable.merge(Observable.merge(workers), 2);
6969
* });
7070
* </pre>
7171
* <p>
@@ -81,9 +81,9 @@
8181
*
8282
* <pre>
8383
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
84-
* // use merge max concurrent to limit the number of concurrent
85-
* // Observables two at a time
86-
* return Completable.merge(Observable.merge(workers, 2));
84+
* // use merge max concurrent to limit the number of concurrent
85+
* // Observables two at a time
86+
* return Completable.merge(Observable.merge(workers, 2));
8787
* });
8888
* </pre>
8989
*
@@ -94,11 +94,11 @@
9494
*
9595
* <pre>
9696
* Scheduler slowScheduler = Schedulers.computation().when(workers -> {
97-
* // use concatenate to make each worker happen one at a time.
98-
* return Completable.concat(workers.map(actions -> {
99-
* // delay the starting of the next worker by 1 second.
100-
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
101-
* }));
97+
* // use concatenate to make each worker happen one at a time.
98+
* return Completable.concat(workers.map(actions -> {
99+
* // delay the starting of the next worker by 1 second.
100+
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
101+
* }));
102102
* });
103103
* </pre>
104104
*/

src/main/java/rx/internal/util/IndexedRingBuffer.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,6 @@ public final class IndexedRingBuffer<E> implements Subscription {
5252
/* package for unit testing */final AtomicInteger index = new AtomicInteger();
5353
/* package for unit testing */final AtomicInteger removedIndex = new AtomicInteger();
5454

55-
private static final ObjectPool<IndexedRingBuffer<?>> POOL = new ObjectPool<IndexedRingBuffer<?>>() {
56-
57-
@Override
58-
protected IndexedRingBuffer<?> createObject() {
59-
return new IndexedRingBuffer<Object>();
60-
}
61-
62-
};
63-
6455
/* package for unit testing */static final int SIZE;
6556

6657
// default size of ring buffer
@@ -255,9 +246,8 @@ protected IndexedRingBuffer<?> createObject() {
255246
SIZE = defaultSize;
256247
}
257248

258-
@SuppressWarnings("unchecked")
259249
public static <T> IndexedRingBuffer<T> getInstance() {
260-
return (IndexedRingBuffer<T>) POOL.borrowObject();
250+
return new IndexedRingBuffer<T>();
261251
}
262252

263253
/**
@@ -283,7 +273,6 @@ public void releaseToPool() {
283273

284274
index.set(0);
285275
removedIndex.set(0);
286-
POOL.returnObject(this);
287276
}
288277

289278
@Override

0 commit comments

Comments
 (0)