Skip to content

persist: expose API for writing/reading "free-standing" batches #32513

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/persist-client/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,15 @@ where
/// Generally the state and lease are bundled together, as in [LeasedBatchPart]... but sometimes
/// it's necessary to handle them separately, so this struct is exposed as well. Handle with care.
#[derive(Clone, Debug)]
pub(crate) struct Lease(Arc<SeqNo>);
pub struct Lease(Arc<SeqNo>);

impl Lease {
/// Creates a new [Lease] that holds the given [SeqNo].
pub fn new(seqno: SeqNo) -> Self {
Self(Arc::new(seqno))
}

/// Returns the inner [SeqNo] of this [Lease].
#[cfg(test)]
pub fn seqno(&self) -> SeqNo {
*self.0
Expand Down
5 changes: 5 additions & 0 deletions src/persist-client/src/internal/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,17 @@ use crate::internal::trace::{
use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
use crate::{PersistConfig, ShardId, WriterId, cfg};

/// A key and value `Schema` of data written to a batch or shard.
#[derive(Debug)]
pub struct Schemas<K: Codec, V: Codec> {
// TODO: Remove the Option once this finishes rolling out and all shards
// have a registered schema.
/// Id under which this schema is registered in the shard's schema registry,
/// if any.
pub id: Option<SchemaId>,
/// Key `Schema`.
pub key: Arc<K::Schema>,
/// Value `Schema`.
pub val: Arc<V::Schema>,
}

Expand Down
6 changes: 3 additions & 3 deletions src/persist-client/src/internal/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ impl<T: Ord> Ord for HollowBatch<T> {
}

impl<T: Timestamp + Codec64 + Sync> HollowBatch<T> {
pub fn part_stream<'a>(
pub(crate) fn part_stream<'a>(
&'a self,
shard_id: ShardId,
blob: &'a dyn Blob,
Expand Down Expand Up @@ -973,11 +973,11 @@ impl<T> HollowBatch<T> {
self.parts.iter().map(|x| x.inline_bytes()).sum()
}

pub fn is_empty(&self) -> bool {
pub(crate) fn is_empty(&self) -> bool {
self.parts.is_empty()
}

pub fn part_count(&self) -> usize {
pub(crate) fn part_count(&self) -> usize {
self.parts.len()
}

Expand Down
141 changes: 138 additions & 3 deletions src/persist-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,33 @@ use std::sync::Arc;

use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use itertools::Itertools;
use mz_build_info::{BuildInfo, build_info};
use mz_dyncfg::ConfigSet;
use mz_ore::{instrument, soft_assert_or_log};
use mz_persist::location::{Blob, Consensus, ExternalError};
use mz_persist_types::schema::SchemaId;
use mz_persist_types::{Codec, Codec64, Opaque};
use timely::progress::Timestamp;
use mz_proto::{IntoRustIfSome, ProtoType};
use semver::Version;
use timely::progress::{Antichain, Timestamp};

use crate::async_runtime::IsolatedRuntime;
use crate::batch::{BATCH_DELETE_ENABLED, Batch, BatchBuilder, ProtoBatch};
use crate::cache::{PersistClientCache, StateCache};
use crate::cfg::PersistConfig;
use crate::critical::{CriticalReaderId, SinceHandle};
use crate::error::InvalidUsage;
use crate::fetch::{BatchFetcher, BatchFetcherConfig};
use crate::internal::compact::Compactor;
use crate::internal::encoding::{Schemas, parse_id};
use crate::internal::encoding::parse_id;
use crate::internal::gc::GarbageCollector;
use crate::internal::machine::{Machine, retry_external};
use crate::internal::state_versions::StateVersions;
use crate::metrics::Metrics;
use crate::read::{LeasedReaderId, READER_LEASE_DURATION, ReadHandle};
use crate::read::{
Cursor, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle, Since,
};
use crate::rpc::PubSubSender;
use crate::schema::CaESchema;
use crate::write::{WriteHandle, WriterId};
Expand Down Expand Up @@ -121,6 +127,8 @@ pub const BUILD_INFO: BuildInfo = build_info!();
// Re-export for convenience.
pub use mz_persist_types::{PersistLocation, ShardId};

pub use crate::internal::encoding::Schemas;

/// Additional diagnostic information used within Persist
/// e.g. for logging, metric labels, etc.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -539,6 +547,133 @@ impl PersistClient {
Ok(writer)
}

/// Returns a [BatchBuilder] that can be used to write a batch of updates to
/// blob storage which can then be appended to the given shard using
/// [WriteHandle::compare_and_append_batch] or [WriteHandle::append_batch],
/// or which can be read using [PersistClient::read_batches_consolidated].
///
/// The builder uses a bounded amount of memory, even when the number of
/// updates is very large. Individual records, however, should be small
/// enough that we can reasonably chunk them up: O(KB) is definitely fine,
/// O(MB) come talk to us.
#[instrument(level = "debug", fields(shard = %shard_id))]
pub async fn batch_builder<K, V, T, D>(
&self,
shard_id: ShardId,
write_schemas: Schemas<K, V>,
lower: Antichain<T>,
) -> BatchBuilder<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Ord + Codec64 + Send + Sync,
{
WriteHandle::builder_inner(
&self.cfg,
Arc::clone(&self.metrics),
self.metrics.shards.shard(&shard_id, "peek_stash"),
&self.metrics.user,
Arc::clone(&self.isolated_runtime),
Arc::clone(&self.blob),
shard_id,
write_schemas,
lower,
)
}

/// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used
/// to append it to the given shard or to read it via
/// [PersistClient::read_batches_consolidated]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat risky API, since it would be pretty easy for us to decode the same ProtoBatch twice and thus have two batches that think they have unique ownership.

Maybe that's necessary, but it feels like it at least warrants a warning in the name or docstring...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mirrors

pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
, but yeah I agree that the API is risky because of the double use. Unfortunately all bets are off when going from batches to proto batches and back, I think.

I'll add a warning in the docstring.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fair! The danger of moving code... everything old is under scrutiny again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is good, imo! 😅

///
/// CAUTION: This API allows turning a [ProtoBatch] into a [Batch] multiple
/// times, but if a batch is deleted the backing data goes away, so at that
/// point all in-memory copies of a batch become invalid and cannot be read
/// anymore.
pub fn batch_from_transmittable_batch<K, V, T, D>(
&self,
shard_id: &ShardId,
batch: ProtoBatch,
) -> Batch<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Ord + Codec64 + Send + Sync,
{
let batch_shard_id: ShardId = batch
.shard_id
.into_rust()
.expect("valid transmittable batch");
assert_eq!(&batch_shard_id, shard_id);

let shard_metrics = self.metrics.shards.shard(shard_id, "peek_stash");

let ret = Batch {
batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
metrics: Arc::clone(&self.metrics),
shard_metrics,
version: Version::parse(&batch.version).expect("valid transmittable batch"),
batch: batch
.batch
.into_rust_if_some("ProtoBatch::batch")
.expect("valid transmittable batch"),
blob: Arc::clone(&self.blob),
_phantom: std::marker::PhantomData,
};

assert_eq!(&ret.shard_id(), shard_id);
ret
}

/// Returns a [Cursor] for reading the given batches. Yielded updates are
/// consolidated if the given batches contain sorted runs, which is true
/// when they have been written using a [BatchBuilder].
///
/// To keep memory usage down when reading a snapshot that consolidates
/// well, this consolidates as it goes. However, note that only the
/// serialized data is consolidated: the deserialized data will only be
/// consolidated if your K/V codecs are one-to-one.
///
/// CAUTION: The caller needs to make sure that the given batches are
/// readable and they have to remain readable for the lifetime of the
/// returned [Cursor]. The caller is also responsible for the lifecycle of
/// the batches: once the cursor and the batches are no longer needed you
/// must call [Cursor::into_lease] to get back the batches and delete them.
#[allow(clippy::unused_async)]
pub async fn read_batches_consolidated<K, V, T, D>(
&mut self,
shard_id: ShardId,
as_of: Antichain<T>,
read_schemas: Schemas<K, V>,
batches: Vec<Batch<K, V, T, D>>,
should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
) -> Result<Cursor<K, V, T, D, Vec<Batch<K, V, T, D>>>, Since<T>>
where
K: Debug + Codec + Ord,
V: Debug + Codec + Ord,
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Ord + Codec64 + Send + Sync,
{
let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash");

let hollow_batches = batches.iter().map(|b| b.batch.clone()).collect_vec();

ReadHandle::read_batches_consolidated(
&self.cfg,
Arc::clone(&self.metrics),
shard_metrics,
self.metrics.read.snapshot.clone(),
Arc::clone(&self.blob),
shard_id,
as_of,
read_schemas,
&hollow_batches,
batches,
should_fetch_part,
)
}

/// Returns the requested schema, if known at the current state.
pub async fn get_schema<K, V, T, D>(
&self,
Expand Down
74 changes: 55 additions & 19 deletions src/persist-client/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, RetryParameters};
use crate::fetch::{FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart, fetch_leased_part};
use crate::internal::encoding::Schemas;
use crate::internal::machine::{ExpireFn, Machine};
use crate::internal::metrics::Metrics;
use crate::internal::metrics::{Metrics, ReadMetrics, ShardMetrics};
use crate::internal::state::{BatchPart, HollowBatch};
use crate::internal::watch::StateWatch;
use crate::iter::{Consolidator, StructuredSort};
Expand Down Expand Up @@ -865,22 +865,30 @@ pub(crate) struct UnexpiredReadHandleState {
/// client should call `next` until it returns `None`, which signals all data has been returned...
/// but it's also free to abandon the instance at any time if it eg. only needs a few entries.
#[derive(Debug)]
pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> {
consolidator: CursorConsolidator<K, V, T, D>,
_lease: Lease,
read_schemas: Schemas<K, V>,
pub struct Cursor<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L = Lease> {
pub(crate) consolidator: CursorConsolidator<K, V, T, D>,
pub(crate) _lease: L,
pub(crate) read_schemas: Schemas<K, V>,
}

impl<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64, L> Cursor<K, V, T, D, L> {
/// Extracts and returns the lease from the cursor. Allowing the caller to
/// do any necessary cleanup associated with the lease.
pub fn into_lease(self: Self) -> L {
self._lease
}
}

#[derive(Debug)]
enum CursorConsolidator<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> {
pub(crate) enum CursorConsolidator<K: Codec, V: Codec, T: Timestamp + Codec64, D: Codec64> {
Structured {
consolidator: Consolidator<T, D, StructuredSort<K, V, T, D>>,
max_len: usize,
max_bytes: usize,
},
}

impl<K, V, T, D> Cursor<K, V, T, D>
impl<K, V, T, D, L> Cursor<K, V, T, D, L>
where
K: Debug + Codec + Ord,
V: Debug + Codec + Ord,
Expand Down Expand Up @@ -985,24 +993,52 @@ where
should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
) -> Result<Cursor<K, V, T, D>, Since<T>> {
let batches = self.machine.snapshot(&as_of).await?;
let lease = self.lease_seqno();

Self::read_batches_consolidated(
&self.cfg,
Arc::clone(&self.metrics),
Arc::clone(&self.machine.applier.shard_metrics),
self.metrics.read.snapshot.clone(),
Arc::clone(&self.blob),
self.shard_id(),
as_of,
self.read_schemas.clone(),
&batches,
lease,
should_fetch_part,
)
}

let context = format!("{}[as_of={:?}]", self.shard_id(), as_of.elements());
pub(crate) fn read_batches_consolidated<L>(
persist_cfg: &PersistConfig,
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
read_metrics: ReadMetrics,
blob: Arc<dyn Blob>,
shard_id: ShardId,
as_of: Antichain<T>,
schemas: Schemas<K, V>,
batches: &[HollowBatch<T>],
lease: L,
should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
) -> Result<Cursor<K, V, T, D, L>, Since<T>> {
let context = format!("{}[as_of={:?}]", shard_id, as_of.elements());
let filter = FetchBatchFilter::Snapshot {
as_of: as_of.clone(),
};
let lease = self.lease_seqno();

let consolidator = {
let mut consolidator = Consolidator::new(
context,
self.shard_id(),
StructuredSort::new(self.read_schemas.clone()),
Arc::clone(&self.blob),
Arc::clone(&self.metrics),
Arc::clone(&self.machine.applier.shard_metrics),
self.metrics.read.snapshot.clone(),
shard_id,
StructuredSort::new(schemas.clone()),
blob,
metrics,
shard_metrics,
read_metrics,
filter,
COMPACTION_MEMORY_BOUND_BYTES.get(&self.cfg),
COMPACTION_MEMORY_BOUND_BYTES.get(persist_cfg),
);
for batch in batches {
for (meta, run) in batch.runs() {
Expand All @@ -1020,15 +1056,15 @@ where
// This default may end up consolidating more records than previously
// for cases like fast-path peeks, where only the first few entries are used.
// If this is a noticeable performance impact, thread the max-len in from the caller.
max_len: self.cfg.compaction_yield_after_n_updates,
max_bytes: BLOB_TARGET_SIZE.get(&self.cfg).max(1),
max_len: persist_cfg.compaction_yield_after_n_updates,
max_bytes: BLOB_TARGET_SIZE.get(persist_cfg).max(1),
}
};

Ok(Cursor {
consolidator,
_lease: lease,
read_schemas: self.read_schemas.clone(),
read_schemas: schemas,
})
}

Expand Down
Loading