@@ -327,110 +327,106 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
327
327
is_virtual_table,
328
328
)
329
329
}
330
+ }
330
331
331
- fn start_index_range (
332
- & mut self ,
333
- request : IndexRangeRequest ,
334
- ) -> anyhow:: Result < Result < DeveloperIndexRangeResponse , RangeRequest > > {
335
- if request. interval . is_empty ( ) {
336
- return Ok ( Ok ( DeveloperIndexRangeResponse {
337
- page : vec ! [ ] ,
338
- cursor : CursorPosition :: End ,
339
- } ) ) ;
340
- }
332
+ fn start_index_range < RT : Runtime > (
333
+ tx : & mut Transaction < RT > ,
334
+ request : IndexRangeRequest ,
335
+ ) -> anyhow:: Result < Result < DeveloperIndexRangeResponse , RangeRequest > > {
336
+ if request. interval . is_empty ( ) {
337
+ return Ok ( Ok ( DeveloperIndexRangeResponse {
338
+ page : vec ! [ ] ,
339
+ cursor : CursorPosition :: End ,
340
+ } ) ) ;
341
+ }
342
+
343
+ let max_rows = cmp:: min ( request. max_rows , MAX_PAGE_SIZE ) ;
341
344
342
- let max_rows = cmp:: min ( request. max_rows , MAX_PAGE_SIZE ) ;
345
+ match request. stable_index_name {
346
+ StableIndexName :: Physical ( tablet_index_name) => {
347
+ let index_name = tablet_index_name
348
+ . clone ( )
349
+ . map_table ( & tx. table_mapping ( ) . tablet_to_name ( ) ) ?;
350
+ Ok ( Err ( RangeRequest {
351
+ index_name : tablet_index_name. clone ( ) ,
352
+ printable_index_name : index_name,
353
+ interval : request. interval . clone ( ) ,
354
+ order : request. order ,
355
+ max_size : max_rows,
356
+ } ) )
357
+ } ,
358
+ StableIndexName :: Virtual ( index_name, tablet_index_name) => {
359
+ log_virtual_table_query ( ) ;
360
+ Ok ( Err ( RangeRequest {
361
+ index_name : tablet_index_name. clone ( ) ,
362
+ printable_index_name : index_name. clone ( ) ,
363
+ interval : request. interval . clone ( ) ,
364
+ order : request. order ,
365
+ max_size : max_rows,
366
+ } ) )
367
+ } ,
368
+ StableIndexName :: Missing => Ok ( Ok ( DeveloperIndexRangeResponse {
369
+ page : vec ! [ ] ,
370
+ cursor : CursorPosition :: End ,
371
+ } ) ) ,
372
+ }
373
+ }
343
374
344
- match request. stable_index_name {
345
- StableIndexName :: Physical ( tablet_index_name) => {
346
- let index_name = tablet_index_name
347
- . clone ( )
348
- . map_table ( & self . tx . table_mapping ( ) . tablet_to_name ( ) ) ?;
349
- Ok ( Err ( RangeRequest {
350
- index_name : tablet_index_name. clone ( ) ,
351
- printable_index_name : index_name,
352
- interval : request. interval . clone ( ) ,
353
- order : request. order ,
354
- max_size : max_rows,
355
- } ) )
375
+ /// NOTE: returns a page of results. Callers must call record_read_document
376
+ /// for all documents returned from the index stream.
377
+ #[ minitrace:: trace]
378
+ #[ convex_macro:: instrument_future]
379
+ pub async fn index_range_batch < RT : Runtime > (
380
+ tx : & mut Transaction < RT > ,
381
+ requests : BTreeMap < BatchKey , IndexRangeRequest > ,
382
+ ) -> BTreeMap < BatchKey , anyhow:: Result < DeveloperIndexRangeResponse > > {
383
+ let batch_size = requests. len ( ) ;
384
+ let mut results = BTreeMap :: new ( ) ;
385
+ let mut fetch_requests = BTreeMap :: new ( ) ;
386
+ let mut virtual_table_versions = BTreeMap :: new ( ) ;
387
+ for ( batch_key, request) in requests {
388
+ if matches ! ( request. stable_index_name, StableIndexName :: Virtual ( _, _) ) {
389
+ virtual_table_versions. insert ( batch_key, request. version . clone ( ) ) ;
390
+ }
391
+ match start_index_range ( tx, request) {
392
+ Err ( e) => {
393
+ results. insert ( batch_key, Err ( e) ) ;
394
+ } ,
395
+ Ok ( Ok ( result) ) => {
396
+ results. insert ( batch_key, Ok ( result) ) ;
356
397
} ,
357
- StableIndexName :: Virtual ( index_name, tablet_index_name) => {
358
- log_virtual_table_query ( ) ;
359
- Ok ( Err ( RangeRequest {
360
- index_name : tablet_index_name. clone ( ) ,
361
- printable_index_name : index_name. clone ( ) ,
362
- interval : request. interval . clone ( ) ,
363
- order : request. order ,
364
- max_size : max_rows,
365
- } ) )
398
+ Ok ( Err ( request) ) => {
399
+ fetch_requests. insert ( batch_key, request) ;
366
400
} ,
367
- StableIndexName :: Missing => Ok ( Ok ( DeveloperIndexRangeResponse {
368
- page : vec ! [ ] ,
369
- cursor : CursorPosition :: End ,
370
- } ) ) ,
371
401
}
372
402
}
373
403
374
- /// NOTE: returns a page of results. Callers must call record_read_document
375
- /// for all documents returned from the index stream.
376
- #[ minitrace:: trace]
377
- #[ convex_macro:: instrument_future]
378
- pub async fn index_range_batch (
379
- & mut self ,
380
- requests : BTreeMap < BatchKey , IndexRangeRequest > ,
381
- ) -> BTreeMap < BatchKey , anyhow:: Result < DeveloperIndexRangeResponse > > {
382
- let batch_size = requests. len ( ) ;
383
- let mut results = BTreeMap :: new ( ) ;
384
- let mut fetch_requests = BTreeMap :: new ( ) ;
385
- let mut virtual_table_versions = BTreeMap :: new ( ) ;
386
- for ( batch_key, request) in requests {
387
- if matches ! ( request. stable_index_name, StableIndexName :: Virtual ( _, _) ) {
388
- virtual_table_versions. insert ( batch_key, request. version . clone ( ) ) ;
389
- }
390
- match self . start_index_range ( request) {
391
- Err ( e) => {
392
- results. insert ( batch_key, Err ( e) ) ;
393
- } ,
394
- Ok ( Ok ( result) ) => {
395
- results. insert ( batch_key, Ok ( result) ) ;
396
- } ,
397
- Ok ( Err ( request) ) => {
398
- fetch_requests. insert ( batch_key, request) ;
399
- } ,
400
- }
401
- }
402
-
403
- let fetch_results = self
404
- . tx
405
- . index
406
- . range_batch ( & mut self . tx . reads , fetch_requests)
407
- . await ;
404
+ let fetch_results = tx. index . range_batch ( & mut tx. reads , fetch_requests) . await ;
408
405
409
- for ( batch_key, fetch_result) in fetch_results {
410
- let virtual_table_version = virtual_table_versions. get ( & batch_key) . cloned ( ) ;
411
- let result = fetch_result. and_then ( |IndexRangeResponse { page, cursor } | {
412
- let developer_results = match virtual_table_version {
413
- Some ( version) => page
414
- . into_iter ( )
415
- . map ( |( key, doc, ts) | {
416
- let doc = VirtualTable :: new ( self . tx )
417
- . map_system_doc_to_virtual_doc ( doc, version. clone ( ) ) ?;
418
- anyhow:: Ok ( ( key, doc, ts) )
419
- } )
420
- . try_collect ( ) ?,
421
- None => page
422
- . into_iter ( )
423
- . map ( |( key, doc, ts) | ( key, doc. to_developer ( ) , ts) )
424
- . collect ( ) ,
425
- } ;
426
- anyhow:: Ok ( DeveloperIndexRangeResponse {
427
- page : developer_results,
428
- cursor,
429
- } )
430
- } ) ;
431
- results. insert ( batch_key, result) ;
432
- }
433
- assert_eq ! ( results. len( ) , batch_size) ;
434
- results
406
+ for ( batch_key, fetch_result) in fetch_results {
407
+ let virtual_table_version = virtual_table_versions. get ( & batch_key) . cloned ( ) ;
408
+ let result = fetch_result. and_then ( |IndexRangeResponse { page, cursor } | {
409
+ let developer_results = match virtual_table_version {
410
+ Some ( version) => page
411
+ . into_iter ( )
412
+ . map ( |( key, doc, ts) | {
413
+ let doc = VirtualTable :: new ( tx)
414
+ . map_system_doc_to_virtual_doc ( doc, version. clone ( ) ) ?;
415
+ anyhow:: Ok ( ( key, doc, ts) )
416
+ } )
417
+ . try_collect ( ) ?,
418
+ None => page
419
+ . into_iter ( )
420
+ . map ( |( key, doc, ts) | ( key, doc. to_developer ( ) , ts) )
421
+ . collect ( ) ,
422
+ } ;
423
+ anyhow:: Ok ( DeveloperIndexRangeResponse {
424
+ page : developer_results,
425
+ cursor,
426
+ } )
427
+ } ) ;
428
+ results. insert ( batch_key, result) ;
435
429
}
430
+ assert_eq ! ( results. len( ) , batch_size) ;
431
+ results
436
432
}
0 commit comments