@@ -19,6 +19,7 @@ use std::{
19
19
} ,
20
20
fmt:: Write ,
21
21
future:: Future ,
22
+ iter,
22
23
ops:: Bound ,
23
24
pin:: Pin ,
24
25
sync:: {
@@ -119,6 +120,7 @@ use metrics::write_persistence_global_timer;
119
120
use mysql_async:: Row ;
120
121
use serde:: Deserialize ;
121
122
use serde_json:: Value as JsonValue ;
123
+ use smallvec:: SmallVec ;
122
124
123
125
use crate :: {
124
126
chunks:: smart_chunks,
@@ -971,40 +973,43 @@ impl<RT: Runtime> PersistenceReader for MySqlReader<RT> {
971
973
let ids: Vec < _ > = ids. into_iter ( ) . collect ( ) ;
972
974
973
975
let mut result = BTreeMap :: new ( ) ;
974
- let mut results = vec ! [ ] ;
975
976
976
977
for chunk in smart_chunks ( & ids) {
977
- let mut params = vec ! [ ] ;
978
- for DocumentPrevTsQuery { id, ts, prev_ts } in chunk {
979
- params. push ( i64:: from ( * ts) . into ( ) ) ;
978
+ let mut params = Vec :: with_capacity ( chunk. len ( ) * 3 ) ;
979
+ let mut id_ts_to_query: HashMap <
980
+ ( InternalDocumentId , Timestamp ) ,
981
+ SmallVec < [ DocumentPrevTsQuery ; 1 ] > ,
982
+ > = HashMap :: with_capacity ( chunk. len ( ) ) ;
983
+ for q @ & DocumentPrevTsQuery { id, ts : _, prev_ts } in chunk {
980
984
params. push ( internal_id_param ( id. table ( ) . 0 ) . into ( ) ) ;
981
- params. push ( internal_doc_id_param ( * id) . into ( ) ) ;
982
- params. push ( i64:: from ( * prev_ts) . into ( ) ) ;
985
+ params. push ( internal_doc_id_param ( id) . into ( ) ) ;
986
+ params. push ( i64:: from ( prev_ts) . into ( ) ) ;
987
+ // the underlying query does not care about `ts` and will
988
+ // deduplicate, so create a map from DB results back to queries
989
+ id_ts_to_query. entry ( ( id, prev_ts) ) . or_default ( ) . push ( * q) ;
983
990
}
984
991
let result_stream = client
985
992
. query_stream ( exact_rev_chunk ( chunk. len ( ) ) , params, chunk. len ( ) )
986
993
. await ?;
987
994
pin_mut ! ( result_stream) ;
988
- while let Some ( result) = result_stream. try_next ( ) . await ? {
989
- results. push ( result) ;
995
+ while let Some ( row) = result_stream. try_next ( ) . await ? {
996
+ let ( prev_ts, id, maybe_doc, prev_prev_ts) = self . row_to_document ( row) ?;
997
+ let entry = DocumentLogEntry {
998
+ ts : prev_ts,
999
+ id,
1000
+ value : maybe_doc,
1001
+ prev_ts : prev_prev_ts,
1002
+ } ;
1003
+ let original_queries = id_ts_to_query
1004
+ . get ( & ( id, prev_ts) )
1005
+ . context ( "exact_rev_chunk query returned an unasked row" ) ?;
1006
+ for ( entry, & q) in
1007
+ iter:: repeat_n ( entry, original_queries. len ( ) ) . zip ( original_queries)
1008
+ {
1009
+ anyhow:: ensure!( result. insert( q, entry) . is_none( ) ) ;
1010
+ }
990
1011
}
991
1012
}
992
- for row in results. into_iter ( ) {
993
- let ts: i64 = row. get ( 6 ) . unwrap ( ) ;
994
- let ts = Timestamp :: try_from ( ts) ?;
995
- let ( prev_ts, id, maybe_doc, prev_prev_ts) = self . row_to_document ( row) ?;
996
- anyhow:: ensure!( result
997
- . insert(
998
- DocumentPrevTsQuery { id, ts, prev_ts } ,
999
- DocumentLogEntry {
1000
- ts: prev_ts,
1001
- id,
1002
- value: maybe_doc,
1003
- prev_ts: prev_prev_ts,
1004
- }
1005
- )
1006
- . is_none( ) ) ;
1007
- }
1008
1013
1009
1014
if let Some ( min_ts) = ids. iter ( ) . map ( |DocumentPrevTsQuery { ts, .. } | * ts) . min ( ) {
1010
1015
// Validate retention after finding documents
@@ -1700,23 +1705,18 @@ D.ts = I2.ts AND D.table_id = I2.table_id AND D.id = I2.document_id
1700
1705
static EXACT_REV_CHUNK_QUERIES : LazyLock < HashMap < usize , String > > = LazyLock :: new ( || {
1701
1706
smart_chunk_sizes ( )
1702
1707
. map ( |chunk_size| {
1703
- let select = r#"
1704
- SELECT id, ts, table_id, json_value, deleted, prev_ts, ? as query_ts
1708
+ let where_clause = iter:: repeat ( "(table_id = ? AND id = ? AND ts = ?)" )
1709
+ . take ( chunk_size)
1710
+ . join ( " OR " ) ;
1711
+ (
1712
+ chunk_size,
1713
+ format ! (
1714
+ "SELECT id, ts, table_id, json_value, deleted, prev_ts
1705
1715
FROM @db_name.documents FORCE INDEX (PRIMARY)
1706
- WHERE table_id = ? AND id = ? AND ts = ?
1707
- ORDER BY ts ASC, table_id ASC, id ASC
1708
- "# ;
1709
- let queries = ( 1 ..=chunk_size)
1710
- . map ( |i| format ! ( "q{i} AS ({select})" ) )
1711
- . join ( ", " ) ;
1712
- let union_all = ( 1 ..=chunk_size)
1713
- . map ( |i| {
1714
- format ! (
1715
- "SELECT id, ts, table_id, json_value, deleted, prev_ts, query_ts FROM q{i}"
1716
- )
1717
- } )
1718
- . join ( " UNION ALL " ) ;
1719
- ( chunk_size, format ! ( "WITH {queries} {union_all}" ) )
1716
+ WHERE {where_clause}
1717
+ ORDER BY ts ASC, table_id ASC, id ASC"
1718
+ ) ,
1719
+ )
1720
1720
} )
1721
1721
. collect ( )
1722
1722
} ) ;
0 commit comments