Skip to content

Commit 82c634f

Browse files
Merge pull request #1459 from benjchristensen/remove-onSetProducer
Remove Subscriber.onSetProducer
2 parents 936e185 + 30a3f2b commit 82c634f

18 files changed

+82
-340
lines changed

rxjava-core/src/main/java/rx/Subscriber.java

+2-12
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public void onStart() {
103103
* {@code Long.MAX_VALUE} if you want the Observable to emit items at its own pace
104104
* @since 0.20
105105
*/
106-
public final void request(long n) {
106+
protected final void request(long n) {
107107
Producer shouldRequest = null;
108108
synchronized (this) {
109109
if (p != null) {
@@ -118,23 +118,13 @@ public final void request(long n) {
118118
}
119119
}
120120

121-
/**
122-
* @warn javadoc description missing
123-
* @return
124-
* @since 0.20
125-
*/
126-
protected Producer onSetProducer(Producer producer) {
127-
return producer;
128-
}
129-
130121
/**
131122
* @warn javadoc description missing
132123
* @warn param producer not described
133124
* @param producer
134125
* @since 0.20
135126
*/
136-
public final void setProducer(Producer producer) {
137-
producer = onSetProducer(producer);
127+
public void setProducer(Producer producer) {
138128
long toRequest;
139129
boolean setProducer = false;
140130
synchronized (this) {

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ public Boolean call(InnerSubscriber<T> s) {
339339
// TODO we may want to store this in s.emitted and only request if above batch
340340
// reset this since we have requested them all
341341
s.emitted = 0;
342-
s.request(emitted);
342+
s.requestMore(emitted);
343343
}
344344
if (emitted == r) {
345345
// we emitted as many as were requested so stop the forEach loop
@@ -494,6 +494,10 @@ public void onCompleted() {
494494
}
495495
}
496496

497+
public void requestMore(long n) {
498+
request(n);
499+
}
500+
497501
private void emit(T t, boolean complete) {
498502
boolean drain = false;
499503
boolean enqueue = true;

rxjava-core/src/main/java/rx/internal/operators/OperatorSkip.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public void onNext(T t) {
6262
}
6363

6464
@Override
65-
protected Producer onSetProducer(final Producer producer) {
66-
return new Producer() {
65+
public void setProducer(final Producer producer) {
66+
child.setProducer(new Producer() {
6767

6868
@Override
6969
public void request(long n) {
@@ -75,7 +75,7 @@ public void request(long n) {
7575
producer.request(n + (toSkip - skipped));
7676
}
7777
}
78-
};
78+
});
7979
}
8080

8181
};

rxjava-core/src/main/java/rx/internal/operators/OperatorSubscribeOn.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ public void onNext(T t) {
7777
}
7878

7979
@Override
80-
protected Producer onSetProducer(final Producer producer) {
81-
return new Producer() {
80+
public void setProducer(final Producer producer) {
81+
subscriber.setProducer(new Producer() {
8282

8383
@Override
8484
public void request(final long n) {
@@ -97,7 +97,7 @@ public void call() {
9797
}
9898
}
9999

100-
};
100+
});
101101
}
102102

103103
});

rxjava-core/src/main/java/rx/internal/operators/OperatorTake.java

+3-19
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ public void onNext(T i) {
7474
* We want to adjust the requested values based on the `take` count.
7575
*/
7676
@Override
77-
protected Producer onSetProducer(final Producer producer) {
78-
return new Producer() {
77+
public void setProducer(final Producer producer) {
78+
child.setProducer(new Producer() {
7979

8080
@Override
8181
public void request(long n) {
@@ -86,7 +86,7 @@ public void request(long n) {
8686
producer.request(c);
8787
}
8888
}
89-
};
89+
});
9090
}
9191

9292
};
@@ -107,22 +107,6 @@ public void request(long n) {
107107
*/
108108
child.add(parent);
109109

110-
/**
111-
* Since we decoupled the subscription chain but want the request to flow through, we reconnect the producer here.
112-
*/
113-
child.setProducer(new Producer() {
114-
115-
@Override
116-
public void request(long n) {
117-
if (n < 0) {
118-
// request up the limit that has been set, no point in asking for more, even if synchronous
119-
parent.request(limit);
120-
} else {
121-
parent.request(n);
122-
}
123-
}
124-
});
125-
126110
return parent;
127111
}
128112

rxjava-core/src/main/java/rx/internal/operators/OperatorZip.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ void tick() {
276276
}
277277
if (emitted > THRESHOLD) {
278278
for (Object obj : observers) {
279-
((InnerSubscriber) obj).request(emitted);
279+
((InnerSubscriber) obj).requestMore(emitted);
280280
}
281281
emitted = 0;
282282
}
@@ -298,6 +298,10 @@ final class InnerSubscriber extends Subscriber {
298298
public void onStart() {
299299
request(RxRingBuffer.SIZE);
300300
}
301+
302+
public void requestMore(long n) {
303+
request(n);
304+
}
301305

302306
@SuppressWarnings("unchecked")
303307
@Override

rxjava-core/src/main/java/rx/observers/TestSubscriber.java

+8
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,14 @@ public void onNext(T t) {
106106
lastSeenThread = Thread.currentThread();
107107
testObserver.onNext(t);
108108
}
109+
110+
/**
111+
* Allow calling the protected {@link #request(long)} from unit tests.
112+
* @param n
113+
*/
114+
public void requestMore(long n) {
115+
request(n);
116+
}
109117

110118
/**
111119
* Get the sequence of items observed by this Subscriber, as an ordered {@link List}.

rxjava-core/src/test/java/rx/SubscriberTest.java

-212
Original file line numberDiff line numberDiff line change
@@ -317,218 +317,6 @@ public void request(long n) {
317317
assertEquals(3, requested.get());
318318
}
319319

320-
@Test
321-
public void testSetProducerFromOperator() {
322-
final AtomicLong requested1 = new AtomicLong();
323-
final AtomicLong requested2 = new AtomicLong();
324-
final AtomicReference<Producer> producer1 = new AtomicReference<Producer>();
325-
final AtomicReference<Producer> producer2 = new AtomicReference<Producer>();
326-
final AtomicReference<Producer> gotProducer = new AtomicReference<Producer>();
327-
Observable.create(new OnSubscribe<Integer>() {
328-
329-
@Override
330-
public void call(final Subscriber<? super Integer> s) {
331-
Producer p1 = new Producer() {
332-
int index = 0;
333-
334-
@Override
335-
public void request(long n) {
336-
requested1.set(n);
337-
System.out.println("onSubscribe => requested: " + n);
338-
for (int i = 0; i < n; i++) {
339-
s.onNext(index++);
340-
}
341-
}
342-
343-
};
344-
producer1.set(p1);
345-
s.setProducer(p1);
346-
}
347-
348-
}).lift(new Operator<Integer, Integer>() {
349-
350-
@Override
351-
public Subscriber<? super Integer> call(final Subscriber<? super Integer> child) {
352-
353-
Producer p2 = new Producer() {
354-
355-
@Override
356-
public void request(long n) {
357-
System.out.println("lift => requested: " + n);
358-
requested2.set(n);
359-
}
360-
361-
};
362-
producer2.set(p2);
363-
child.setProducer(p2);
364-
365-
return new Subscriber<Integer>(child) {
366-
367-
// we request "5" and this decouples the Producer chain while retaining the Subscription chain
368-
@Override
369-
public void onStart() {
370-
request(5);
371-
}
372-
373-
@Override
374-
public void onCompleted() {
375-
}
376-
377-
@Override
378-
public void onError(Throwable e) {
379-
}
380-
381-
@Override
382-
public void onNext(Integer t) {
383-
}
384-
385-
};
386-
}
387-
388-
}).subscribe(new Subscriber<Integer>() {
389-
390-
@Override
391-
public void onStart() {
392-
request(1);
393-
}
394-
395-
@Override
396-
public void onCompleted() {
397-
398-
}
399-
400-
@Override
401-
public void onError(Throwable e) {
402-
403-
}
404-
405-
@Override
406-
public void onNext(Integer t) {
407-
System.out.println(t);
408-
request(1);
409-
}
410-
411-
@Override
412-
protected Producer onSetProducer(Producer producer) {
413-
gotProducer.set(producer);
414-
return producer;
415-
}
416-
417-
});
418-
419-
if (gotProducer.get() != producer2.get()) {
420-
throw new IllegalStateException("Expecting the producer from lift");
421-
}
422-
assertEquals(requested1.get(), 5);
423-
assertEquals(requested2.get(), 1);
424-
}
425-
426-
@Test
427-
public void testSetProducerFromOperatorWithUnsafeSubscribe() {
428-
final AtomicLong requested1 = new AtomicLong();
429-
final AtomicLong requested2 = new AtomicLong();
430-
final AtomicReference<Producer> producer1 = new AtomicReference<Producer>();
431-
final AtomicReference<Producer> producer2 = new AtomicReference<Producer>();
432-
final AtomicReference<Producer> gotProducer = new AtomicReference<Producer>();
433-
Observable.create(new OnSubscribe<Integer>() {
434-
435-
@Override
436-
public void call(final Subscriber<? super Integer> s) {
437-
Producer p1 = new Producer() {
438-
int index = 0;
439-
440-
@Override
441-
public void request(long n) {
442-
requested1.set(n);
443-
System.out.println("onSubscribe => requested: " + n);
444-
for (int i = 0; i < n; i++) {
445-
s.onNext(index++);
446-
}
447-
}
448-
449-
};
450-
producer1.set(p1);
451-
s.setProducer(p1);
452-
}
453-
454-
}).lift(new Operator<Integer, Integer>() {
455-
456-
@Override
457-
public Subscriber<? super Integer> call(final Subscriber<? super Integer> child) {
458-
459-
Producer p2 = new Producer() {
460-
461-
@Override
462-
public void request(long n) {
463-
System.out.println("lift => requested: " + n);
464-
requested2.set(n);
465-
}
466-
467-
};
468-
producer2.set(p2);
469-
child.setProducer(p2);
470-
471-
return new Subscriber<Integer>(child) {
472-
473-
// we request "5" and this decouples the Producer chain while retaining the Subscription chain
474-
@Override
475-
public void onStart() {
476-
request(5);
477-
}
478-
479-
@Override
480-
public void onCompleted() {
481-
}
482-
483-
@Override
484-
public void onError(Throwable e) {
485-
}
486-
487-
@Override
488-
public void onNext(Integer t) {
489-
}
490-
491-
};
492-
}
493-
494-
}).unsafeSubscribe(new Subscriber<Integer>() {
495-
496-
@Override
497-
public void onStart() {
498-
request(1);
499-
}
500-
501-
@Override
502-
public void onCompleted() {
503-
504-
}
505-
506-
@Override
507-
public void onError(Throwable e) {
508-
509-
}
510-
511-
@Override
512-
public void onNext(Integer t) {
513-
System.out.println(t);
514-
request(1);
515-
}
516-
517-
@Override
518-
protected Producer onSetProducer(Producer producer) {
519-
gotProducer.set(producer);
520-
return producer;
521-
}
522-
523-
});
524-
525-
if (gotProducer.get() != producer2.get()) {
526-
throw new IllegalStateException("Expecting the producer from lift");
527-
}
528-
assertEquals(requested1.get(), 5);
529-
assertEquals(requested2.get(), 1);
530-
}
531-
532320
@Test
533321
public void testOnStartCalledOnceViaSubscribe() {
534322
final AtomicInteger c = new AtomicInteger();

0 commit comments

Comments
 (0)