21
21
import reactor .core .publisher .GroupedFlux ;
22
22
import reactor .core .publisher .Operators ;
23
23
import reactor .core .publisher .Signal ;
24
+ import reactor .util .context .Context ;
24
25
import reactor .util .function .Tuple2 ;
25
26
import reactor .util .function .Tuples ;
26
27
@@ -149,8 +150,8 @@ private Function<Flux<Tuple2<Integer, Message<byte[]>>>, Flux<Tuple2<Integer, Me
149
150
// Used to transform the publisher chain into one that doesn't forward cancel() calls once it has complete()d.
150
151
private Function <? super Publisher <Tuple2 <Integer , Message <byte []>>>, ? extends Publisher <Tuple2 <Integer , Message <byte []>>>> ignoreCancelsAfterComplete () {
151
152
return Operators .lift ((f , actual ) ->
152
- new CoreSubscriber <>() {
153
- AtomicBoolean completed = new AtomicBoolean () ;
153
+ new CoreSubscriber <Tuple2 < Integer , Message < byte []>> >() {
154
+ private volatile boolean completed ;
154
155
155
156
@ Override
156
157
public void onSubscribe (Subscription s ) {
@@ -162,7 +163,7 @@ public void request(long n) {
162
163
163
164
@ Override
164
165
public void cancel () {
165
- if (!completed . get () ) {
166
+ if (!completed ) {
166
167
s .cancel ();
167
168
}
168
169
}
@@ -181,9 +182,14 @@ public void onError(Throwable t) {
181
182
182
183
@ Override
183
184
public void onComplete () {
184
- completed . compareAndSet ( false , true ) ;
185
+ completed = true ;
185
186
actual .onComplete ();
186
187
}
188
+
189
+ @ Override
190
+ public Context currentContext () {
191
+ return actual .currentContext ();
192
+ }
187
193
});
188
194
}
189
195
0 commit comments