File tree Expand file tree Collapse file tree 1 file changed +6
-0
lines changed
rxjava-core/src/main/java/rx/observers Expand file tree Collapse file tree 1 file changed +6
-0
lines changed Original file line number Diff line number Diff line change @@ -117,18 +117,23 @@ public void onNext(T t) {
117117 queue = new FastList ();
118118 }
119119 queue .add (t != null ? t : NULL_SENTINEL );
120+ // another thread is emitting so we add to the queue and return
120121 return ;
121122 }
123+ // we can emit
122124 emitting = true ;
125+ // reference to the list to drain before emitting our value
123126 list = queue ;
124127 queue = null ;
125128 }
126129
130+ // we only get here if we won the right to emit, otherwise we returned in the if(emitting) block above
127131 try {
128132 int iter = MAX_DRAIN_ITERATION ;
129133 do {
130134 drainQueue (list );
131135 if (iter == MAX_DRAIN_ITERATION ) {
136+ // after the first draining we emit our own value
132137 actual .onNext (t );
133138 }
134139 --iter ;
@@ -152,6 +157,7 @@ public void onNext(T t) {
152157 list = null ;
153158 }
154159 }
160+ // this will only drain if terminated (done here outside of synchronized block)
155161 drainQueue (list );
156162 }
157163 }
You can’t perform that action at this time.
0 commit comments