Skip to content

Commit a6fdfff

Browse files
geoffreyclaudeNirnay Roy
authored and
Nirnay Roy
committed
feat: Add tracing regression tests (apache#15673)
1 parent c5a885b commit a6fdfff

File tree

4 files changed

+378
-0
lines changed

4 files changed

+378
-0
lines changed

datafusion/core/tests/core_integration.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ mod serde;
5151
/// Run all tests that are found in the `catalog` directory
5252
mod catalog;
5353

54+
/// Run all tests that are found in the `tracing` directory
55+
mod tracing;
56+
5457
#[cfg(test)]
5558
#[ctor::ctor]
5659
fn init() {
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::collections::VecDeque;
20+
use std::ops::Deref;
21+
use std::sync::{Arc, LazyLock};
22+
23+
use datafusion_common::{HashMap, HashSet};
24+
use datafusion_common_runtime::{set_join_set_tracer, JoinSetTracer};
25+
use futures::future::BoxFuture;
26+
use tokio::sync::{Mutex, MutexGuard};
27+
28+
/// Initializes the global join set tracer with the asserting tracer.
29+
/// Call this function before spawning any tasks that should be traced.
30+
pub fn init_asserting_tracer() {
31+
set_join_set_tracer(ASSERTING_TRACER.deref())
32+
.expect("Failed to initialize asserting tracer");
33+
}
34+
35+
/// Verifies that the current task has a traceable ancestry back to "root".
36+
///
37+
/// The function performs a breadth-first search (BFS) in the global spawn graph:
38+
/// - It starts at the current task and follows parent links.
39+
/// - If it reaches the "root" task, the ancestry is valid.
40+
/// - If a task is missing from the graph, it panics.
41+
///
42+
/// Note: Tokio task IDs are unique only while a task is active.
43+
/// Once a task completes, its ID may be reused.
44+
pub async fn assert_traceability() {
45+
// Acquire the spawn graph lock.
46+
let spawn_graph = acquire_spawn_graph().await;
47+
48+
// Start BFS with the current task.
49+
let mut tasks_to_check = VecDeque::from(vec![current_task()]);
50+
51+
while let Some(task_id) = tasks_to_check.pop_front() {
52+
if task_id == "root" {
53+
// Ancestry reached the root.
54+
continue;
55+
}
56+
// Obtain parent tasks, panicking if the task is not present.
57+
let parents = spawn_graph
58+
.get(&task_id)
59+
.expect("Task ID not found in spawn graph");
60+
// Queue each parent for checking.
61+
for parent in parents {
62+
tasks_to_check.push_back(parent.clone());
63+
}
64+
}
65+
}
66+
67+
/// Tracer that maintains a graph of task ancestry for tracing purposes.
68+
///
69+
/// For each task, it records a set of parent task IDs to ensure that every
70+
/// asynchronous task can be traced back to "root".
71+
struct AssertingTracer {
72+
/// An asynchronous map from task IDs to their parent task IDs.
73+
spawn_graph: Arc<Mutex<HashMap<String, HashSet<String>>>>,
74+
}
75+
76+
/// Lazily initialized global instance of `AssertingTracer`.
77+
static ASSERTING_TRACER: LazyLock<AssertingTracer> = LazyLock::new(AssertingTracer::new);
78+
79+
impl AssertingTracer {
80+
/// Creates a new `AssertingTracer` with an empty spawn graph.
81+
fn new() -> Self {
82+
Self {
83+
spawn_graph: Arc::default(),
84+
}
85+
}
86+
}
87+
88+
/// Returns the current task's ID as a string, or "root" if unavailable.
89+
///
90+
/// Tokio guarantees task IDs are unique only among active tasks,
91+
/// so completed tasks may have their IDs reused.
92+
fn current_task() -> String {
93+
tokio::task::try_id()
94+
.map(|id| format!("{id}"))
95+
.unwrap_or_else(|| "root".to_string())
96+
}
97+
98+
/// Asynchronously locks and returns the spawn graph.
99+
///
100+
/// The returned guard allows inspection or modification of task ancestry.
101+
async fn acquire_spawn_graph<'a>() -> MutexGuard<'a, HashMap<String, HashSet<String>>> {
102+
ASSERTING_TRACER.spawn_graph.lock().await
103+
}
104+
105+
/// Registers the current task as a child of `parent_id` in the spawn graph.
106+
async fn register_task(parent_id: String) {
107+
acquire_spawn_graph()
108+
.await
109+
.entry(current_task())
110+
.or_insert_with(HashSet::new)
111+
.insert(parent_id);
112+
}
113+
114+
impl JoinSetTracer for AssertingTracer {
115+
/// Wraps an asynchronous future to record its parent task before execution.
116+
fn trace_future(
117+
&self,
118+
fut: BoxFuture<'static, Box<dyn Any + Send>>,
119+
) -> BoxFuture<'static, Box<dyn Any + Send>> {
120+
// Capture the parent task ID.
121+
let parent_id = current_task();
122+
Box::pin(async move {
123+
// Register the parent-child relationship.
124+
register_task(parent_id).await;
125+
// Execute the wrapped future.
126+
fut.await
127+
})
128+
}
129+
130+
/// Wraps a blocking closure to record its parent task before execution.
131+
fn trace_block(
132+
&self,
133+
f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>,
134+
) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> {
135+
let parent_id = current_task();
136+
Box::new(move || {
137+
// Synchronously record the task relationship.
138+
futures::executor::block_on(register_task(parent_id));
139+
f()
140+
})
141+
}
142+
}

datafusion/core/tests/tracing/mod.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! # JoinSetTracer Integration Tests
19+
//!
20+
//! These are smoke tests that verify `JoinSetTracer` can be correctly injected into DataFusion.
21+
//!
22+
//! They run a SQL query that reads Parquet data and performs an aggregation,
23+
//! which causes DataFusion to spawn multiple tasks.
24+
//! The object store is wrapped to assert that every task can be traced back to the root.
25+
//!
26+
//! These tests don't cover all edge cases, but they should fail if changes to
27+
//! DataFusion's task spawning break tracing.
28+
29+
mod asserting_tracer;
30+
mod traceable_object_store;
31+
32+
use asserting_tracer::init_asserting_tracer;
33+
use datafusion::datasource::file_format::parquet::ParquetFormat;
34+
use datafusion::datasource::listing::ListingOptions;
35+
use datafusion::prelude::*;
36+
use datafusion::test_util::parquet_test_data;
37+
use datafusion_common::assert_contains;
38+
use datafusion_common_runtime::SpawnedTask;
39+
use log::info;
40+
use object_store::local::LocalFileSystem;
41+
use std::sync::Arc;
42+
use traceable_object_store::traceable_object_store;
43+
use url::Url;
44+
45+
/// Combined test that first verifies the query panics when no tracer is registered,
46+
/// then initializes the tracer and confirms the query runs successfully.
47+
///
48+
/// Using a single test function prevents global tracer leakage between tests.
49+
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
50+
async fn test_tracer_injection() {
51+
// Without initializing the tracer, run the query.
52+
// Spawn the query in a separate task so we can catch its panic.
53+
info!("Running query without tracer");
54+
// The absence of the tracer should cause the task to panic inside the `TraceableObjectStore`.
55+
let untraced_result = SpawnedTask::spawn(run_query()).join().await;
56+
if let Err(e) = untraced_result {
57+
// Check if the error message contains the expected error.
58+
assert!(e.is_panic(), "Expected a panic, but got: {:?}", e);
59+
assert_contains!(e.to_string(), "Task ID not found in spawn graph");
60+
info!("Caught expected panic: {}", e);
61+
} else {
62+
panic!("Expected the task to panic, but it completed successfully");
63+
};
64+
65+
// Initialize the asserting tracer and run the query.
66+
info!("Initializing tracer and re-running query");
67+
init_asserting_tracer();
68+
SpawnedTask::spawn(run_query()).join().await.unwrap(); // Should complete without panics or errors.
69+
}
70+
71+
/// Executes a sample task-spawning SQL query using a traceable object store.
72+
async fn run_query() {
73+
info!("Starting query execution");
74+
75+
// Create a new session context
76+
let ctx = SessionContext::new();
77+
78+
// Get the test data directory
79+
let test_data = parquet_test_data();
80+
81+
// Define a Parquet file format with pruning enabled
82+
let file_format = ParquetFormat::default().with_enable_pruning(true);
83+
84+
// Set listing options for the parquet file with a specific extension
85+
let listing_options = ListingOptions::new(Arc::new(file_format))
86+
.with_file_extension("alltypes_tiny_pages_plain.parquet");
87+
88+
// Wrap the local file system in a traceable object store to verify task traceability.
89+
let local_fs = Arc::new(LocalFileSystem::new());
90+
let traceable_store = traceable_object_store(local_fs);
91+
92+
// Register the traceable object store with a test URL.
93+
let url = Url::parse("test://").unwrap();
94+
ctx.register_object_store(&url, traceable_store.clone());
95+
96+
// Register a listing table from the test data directory.
97+
let table_path = format!("test://{}/", test_data);
98+
ctx.register_listing_table("alltypes", &table_path, listing_options, None, None)
99+
.await
100+
.expect("Failed to register table");
101+
102+
// Define and execute an SQL query against the registered table, which should
103+
// spawn multiple tasks due to the aggregation and parquet file read.
104+
let sql = "SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col";
105+
let result_batches = ctx.sql(sql).await.unwrap().collect().await.unwrap();
106+
107+
info!("Query complete: {} batches returned", result_batches.len());
108+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Object store implementation used for testing
19+
20+
use crate::tracing::asserting_tracer::assert_traceability;
21+
use futures::stream::BoxStream;
22+
use object_store::{
23+
path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta,
24+
ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult,
25+
};
26+
use std::fmt::{Debug, Display, Formatter};
27+
use std::sync::Arc;
28+
29+
/// Returns an `ObjectStore` that asserts it can trace its calls back to the root tokio task.
30+
pub fn traceable_object_store(
31+
object_store: Arc<dyn ObjectStore>,
32+
) -> Arc<dyn ObjectStore> {
33+
Arc::new(TraceableObjectStore::new(object_store))
34+
}
35+
36+
/// An object store that asserts it can trace all its calls back to the root tokio task.
37+
#[derive(Debug)]
38+
struct TraceableObjectStore {
39+
inner: Arc<dyn ObjectStore>,
40+
}
41+
42+
impl TraceableObjectStore {
43+
fn new(inner: Arc<dyn ObjectStore>) -> Self {
44+
Self { inner }
45+
}
46+
}
47+
48+
impl Display for TraceableObjectStore {
49+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
50+
Display::fmt(&self.inner, f)
51+
}
52+
}
53+
54+
/// All trait methods are forwarded to the inner object store,
55+
/// after asserting they can trace their calls back to the root tokio task.
56+
#[async_trait::async_trait]
57+
impl ObjectStore for TraceableObjectStore {
58+
async fn put_opts(
59+
&self,
60+
location: &Path,
61+
payload: PutPayload,
62+
opts: PutOptions,
63+
) -> object_store::Result<PutResult> {
64+
assert_traceability().await;
65+
self.inner.put_opts(location, payload, opts).await
66+
}
67+
68+
async fn put_multipart_opts(
69+
&self,
70+
location: &Path,
71+
opts: PutMultipartOpts,
72+
) -> object_store::Result<Box<dyn MultipartUpload>> {
73+
assert_traceability().await;
74+
self.inner.put_multipart_opts(location, opts).await
75+
}
76+
77+
async fn get_opts(
78+
&self,
79+
location: &Path,
80+
options: GetOptions,
81+
) -> object_store::Result<GetResult> {
82+
assert_traceability().await;
83+
self.inner.get_opts(location, options).await
84+
}
85+
86+
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
87+
assert_traceability().await;
88+
self.inner.head(location).await
89+
}
90+
91+
async fn delete(&self, location: &Path) -> object_store::Result<()> {
92+
assert_traceability().await;
93+
self.inner.delete(location).await
94+
}
95+
96+
fn list(
97+
&self,
98+
prefix: Option<&Path>,
99+
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
100+
futures::executor::block_on(assert_traceability());
101+
self.inner.list(prefix)
102+
}
103+
104+
async fn list_with_delimiter(
105+
&self,
106+
prefix: Option<&Path>,
107+
) -> object_store::Result<ListResult> {
108+
assert_traceability().await;
109+
self.inner.list_with_delimiter(prefix).await
110+
}
111+
112+
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
113+
assert_traceability().await;
114+
self.inner.copy(from, to).await
115+
}
116+
117+
async fn copy_if_not_exists(
118+
&self,
119+
from: &Path,
120+
to: &Path,
121+
) -> object_store::Result<()> {
122+
assert_traceability().await;
123+
self.inner.copy_if_not_exists(from, to).await
124+
}
125+
}

0 commit comments

Comments
 (0)