@@ -20,10 +20,10 @@ use differential_dataflow::Hashable;
2020use differential_dataflow:: difference:: Semigroup ;
2121use differential_dataflow:: lattice:: Lattice ;
2222use futures:: StreamExt ;
23- use mz_dyncfg:: { Config , ConfigSet , ConfigUpdates } ;
23+ use mz_dyncfg:: { Config , ConfigSet } ;
2424use mz_ore:: cast:: CastFrom ;
2525use mz_ore:: task:: JoinHandleExt ;
26- use mz_persist_client:: cfg:: { RetryParameters , USE_GLOBAL_TXN_CACHE_SOURCE } ;
26+ use mz_persist_client:: cfg:: RetryParameters ;
2727use mz_persist_client:: operators:: shard_source:: {
2828 ErrorHandler , FilterResult , SnapshotMode , shard_source,
2929} ;
@@ -48,7 +48,7 @@ use tracing::debug;
4848
4949use crate :: TxnsCodecDefault ;
5050use crate :: txn_cache:: TxnsCache ;
51- use crate :: txn_read:: { DataListenNext , DataRemapEntry , TxnsRead } ;
51+ use crate :: txn_read:: { DataRemapEntry , TxnsRead } ;
5252
5353/// An operator for translating physical data shard frontiers into logical ones.
5454///
@@ -96,7 +96,6 @@ pub fn txns_progress<K, V, T, D, P, C, F, G>(
9696 passthrough : Stream < G , P > ,
9797 name : & str ,
9898 ctx : & TxnsContext ,
99- worker_dyncfgs : & ConfigSet ,
10099 client_fn : impl Fn ( ) -> F ,
101100 txns_id : ShardId ,
102101 data_id : ShardId ,
@@ -116,32 +115,18 @@ where
116115 G : Scope < Timestamp = T > ,
117116{
118117 let unique_id = ( name, passthrough. scope ( ) . addr ( ) ) . hashed ( ) ;
119- let ( remap, source_button) = if USE_GLOBAL_TXN_CACHE_SOURCE . get ( worker_dyncfgs) {
120- txns_progress_source_global :: < K , V , T , D , P , C , G > (
121- passthrough. scope ( ) ,
122- name,
123- ctx. clone ( ) ,
124- client_fn ( ) ,
125- txns_id,
126- data_id,
127- as_of,
128- data_key_schema,
129- data_val_schema,
130- unique_id,
131- )
132- } else {
133- txns_progress_source_local :: < K , V , T , D , P , C , G > (
134- passthrough. scope ( ) ,
135- name,
136- client_fn ( ) ,
137- txns_id,
138- data_id,
139- as_of,
140- data_key_schema,
141- data_val_schema,
142- unique_id,
143- )
144- } ;
118+ let ( remap, source_button) = txns_progress_source_global :: < K , V , T , D , P , C , G > (
119+ passthrough. scope ( ) ,
120+ name,
121+ ctx. clone ( ) ,
122+ client_fn ( ) ,
123+ txns_id,
124+ data_id,
125+ as_of,
126+ data_key_schema,
127+ data_val_schema,
128+ unique_id,
129+ ) ;
145130 // Each of the `txns_frontiers` workers wants the full copy of the remap
146131 // information.
147132 let remap = remap. broadcast ( ) ;
@@ -156,110 +141,6 @@ where
156141 ( passthrough, vec ! [ source_button, frontiers_button] )
157142}
158143
159- /// An alternative implementation of [`txns_progress_source_global`] that opens
160- /// a new [`TxnsCache`] local to the operator.
161- fn txns_progress_source_local < K , V , T , D , P , C , G > (
162- scope : G ,
163- name : & str ,
164- client : impl Future < Output = PersistClient > + ' static ,
165- txns_id : ShardId ,
166- data_id : ShardId ,
167- as_of : T ,
168- data_key_schema : Arc < K :: Schema > ,
169- data_val_schema : Arc < V :: Schema > ,
170- unique_id : u64 ,
171- ) -> ( Stream < G , DataRemapEntry < T > > , PressOnDropButton )
172- where
173- K : Debug + Codec + Send + Sync ,
174- V : Debug + Codec + Send + Sync ,
175- T : Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync ,
176- D : Debug + Data + Semigroup + Ord + Codec64 + Send + Sync ,
177- P : Debug + Data ,
178- C : TxnsCodec + ' static ,
179- G : Scope < Timestamp = T > ,
180- {
181- let worker_idx = scope. index ( ) ;
182- let chosen_worker = usize:: cast_from ( name. hashed ( ) ) % scope. peers ( ) ;
183- let name = format ! ( "txns_progress_source({})" , name) ;
184- let mut builder = AsyncOperatorBuilder :: new ( name. clone ( ) , scope) ;
185- let name = format ! ( "{} [{}] {:.9}" , name, unique_id, data_id. to_string( ) ) ;
186- let ( remap_output, remap_stream) = builder. new_output ( ) ;
187-
188- let shutdown_button = builder. build ( move |capabilities| async move {
189- if worker_idx != chosen_worker {
190- return ;
191- }
192-
193- let [ mut cap] : [ _ ; 1 ] = capabilities. try_into ( ) . expect ( "one capability per output" ) ;
194- let client = client. await ;
195- let mut txns_cache = TxnsCache :: < T , C > :: open ( & client, txns_id, Some ( data_id) ) . await ;
196-
197- let _ = txns_cache. update_gt ( & as_of) . await ;
198- let mut subscribe = txns_cache. data_subscribe ( data_id, as_of. clone ( ) ) ;
199- let data_write = client
200- . open_writer :: < K , V , T , D > (
201- data_id,
202- Arc :: clone ( & data_key_schema) ,
203- Arc :: clone ( & data_val_schema) ,
204- Diagnostics :: from_purpose ( "data read physical upper" ) ,
205- )
206- . await
207- . expect ( "schema shouldn't change" ) ;
208- if let Some ( snapshot) = subscribe. snapshot . take ( ) {
209- snapshot. unblock_read ( data_write) . await ;
210- }
211-
212- debug ! ( "{} emitting {:?}" , name, subscribe. remap) ;
213- remap_output. give ( & cap, subscribe. remap . clone ( ) ) ;
214-
215- loop {
216- let _ = txns_cache. update_ge ( & subscribe. remap . logical_upper ) . await ;
217- cap. downgrade ( & subscribe. remap . logical_upper ) ;
218- let data_listen_next =
219- txns_cache. data_listen_next ( & subscribe. data_id , & subscribe. remap . logical_upper ) ;
220- debug ! (
221- "{} data_listen_next at {:?}: {:?}" ,
222- name, subscribe. remap. logical_upper, data_listen_next,
223- ) ;
224- match data_listen_next {
225- // We've caught up to the txns upper and we have to wait for it
226- // to advance before asking again.
227- //
228- // Note that we're asking again with the same input, but once
229- // the cache is past remap.logical_upper (as it will be after
230- // this update_gt call), we're guaranteed to get an answer.
231- DataListenNext :: WaitForTxnsProgress => {
232- let _ = txns_cache. update_gt ( & subscribe. remap . logical_upper ) . await ;
233- }
234- // The data shard got a write!
235- DataListenNext :: ReadDataTo ( new_upper) => {
236- // A write means both the physical and logical upper advance.
237- subscribe. remap = DataRemapEntry {
238- physical_upper : new_upper. clone ( ) ,
239- logical_upper : new_upper,
240- } ;
241- debug ! ( "{} emitting {:?}" , name, subscribe. remap) ;
242- remap_output. give ( & cap, subscribe. remap . clone ( ) ) ;
243- }
244- // We know there are no writes in `[logical_upper,
245- // new_progress)`, so advance our output frontier.
246- DataListenNext :: EmitLogicalProgress ( new_progress) => {
247- assert ! ( subscribe. remap. physical_upper < new_progress) ;
248- assert ! ( subscribe. remap. logical_upper < new_progress) ;
249-
250- subscribe. remap . logical_upper = new_progress;
251- // As mentioned in the docs on `DataRemapEntry`, we only
252- // emit updates when the physical upper changes (which
253- // happens to makes the protocol a tiny bit more
254- // remap-like).
255- debug ! ( "{} not emitting {:?}" , name, subscribe. remap) ;
256- }
257- }
258- }
259- } ) ;
260- ( remap_stream, shutdown_button. press_on_drop ( ) )
261- }
262-
263144/// TODO: I'd much prefer the communication protocol between the two operators
264145/// to be exactly remap as defined in the [reclocking design doc]. However, we
265146/// can't quite recover exactly the information necessary to construct that at
@@ -638,7 +519,6 @@ impl DataSubscribe {
638519 data_id : ShardId ,
639520 as_of : u64 ,
640521 until : Antichain < u64 > ,
641- use_global_txn_cache : bool ,
642522 ) -> Self {
643523 let mut worker = Worker :: new (
644524 WorkerConfig :: default ( ) ,
@@ -675,18 +555,11 @@ impl DataSubscribe {
675555 } )
676556 } ) ;
677557 let data_stream = data_stream. probe_with ( & data) ;
678- // We purposely do not use the `ConfigSet` in `client` so that
679- // different tests can set different values.
680- let config_set = ConfigSet :: default ( ) . add ( & USE_GLOBAL_TXN_CACHE_SOURCE ) ;
681- let mut updates = ConfigUpdates :: default ( ) ;
682- updates. add ( & USE_GLOBAL_TXN_CACHE_SOURCE , use_global_txn_cache) ;
683- updates. apply ( & config_set) ;
684558 let ( data_stream, mut txns_progress_token) =
685559 txns_progress :: < String , ( ) , u64 , i64 , _ , TxnsCodecDefault , _ , _ > (
686560 data_stream,
687561 name,
688562 & TxnsContext :: default ( ) ,
689- & config_set,
690563 || std:: future:: ready ( client. clone ( ) ) ,
691564 txns_id,
692565 data_id,
@@ -843,7 +716,6 @@ impl DataSubscribeTask {
843716 data_id,
844717 as_of,
845718 Antichain :: new ( ) ,
846- true ,
847719 ) ;
848720 let mut output = Vec :: new ( ) ;
849721 loop {
@@ -1071,7 +943,6 @@ mod tests {
1071943 d0,
1072944 3 ,
1073945 Antichain :: from_elem ( until) ,
1074- true ,
1075946 ) ;
1076947 // Manually step the dataflow, instead of going through the
1077948 // `DataSubscribe` helper because we're interested in all captured
0 commit comments