@@ -21,27 +21,39 @@ use std::sync::Arc;
21
21
22
22
use differential_dataflow:: difference:: Semigroup ;
23
23
use differential_dataflow:: lattice:: Lattice ;
24
+ use differential_dataflow:: trace:: Description ;
24
25
use mz_build_info:: { BuildInfo , build_info} ;
25
26
use mz_dyncfg:: ConfigSet ;
26
27
use mz_ore:: { instrument, soft_assert_or_log} ;
27
28
use mz_persist:: location:: { Blob , Consensus , ExternalError } ;
28
29
use mz_persist_types:: schema:: SchemaId ;
29
30
use mz_persist_types:: { Codec , Codec64 , Opaque } ;
30
- use timely:: progress:: Timestamp ;
31
+ use mz_proto:: { IntoRustIfSome , ProtoType } ;
32
+ use semver:: Version ;
33
+ use timely:: progress:: { Antichain , Timestamp } ;
31
34
32
35
use crate :: async_runtime:: IsolatedRuntime ;
36
+ use crate :: batch:: {
37
+ BATCH_DELETE_ENABLED , BLOB_TARGET_SIZE , Batch , BatchBuilder , BatchBuilderConfig ,
38
+ BatchBuilderInternal , BatchParts , ProtoBatch ,
39
+ } ;
33
40
use crate :: cache:: { PersistClientCache , StateCache } ;
34
- use crate :: cfg:: PersistConfig ;
41
+ use crate :: cfg:: { COMPACTION_MEMORY_BOUND_BYTES , PersistConfig } ;
35
42
use crate :: critical:: { CriticalReaderId , SinceHandle } ;
36
43
use crate :: error:: InvalidUsage ;
37
- use crate :: fetch:: { BatchFetcher , BatchFetcherConfig } ;
38
- use crate :: internal:: compact:: Compactor ;
39
- use crate :: internal:: encoding:: { Schemas , parse_id} ;
44
+ use crate :: fetch:: { BatchFetcher , BatchFetcherConfig , FetchBatchFilter , Lease } ;
45
+ use crate :: internal:: compact:: { CompactConfig , Compactor } ;
46
+ use crate :: internal:: encoding:: parse_id;
40
47
use crate :: internal:: gc:: GarbageCollector ;
41
48
use crate :: internal:: machine:: { Machine , retry_external} ;
49
+ use crate :: internal:: state:: RunOrder ;
42
50
use crate :: internal:: state_versions:: StateVersions ;
51
+ use crate :: iter:: { Consolidator , StructuredSort } ;
43
52
use crate :: metrics:: Metrics ;
44
- use crate :: read:: { LeasedReaderId , READER_LEASE_DURATION , ReadHandle } ;
53
+ use crate :: read:: {
54
+ Cursor , CursorConsolidator , LazyPartStats , LeasedReaderId , READER_LEASE_DURATION , ReadHandle ,
55
+ Since ,
56
+ } ;
45
57
use crate :: rpc:: PubSubSender ;
46
58
use crate :: schema:: CaESchema ;
47
59
use crate :: write:: { WriteHandle , WriterId } ;
@@ -121,6 +133,9 @@ pub const BUILD_INFO: BuildInfo = build_info!();
121
133
// Re-export for convenience.
122
134
pub use mz_persist_types:: { PersistLocation , ShardId } ;
123
135
136
+ pub use crate :: internal:: encoding:: Schemas ;
137
+ pub use crate :: internal:: state:: HollowBatch ;
138
+
124
139
/// Additional diagnostic information used within Persist
125
140
/// e.g. for logging, metric labels, etc.
126
141
#[ derive( Clone , Debug ) ]
@@ -539,6 +554,188 @@ impl PersistClient {
539
554
Ok ( writer)
540
555
}
541
556
557
+ /// Returns a [BatchBuilder] that can be used to write a batch of updates to
558
+ /// blob storage which can then be appended to the given shard using
559
+ /// [WriteHandle::compare_and_append_batch] or [WriteHandle::append_batch],
560
+ /// or which can be read using [PersistClient::read_batches_consolidated].
561
+ ///
562
+ /// The builder uses a bounded amount of memory, even when the number of
563
+ /// updates is very large. Individual records, however, should be small
564
+ /// enough that we can reasonably chunk them up: O(KB) is definitely fine,
565
+ /// O(MB) come talk to us.
566
+ #[ instrument( level = "debug" , fields( shard = %shard_id) ) ]
567
+ pub async fn batch_builder < K , V , T , D > (
568
+ & self ,
569
+ shard_id : ShardId ,
570
+ write_schemas : Schemas < K , V > ,
571
+ lower : Antichain < T > ,
572
+ ) -> Result < BatchBuilder < K , V , T , D > , InvalidUsage < T > >
573
+ where
574
+ K : Debug + Codec ,
575
+ V : Debug + Codec ,
576
+ T : Timestamp + Lattice + Codec64 + Sync ,
577
+ D : Semigroup + Ord + Codec64 + Send + Sync ,
578
+ {
579
+ let cfg = CompactConfig :: new ( & self . cfg , shard_id) ;
580
+ // WIP: Pass this in as an argument?
581
+ let shard_metrics = self . metrics . shards . shard ( & shard_id, "peek_stash" ) ;
582
+
583
+ let parts = if let Some ( max_runs) = cfg. batch . max_runs {
584
+ BatchParts :: new_compacting :: < K , V , D > (
585
+ cfg,
586
+ Description :: new (
587
+ lower. clone ( ) ,
588
+ Antichain :: new ( ) ,
589
+ Antichain :: from_elem ( T :: minimum ( ) ) ,
590
+ ) ,
591
+ max_runs,
592
+ Arc :: clone ( & self . metrics ) ,
593
+ shard_metrics,
594
+ shard_id,
595
+ Arc :: clone ( & self . blob ) ,
596
+ Arc :: clone ( & self . isolated_runtime ) ,
597
+ & self . metrics . user ,
598
+ write_schemas. clone ( ) ,
599
+ )
600
+ } else {
601
+ BatchParts :: new_ordered (
602
+ cfg. batch ,
603
+ RunOrder :: Unordered ,
604
+ Arc :: clone ( & self . metrics ) ,
605
+ shard_metrics,
606
+ shard_id,
607
+ Arc :: clone ( & self . blob ) ,
608
+ Arc :: clone ( & self . isolated_runtime ) ,
609
+ & self . metrics . user ,
610
+ )
611
+ } ;
612
+ let builder = BatchBuilderInternal :: new (
613
+ BatchBuilderConfig :: new ( & self . cfg , shard_id) ,
614
+ parts,
615
+ Arc :: clone ( & self . metrics ) ,
616
+ write_schemas. clone ( ) ,
617
+ Arc :: clone ( & self . blob ) ,
618
+ shard_id,
619
+ self . cfg . build_version . clone ( ) ,
620
+ ) ;
621
+ Ok ( BatchBuilder :: new (
622
+ builder,
623
+ Description :: new ( lower, Antichain :: new ( ) , Antichain :: from_elem ( T :: minimum ( ) ) ) ,
624
+ ) )
625
+ }
626
+
627
+ /// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used
628
+ /// to append it to the given shard or to read it via
629
+ /// [PersistClient::read_batches_consolidated]
630
+ pub fn batch_from_transmittable_batch < K , V , T , D > (
631
+ & self ,
632
+ shard_id : & ShardId ,
633
+ batch : ProtoBatch ,
634
+ ) -> Batch < K , V , T , D >
635
+ where
636
+ K : Debug + Codec ,
637
+ V : Debug + Codec ,
638
+ T : Timestamp + Lattice + Codec64 + Sync ,
639
+ D : Semigroup + Ord + Codec64 + Send + Sync ,
640
+ {
641
+ let batch_shard_id: ShardId = batch
642
+ . shard_id
643
+ . into_rust ( )
644
+ . expect ( "valid transmittable batch" ) ;
645
+ assert_eq ! ( & batch_shard_id, shard_id) ;
646
+
647
+ let shard_metrics = self . metrics . shards . shard ( shard_id, "peek_stash" ) ;
648
+
649
+ let ret = Batch {
650
+ batch_delete_enabled : BATCH_DELETE_ENABLED . get ( & self . cfg ) ,
651
+ metrics : Arc :: clone ( & self . metrics ) ,
652
+ shard_metrics,
653
+ version : Version :: parse ( & batch. version ) . expect ( "valid transmittable batch" ) ,
654
+ batch : batch
655
+ . batch
656
+ . into_rust_if_some ( "ProtoBatch::batch" )
657
+ . expect ( "valid transmittable batch" ) ,
658
+ blob : Arc :: clone ( & self . blob ) ,
659
+ _phantom : std:: marker:: PhantomData ,
660
+ } ;
661
+
662
+ assert_eq ! ( & ret. shard_id( ) , shard_id) ;
663
+ ret
664
+ }
665
+
666
+ /// Returns a [Cursor] for reading the given batches. Yielded updates are
667
+ /// consolidated if the given batches contain sorted runs, which is true
668
+ /// when they have been written using a [BatchBuilder].
669
+ ///
670
+ /// To keep memory usage down when reading a snapshot that consolidates
671
+ /// well, this consolidates as it goes. However, note that only the
672
+ /// serialized data is consolidated: the deserialized data will only be
673
+ /// consolidated if your K/V codecs are one-to-one.
674
+ // WIP: Do we want to let callers inject sth like MFP here?
675
+ // WIP: This doesn't need async right now, but still might want it in the
676
+ // API to have the option in the future?
677
+ #[ allow( clippy:: unused_async) ]
678
+ pub async fn read_batches_consolidated < K , V , T , D > (
679
+ & mut self ,
680
+ shard_id : ShardId ,
681
+ as_of : Antichain < T > ,
682
+ read_schemas : Schemas < K , V > ,
683
+ batches : & [ Batch < K , V , T , D > ] ,
684
+ should_fetch_part : impl for < ' a > Fn ( Option < & ' a LazyPartStats > ) -> bool ,
685
+ ) -> Result < Cursor < K , V , T , D > , Since < T > >
686
+ where
687
+ K : Debug + Codec ,
688
+ V : Debug + Codec ,
689
+ T : Timestamp + Lattice + Codec64 + Sync ,
690
+ D : Semigroup + Ord + Codec64 + Send + Sync ,
691
+ {
692
+ let context = format ! ( "{}[as_of={:?}]" , shard_id, as_of. elements( ) ) ;
693
+ let filter = FetchBatchFilter :: Snapshot {
694
+ as_of : as_of. clone ( ) ,
695
+ } ;
696
+
697
+ let shard_metrics = self . metrics . shards . shard ( & shard_id, "peek_stash" ) ;
698
+
699
+ let consolidator = {
700
+ let mut consolidator = Consolidator :: new (
701
+ context,
702
+ shard_id,
703
+ StructuredSort :: new ( read_schemas. clone ( ) ) ,
704
+ Arc :: clone ( & self . blob ) ,
705
+ Arc :: clone ( & self . metrics ) ,
706
+ Arc :: clone ( & shard_metrics) ,
707
+ self . metrics . read . snapshot . clone ( ) ,
708
+ filter,
709
+ COMPACTION_MEMORY_BOUND_BYTES . get ( & self . cfg ) ,
710
+ ) ;
711
+ for batch in batches {
712
+ for ( meta, run) in batch. batch . runs ( ) {
713
+ consolidator. enqueue_run (
714
+ & batch. batch . desc ,
715
+ meta,
716
+ run. into_iter ( )
717
+ . filter ( |p| should_fetch_part ( p. stats ( ) ) )
718
+ . cloned ( ) ,
719
+ ) ;
720
+ }
721
+ }
722
+ CursorConsolidator :: Structured {
723
+ consolidator,
724
+ // This default may end up consolidating more records than previously
725
+ // for cases like fast-path peeks, where only the first few entries are used.
726
+ // If this is a noticeable performance impact, thread the max-len in from the caller.
727
+ max_len : self . cfg . compaction_yield_after_n_updates ,
728
+ max_bytes : BLOB_TARGET_SIZE . get ( & self . cfg ) . max ( 1 ) ,
729
+ }
730
+ } ;
731
+
732
+ Ok ( Cursor {
733
+ consolidator,
734
+ _lease : Lease :: default ( ) ,
735
+ read_schemas,
736
+ } )
737
+ }
738
+
542
739
/// Returns the requested schema, if known at the current state.
543
740
pub async fn get_schema < K , V , T , D > (
544
741
& self ,
0 commit comments