Skip to content

Commit 0a8872f

Browse files
committed
feat: initial instrumentation of scan plan with traces and metrics
1 parent b1a4447 commit 0a8872f

File tree

10 files changed

+95
-3
lines changed

10 files changed

+95
-3
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ indicatif = "0.17"
8383
itertools = "0.13"
8484
linkedbytes = "0.1.8"
8585
metainfo = "0.7.14"
86+
metrics = "0.24"
8687
mimalloc = "0.1.46"
8788
mockito = "1"
8889
motore-macros = "0.4.3"

crates/iceberg/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ expect-test = { workspace = true }
6767
fnv = { workspace = true }
6868
futures = { workspace = true }
6969
itertools = { workspace = true }
70+
metrics = { workspace = true }
7071
moka = { version = "0.12.10", features = ["future"] }
7172
murmur3 = { workspace = true }
7273
num-bigint = { workspace = true }
@@ -87,6 +88,7 @@ serde_with = { workspace = true }
8788
strum = { workspace = true, features = ["derive"] }
8889
thrift = { workspace = true }
8990
tokio = { workspace = true, optional = false, features = ["sync"] }
91+
tracing = { workspace = true }
9092
typed-builder = { workspace = true }
9193
url = { workspace = true }
9294
uuid = { workspace = true }

crates/iceberg/src/expr/visitors/expression_evaluator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ impl ExpressionEvaluator {
4343
/// the provided [`DataFile`]'s partition [`Struct`]. Used by [`TableScan`]
4444
/// to see if this [`DataFile`] could possibly contain data that matches
4545
/// the scan's filter.
46+
#[tracing::instrument(skip_all)]
4647
pub(crate) fn eval(&self, data_file: &DataFile) -> Result<bool> {
4748
let mut visitor = ExpressionEvaluatorVisitor::new(data_file.partition());
4849

crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ impl<'a> InclusiveMetricsEvaluator<'a> {
3939
/// provided [`DataFile`]'s metrics. Used by [`TableScan`] to
4040
/// see if this `DataFile` contains data that could match
4141
/// the scan's filter.
42+
#[tracing::instrument(skip_all)]
4243
pub(crate) fn eval(
4344
filter: &'a BoundPredicate,
4445
data_file: &'a DataFile,

crates/iceberg/src/io/object_cache.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ impl ObjectCache {
8585

8686
/// Retrieves an Arc [`Manifest`] from the cache
8787
/// or retrieves one from FileIO and parses it if not present
88+
#[tracing::instrument(skip_all)]
8889
pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result<Arc<Manifest>> {
8990
if self.cache_disabled {
9091
return manifest_file

crates/iceberg/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,4 @@ pub mod writer;
9090

9191
mod delete_vector;
9292
pub mod puffin;
93+
mod traced_stream;

crates/iceberg/src/scan/context.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ pub(crate) struct PlanContext {
144144
}
145145

146146
impl PlanContext {
147+
#[tracing::instrument(skip_all)]
147148
pub(crate) async fn get_manifest_list(&self) -> Result<Arc<ManifestList>> {
148149
self.object_cache
149150
.as_ref()
@@ -172,6 +173,12 @@ impl PlanContext {
172173
Ok(partition_filter)
173174
}
174175

176+
#[tracing::instrument(
177+
skip_all,
178+
fields(
179+
manifest_list.len = manifest_list.entries().len(),
180+
)
181+
)]
175182
pub(crate) fn build_manifest_file_context_iter(
176183
&self,
177184
manifest_list: Arc<ManifestList>,
@@ -191,16 +198,20 @@ impl PlanContext {
191198
.get(manifest_file.partition_spec_id, predicate.clone())
192199
.eval(&manifest_file)?
193200
{
201+
tracing::trace!(file_path = manifest_file.manifest_path, "iceberg.scan.manifest_file.skipped");
202+
metrics::counter!("iceberg.scan.manifest_file.skipped", "reason" => "partition").increment(1);
194203
return Ok(None); // Skip this file.
195204
}
196205
Some(predicate)
197206
} else {
198207
None
199208
};
200209

210+
tracing::trace!(file_path = manifest_file.manifest_path, "iceberg.scan.manifest_file.included");
211+
metrics::counter!("iceberg.scan.manifest_file.included").increment(1);
212+
201213
let context = self
202214
.create_manifest_file_context(manifest_file, partition_bound_predicate)?;
203-
204215
Ok(Some(context))
205216
})()
206217
.transpose()

crates/iceberg/src/scan/mod.rs

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::expr::{Bind, BoundPredicate, Predicate};
3737
use crate::io::FileIO;
3838
use crate::spec::{DataContentType, SnapshotRef};
3939
use crate::table::Table;
40+
use crate::traced_stream::TracedStream;
4041
use crate::utils::available_parallelism;
4142
use crate::{Error, ErrorKind, Result};
4243

@@ -330,7 +331,11 @@ pub struct TableScan {
330331
impl TableScan {
331332
/// Returns a stream of [`FileScanTask`]s.
332333
pub async fn plan_files(&self) -> Result<FileScanTaskStream> {
334+
let span = tracing::trace_span!("plan_files");
335+
let _entered = span.enter();
336+
333337
let Some(plan_context) = self.plan_context.as_ref() else {
338+
tracing::debug!("file plan requested for a table with no snapshots");
334339
return Ok(Box::pin(futures::stream::empty()));
335340
};
336341

@@ -351,7 +356,7 @@ impl TableScan {
351356

352357
let delete_file_index = Arc::new(delete_file_index);
353358

354-
Ok(TableScan::process_manifest_contexts(
359+
let stream = TableScan::process_manifest_contexts(
355360
data_contexts,
356361
self.concurrency_limit_manifest_files,
357362
self.concurrency_limit_manifest_entries,
@@ -360,7 +365,9 @@ impl TableScan {
360365
async move { Self::process_data_manifest_entry(ctx, delete_file_index) }
361366
},
362367
)
363-
.boxed())
368+
.boxed();
369+
370+
Ok(Box::pin(TracedStream::new(stream, span.clone())))
364371
}
365372

366373
/// Returns an [`ArrowRecordBatchStream`].
@@ -419,19 +426,27 @@ impl TableScan {
419426
.try_filter_map(|opt_task| async move { Ok(opt_task) })
420427
}
421428

429+
#[tracing::instrument(skip_all, fields(file_path))]
422430
fn process_data_manifest_entry(
423431
manifest_entry_context: Result<ManifestEntryContext>,
424432
delete_file_index: Arc<DeleteFileIndex>,
425433
) -> Result<Option<FileScanTask>> {
426434
let manifest_entry_context = manifest_entry_context?;
435+
tracing::Span::current().record(
436+
"file_path",
437+
manifest_entry_context.manifest_entry.file_path(),
438+
);
427439

428440
// skip processing this manifest entry if it has been marked as deleted
429441
if !manifest_entry_context.manifest_entry.is_alive() {
442+
metrics::counter!("iceberg.scan.data_file.skipped", "reason" => "not_alive")
443+
.increment(1);
430444
return Ok(None);
431445
}
432446

433447
// abort the plan if we encounter a manifest entry for a delete file
434448
if manifest_entry_context.manifest_entry.content_type() != DataContentType::Data {
449+
tracing::error!("Encountered an entry for a delete file in a data file manifest");
435450
return Err(Error::new(
436451
ErrorKind::FeatureUnsupported,
437452
"Encountered an entry for a delete file in a data file manifest",
@@ -455,6 +470,8 @@ impl TableScan {
455470
// skip any data file whose partition data indicates that it can't contain
456471
// any data that matches this scan's filter
457472
if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
473+
metrics::counter!("iceberg.scan.data_file.skipped", "reason" => "partition")
474+
.increment(1);
458475
return Ok(None);
459476
}
460477

@@ -464,30 +481,41 @@ impl TableScan {
464481
manifest_entry_context.manifest_entry.data_file(),
465482
false,
466483
)? {
484+
metrics::counter!("iceberg.scan.data_file.skipped", "reason" => "file_metrics")
485+
.increment(1);
467486
return Ok(None);
468487
}
469488
}
470489

471490
// congratulations! the manifest entry has made its way through the
472491
// entire plan without getting filtered out. Create a corresponding
473492
// FileScanTask and push it to the result stream
493+
metrics::counter!("iceberg.scan.data_file.included").increment(1);
474494
Ok(Some(
475495
manifest_entry_context.into_file_scan_task(delete_file_index)?,
476496
))
477497
}
478498

499+
#[tracing::instrument(skip_all, fields(file_path))]
479500
fn process_delete_manifest_entry(
480501
manifest_entry_context: Result<ManifestEntryContext>,
481502
) -> Result<Option<DeleteFileContext>> {
482503
let manifest_entry_context = manifest_entry_context?;
504+
tracing::Span::current().record(
505+
"file_path",
506+
manifest_entry_context.manifest_entry.file_path(),
507+
);
483508

484509
// skip processing this manifest entry if it has been marked as deleted
485510
if !manifest_entry_context.manifest_entry.is_alive() {
511+
metrics::counter!("iceberg.scan.delete_file.skipped", "reason" => "not_alive")
512+
.increment(1);
486513
return Ok(None);
487514
}
488515

489516
// abort the plan if we encounter a manifest entry that is not for a delete file
490517
if manifest_entry_context.manifest_entry.content_type() == DataContentType::Data {
518+
tracing::error!("Encountered an entry for a data file in a delete manifest");
491519
return Err(Error::new(
492520
ErrorKind::FeatureUnsupported,
493521
"Encountered an entry for a data file in a delete manifest",
@@ -506,10 +534,13 @@ impl TableScan {
506534
// skip any data file whose partition data indicates that it can't contain
507535
// any data that matches this scan's filter
508536
if !expression_evaluator.eval(manifest_entry_context.manifest_entry.data_file())? {
537+
metrics::counter!("iceberg.scan.delete_file.skipped", "reason" => "partition")
538+
.increment(1);
509539
return Ok(None);
510540
}
511541
}
512542

543+
metrics::counter!("iceberg.scan.delete_file.included").increment(1);
513544
Ok(Some(DeleteFileContext {
514545
manifest_entry: manifest_entry_context.manifest_entry.clone(),
515546
partition_spec_id: manifest_entry_context.partition_spec_id,

crates/iceberg/src/traced_stream.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use std::pin::Pin;
2+
use std::task::{Context, Poll};
3+
4+
use futures::Stream;
5+
use tracing::Span;
6+
7+
pub struct TracedStream<S> {
8+
stream: S,
9+
_span: Span,
10+
}
11+
12+
impl<S> TracedStream<S> {
13+
pub fn new(stream: S, span: Span) -> Self {
14+
Self {
15+
stream,
16+
_span: span,
17+
}
18+
}
19+
}
20+
21+
impl<S> Stream for TracedStream<S>
22+
where S: Stream + Unpin
23+
{
24+
type Item = S::Item;
25+
26+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
27+
let this = self.get_mut();
28+
let _entered = this._span.enter();
29+
Pin::new(&mut this.stream).poll_next(cx)
30+
}
31+
}

0 commit comments

Comments
 (0)