Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions src/persist-client/src/cli/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
Arc::clone(&pubsub_sender),
));

// We need a PersistClient to open a write handle so we can append an empty batch.
let persist_client = PersistClient::new(
cfg,
blob,
Expand Down Expand Up @@ -259,21 +258,7 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
diagnostics,
)
.await?;

if !write_handle.upper().is_empty() {
let empty_batch: Vec<(
(crate::cli::inspect::K, crate::cli::inspect::V),
u64,
i64,
)> = vec![];
let lower = write_handle.upper().clone();
let upper = Antichain::new();

let result = write_handle.append(empty_batch, lower, upper).await?;
if let Err(err) = result {
anyhow::bail!("failed to force downgrade upper, {err:?}");
}
}
write_handle.advance_upper(&Antichain::new()).await;
}

if force_downgrade_since {
Expand Down
16 changes: 16 additions & 0 deletions src/persist-client/src/internal/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,22 @@ where
})
}

/// Returns the ID of the given schema, if known at the current state.
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
self.state
.read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
// The common case is that the requested schema is a recent one, so as a minor
// optimization, do this search in reverse order.
let mut schemas = state.collections.schemas.iter().rev();
schemas
.find(|(_, x)| {
K::decode_schema(&x.key) == *key_schema
&& V::decode_schema(&x.val) == *val_schema
})
.map(|(id, _)| *id)
})
}

/// Returns whether the current's state `since` and `upper` are both empty.
///
/// Due to sharing state with other handles, successive reads to this fn or any other may
Expand Down
2 changes: 0 additions & 2 deletions src/persist-client/src/internal/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ use crate::{PersistConfig, ShardId, WriterId, cfg};
/// A key and value `Schema` of data written to a batch or shard.
#[derive(Debug)]
pub struct Schemas<K: Codec, V: Codec> {
// TODO: Remove the Option once this finishes rolling out and all shards
// have a registered schema.
Comment on lines -68 to -69
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this TODO because we need the Option now to support WriteHandles who haven't yet registered their schema.

/// Id under which this schema is registered in the shard's schema registry,
/// if any.
pub id: Option<SchemaId>,
Expand Down
5 changes: 5 additions & 0 deletions src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,11 @@ where
self.applier.latest_schema()
}

/// Returns the ID of the given schema, if known at the current state.
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
self.applier.find_schema(key_schema, val_schema)
}

/// See [crate::PersistClient::compare_and_evolve_schema].
///
/// TODO: Unify this with [Self::register_schema]?
Expand Down
42 changes: 8 additions & 34 deletions src/persist-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use differential_dataflow::lattice::Lattice;
use itertools::Itertools;
use mz_build_info::{BuildInfo, build_info};
use mz_dyncfg::ConfigSet;
use mz_ore::{instrument, soft_assert_or_log};
use mz_ore::instrument;
use mz_persist::location::{Blob, Consensus, ExternalError};
use mz_persist_types::schema::SchemaId;
use mz_persist_types::{Codec, Codec64, Opaque};
Expand Down Expand Up @@ -490,10 +490,6 @@ impl PersistClient {
///
/// Use this to save latency and a bit of persist traffic if you're just
/// going to immediately drop or expire the [ReadHandle].
///
/// The `_schema` parameter is currently unused, but should be an object
/// that represents the schema of the data in the shard. This will be required
/// in the future.
#[instrument(level = "debug", fields(shard = %shard_id))]
pub async fn open_writer<K, V, T, D>(
&self,
Expand All @@ -511,23 +507,11 @@ impl PersistClient {
let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));

// TODO: Because schemas are ordered, as part of the persist schema
// changes work, we probably want to build some way to allow persist
// users to control the order. For example, maybe a
// `PersistClient::compare_and_append_schema(current_schema_id,
// next_schema)`. Presumably this would then be passed in to open_writer
// instead of us implicitly registering it here.
// NB: The overwhelming common case is that this schema is already
// registered. In this case, the cmd breaks early and nothing is
// written to (or read from) CRDB.
let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await;
maintenance.start_performing(&machine, &gc);
soft_assert_or_log!(
schema_id.is_some(),
"unable to register schemas {:?} {:?}",
key_schema,
val_schema,
);
// We defer registering the schema until write time, to allow opening
// write handles in a "read-only" mode where they don't implicitly
// modify persist state. But it might already be registered, in which
// case we can fetch its ID.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little nervous about how this changed the semantics of the schema_id method... but it turns out that's only used by txn-wal in what looks like a safe pattern. Still a possible footgun, but hopefully one we can mitigate in the future...

let schema_id = machine.find_schema(&*key_schema, &*val_schema);

let writer_id = WriterId::new();
let schemas = Schemas {
Expand Down Expand Up @@ -1992,7 +1976,6 @@ mod tests {
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
const EMPTY: &[(((), ()), u64, i64)] = &[];
let persist_client = new_test_client(&dyncfgs).await;

let shard_id = ShardId::new();
Expand All @@ -2006,11 +1989,7 @@ mod tests {
// Advance since and upper to empty, which is a pre-requisite for
// finalization/tombstoning.
let () = read.downgrade_since(&Antichain::new()).await;
let () = write
.compare_and_append(EMPTY, Antichain::from_elem(0), Antichain::new())
.await
.expect("usage should be valid")
.expect("upper should match");
let () = write.advance_upper(&Antichain::new()).await;

let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
.open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
Expand Down Expand Up @@ -2047,7 +2026,6 @@ mod tests {
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn finalize_shard(dyncfgs: ConfigUpdates) {
const EMPTY: &[(((), ()), u64, i64)] = &[];
const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
let persist_client = new_test_client(&dyncfgs).await;

Expand All @@ -2069,11 +2047,7 @@ mod tests {
// Advance since and upper to empty, which is a pre-requisite for
// finalization/tombstoning.
let () = read.downgrade_since(&Antichain::new()).await;
let () = write
.compare_and_append(EMPTY, Antichain::from_elem(1), Antichain::new())
.await
.expect("usage should be valid")
.expect("upper should match");
let () = write.advance_upper(&Antichain::new()).await;

let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
.open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
Expand Down
4 changes: 3 additions & 1 deletion src/persist-client/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ mod tests {
let schema0 = StringsSchema(vec![false]);
let schema1 = StringsSchema(vec![false, true]);

let write0 = client
let mut write0 = client
.open_writer::<Strings, (), u64, i64>(
shard_id,
Arc::new(schema0.clone()),
Expand All @@ -614,6 +614,8 @@ mod tests {
)
.await
.unwrap();

write0.ensure_schema_registered().await;
assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0));

// Not backward compatible (yet... we don't support dropping a column at
Expand Down
72 changes: 72 additions & 0 deletions src/persist-client/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,31 @@ where
self.write_schemas.id
}

/// Registers the write schema, if it isn't already registered.
///
/// # Panics
///
/// This method expects that either the shard doesn't yet have any schema registered, or one of
/// the registered schemas is the same as the write schema. If all registered schemas are
/// different from the write schema, it panics.
pub async fn ensure_schema_registered(&mut self) -> SchemaId {
let Schemas { id, key, val } = &self.write_schemas;

if let Some(id) = id {
return *id;
}

let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
maintenance.start_performing(&self.machine, &self.gc);

let Some(schema_id) = schema_id else {
panic!("unable to register schemas: {key:?} {val:?}");
};
Comment on lines +241 to +243
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that previously in open_writer we only soft-asserted that schema registration was successful. That seems dangerous to me. Presumably writing to a shard with an incompatible schema would lead to all kinds of issues? Perhaps it was fine because the CaA logic also performs validation.

Since ensure_schema_registered is only called when we know we will need the schema, it seems fine to panic here. Also fine to return an Option like register_schema does and then unwrap at the call site. Lmk which you prefer!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems dangerous to me.

Maybe! Note that prior to this stuff being added, the schema could change ~arbitrarily without Persist noticing, so it was a strict improvement in safety.

But anyways I don't think this assert has tripped at all in the year or so it's been in prod, so it seems safe to strengthen it now.


self.write_schemas.id = Some(schema_id);
schema_id
}

/// A cached version of the shard-global `upper` frontier.
///
/// This is the most recent upper discovered by this handle. It is
Expand Down Expand Up @@ -256,6 +281,50 @@ where
&self.upper
}

/// Advance the shard's upper by the given frontier.
///
/// If the provided `target` is less than or equal to the shard's upper, this is a no-op.
///
/// In contrast to the various compare-and-append methods, this method does not require the
/// handle's write schema to be registered with the shard. That is, it is fine to use a dummy
/// schema when creating a writer just to advance a shard upper.
pub async fn advance_upper(&mut self, target: &Antichain<T>) {
// We avoid `fetch_recent_upper` here, to avoid a consensus roundtrip if the known upper is
// already beyond the target.
let mut lower = self.shared_upper().clone();

while !PartialOrder::less_equal(target, &lower) {
let since = Antichain::from_elem(T::minimum());
let desc = Description::new(lower.clone(), target.clone(), since);
let batch = HollowBatch::empty(desc);

let heartbeat_timestamp = (self.cfg.now)();
let res = self
.machine
.compare_and_append(
&batch,
&self.writer_id,
&self.debug_state,
heartbeat_timestamp,
)
.await;

use CompareAndAppendRes::*;
let new_upper = match res {
Success(_seq_no, maintenance) => {
maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
batch.desc.upper().clone()
}
UpperMismatch(_seq_no, actual_upper) => actual_upper,
InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
InlineBackpressure => unreachable!("batch was empty"),
};

self.upper.clone_from(&new_upper);
lower = new_upper;
}
}

/// Applies `updates` to this shard and downgrades this handle's upper to
/// `upper`.
///
Expand Down Expand Up @@ -507,6 +576,9 @@ where
}
}

// Before we append any data, we require a registered write schema.
self.ensure_schema_registered().await;

let lower = expected_upper.clone();
let upper = new_upper;
let since = Antichain::from_elem(T::minimum());
Expand Down
33 changes: 7 additions & 26 deletions src/storage-client/src/storage_collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3248,40 +3248,23 @@ async fn finalize_shards_task<T>(
Some(shard_id)
} else {
debug!(%shard_id, "finalizing shard");
let finalize = || async move {
let finalize = || async move {
// TODO: thread the global ID into the shard finalization WAL
let diagnostics = Diagnostics::from_purpose("finalizing shards");

let schemas = persist_client.latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics.clone()).await.expect("codecs have not changed");
let (key_schema, val_schema) = match schemas {
Some((_, key_schema, val_schema)) => (key_schema, val_schema),
None => (RelationDesc::empty(), UnitSchema),
};

let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![];
// We only use the writer to advance the upper, so using a dummy schema is
// fine.
let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
persist_client
.open_writer(
shard_id,
Arc::new(key_schema),
Arc::new(val_schema),
Arc::new(RelationDesc::empty()),
Arc::new(UnitSchema),
diagnostics,
)
.await
.expect("invalid persist usage");

let upper = write_handle.upper();

if !upper.is_empty() {
let append = write_handle
.append(empty_batch, upper.clone(), Antichain::new())
.await?;

if let Err(e) = append {
warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}");
return Ok(());
}
}
write_handle.advance_upper(&Antichain::new()).await;
write_handle.expire().await;

if force_downgrade_since {
Expand Down Expand Up @@ -3317,9 +3300,7 @@ async fn finalize_shards_task<T>(
.compare_and_downgrade_since(&epoch, (&epoch, &new_since))
.await;
if let Err(e) = downgrade {
warn!(
"tried to finalize a shard with an advancing epoch: {e:?}"
);
warn!("tried to finalize a shard with an advancing epoch: {e:?}");
return Ok(());
}
// Not available now, so finalization is broken.
Expand Down
Loading