Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ pub fn build_compute_dataflow<A: Allocate>(
*source_id,
Arc::clone(&compute_state.persist_clients),
&compute_state.txns_ctx,
&compute_state.worker_config,
source.storage_metadata.clone(),
read_schema,
dataflow.as_of.clone(),
Expand Down
1 change: 0 additions & 1 deletion src/compute/src/sink/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ where
sink_id,
Arc::clone(&compute_state.persist_clients),
&compute_state.txns_ctx,
&compute_state.worker_config,
target,
None,
as_of,
Expand Down
8 changes: 0 additions & 8 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&crate::cfg::USE_CRITICAL_SINCE_CATALOG)
.add(&crate::cfg::USE_CRITICAL_SINCE_SOURCE)
.add(&crate::cfg::USE_CRITICAL_SINCE_SNAPSHOT)
.add(&crate::cfg::USE_GLOBAL_TXN_CACHE_SOURCE)
.add(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS)
.add(&COMPACTION_HEURISTIC_MIN_INPUTS)
.add(&COMPACTION_HEURISTIC_MIN_PARTS)
Expand Down Expand Up @@ -465,13 +464,6 @@ pub const USE_CRITICAL_SINCE_SNAPSHOT: Config<bool> = Config::new(
"Use the critical since (instead of the overall since) when taking snapshots in the controller or in fast-path peeks.",
);

/// Migrate the persist source to use a process global txn cache.
pub const USE_GLOBAL_TXN_CACHE_SOURCE: Config<bool> = Config::new(
"use_global_txn_cache_source",
true,
"Use the process global txn cache (instead of an operator local one) in the Persist source.",
);

/// The maximum number of parts (s3 blobs) that [crate::batch::BatchBuilder]
/// will pipeline before back-pressuring [crate::batch::BatchBuilder::add]
/// calls on previous ones finishing.
Expand Down
5 changes: 0 additions & 5 deletions src/storage-operators/src/persist_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
//! A source that reads from an a persist shard.

use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use mz_dyncfg::ConfigSet;
use std::convert::Infallible;
use std::fmt::Debug;
use std::future::Future;
Expand Down Expand Up @@ -139,9 +138,6 @@ pub fn persist_source<G>(
source_id: GlobalId,
persist_clients: Arc<PersistClientCache>,
txns_ctx: &TxnsContext,
// In case we need to use a dyncfg to decide which operators to render in a
// dataflow.
worker_dyncfgs: &ConfigSet,
metadata: CollectionMetadata,
read_schema: Option<RelationDesc>,
as_of: Option<Antichain<Timestamp>>,
Expand Down Expand Up @@ -236,7 +232,6 @@ where
stream,
&source_id.to_string(),
txns_ctx,
worker_dyncfgs,
move || {
let (c, l) = (
Arc::clone(&persist_clients),
Expand Down
1 change: 0 additions & 1 deletion src/storage/src/render/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ where
sink.from,
Arc::clone(&storage_state.persist_clients),
&storage_state.txns_ctx,
storage_state.storage_configuration.config_set(),
sink.from_storage_metadata.clone(),
None,
Some(sink.as_of.clone()),
Expand Down
3 changes: 1 addition & 2 deletions src/txn-wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@
//! .apply(&mut txns).await;
//!
//! // Read data shard(s) at some `read_ts`.
//! let mut subscribe = DataSubscribe::new("example", client, txns_id, d1, 4, Antichain::new(), true);
//! let mut subscribe = DataSubscribe::new("example", client, txns_id, d1, 4, Antichain::new());
//! while subscribe.progress() <= 4 {
//! subscribe.step();
//! # tokio::task::yield_now().await;
Expand Down Expand Up @@ -796,7 +796,6 @@ mod tests {
data_id,
as_of,
Antichain::new(),
true,
);
data_subscribe.step_past(until - 1).await;
data_subscribe
Expand Down
159 changes: 15 additions & 144 deletions src/txn-wal/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use differential_dataflow::Hashable;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use futures::StreamExt;
use mz_dyncfg::{Config, ConfigSet, ConfigUpdates};
use mz_dyncfg::{Config, ConfigSet};
use mz_ore::cast::CastFrom;
use mz_ore::task::JoinHandleExt;
use mz_persist_client::cfg::{RetryParameters, USE_GLOBAL_TXN_CACHE_SOURCE};
use mz_persist_client::cfg::RetryParameters;
use mz_persist_client::operators::shard_source::{
ErrorHandler, FilterResult, SnapshotMode, shard_source,
};
Expand All @@ -48,7 +48,7 @@ use tracing::debug;

use crate::TxnsCodecDefault;
use crate::txn_cache::TxnsCache;
use crate::txn_read::{DataListenNext, DataRemapEntry, TxnsRead};
use crate::txn_read::{DataRemapEntry, TxnsRead};

/// An operator for translating physical data shard frontiers into logical ones.
///
Expand Down Expand Up @@ -96,7 +96,6 @@ pub fn txns_progress<K, V, T, D, P, C, F, G>(
passthrough: Stream<G, P>,
name: &str,
ctx: &TxnsContext,
worker_dyncfgs: &ConfigSet,
client_fn: impl Fn() -> F,
txns_id: ShardId,
data_id: ShardId,
Expand All @@ -116,32 +115,18 @@ where
G: Scope<Timestamp = T>,
{
let unique_id = (name, passthrough.scope().addr()).hashed();
let (remap, source_button) = if USE_GLOBAL_TXN_CACHE_SOURCE.get(worker_dyncfgs) {
txns_progress_source_global::<K, V, T, D, P, C, G>(
passthrough.scope(),
name,
ctx.clone(),
client_fn(),
txns_id,
data_id,
as_of,
data_key_schema,
data_val_schema,
unique_id,
)
} else {
txns_progress_source_local::<K, V, T, D, P, C, G>(
passthrough.scope(),
name,
client_fn(),
txns_id,
data_id,
as_of,
data_key_schema,
data_val_schema,
unique_id,
)
};
let (remap, source_button) = txns_progress_source_global::<K, V, T, D, P, C, G>(
passthrough.scope(),
name,
ctx.clone(),
client_fn(),
txns_id,
data_id,
as_of,
data_key_schema,
data_val_schema,
unique_id,
);
// Each of the `txns_frontiers` workers wants the full copy of the remap
// information.
let remap = remap.broadcast();
Expand All @@ -156,110 +141,6 @@ where
(passthrough, vec![source_button, frontiers_button])
}

/// An alternative implementation of [`txns_progress_source_global`] that opens
/// a new [`TxnsCache`] local to the operator.
fn txns_progress_source_local<K, V, T, D, P, C, G>(
scope: G,
name: &str,
client: impl Future<Output = PersistClient> + 'static,
txns_id: ShardId,
data_id: ShardId,
as_of: T,
data_key_schema: Arc<K::Schema>,
data_val_schema: Arc<V::Schema>,
unique_id: u64,
) -> (Stream<G, DataRemapEntry<T>>, PressOnDropButton)
where
K: Debug + Codec + Send + Sync,
V: Debug + Codec + Send + Sync,
T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
D: Debug + Data + Semigroup + Ord + Codec64 + Send + Sync,
P: Debug + Data,
C: TxnsCodec + 'static,
G: Scope<Timestamp = T>,
{
let worker_idx = scope.index();
let chosen_worker = usize::cast_from(name.hashed()) % scope.peers();
let name = format!("txns_progress_source({})", name);
let mut builder = AsyncOperatorBuilder::new(name.clone(), scope);
let name = format!("{} [{}] {:.9}", name, unique_id, data_id.to_string());
let (remap_output, remap_stream) = builder.new_output();

let shutdown_button = builder.build(move |capabilities| async move {
if worker_idx != chosen_worker {
return;
}

let [mut cap]: [_; 1] = capabilities.try_into().expect("one capability per output");
let client = client.await;
let mut txns_cache = TxnsCache::<T, C>::open(&client, txns_id, Some(data_id)).await;

let _ = txns_cache.update_gt(&as_of).await;
let mut subscribe = txns_cache.data_subscribe(data_id, as_of.clone());
let data_write = client
.open_writer::<K, V, T, D>(
data_id,
Arc::clone(&data_key_schema),
Arc::clone(&data_val_schema),
Diagnostics::from_purpose("data read physical upper"),
)
.await
.expect("schema shouldn't change");
if let Some(snapshot) = subscribe.snapshot.take() {
snapshot.unblock_read(data_write).await;
}

debug!("{} emitting {:?}", name, subscribe.remap);
remap_output.give(&cap, subscribe.remap.clone());

loop {
let _ = txns_cache.update_ge(&subscribe.remap.logical_upper).await;
cap.downgrade(&subscribe.remap.logical_upper);
let data_listen_next =
txns_cache.data_listen_next(&subscribe.data_id, &subscribe.remap.logical_upper);
debug!(
"{} data_listen_next at {:?}: {:?}",
name, subscribe.remap.logical_upper, data_listen_next,
);
match data_listen_next {
// We've caught up to the txns upper and we have to wait for it
// to advance before asking again.
//
// Note that we're asking again with the same input, but once
// the cache is past remap.logical_upper (as it will be after
// this update_gt call), we're guaranteed to get an answer.
DataListenNext::WaitForTxnsProgress => {
let _ = txns_cache.update_gt(&subscribe.remap.logical_upper).await;
}
// The data shard got a write!
DataListenNext::ReadDataTo(new_upper) => {
// A write means both the physical and logical upper advance.
subscribe.remap = DataRemapEntry {
physical_upper: new_upper.clone(),
logical_upper: new_upper,
};
debug!("{} emitting {:?}", name, subscribe.remap);
remap_output.give(&cap, subscribe.remap.clone());
}
// We know there are no writes in `[logical_upper,
// new_progress)`, so advance our output frontier.
DataListenNext::EmitLogicalProgress(new_progress) => {
assert!(subscribe.remap.physical_upper < new_progress);
assert!(subscribe.remap.logical_upper < new_progress);

subscribe.remap.logical_upper = new_progress;
// As mentioned in the docs on `DataRemapEntry`, we only
// emit updates when the physical upper changes (which
// happens to makes the protocol a tiny bit more
// remap-like).
debug!("{} not emitting {:?}", name, subscribe.remap);
}
}
}
});
(remap_stream, shutdown_button.press_on_drop())
}

/// TODO: I'd much prefer the communication protocol between the two operators
/// to be exactly remap as defined in the [reclocking design doc]. However, we
/// can't quite recover exactly the information necessary to construct that at
Expand Down Expand Up @@ -638,7 +519,6 @@ impl DataSubscribe {
data_id: ShardId,
as_of: u64,
until: Antichain<u64>,
use_global_txn_cache: bool,
) -> Self {
let mut worker = Worker::new(
WorkerConfig::default(),
Expand Down Expand Up @@ -675,18 +555,11 @@ impl DataSubscribe {
})
});
let data_stream = data_stream.probe_with(&data);
// We purposely do not use the `ConfigSet` in `client` so that
// different tests can set different values.
let config_set = ConfigSet::default().add(&USE_GLOBAL_TXN_CACHE_SOURCE);
let mut updates = ConfigUpdates::default();
updates.add(&USE_GLOBAL_TXN_CACHE_SOURCE, use_global_txn_cache);
updates.apply(&config_set);
let (data_stream, mut txns_progress_token) =
txns_progress::<String, (), u64, i64, _, TxnsCodecDefault, _, _>(
data_stream,
name,
&TxnsContext::default(),
&config_set,
|| std::future::ready(client.clone()),
txns_id,
data_id,
Expand Down Expand Up @@ -843,7 +716,6 @@ impl DataSubscribeTask {
data_id,
as_of,
Antichain::new(),
true,
);
let mut output = Vec::new();
loop {
Expand Down Expand Up @@ -1071,7 +943,6 @@ mod tests {
d0,
3,
Antichain::from_elem(until),
true,
);
// Manually step the dataflow, instead of going through the
// `DataSubscribe` helper because we're interested in all captured
Expand Down
1 change: 0 additions & 1 deletion src/txn-wal/src/txn_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,6 @@ mod tests {
data_id,
as_of,
Antichain::new(),
true,
)
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/txn-wal/src/txns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1340,8 +1340,8 @@ mod tests {
debug!("stress update {:.9} to {}", data_id.to_string(), self.ts);
let _ = self.txns.txns_cache.update_ge(&self.ts).await;
}
4 => self.start_read(data_id, true),
5 => self.start_read(data_id, false),
4 => self.start_read(data_id),
5 => self.start_read(data_id),
_ => unreachable!(""),
}
debug!("stress {} step {} DONE ts={}", self.idx, self.step, self.ts);
Expand Down Expand Up @@ -1469,7 +1469,7 @@ mod tests {
.await
}

fn start_read(&mut self, data_id: ShardId, use_global_txn_cache: bool) {
fn start_read(&mut self, data_id: ShardId) {
debug!(
"stress start_read {:.9} at {}",
data_id.to_string(),
Expand All @@ -1490,7 +1490,6 @@ mod tests {
data_id,
as_of,
Antichain::new(),
use_global_txn_cache,
);
let data_id = format!("{:.9}", data_id.to_string());
let _guard = info_span!("read_worker", %data_id, as_of).entered();
Expand Down