Skip to content

Commit 04664bf

Browse files
committed
persist: register schema at write time, not writer open time
This commit changes persist clients to defer calling `register_schema` on a shard from write handle creation time to the time of the first append operation. The rationale is that we don't need to enforce a schema if we are not attempting to write to a shard. The plan is for 0dt upgrades to make good use of the new, more lenient behavior. Read-only environments can open write handles with evolved schemas without having to durably write down the new schemas. This will allow us to back out of version upgrades without the risk of permanently poisoning the persist state for lower versions.
1 parent 2c28522 commit 04664bf

File tree

7 files changed

+125
-59
lines changed

7 files changed

+125
-59
lines changed

src/persist-client/src/internal/apply.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,22 @@ where
254254
})
255255
}
256256

257+
/// See [crate::PersistClient::find_schema].
258+
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
259+
self.state
260+
.read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
261+
// The common case is that the requested schema is a recent one, so as a minor
262+
// optimization, do this search in reverse order.
263+
let mut schemas = state.collections.schemas.iter().rev();
264+
schemas
265+
.find(|(_, x)| {
266+
K::decode_schema(&x.key) == *key_schema
267+
&& V::decode_schema(&x.val) == *val_schema
268+
})
269+
.map(|(id, _)| *id)
270+
})
271+
}
272+
257273
/// Returns whether the current's state `since` and `upper` are both empty.
258274
///
259275
/// Due to sharing state with other handles, successive reads to this fn or any other may

src/persist-client/src/internal/encoding.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ use crate::{PersistConfig, ShardId, WriterId, cfg};
6565
/// A key and value `Schema` of data written to a batch or shard.
6666
#[derive(Debug)]
6767
pub struct Schemas<K: Codec, V: Codec> {
68-
// TODO: Remove the Option once this finishes rolling out and all shards
69-
// have a registered schema.
7068
/// Id under which this schema is registered in the shard's schema registry,
7169
/// if any.
7270
pub id: Option<SchemaId>,

src/persist-client/src/internal/machine.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,11 @@ where
700700
self.applier.latest_schema()
701701
}
702702

703+
/// See [crate::PersistClient::find_schema].
704+
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
705+
self.applier.find_schema(key_schema, val_schema)
706+
}
707+
703708
/// See [crate::PersistClient::compare_and_evolve_schema].
704709
///
705710
/// TODO: Unify this with [Self::register_schema]?

src/persist-client/src/lib.rs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use differential_dataflow::lattice::Lattice;
2424
use itertools::Itertools;
2525
use mz_build_info::{BuildInfo, build_info};
2626
use mz_dyncfg::ConfigSet;
27-
use mz_ore::{instrument, soft_assert_or_log};
27+
use mz_ore::instrument;
2828
use mz_persist::location::{Blob, Consensus, ExternalError};
2929
use mz_persist_types::schema::SchemaId;
3030
use mz_persist_types::{Codec, Codec64, Opaque};
@@ -490,10 +490,6 @@ impl PersistClient {
490490
///
491491
/// Use this to save latency and a bit of persist traffic if you're just
492492
/// going to immediately drop or expire the [ReadHandle].
493-
///
494-
/// The `_schema` parameter is currently unused, but should be an object
495-
/// that represents the schema of the data in the shard. This will be required
496-
/// in the future.
497493
#[instrument(level = "debug", fields(shard = %shard_id))]
498494
pub async fn open_writer<K, V, T, D>(
499495
&self,
@@ -511,23 +507,11 @@ impl PersistClient {
511507
let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
512508
let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
513509

514-
// TODO: Because schemas are ordered, as part of the persist schema
515-
// changes work, we probably want to build some way to allow persist
516-
// users to control the order. For example, maybe a
517-
// `PersistClient::compare_and_append_schema(current_schema_id,
518-
// next_schema)`. Presumably this would then be passed in to open_writer
519-
// instead of us implicitly registering it here.
520-
// NB: The overwhelming common case is that this schema is already
521-
// registered. In this case, the cmd breaks early and nothing is
522-
// written to (or read from) CRDB.
523-
let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await;
524-
maintenance.start_performing(&machine, &gc);
525-
soft_assert_or_log!(
526-
schema_id.is_some(),
527-
"unable to register schemas {:?} {:?}",
528-
key_schema,
529-
val_schema,
530-
);
510+
// We defer registering the schema until write time, to allow opening
511+
// write handles in a "read-only" mode where they don't implicitly
512+
// modify persist state. But it might already be registered, in which
513+
// case we can fetch its ID.
514+
let schema_id = machine.find_schema(&*key_schema, &*val_schema);
531515

532516
let writer_id = WriterId::new();
533517
let schemas = Schemas {
@@ -718,6 +702,26 @@ impl PersistClient {
718702
Ok(machine.latest_schema())
719703
}
720704

705+
/// Returns the ID of the given schema, if known at the current state.
706+
pub async fn find_schema<K, V, T, D>(
707+
&self,
708+
shard_id: ShardId,
709+
key_schema: &K::Schema,
710+
val_schema: &V::Schema,
711+
diagnostics: Diagnostics,
712+
) -> Result<Option<SchemaId>, InvalidUsage<T>>
713+
where
714+
K: Debug + Codec,
715+
V: Debug + Codec,
716+
T: Timestamp + Lattice + Codec64 + Sync,
717+
D: Semigroup + Codec64 + Send + Sync,
718+
{
719+
let machine = self
720+
.make_machine::<K, V, T, D>(shard_id, diagnostics)
721+
.await?;
722+
Ok(machine.find_schema(key_schema, val_schema))
723+
}
724+
721725
/// Registers a new latest schema for the given shard.
722726
///
723727
/// This new schema must be [backward_compatible] with all previous schemas

src/persist-client/src/schema.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ mod tests {
605605
let schema0 = StringsSchema(vec![false]);
606606
let schema1 = StringsSchema(vec![false, true]);
607607

608-
let write0 = client
608+
let mut write0 = client
609609
.open_writer::<Strings, (), u64, i64>(
610610
shard_id,
611611
Arc::new(schema0.clone()),
@@ -614,6 +614,8 @@ mod tests {
614614
)
615615
.await
616616
.unwrap();
617+
618+
write0.ensure_schema_registered().await;
617619
assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0));
618620

619621
// Not backward compatible (yet... we don't support dropping a column at

src/persist-client/src/write.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,31 @@ where
221221
self.write_schemas.id
222222
}
223223

224+
/// Registers the write schema, if it isn't already registered.
225+
///
226+
/// # Panics
227+
///
228+
/// This method expects that either the shard doesn't yet have any schema registered, or one of
229+
/// the registered schemas is the same as the write schema. If all registered schemas are
230+
/// different from the write schema, it panics.
231+
pub async fn ensure_schema_registered(&mut self) -> SchemaId {
232+
let Schemas { id, key, val } = &self.write_schemas;
233+
234+
if let Some(id) = id {
235+
return *id;
236+
}
237+
238+
let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
239+
maintenance.start_performing(&self.machine, &self.gc);
240+
241+
let Some(schema_id) = schema_id else {
242+
panic!("unable to register schemas: {key:?} {val:?}");
243+
};
244+
245+
self.write_schemas.id = Some(schema_id);
246+
schema_id
247+
}
248+
224249
/// A cached version of the shard-global `upper` frontier.
225250
///
226251
/// This is the most recent upper discovered by this handle. It is
@@ -507,6 +532,16 @@ where
507532
}
508533
}
509534

535+
// If we are writing any data, we require a registered write schema.
536+
//
537+
// We allow appending empty batches without a registered schema, because
538+
// some clients expect to be able to advance the write frontier without
539+
// knowing the schema. For example, shard finalization may not always
540+
// know the schemas of shards to be finalized.
541+
if batches.iter().any(|b| b.batch.len > 0) && self.schema_id().is_none() {
542+
self.ensure_schema_registered().await;
543+
}
544+
510545
let lower = expected_upper.clone();
511546
let upper = new_upper;
512547
let since = Antichain::from_elem(T::minimum());

src/txn-wal/src/txns.rs

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,15 @@ where
214214
) -> Result<Tidy, T> {
215215
let op = &Arc::clone(&self.metrics).register;
216216
op.run(async {
217-
let data_writes = data_writes.into_iter().collect::<Vec<_>>();
217+
let mut data_writes = data_writes.into_iter().collect::<Vec<_>>();
218+
219+
// The txns system requires that all participating data shards have a
220+
// schema registered. Importantly, we must register a data shard's
221+
// schema _before_ we publish it to the txns shard.
222+
for data_write in &mut data_writes {
223+
data_write.ensure_schema_registered().await;
224+
}
225+
218226
let updates = data_writes
219227
.iter()
220228
.map(|data_write| {
@@ -285,45 +293,47 @@ where
285293
}
286294
}
287295
for data_write in data_writes {
288-
let new_schema_id = data_write.schema_id();
289-
290296
// If we already have a write handle for a newer version of a table, don't replace
291297
// it! Currently we only support adding columns to tables with a default value, so
292298
// the latest/newest schema will always be the most complete.
293299
//
294-
// TODO(alter_table): Revist when we support dropping columns.
300+
// TODO(alter_table): Revisit when we support dropping columns.
295301
match self.datas.data_write_for_commit.get(&data_write.shard_id()) {
296302
None => {
297303
self.datas
298304
.data_write_for_commit
299305
.insert(data_write.shard_id(), DataWriteCommit(data_write));
300306
}
301307
Some(previous) => {
302-
match (previous.schema_id(), new_schema_id) {
303-
(Some(previous_id), None) => {
304-
mz_ore::soft_panic_or_log!(
305-
"tried registering a WriteHandle replacing one with a SchemaId prev_schema_id: {:?} shard_id: {:?}",
306-
previous_id,
307-
previous.shard_id(),
308-
);
309-
},
310-
(Some(previous_id), Some(new_id)) if previous_id > new_id => {
311-
mz_ore::soft_panic_or_log!(
312-
"tried registering a WriteHandle with an older SchemaId prev_schema_id: {:?} new_schema_id: {:?} shard_id: {:?}",
313-
previous_id,
314-
new_id,
315-
previous.shard_id(),
316-
);
317-
},
318-
(previous_schema_id, new_schema_id) => {
319-
if previous_schema_id.is_none() && new_schema_id.is_none() {
320-
tracing::warn!("replacing WriteHandle without any SchemaIds to reason about");
321-
} else {
322-
tracing::info!(?previous_schema_id, ?new_schema_id, shard_id = ?previous.shard_id(), "replacing WriteHandle");
323-
}
324-
self.datas.data_write_for_commit.insert(data_write.shard_id(), DataWriteCommit(data_write));
325-
}
308+
let new_schema_id = data_write.schema_id().expect("ensured above");
309+
310+
if let Some(prev_schema_id) = previous.schema_id()
311+
&& prev_schema_id > new_schema_id
312+
{
313+
mz_ore::soft_panic_or_log!(
314+
"tried registering a WriteHandle with an older SchemaId; \
315+
prev_schema_id: {} new_schema_id: {} shard_id: {}",
316+
prev_schema_id,
317+
new_schema_id,
318+
previous.shard_id(),
319+
);
320+
continue;
321+
} else if previous.schema_id().is_none() {
322+
mz_ore::soft_panic_or_log!(
323+
"encountered data shard without a schema; shard_id: {}",
324+
previous.shard_id(),
325+
);
326326
}
327+
328+
tracing::info!(
329+
prev_schema_id = ?previous.schema_id(),
330+
?new_schema_id,
331+
shard_id = %previous.shard_id(),
332+
"replacing WriteHandle"
333+
);
334+
self.datas
335+
.data_write_for_commit
336+
.insert(data_write.shard_id(), DataWriteCommit(data_write));
327337
}
328338
}
329339
}
@@ -756,12 +766,8 @@ where
756766
.expect("codecs have not changed");
757767
let (key_schema, val_schema) = match schemas {
758768
Some((_, key_schema, val_schema)) => (Arc::new(key_schema), Arc::new(val_schema)),
759-
// - For new shards we will always have at least one schema
760-
// registered by the time we reach this point, because that
761-
// happens at txn-registration time.
762-
// - For pre-existing shards, every txns shard will have had
763-
// open_writer called on it at least once in the previous release,
764-
// so the schema should exist.
769+
// We will always have at least one schema registered by the time we reach this point,
770+
// because that is ensured at txn-registration time.
765771
None => unreachable!("data shard {} should have a schema", data_id),
766772
};
767773
let wrapped = self

0 commit comments

Comments
 (0)