Skip to content

Commit 69a5c78

Browse files
feat: allowing injecting custom join_set tracer to avoid dependency on tracing crate
1 parent 703aa32 commit 69a5c78

File tree

9 files changed

+246
-174
lines changed

9 files changed

+246
-174
lines changed

Cargo.lock

Lines changed: 0 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ Optional features:
129129
- `backtrace`: include backtrace information in error messages
130130
- `pyarrow`: conversions between PyArrow and DataFusion types
131131
- `serde`: enable arrow-schema's `serde` feature
132-
- `tracing`: propagates the current span across thread boundaries
133132

134133
[apache avro]: https://avro.apache.org/
135134
[apache parquet]: https://parquet.apache.org/

datafusion-examples/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ async-trait = { workspace = true }
6161
bytes = { workspace = true }
6262
dashmap = { workspace = true }
6363
# note only use main datafusion crate for examples
64-
datafusion = { workspace = true, default-features = true, features = ["avro", "tracing"] }
64+
datafusion = { workspace = true, default-features = true, features = ["avro"] }
6565
datafusion-proto = { workspace = true }
6666
env_logger = { workspace = true }
6767
futures = { workspace = true }

datafusion-examples/examples/tracing.rs

Lines changed: 77 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -15,113 +15,113 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! This example demonstrates the trace feature in DataFusion’s runtime.
19-
//! When the `tracing` feature is enabled, spawned tasks in DataFusion (such as those
20-
//! created during repartitioning or when reading Parquet files) are instrumented
21-
//! with the current tracing span, allowing to propagate any existing tracing context.
18+
//! This example demonstrates the tracing injection feature for the DataFusion runtime.
19+
//! Tasks spawned on new threads behave differently depending on whether a tracer is injected.
20+
//! The log output clearly distinguishes the two cases.
2221
//!
23-
//! In this example we create a session configured to use multiple partitions,
24-
//! register a Parquet table (based on the `alltypes_tiny_pages_plain.parquet` file),
25-
//! and run a query that should trigger parallel execution on multiple threads.
26-
//! We wrap the entire query execution within a custom span and log messages.
27-
//! By inspecting the tracing output, we should see that the tasks spawned
28-
//! internally inherit the span context.
29-
30-
use arrow::util::pretty::pretty_format_batches;
31-
use datafusion::arrow::record_batch::RecordBatch;
22+
//! # Expected Log Output
23+
//!
24+
//! When **no tracer** is injected, logs from tasks running on `tokio-runtime-worker` threads
25+
//! will _not_ include the `run_instrumented_query` span:
26+
//!
27+
//! ```text
28+
//! 10:29:40.714 INFO main ThreadId(01) tracing: ***** RUNNING WITHOUT INJECTED TRACER *****
29+
//! 10:29:40.714 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution
30+
//! 10:29:40.728 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"
31+
//! 10:29:40.743 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 6 ms
32+
//! 10:29:40.759 DEBUG tokio-runtime-worker ThreadId(03) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
33+
//! 10:29:40.758 DEBUG tokio-runtime-worker ThreadId(04) datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
34+
//! 10:29:40.771 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned
35+
//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** WITHOUT tracer: Non-main tasks did NOT inherit the `run_instrumented_query` span *****
36+
//! ```
37+
//!
38+
//! When a tracer **is** injected, tasks spawned on non‑main threads _do_ inherit the span:
39+
//!
40+
//! ```text
41+
//! 10:29:40.772 INFO main ThreadId(01) tracing: Injecting custom tracer...
42+
//! 10:29:40.772 INFO main ThreadId(01) tracing: ***** RUNNING WITH INJECTED TRACER *****
43+
//! 10:29:40.772 INFO main ThreadId(01) run_instrumented_query: tracing: Starting query execution
44+
//! 10:29:40.775 INFO main ThreadId(01) run_instrumented_query: tracing: Executing SQL query sql="SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"
45+
//! 10:29:40.784 DEBUG main ThreadId(01) run_instrumented_query: datafusion_optimizer::optimizer: Optimizer took 7 ms
46+
//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(03) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
47+
//! 10:29:40.801 DEBUG tokio-runtime-worker ThreadId(04) run_instrumented_query: datafusion_physical_plan::aggregates::row_hash: Creating GroupedHashAggregateStream
48+
//! 10:29:40.809 INFO main ThreadId(01) run_instrumented_query: tracing: Query complete: 6 batches returned
49+
//! 10:29:40.809 INFO main ThreadId(01) tracing: ***** WITH tracer: Non-main tasks DID inherit the `run_instrumented_query` span *****
50+
//! ```
51+
52+
use datafusion::common::runtime::set_join_set_tracer;
3253
use datafusion::datasource::file_format::parquet::ParquetFormat;
3354
use datafusion::datasource::listing::ListingOptions;
3455
use datafusion::error::Result;
3556
use datafusion::prelude::*;
3657
use datafusion::test_util::parquet_test_data;
58+
use futures::future::BoxFuture;
59+
use futures::FutureExt;
60+
use std::any::Any;
3761
use std::sync::Arc;
38-
use tracing::{info, instrument, Level};
62+
use tracing::{info, instrument, Instrument, Level, Span};
3963

4064
#[tokio::main]
4165
async fn main() -> Result<()> {
42-
// Initialize a tracing subscriber that prints to stdout.
66+
// Initialize tracing subscriber with thread info.
4367
tracing_subscriber::fmt()
4468
.with_thread_ids(true)
4569
.with_thread_names(true)
4670
.with_max_level(Level::DEBUG)
4771
.init();
4872

49-
log::info!("Starting example, this log is not captured by tracing");
73+
// Run query WITHOUT tracer injection.
74+
info!("***** RUNNING WITHOUT INJECTED TRACER *****");
75+
run_instrumented_query().await?;
76+
info!("***** WITHOUT tracer: `tokio-runtime-worker` tasks did NOT inherit the `run_instrumented_query` span *****");
5077

51-
// execute the query within a tracing span
52-
let result = run_instrumented_query().await;
78+
// Inject custom tracer so tasks run in the current span.
79+
info!("Injecting custom tracer...");
80+
set_join_set_tracer(instrument_future, instrument_block)
81+
.expect("Failed to set tracer");
82+
83+
// Run query WITH tracer injection.
84+
info!("***** RUNNING WITH INJECTED TRACER *****");
85+
run_instrumented_query().await?;
86+
info!("***** WITH tracer: `tokio-runtime-worker` tasks DID inherit the `run_instrumented_query` span *****");
87+
88+
Ok(())
89+
}
5390

54-
info!(
55-
"Finished example. Check the logs above for tracing span details showing \
56-
that tasks were spawned within the 'run_instrumented_query' span on different threads."
57-
);
91+
/// Instruments a boxed future to run in the current span.
92+
fn instrument_future(
93+
fut: BoxFuture<'static, Box<dyn Any + Send>>,
94+
) -> BoxFuture<'static, Box<dyn Any + Send>> {
95+
fut.in_current_span().boxed()
96+
}
5897

59-
result
98+
/// Instruments a boxed blocking closure to execute within the current span.
99+
fn instrument_block(
100+
f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
101+
) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> {
102+
let span = Span::current();
103+
Box::new(move || span.in_scope(|| f()))
60104
}
61105

62106
#[instrument(level = "info")]
63107
async fn run_instrumented_query() -> Result<()> {
64-
info!("Starting query execution within the custom tracing span");
108+
info!("Starting query execution");
65109

66-
// The default session will set the number of partitions to `std::thread::available_parallelism()`.
67110
let ctx = SessionContext::new();
68-
69-
// Get the path to the test parquet data.
70111
let test_data = parquet_test_data();
71-
// Build listing options that pick up only the "alltypes_tiny_pages_plain.parquet" file.
72112
let file_format = ParquetFormat::default().with_enable_pruning(true);
73113
let listing_options = ListingOptions::new(Arc::new(file_format))
74114
.with_file_extension("alltypes_tiny_pages_plain.parquet");
75115

76-
info!("Registering Parquet table 'alltypes' from {test_data} in {listing_options:?}");
77-
78-
// Register a listing table using an absolute URL.
79116
let table_path = format!("file://{test_data}/");
80-
ctx.register_listing_table(
81-
"alltypes",
82-
&table_path,
83-
listing_options.clone(),
84-
None,
85-
None,
86-
)
87-
.await
88-
.expect("register_listing_table failed");
89-
90-
info!("Registered Parquet table 'alltypes' from {table_path}");
91-
92-
// Run a query that will trigger parallel execution on multiple threads.
93-
let sql = "SELECT COUNT(*), bool_col, date_string_col, string_col
94-
FROM (
95-
SELECT bool_col, date_string_col, string_col FROM alltypes
96-
UNION ALL
97-
SELECT bool_col, date_string_col, string_col FROM alltypes
98-
) AS t
99-
GROUP BY bool_col, date_string_col, string_col
100-
ORDER BY 1,2,3,4 DESC
101-
LIMIT 5;";
102-
info!(%sql, "Executing SQL query");
103-
let df = ctx.sql(sql).await?;
104-
105-
let results: Vec<RecordBatch> = df.collect().await?;
106-
info!("Query execution complete");
107-
108-
// Print out the results and tracing output.
109-
datafusion::common::assert_batches_eq!(
110-
[
111-
"+----------+----------+-----------------+------------+",
112-
"| count(*) | bool_col | date_string_col | string_col |",
113-
"+----------+----------+-----------------+------------+",
114-
"| 2 | false | 01/01/09 | 9 |",
115-
"| 2 | false | 01/01/09 | 7 |",
116-
"| 2 | false | 01/01/09 | 5 |",
117-
"| 2 | false | 01/01/09 | 3 |",
118-
"| 2 | false | 01/01/09 | 1 |",
119-
"+----------+----------+-----------------+------------+",
120-
],
121-
&results
122-
);
123-
124-
info!("Query results:\n{}", pretty_format_batches(&results)?);
125-
117+
info!("Registering table 'alltypes' from {}", table_path);
118+
ctx.register_listing_table("alltypes", &table_path, listing_options, None, None)
119+
.await
120+
.expect("Failed to register table");
121+
122+
let sql = "SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col";
123+
info!(sql, "Executing SQL query");
124+
let result = ctx.sql(sql).await?.collect().await?;
125+
info!("Query complete: {} batches returned", result.len());
126126
Ok(())
127127
}

datafusion/common-runtime/Cargo.toml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,12 @@ all-features = true
3434
[lints]
3535
workspace = true
3636

37-
[features]
38-
tracing = ["dep:tracing", "dep:tracing-futures"]
39-
4037
[lib]
4138
name = "datafusion_common_runtime"
4239

4340
[dependencies]
4441
log = { workspace = true }
4542
tokio = { workspace = true }
46-
tracing = { version = "0.1", optional = true }
47-
tracing-futures = { version = "0.2", optional = true }
4843

4944
[dev-dependencies]
5045
tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] }

datafusion/common-runtime/src/common.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,21 @@ impl<R: 'static> SpawnedTask<R> {
5353
}
5454

5555
/// Joins the task, returning the result of join (`Result<R, JoinError>`).
56-
pub async fn join(mut self) -> Result<R, JoinError> {
56+
pub async fn join(mut self) -> Result<R, JoinError>
57+
where
58+
R: Send,
59+
{
5760
self.inner
5861
.join_next()
5962
.await
6063
.expect("`SpawnedTask` instance always contains exactly 1 task")
6164
}
6265

6366
/// Joins the task and unwinds the panic if it happens.
64-
pub async fn join_unwind(self) -> Result<R, JoinError> {
67+
pub async fn join_unwind(self) -> Result<R, JoinError>
68+
where
69+
R: Send,
70+
{
6571
self.join().await.map_err(|e| {
6672
// `JoinError` can be caused either by panic or cancellation. We have to handle panics:
6773
if e.is_panic() {

0 commit comments

Comments
 (0)