Skip to content

Commit 69c6796

Browse files
committed
persist: provide WriteHandle::advance_upper
... and use it to enable shard finalization without a known shard schema.
1 parent 04664bf commit 69c6796

File tree

4 files changed

+56
-63
lines changed

4 files changed

+56
-63
lines changed

src/persist-client/src/cli/admin.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,6 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
231231
Arc::clone(&pubsub_sender),
232232
));
233233

234-
// We need a PersistClient to open a write handle so we can append an empty batch.
235234
let persist_client = PersistClient::new(
236235
cfg,
237236
blob,
@@ -259,21 +258,7 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
259258
diagnostics,
260259
)
261260
.await?;
262-
263-
if !write_handle.upper().is_empty() {
264-
let empty_batch: Vec<(
265-
(crate::cli::inspect::K, crate::cli::inspect::V),
266-
u64,
267-
i64,
268-
)> = vec![];
269-
let lower = write_handle.upper().clone();
270-
let upper = Antichain::new();
271-
272-
let result = write_handle.append(empty_batch, lower, upper).await?;
273-
if let Err(err) = result {
274-
anyhow::bail!("failed to force downgrade upper, {err:?}");
275-
}
276-
}
261+
write_handle.advance_upper(&Antichain::new()).await;
277262
}
278263

279264
if force_downgrade_since {

src/persist-client/src/lib.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1996,7 +1996,6 @@ mod tests {
19961996
#[mz_persist_proc::test(tokio::test)]
19971997
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
19981998
async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
1999-
const EMPTY: &[(((), ()), u64, i64)] = &[];
20001999
let persist_client = new_test_client(&dyncfgs).await;
20012000

20022001
let shard_id = ShardId::new();
@@ -2010,11 +2009,7 @@ mod tests {
20102009
// Advance since and upper to empty, which is a pre-requisite for
20112010
// finalization/tombstoning.
20122011
let () = read.downgrade_since(&Antichain::new()).await;
2013-
let () = write
2014-
.compare_and_append(EMPTY, Antichain::from_elem(0), Antichain::new())
2015-
.await
2016-
.expect("usage should be valid")
2017-
.expect("upper should match");
2012+
let () = write.advance_upper(&Antichain::new()).await;
20182013

20192014
let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
20202015
.open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
@@ -2051,7 +2046,6 @@ mod tests {
20512046
#[mz_persist_proc::test(tokio::test)]
20522047
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
20532048
async fn finalize_shard(dyncfgs: ConfigUpdates) {
2054-
const EMPTY: &[(((), ()), u64, i64)] = &[];
20552049
const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
20562050
let persist_client = new_test_client(&dyncfgs).await;
20572051

@@ -2073,11 +2067,7 @@ mod tests {
20732067
// Advance since and upper to empty, which is a pre-requisite for
20742068
// finalization/tombstoning.
20752069
let () = read.downgrade_since(&Antichain::new()).await;
2076-
let () = write
2077-
.compare_and_append(EMPTY, Antichain::from_elem(1), Antichain::new())
2078-
.await
2079-
.expect("usage should be valid")
2080-
.expect("upper should match");
2070+
let () = write.advance_upper(&Antichain::new()).await;
20812071

20822072
let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
20832073
.open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())

src/persist-client/src/write.rs

Lines changed: 46 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,50 @@ where
281281
&self.upper
282282
}
283283

284+
/// Advance the shard's upper by the given frontier.
285+
///
286+
/// If the provided `target` is less than or equal to the shard's upper, this is a no-op.
287+
///
288+
/// In contrast to the various compare-and-append methods, this method does not require the
289+
/// handle's write schema to be registered with the shard. That is, it is fine to use a dummy
290+
/// schema when creating a writer just to advance a shard upper.
291+
pub async fn advance_upper(&mut self, target: &Antichain<T>) {
292+
// We avoid `fetch_recent_upper` here, to avoid a consensus roundtrip if the known upper is
293+
// already beyond the target.
294+
let mut lower = self.shared_upper().clone();
295+
296+
while !PartialOrder::less_equal(target, &lower) {
297+
let since = Antichain::from_elem(T::minimum());
298+
let desc = Description::new(lower.clone(), target.clone(), since);
299+
let batch = HollowBatch::empty(desc);
300+
301+
let heartbeat_timestamp = (self.cfg.now)();
302+
let res = self
303+
.machine
304+
.compare_and_append(
305+
&batch,
306+
&self.writer_id,
307+
&self.debug_state,
308+
heartbeat_timestamp,
309+
)
310+
.await;
311+
312+
use CompareAndAppendRes::*;
313+
let new_upper = match res {
314+
Success(_seq_no, maintenance) => {
315+
maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
316+
batch.desc.upper().clone()
317+
}
318+
UpperMismatch(_seq_no, actual_upper) => actual_upper,
319+
InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
320+
InlineBackpressure => unreachable!("batch was empty"),
321+
};
322+
323+
self.upper.clone_from(&new_upper);
324+
lower = new_upper;
325+
}
326+
}
327+
284328
/// Applies `updates` to this shard and downgrades this handle's upper to
285329
/// `upper`.
286330
///
@@ -532,15 +576,8 @@ where
532576
}
533577
}
534578

535-
// If we are writing any data, we require a registered write schema.
536-
//
537-
// We allow appending empty batches without a registered schema, because
538-
// some clients expect to be able to advance the write frontier without
539-
// knowing the schema. For example, shard finalization may not always
540-
// know the schemas of shards to be finalized.
541-
if batches.iter().any(|b| b.batch.len > 0) && self.schema_id().is_none() {
542-
self.ensure_schema_registered().await;
543-
}
579+
// Before we append any data, we require a registered write schema.
580+
self.ensure_schema_registered().await;
544581

545582
let lower = expected_upper.clone();
546583
let upper = new_upper;

src/storage-client/src/storage_collections.rs

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3248,40 +3248,23 @@ async fn finalize_shards_task<T>(
32483248
Some(shard_id)
32493249
} else {
32503250
debug!(%shard_id, "finalizing shard");
3251-
let finalize = || async move {
3251+
let finalize = || async move {
32523252
// TODO: thread the global ID into the shard finalization WAL
32533253
let diagnostics = Diagnostics::from_purpose("finalizing shards");
32543254

3255-
let schemas = persist_client.latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics.clone()).await.expect("codecs have not changed");
3256-
let (key_schema, val_schema) = match schemas {
3257-
Some((_, key_schema, val_schema)) => (key_schema, val_schema),
3258-
None => (RelationDesc::empty(), UnitSchema),
3259-
};
3260-
3261-
let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![];
3255+
// We only use the writer to advance the upper, so using a dummy schema is
3256+
// fine.
32623257
let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
32633258
persist_client
32643259
.open_writer(
32653260
shard_id,
3266-
Arc::new(key_schema),
3267-
Arc::new(val_schema),
3261+
Arc::new(RelationDesc::empty()),
3262+
Arc::new(UnitSchema),
32683263
diagnostics,
32693264
)
32703265
.await
32713266
.expect("invalid persist usage");
3272-
3273-
let upper = write_handle.upper();
3274-
3275-
if !upper.is_empty() {
3276-
let append = write_handle
3277-
.append(empty_batch, upper.clone(), Antichain::new())
3278-
.await?;
3279-
3280-
if let Err(e) = append {
3281-
warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}");
3282-
return Ok(());
3283-
}
3284-
}
3267+
write_handle.advance_upper(&Antichain::new()).await;
32853268
write_handle.expire().await;
32863269

32873270
if force_downgrade_since {
@@ -3317,9 +3300,7 @@ async fn finalize_shards_task<T>(
33173300
.compare_and_downgrade_since(&epoch, (&epoch, &new_since))
33183301
.await;
33193302
if let Err(e) = downgrade {
3320-
warn!(
3321-
"tried to finalize a shard with an advancing epoch: {e:?}"
3322-
);
3303+
warn!("tried to finalize a shard with an advancing epoch: {e:?}");
33233304
return Ok(());
33243305
}
33253306
// Not available now, so finalization is broken.

0 commit comments

Comments
 (0)