Skip to content

Commit d2b1b58

Browse files
authored
Merge pull request #33902 from teskje/persist-defer-register-schema
persist: register schema at write time, not writer open time
2 parents dd3402f + 990ffbb commit d2b1b58

File tree

9 files changed

+152
-113
lines changed

9 files changed

+152
-113
lines changed

src/persist-client/src/cli/admin.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,6 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
231231
Arc::clone(&pubsub_sender),
232232
));
233233

234-
// We need a PersistClient to open a write handle so we can append an empty batch.
235234
let persist_client = PersistClient::new(
236235
cfg,
237236
blob,
@@ -259,21 +258,7 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
259258
diagnostics,
260259
)
261260
.await?;
262-
263-
if !write_handle.upper().is_empty() {
264-
let empty_batch: Vec<(
265-
(crate::cli::inspect::K, crate::cli::inspect::V),
266-
u64,
267-
i64,
268-
)> = vec![];
269-
let lower = write_handle.upper().clone();
270-
let upper = Antichain::new();
271-
272-
let result = write_handle.append(empty_batch, lower, upper).await?;
273-
if let Err(err) = result {
274-
anyhow::bail!("failed to force downgrade upper, {err:?}");
275-
}
276-
}
261+
write_handle.advance_upper(&Antichain::new()).await;
277262
}
278263

279264
if force_downgrade_since {

src/persist-client/src/internal/apply.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,22 @@ where
254254
})
255255
}
256256

257+
/// Returns the ID of the given schema, if known at the current state.
258+
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
259+
self.state
260+
.read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
261+
// The common case is that the requested schema is a recent one, so as a minor
262+
// optimization, do this search in reverse order.
263+
let mut schemas = state.collections.schemas.iter().rev();
264+
schemas
265+
.find(|(_, x)| {
266+
K::decode_schema(&x.key) == *key_schema
267+
&& V::decode_schema(&x.val) == *val_schema
268+
})
269+
.map(|(id, _)| *id)
270+
})
271+
}
272+
257273
/// Returns whether the current's state `since` and `upper` are both empty.
258274
///
259275
/// Due to sharing state with other handles, successive reads to this fn or any other may

src/persist-client/src/internal/encoding.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ use crate::{PersistConfig, ShardId, WriterId, cfg};
6565
/// A key and value `Schema` of data written to a batch or shard.
6666
#[derive(Debug)]
6767
pub struct Schemas<K: Codec, V: Codec> {
68-
// TODO: Remove the Option once this finishes rolling out and all shards
69-
// have a registered schema.
7068
/// Id under which this schema is registered in the shard's schema registry,
7169
/// if any.
7270
pub id: Option<SchemaId>,

src/persist-client/src/internal/machine.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,11 @@ where
700700
self.applier.latest_schema()
701701
}
702702

703+
/// Returns the ID of the given schema, if known at the current state.
704+
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
705+
self.applier.find_schema(key_schema, val_schema)
706+
}
707+
703708
/// See [crate::PersistClient::compare_and_evolve_schema].
704709
///
705710
/// TODO: Unify this with [Self::register_schema]?

src/persist-client/src/lib.rs

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use differential_dataflow::lattice::Lattice;
2424
use itertools::Itertools;
2525
use mz_build_info::{BuildInfo, build_info};
2626
use mz_dyncfg::ConfigSet;
27-
use mz_ore::{instrument, soft_assert_or_log};
27+
use mz_ore::instrument;
2828
use mz_persist::location::{Blob, Consensus, ExternalError};
2929
use mz_persist_types::schema::SchemaId;
3030
use mz_persist_types::{Codec, Codec64, Opaque};
@@ -490,10 +490,6 @@ impl PersistClient {
490490
///
491491
/// Use this to save latency and a bit of persist traffic if you're just
492492
/// going to immediately drop or expire the [ReadHandle].
493-
///
494-
/// The `_schema` parameter is currently unused, but should be an object
495-
/// that represents the schema of the data in the shard. This will be required
496-
/// in the future.
497493
#[instrument(level = "debug", fields(shard = %shard_id))]
498494
pub async fn open_writer<K, V, T, D>(
499495
&self,
@@ -511,23 +507,11 @@ impl PersistClient {
511507
let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
512508
let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
513509

514-
// TODO: Because schemas are ordered, as part of the persist schema
515-
// changes work, we probably want to build some way to allow persist
516-
// users to control the order. For example, maybe a
517-
// `PersistClient::compare_and_append_schema(current_schema_id,
518-
// next_schema)`. Presumably this would then be passed in to open_writer
519-
// instead of us implicitly registering it here.
520-
// NB: The overwhelming common case is that this schema is already
521-
// registered. In this case, the cmd breaks early and nothing is
522-
// written to (or read from) CRDB.
523-
let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await;
524-
maintenance.start_performing(&machine, &gc);
525-
soft_assert_or_log!(
526-
schema_id.is_some(),
527-
"unable to register schemas {:?} {:?}",
528-
key_schema,
529-
val_schema,
530-
);
510+
// We defer registering the schema until write time, to allow opening
511+
// write handles in a "read-only" mode where they don't implicitly
512+
// modify persist state. But it might already be registered, in which
513+
// case we can fetch its ID.
514+
let schema_id = machine.find_schema(&*key_schema, &*val_schema);
531515

532516
let writer_id = WriterId::new();
533517
let schemas = Schemas {
@@ -1992,7 +1976,6 @@ mod tests {
19921976
#[mz_persist_proc::test(tokio::test)]
19931977
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
19941978
async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
1995-
const EMPTY: &[(((), ()), u64, i64)] = &[];
19961979
let persist_client = new_test_client(&dyncfgs).await;
19971980

19981981
let shard_id = ShardId::new();
@@ -2006,11 +1989,7 @@ mod tests {
20061989
// Advance since and upper to empty, which is a pre-requisite for
20071990
// finalization/tombstoning.
20081991
let () = read.downgrade_since(&Antichain::new()).await;
2009-
let () = write
2010-
.compare_and_append(EMPTY, Antichain::from_elem(0), Antichain::new())
2011-
.await
2012-
.expect("usage should be valid")
2013-
.expect("upper should match");
1992+
let () = write.advance_upper(&Antichain::new()).await;
20141993

20151994
let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
20161995
.open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
@@ -2047,7 +2026,6 @@ mod tests {
20472026
#[mz_persist_proc::test(tokio::test)]
20482027
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
20492028
async fn finalize_shard(dyncfgs: ConfigUpdates) {
2050-
const EMPTY: &[(((), ()), u64, i64)] = &[];
20512029
const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
20522030
let persist_client = new_test_client(&dyncfgs).await;
20532031

@@ -2069,11 +2047,7 @@ mod tests {
20692047
// Advance since and upper to empty, which is a pre-requisite for
20702048
// finalization/tombstoning.
20712049
let () = read.downgrade_since(&Antichain::new()).await;
2072-
let () = write
2073-
.compare_and_append(EMPTY, Antichain::from_elem(1), Antichain::new())
2074-
.await
2075-
.expect("usage should be valid")
2076-
.expect("upper should match");
2050+
let () = write.advance_upper(&Antichain::new()).await;
20772051

20782052
let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
20792053
.open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())

src/persist-client/src/schema.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ mod tests {
605605
let schema0 = StringsSchema(vec![false]);
606606
let schema1 = StringsSchema(vec![false, true]);
607607

608-
let write0 = client
608+
let mut write0 = client
609609
.open_writer::<Strings, (), u64, i64>(
610610
shard_id,
611611
Arc::new(schema0.clone()),
@@ -614,6 +614,8 @@ mod tests {
614614
)
615615
.await
616616
.unwrap();
617+
618+
write0.ensure_schema_registered().await;
617619
assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0));
618620

619621
// Not backward compatible (yet... we don't support dropping a column at

src/persist-client/src/write.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,31 @@ where
221221
self.write_schemas.id
222222
}
223223

224+
/// Registers the write schema, if it isn't already registered.
225+
///
226+
/// # Panics
227+
///
228+
/// This method expects that either the shard doesn't yet have any schema registered, or one of
229+
/// the registered schemas is the same as the write schema. If all registered schemas are
230+
/// different from the write schema, it panics.
231+
pub async fn ensure_schema_registered(&mut self) -> SchemaId {
232+
let Schemas { id, key, val } = &self.write_schemas;
233+
234+
if let Some(id) = id {
235+
return *id;
236+
}
237+
238+
let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
239+
maintenance.start_performing(&self.machine, &self.gc);
240+
241+
let Some(schema_id) = schema_id else {
242+
panic!("unable to register schemas: {key:?} {val:?}");
243+
};
244+
245+
self.write_schemas.id = Some(schema_id);
246+
schema_id
247+
}
248+
224249
/// A cached version of the shard-global `upper` frontier.
225250
///
226251
/// This is the most recent upper discovered by this handle. It is
@@ -256,6 +281,50 @@ where
256281
&self.upper
257282
}
258283

284+
/// Advance the shard's upper by the given frontier.
285+
///
286+
/// If the provided `target` is less than or equal to the shard's upper, this is a no-op.
287+
///
288+
/// In contrast to the various compare-and-append methods, this method does not require the
289+
/// handle's write schema to be registered with the shard. That is, it is fine to use a dummy
290+
/// schema when creating a writer just to advance a shard upper.
291+
pub async fn advance_upper(&mut self, target: &Antichain<T>) {
292+
// We avoid `fetch_recent_upper` here, to avoid a consensus roundtrip if the known upper is
293+
// already beyond the target.
294+
let mut lower = self.shared_upper().clone();
295+
296+
while !PartialOrder::less_equal(target, &lower) {
297+
let since = Antichain::from_elem(T::minimum());
298+
let desc = Description::new(lower.clone(), target.clone(), since);
299+
let batch = HollowBatch::empty(desc);
300+
301+
let heartbeat_timestamp = (self.cfg.now)();
302+
let res = self
303+
.machine
304+
.compare_and_append(
305+
&batch,
306+
&self.writer_id,
307+
&self.debug_state,
308+
heartbeat_timestamp,
309+
)
310+
.await;
311+
312+
use CompareAndAppendRes::*;
313+
let new_upper = match res {
314+
Success(_seq_no, maintenance) => {
315+
maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
316+
batch.desc.upper().clone()
317+
}
318+
UpperMismatch(_seq_no, actual_upper) => actual_upper,
319+
InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
320+
InlineBackpressure => unreachable!("batch was empty"),
321+
};
322+
323+
self.upper.clone_from(&new_upper);
324+
lower = new_upper;
325+
}
326+
}
327+
259328
/// Applies `updates` to this shard and downgrades this handle's upper to
260329
/// `upper`.
261330
///
@@ -507,6 +576,9 @@ where
507576
}
508577
}
509578

579+
// Before we append any data, we require a registered write schema.
580+
self.ensure_schema_registered().await;
581+
510582
let lower = expected_upper.clone();
511583
let upper = new_upper;
512584
let since = Antichain::from_elem(T::minimum());

src/storage-client/src/storage_collections.rs

Lines changed: 7 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3248,40 +3248,23 @@ async fn finalize_shards_task<T>(
32483248
Some(shard_id)
32493249
} else {
32503250
debug!(%shard_id, "finalizing shard");
3251-
let finalize = || async move {
3251+
let finalize = || async move {
32523252
// TODO: thread the global ID into the shard finalization WAL
32533253
let diagnostics = Diagnostics::from_purpose("finalizing shards");
32543254

3255-
let schemas = persist_client.latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics.clone()).await.expect("codecs have not changed");
3256-
let (key_schema, val_schema) = match schemas {
3257-
Some((_, key_schema, val_schema)) => (key_schema, val_schema),
3258-
None => (RelationDesc::empty(), UnitSchema),
3259-
};
3260-
3261-
let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![];
3255+
// We only use the writer to advance the upper, so using a dummy schema is
3256+
// fine.
32623257
let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
32633258
persist_client
32643259
.open_writer(
32653260
shard_id,
3266-
Arc::new(key_schema),
3267-
Arc::new(val_schema),
3261+
Arc::new(RelationDesc::empty()),
3262+
Arc::new(UnitSchema),
32683263
diagnostics,
32693264
)
32703265
.await
32713266
.expect("invalid persist usage");
3272-
3273-
let upper = write_handle.upper();
3274-
3275-
if !upper.is_empty() {
3276-
let append = write_handle
3277-
.append(empty_batch, upper.clone(), Antichain::new())
3278-
.await?;
3279-
3280-
if let Err(e) = append {
3281-
warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}");
3282-
return Ok(());
3283-
}
3284-
}
3267+
write_handle.advance_upper(&Antichain::new()).await;
32853268
write_handle.expire().await;
32863269

32873270
if force_downgrade_since {
@@ -3317,9 +3300,7 @@ async fn finalize_shards_task<T>(
33173300
.compare_and_downgrade_since(&epoch, (&epoch, &new_since))
33183301
.await;
33193302
if let Err(e) = downgrade {
3320-
warn!(
3321-
"tried to finalize a shard with an advancing epoch: {e:?}"
3322-
);
3303+
warn!("tried to finalize a shard with an advancing epoch: {e:?}");
33233304
return Ok(());
33243305
}
33253306
// Not available now, so finalization is broken.

0 commit comments

Comments
 (0)