@@ -37,6 +37,7 @@ use crate::expr::{Bind, BoundPredicate, Predicate};
37
37
use crate :: io:: FileIO ;
38
38
use crate :: spec:: { DataContentType , SnapshotRef } ;
39
39
use crate :: table:: Table ;
40
+ use crate :: traced_stream:: TracedStream ;
40
41
use crate :: utils:: available_parallelism;
41
42
use crate :: { Error , ErrorKind , Result } ;
42
43
@@ -330,7 +331,11 @@ pub struct TableScan {
330
331
impl TableScan {
331
332
/// Returns a stream of [`FileScanTask`]s.
332
333
pub async fn plan_files ( & self ) -> Result < FileScanTaskStream > {
334
+ let span = tracing:: trace_span!( "plan_files" ) ;
335
+ let _entered = span. enter ( ) ;
336
+
333
337
let Some ( plan_context) = self . plan_context . as_ref ( ) else {
338
+ tracing:: debug!( "file plan requested for a table with no snapshots" ) ;
334
339
return Ok ( Box :: pin ( futures:: stream:: empty ( ) ) ) ;
335
340
} ;
336
341
@@ -351,7 +356,7 @@ impl TableScan {
351
356
352
357
let delete_file_index = Arc :: new ( delete_file_index) ;
353
358
354
- Ok ( TableScan :: process_manifest_contexts (
359
+ let stream = TableScan :: process_manifest_contexts (
355
360
data_contexts,
356
361
self . concurrency_limit_manifest_files ,
357
362
self . concurrency_limit_manifest_entries ,
@@ -360,7 +365,9 @@ impl TableScan {
360
365
async move { Self :: process_data_manifest_entry ( ctx, delete_file_index) }
361
366
} ,
362
367
)
363
- . boxed ( ) )
368
+ . boxed ( ) ;
369
+
370
+ Ok ( Box :: pin ( TracedStream :: new ( stream, span. clone ( ) ) ) )
364
371
}
365
372
366
373
/// Returns an [`ArrowRecordBatchStream`].
@@ -419,19 +426,27 @@ impl TableScan {
419
426
. try_filter_map ( |opt_task| async move { Ok ( opt_task) } )
420
427
}
421
428
429
+ #[ tracing:: instrument( skip_all, fields( file_path) ) ]
422
430
fn process_data_manifest_entry (
423
431
manifest_entry_context : Result < ManifestEntryContext > ,
424
432
delete_file_index : Arc < DeleteFileIndex > ,
425
433
) -> Result < Option < FileScanTask > > {
426
434
let manifest_entry_context = manifest_entry_context?;
435
+ tracing:: Span :: current ( ) . record (
436
+ "file_path" ,
437
+ manifest_entry_context. manifest_entry . file_path ( ) ,
438
+ ) ;
427
439
428
440
// skip processing this manifest entry if it has been marked as deleted
429
441
if !manifest_entry_context. manifest_entry . is_alive ( ) {
442
+ metrics:: counter!( "iceberg.scan.data_file.skipped" , "reason" => "not_alive" )
443
+ . increment ( 1 ) ;
430
444
return Ok ( None ) ;
431
445
}
432
446
433
447
// abort the plan if we encounter a manifest entry for a delete file
434
448
if manifest_entry_context. manifest_entry . content_type ( ) != DataContentType :: Data {
449
+ tracing:: error!( "Encountered an entry for a delete file in a data file manifest" ) ;
435
450
return Err ( Error :: new (
436
451
ErrorKind :: FeatureUnsupported ,
437
452
"Encountered an entry for a delete file in a data file manifest" ,
@@ -455,6 +470,8 @@ impl TableScan {
455
470
// skip any data file whose partition data indicates that it can't contain
456
471
// any data that matches this scan's filter
457
472
if !expression_evaluator. eval ( manifest_entry_context. manifest_entry . data_file ( ) ) ? {
473
+ metrics:: counter!( "iceberg.scan.data_file.skipped" , "reason" => "partition" )
474
+ . increment ( 1 ) ;
458
475
return Ok ( None ) ;
459
476
}
460
477
@@ -464,30 +481,41 @@ impl TableScan {
464
481
manifest_entry_context. manifest_entry . data_file ( ) ,
465
482
false ,
466
483
) ? {
484
+ metrics:: counter!( "iceberg.scan.data_file.skipped" , "reason" => "file_metrics" )
485
+ . increment ( 1 ) ;
467
486
return Ok ( None ) ;
468
487
}
469
488
}
470
489
471
490
// congratulations! the manifest entry has made its way through the
472
491
// entire plan without getting filtered out. Create a corresponding
473
492
// FileScanTask and push it to the result stream
493
+ metrics:: counter!( "iceberg.scan.data_file.included" ) . increment ( 1 ) ;
474
494
Ok ( Some (
475
495
manifest_entry_context. into_file_scan_task ( delete_file_index) ?,
476
496
) )
477
497
}
478
498
499
+ #[ tracing:: instrument( skip_all, fields( file_path) ) ]
479
500
fn process_delete_manifest_entry (
480
501
manifest_entry_context : Result < ManifestEntryContext > ,
481
502
) -> Result < Option < DeleteFileContext > > {
482
503
let manifest_entry_context = manifest_entry_context?;
504
+ tracing:: Span :: current ( ) . record (
505
+ "file_path" ,
506
+ manifest_entry_context. manifest_entry . file_path ( ) ,
507
+ ) ;
483
508
484
509
// skip processing this manifest entry if it has been marked as deleted
485
510
if !manifest_entry_context. manifest_entry . is_alive ( ) {
511
+ metrics:: counter!( "iceberg.scan.delete_file.skipped" , "reason" => "not_alive" )
512
+ . increment ( 1 ) ;
486
513
return Ok ( None ) ;
487
514
}
488
515
489
516
// abort the plan if we encounter a manifest entry that is not for a delete file
490
517
if manifest_entry_context. manifest_entry . content_type ( ) == DataContentType :: Data {
518
+ tracing:: error!( "Encountered an entry for a data file in a delete manifest" ) ;
491
519
return Err ( Error :: new (
492
520
ErrorKind :: FeatureUnsupported ,
493
521
"Encountered an entry for a data file in a delete manifest" ,
@@ -506,10 +534,13 @@ impl TableScan {
506
534
// skip any data file whose partition data indicates that it can't contain
507
535
// any data that matches this scan's filter
508
536
if !expression_evaluator. eval ( manifest_entry_context. manifest_entry . data_file ( ) ) ? {
537
+ metrics:: counter!( "iceberg.scan.delete_file.skipped" , "reason" => "partition" )
538
+ . increment ( 1 ) ;
509
539
return Ok ( None ) ;
510
540
}
511
541
}
512
542
543
+ metrics:: counter!( "iceberg.scan.delete_file.included" ) . increment ( 1 ) ;
513
544
Ok ( Some ( DeleteFileContext {
514
545
manifest_entry : manifest_entry_context. manifest_entry . clone ( ) ,
515
546
partition_spec_id : manifest_entry_context. partition_spec_id ,
0 commit comments