@@ -1085,14 +1085,16 @@ where
1085
1085
. cloned ( )
1086
1086
. collect :: < Vec < _ > > ( ) ;
1087
1087
1088
- trace ! (
1088
+ info ! (
1089
1089
"persist_sink {collection_id}/{shard_id}: \
1090
- append_batches: in_flight: {:?}, \
1091
- done: {:?}, \
1090
+ append_batches: in_flight_descriptions size: {:?}, \
1091
+ in_flight_batches size: {:?}, \
1092
+ done size: {:?}, \
1092
1093
batch_frontier: {:?}, \
1093
1094
batch_description_frontier: {:?}",
1094
- in_flight_descriptions,
1095
- done_batches,
1095
+ in_flight_descriptions. len( ) ,
1096
+ in_flight_batches. len( ) ,
1097
+ done_batches. len( ) ,
1096
1098
batches_frontier,
1097
1099
batch_description_frontier
1098
1100
) ;
@@ -1112,6 +1114,7 @@ where
1112
1114
let mut done_batches_iter = done_batches. iter ( ) ;
1113
1115
1114
1116
let Some ( first_batch_description) = done_batches_iter. next ( ) else {
1117
+ upper_cap_set. downgrade ( current_upper. borrow ( ) . iter ( ) ) ;
1115
1118
continue ;
1116
1119
} ;
1117
1120
@@ -1135,7 +1138,7 @@ where
1135
1138
1136
1139
let mut to_append = batches. iter_mut ( ) . map ( |b| & mut b. batch ) . collect :: < Vec < _ > > ( ) ;
1137
1140
1138
- while to_append . len ( ) > 0 {
1141
+ loop {
1139
1142
let result = {
1140
1143
let maybe_err = if * read_only_rx. borrow ( ) {
1141
1144
@@ -1283,6 +1286,7 @@ where
1283
1286
to_append. clear ( ) ;
1284
1287
current_upper. borrow_mut ( ) . clone_from ( & batch_upper) ;
1285
1288
upper_cap_set. downgrade ( current_upper. borrow ( ) . iter ( ) ) ;
1289
+ break ;
1286
1290
}
1287
1291
Err ( mismatch) => {
1288
1292
// We tried to do a non-contiguous append, that won't work.
@@ -1324,15 +1328,17 @@ where
1324
1328
batch_lower = mismatch. current . clone ( ) ;
1325
1329
to_append = batches. iter_mut ( )
1326
1330
. sorted_by ( |l, r| Ord :: cmp ( & l. data_ts , & r. data_ts ) )
1327
- . map ( |batch| & mut batch. batch ) . collect :: < Vec < _ > > ( ) ;
1331
+ . map ( |batch| & mut batch. batch )
1332
+ . collect :: < Vec < _ > > ( ) ;
1328
1333
1329
1334
// Best-effort attempt to delete unneeded batches.
1330
1335
future:: join_all ( batch_delete_futures) . await ;
1331
1336
continue ;
1332
1337
} else {
1333
1338
// Best-effort attempt to delete unneeded batches.
1334
1339
future:: join_all ( batches. into_iter ( ) . map ( |b| b. batch . delete ( ) ) ) . await ;
1335
- current_upper. replace ( mismatch. current . clone ( ) ) ;
1340
+ current_upper. borrow_mut ( ) . clone_from ( & mismatch. current ) ;
1341
+ upper_cap_set. downgrade ( current_upper. borrow ( ) . iter ( ) ) ;
1336
1342
}
1337
1343
1338
1344
if bail_on_concurrent_modification {
0 commit comments