38
38
import reactor .util .annotation .Nullable ;
39
39
40
40
import java .util .concurrent .atomic .AtomicBoolean ;
41
+ import java .util .concurrent .atomic .AtomicReference ;
41
42
42
43
import static io .r2dbc .postgresql .PostgresqlResult .toResult ;
43
44
@@ -55,19 +56,31 @@ final class PostgresqlCopyIn {
55
56
Mono <Long > copy (String sql , Publisher <? extends Publisher <ByteBuf >> stdin ) {
56
57
57
58
ExceptionFactory exceptionFactory = ExceptionFactory .withSql (sql );
58
-
59
+ AtomicReference < CopyData > toReleaseOnError = new AtomicReference <>();
59
60
return Flux .from (stdin )
60
61
.<FrontendMessage >concatMap (data -> {
61
62
62
63
CompositeByteBuf composite = this .context .getClient ().getByteBufAllocator ().compositeBuffer ();
63
64
64
65
return Flux .from (data )
65
- .reduce (composite , (l , r ) -> l .addComponent (true , r ))
66
+ .reduce (composite , (l , r ) -> {
67
+ return l .addComponent (true , r );
68
+ })
66
69
.map (CopyData ::new )
70
+ .doOnNext (toReleaseOnError ::set )
67
71
.doOnDiscard (ReferenceCounted .class , ReferenceCountUtil ::release );
68
72
69
73
}).concatWithValues (CopyDone .INSTANCE ).startWith (new Query (sql ))
70
- .as (messages -> copyIn (exceptionFactory , messages ));
74
+ .as (messages -> copyIn (exceptionFactory , messages ))
75
+ .doFinally (signalType -> {
76
+
77
+ CopyData copyData = toReleaseOnError .get ();
78
+ if (copyData != null ) {
79
+ if (copyData .refCnt () > 0 ) {
80
+ copyData .release ();
81
+ }
82
+ }
83
+ });
71
84
}
72
85
73
86
private Mono <Long > copyIn (ExceptionFactory exceptionFactory , Flux <FrontendMessage > copyDataMessages ) {
0 commit comments