Skip to content

Commit b3ec4e8

Browse files
feat: allowing injecting custom join_set tracer to avoid dependency on tracing crate
1 parent 55f04d2 commit b3ec4e8

File tree

8 files changed

+243
-132
lines changed

8 files changed

+243
-132
lines changed

Cargo.lock

Lines changed: 1 addition & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ Optional features:
129129
- `backtrace`: include backtrace information in error messages
130130
- `pyarrow`: conversions between PyArrow and DataFusion types
131131
- `serde`: enable arrow-schema's `serde` feature
132-
- `tracing`: propagates the current span across thread boundaries
133132

134133
[apache avro]: https://avro.apache.org/
135134
[apache parquet]: https://parquet.apache.org/

datafusion-examples/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async-trait = { workspace = true }
6161
bytes = { workspace = true }
6262
dashmap = { workspace = true }
6363
# note only use main datafusion crate for examples
64-
datafusion = { workspace = true, default-features = true, features = ["tracing"] }
64+
datafusion = { workspace = true, default-features = true }
6565
datafusion-proto = { workspace = true }
6666
env_logger = { workspace = true }
6767
futures = { workspace = true }

datafusion-examples/examples/tracing.rs

Lines changed: 89 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -15,113 +15,125 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! This example demonstrates the trace feature in DataFusion’s runtime.
19-
//! When the `tracing` feature is enabled, spawned tasks in DataFusion (such as those
20-
//! created during repartitioning or when reading Parquet files) are instrumented
21-
//! with the current tracing span, allowing to propagate any existing tracing context.
18+
//! This example demonstrates the tracing injection feature for the DataFusion runtime.
19+
//! Tasks spawned on new threads behave differently depending on whether a tracer is injected.
20+
//! The log output clearly distinguishes the two cases.
2221
//!
23-
//! In this example we create a session configured to use multiple partitions,
24-
//! register a Parquet table (based on the `alltypes_tiny_pages_plain.parquet` file),
25-
//! and run a query that should trigger parallel execution on multiple threads.
26-
//! We wrap the entire query execution within a custom span and log messages.
27-
//! By inspecting the tracing output, we should see that the tasks spawned
28-
//! internally inherit the span context.
29-
30-
use arrow::util::pretty::pretty_format_batches;
31-
use datafusion::arrow::record_batch::RecordBatch;
22+
//! # Expected Log Output
23+
//!
24+
//! When **no tracer** is injected, logs from tasks running on `tokio-runtime-worker` threads
25+
//! will _not_ include the `run_instrumented_query` span:
26+
//!
27+
//! ```text
28+
//! 10:29:40.714 INFO main ThreadId(01) tracing: ***** RUNNING WITHOUT INJECTED TRACER *****
29+
//! 10:29:40.714 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution
30+
//! 10:29:40.728 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"
31+
//! 10:29:40.743 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 6 ms
32+
//! 10:29:40.759 DEBUG tokio-runtime-worker ThreadId(03) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
33+
//! 10:29:40.758 DEBUG tokio-runtime-worker ThreadId(04) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
34+
//! 10:29:40.771 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned
35+
//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** WITHOUT tracer: Non-main tasks did NOT inherit the `run_instrumented_query` span *****
36+
//! ```
37+
//!
38+
//! When a tracer **is** injected, tasks spawned on non‑main threads _do_ inherit the span:
39+
//!
40+
//! ```text
41+
//! 10:29:40.772 INFO main ThreadId(01) tracing: Injecting custom tracer...
42+
//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** RUNNING WITH INJECTED TRACER *****
43+
//! 10:29:40.772 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution
44+
//! 10:29:40.775 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"
45+
//! 10:29:40.784 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 7 ms
46+
//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(03) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
47+
//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(04) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
48+
//! 10:29:40.809 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned
49+
//! 10:29:40.809 INFO main ThreadId(01) tracing: ***** WITH tracer: Non-main tasks DID inherit the `run_instrumented_query` span *****
50+
//! ```
51+
52+
use datafusion::common::runtime::{set_join_set_tracer, JoinSetTracer};
3253
use datafusion::datasource::file_format::parquet::ParquetFormat;
3354
use datafusion::datasource::listing::ListingOptions;
3455
use datafusion::error::Result;
3556
use datafusion::prelude::*;
3657
use datafusion::test_util::parquet_test_data;
58+
use futures::future::BoxFuture;
59+
use futures::FutureExt;
60+
use std::any::Any;
3761
use std::sync::Arc;
38-
use tracing::{info, instrument, Level};
62+
use tracing::{info, instrument, Instrument, Level, Span};
3963

4064
#[tokio::main]
4165
async fn main() -> Result<()> {
42-
// Initialize a tracing subscriber that prints to stdout.
66+
// Initialize tracing subscriber with thread info.
4367
tracing_subscriber::fmt()
4468
.with_thread_ids(true)
4569
.with_thread_names(true)
4670
.with_max_level(Level::DEBUG)
4771
.init();
4872

49-
log::info!("Starting example, this log is not captured by tracing");
73+
// Run query WITHOUT tracer injection.
74+
info!("***** RUNNING WITHOUT INJECTED TRACER *****");
75+
run_instrumented_query().await?;
76+
info!("***** WITHOUT tracer: `tokio-runtime-worker` tasks did NOT inherit the `run_instrumented_query` span *****");
77+
78+
// Inject custom tracer so tasks run in the current span.
79+
info!("Injecting custom tracer...");
80+
set_join_set_tracer(&SpanTracer).expect("Failed to set tracer");
5081

51-
// execute the query within a tracing span
52-
let result = run_instrumented_query().await;
82+
// Run query WITH tracer injection.
83+
info!("***** RUNNING WITH INJECTED TRACER *****");
84+
run_instrumented_query().await?;
85+
info!("***** WITH tracer: `tokio-runtime-worker` tasks DID inherit the `run_instrumented_query` span *****");
5386

54-
info!(
55-
"Finished example. Check the logs above for tracing span details showing \
56-
that tasks were spawned within the 'run_instrumented_query' span on different threads."
57-
);
87+
Ok(())
88+
}
5889

59-
result
90+
/// A simple tracer that ensures any spawned task or blocking closure
91+
/// inherits the current span via `in_current_span`.
92+
struct SpanTracer;
93+
94+
/// Implement the `JoinSetTracer` trait so we can inject instrumentation
95+
/// for both async futures and blocking closures.
96+
impl JoinSetTracer for SpanTracer {
97+
/// Instruments a boxed future to run in the current span. The future's
98+
/// return type is erased to `Box<dyn Any + Send>`, which we simply
99+
/// run inside the `Span::current()` context.
100+
fn trace_future(
101+
&self,
102+
fut: BoxFuture<'static, Box<dyn Any + Send>>,
103+
) -> BoxFuture<'static, Box<dyn Any + Send>> {
104+
fut.in_current_span().boxed()
105+
}
106+
107+
/// Instruments a boxed blocking closure by running it inside the
108+
/// `Span::current()` context.
109+
fn trace_block(
110+
&self,
111+
f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
112+
) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> {
113+
let span = Span::current();
114+
Box::new(move || span.in_scope(|| f()))
115+
}
60116
}
61117

62118
#[instrument(level = "info")]
63119
async fn run_instrumented_query() -> Result<()> {
64-
info!("Starting query execution within the custom tracing span");
120+
info!("Starting query execution");
65121

66-
// The default session will set the number of partitions to `std::thread::available_parallelism()`.
67122
let ctx = SessionContext::new();
68-
69-
// Get the path to the test parquet data.
70123
let test_data = parquet_test_data();
71-
// Build listing options that pick up only the "alltypes_tiny_pages_plain.parquet" file.
72124
let file_format = ParquetFormat::default().with_enable_pruning(true);
73125
let listing_options = ListingOptions::new(Arc::new(file_format))
74126
.with_file_extension("alltypes_tiny_pages_plain.parquet");
75127

76-
info!("Registering Parquet table 'alltypes' from {test_data} in {listing_options:?}");
77-
78-
// Register a listing table using an absolute URL.
79128
let table_path = format!("file://{test_data}/");
80-
ctx.register_listing_table(
81-
"alltypes",
82-
&table_path,
83-
listing_options.clone(),
84-
None,
85-
None,
86-
)
87-
.await
88-
.expect("register_listing_table failed");
89-
90-
info!("Registered Parquet table 'alltypes' from {table_path}");
91-
92-
// Run a query that will trigger parallel execution on multiple threads.
93-
let sql = "SELECT COUNT(*), bool_col, date_string_col, string_col
94-
FROM (
95-
SELECT bool_col, date_string_col, string_col FROM alltypes
96-
UNION ALL
97-
SELECT bool_col, date_string_col, string_col FROM alltypes
98-
) AS t
99-
GROUP BY bool_col, date_string_col, string_col
100-
ORDER BY 1,2,3,4 DESC
101-
LIMIT 5;";
102-
info!(%sql, "Executing SQL query");
103-
let df = ctx.sql(sql).await?;
104-
105-
let results: Vec<RecordBatch> = df.collect().await?;
106-
info!("Query execution complete");
107-
108-
// Print out the results and tracing output.
109-
datafusion::common::assert_batches_eq!(
110-
[
111-
"+----------+----------+-----------------+------------+",
112-
"| count(*) | bool_col | date_string_col | string_col |",
113-
"+----------+----------+-----------------+------------+",
114-
"| 2 | false | 01/01/09 | 9 |",
115-
"| 2 | false | 01/01/09 | 7 |",
116-
"| 2 | false | 01/01/09 | 5 |",
117-
"| 2 | false | 01/01/09 | 3 |",
118-
"| 2 | false | 01/01/09 | 1 |",
119-
"+----------+----------+-----------------+------------+",
120-
],
121-
&results
122-
);
123-
124-
info!("Query results:\n{}", pretty_format_batches(&results)?);
125-
129+
info!("Registering table 'alltypes' from {}", table_path);
130+
ctx.register_listing_table("alltypes", &table_path, listing_options, None, None)
131+
.await
132+
.expect("Failed to register table");
133+
134+
let sql = "SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col";
135+
info!(sql, "Executing SQL query");
136+
let result = ctx.sql(sql).await?.collect().await?;
137+
info!("Query complete: {} batches returned", result.len());
126138
Ok(())
127139
}

datafusion/common-runtime/Cargo.toml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,13 @@ all-features = true
3434
[lints]
3535
workspace = true
3636

37-
[features]
38-
tracing = ["dep:tracing", "dep:tracing-futures"]
39-
4037
[lib]
4138
name = "datafusion_common_runtime"
4239

4340
[dependencies]
41+
futures = { workspace = true }
4442
log = { workspace = true }
4543
tokio = { workspace = true }
46-
tracing = { version = "0.1", optional = true }
47-
tracing-futures = { version = "0.2", optional = true }
4844

4945
[dev-dependencies]
5046
tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] }

0 commit comments

Comments
 (0)