Skip to content

Commit dcf5ad9

Browse files
committed
fix ignore uppers for empty batch sets
1 parent 58edfee commit dcf5ad9

File tree

3 files changed

+52
-18
lines changed

3 files changed

+52
-18
lines changed

src/persist-client/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1187,6 +1187,7 @@ mod tests {
11871187
append_upper: Antichain::from_elem(5),
11881188
},
11891189
);
1190+
11901191
let batch = write0
11911192
.batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
11921193
.await
@@ -1203,6 +1204,7 @@ mod tests {
12031204
append_upper: Antichain::from_elem(3),
12041205
},
12051206
);
1207+
12061208
let batch = write0
12071209
.batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
12081210
.await

src/persist-client/src/write.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,26 @@ where
463463
where
464464
D: Send + Sync,
465465
{
466+
let mut max_upper: Option<&Antichain<T>> = None;
467+
let mut min_lower: Option<&Antichain<T>> = None;
468+
466469
for batch in batches.iter() {
470+
if let Some(current_max_upper) = max_upper {
471+
if batch.upper().gt(current_max_upper) {
472+
max_upper = Some(batch.upper());
473+
}
474+
} else {
475+
max_upper = Some(batch.upper());
476+
}
477+
478+
if let Some(current_min_lower) = min_lower {
479+
if batch.lower().lt(current_min_lower) {
480+
min_lower = Some(batch.lower());
481+
}
482+
} else {
483+
min_lower = Some(batch.lower());
484+
}
485+
467486
if self.machine.shard_id() != batch.shard_id() {
468487
return Err(InvalidUsage::BatchNotFromThisShard {
469488
batch_shard: batch.shard_id(),
@@ -484,6 +503,18 @@ where
484503

485504
let lower = expected_upper.clone();
486505
let upper = new_upper;
506+
507+
if let (Some(max_upper), Some(min_lower)) = (max_upper, min_lower) {
508+
if max_upper.lt(&upper) || min_lower.gt(&lower) {
509+
return Err(InvalidUsage::InvalidBatchBounds {
510+
batch_lower: min_lower.clone(),
511+
batch_upper: max_upper.clone(),
512+
append_lower: lower.clone(),
513+
append_upper: upper.clone(),
514+
});
515+
}
516+
}
517+
487518
let since = Antichain::from_elem(T::minimum());
488519
let desc = Description::new(lower, upper, since);
489520

src/storage/src/render/persist_sink.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ use mz_persist_client::cache::PersistClientCache;
107107
use mz_persist_client::error::UpperMismatch;
108108
use mz_persist_types::codec_impls::UnitSchema;
109109
use mz_persist_types::{Codec, Codec64};
110-
use mz_repr::{Diff, GlobalId, Row};
110+
use mz_repr::{Diff, GlobalId, Row, TimestampManipulation};
111111
use mz_storage_types::controller::CollectionMetadata;
112112
use mz_storage_types::errors::DataflowError;
113113
use mz_storage_types::sources::SourceData;
@@ -116,12 +116,12 @@ use mz_timely_util::builder_async::{
116116
Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
117117
};
118118
use serde::{Deserialize, Serialize};
119+
use timely::PartialOrder;
119120
use timely::container::CapacityContainerBuilder;
120121
use timely::dataflow::channels::pact::{Exchange, Pipeline};
121122
use timely::dataflow::operators::{Broadcast, Capability, CapabilitySet, Inspect};
122123
use timely::dataflow::{Scope, Stream};
123124
use timely::progress::{Antichain, Timestamp};
124-
use timely::{Container, PartialOrder};
125125
use tokio::sync::Semaphore;
126126
use tracing::trace;
127127

@@ -1111,21 +1111,6 @@ where
11111111
}
11121112
});
11131113

1114-
let mut done_batches_iter = done_batches.iter();
1115-
1116-
let Some(first_batch_description) = done_batches_iter.next() else {
1117-
upper_cap_set.downgrade(current_upper.borrow().iter());
1118-
continue;
1119-
};
1120-
1121-
let batch_upper = first_batch_description.1.clone();
1122-
1123-
let mut batch_lower = if let Some(last_batch_description) = done_batches_iter.last() {
1124-
last_batch_description.0.clone()
1125-
} else {
1126-
first_batch_description.0.clone()
1127-
};
1128-
11291114
let mut batches: Vec<FinishedBatch> = vec![];
11301115
let mut batch_metrics: Vec<BatchMetrics> = vec![];
11311116

@@ -1136,7 +1121,23 @@ where
11361121
batch_metrics.push(batch.batch_metrics);
11371122
}
11381123

1139-
let mut to_append = batches.iter_mut().map(|b| &mut b.batch).collect::<Vec<_>>();
1124+
let mut batch_upper = Antichain::from_elem(mz_repr::Timestamp::minimum());
1125+
let mut batch_lower = Antichain::from_elem(mz_repr::Timestamp::maximum());
1126+
1127+
let mut to_append = batches
1128+
.iter_mut()
1129+
.map(|b| {
1130+
if batch_upper.lt(b.batch.upper()) {
1131+
batch_upper = b.batch.upper().clone();
1132+
}
1133+
1134+
if batch_lower.gt(b.batch.lower()) {
1135+
batch_lower = b.batch.lower().clone();
1136+
}
1137+
1138+
&mut b.batch
1139+
})
1140+
.collect::<Vec<_>>();
11401141

11411142
loop {
11421143
let result = {

0 commit comments

Comments
 (0)