|
15 | 15 | // specific language governing permissions and limitations
|
16 | 16 | // under the License.
|
17 | 17 |
|
18 |
| -//! This example demonstrates the trace feature in DataFusion’s runtime. |
19 |
| -//! When the `tracing` feature is enabled, spawned tasks in DataFusion (such as those |
20 |
| -//! created during repartitioning or when reading Parquet files) are instrumented |
21 |
| -//! with the current tracing span, allowing to propagate any existing tracing context. |
22 |
| -//! |
23 |
| -//! In this example we create a session configured to use multiple partitions, |
24 |
| -//! register a Parquet table (based on the `alltypes_tiny_pages_plain.parquet` file), |
25 |
| -//! and run a query that should trigger parallel execution on multiple threads. |
26 |
| -//! We wrap the entire query execution within a custom span and log messages. |
27 |
| -//! By inspecting the tracing output, we should see that the tasks spawned |
28 |
| -//! internally inherit the span context. |
| 18 | +//! This example demonstrates the tracing injection feature for the DataFusion runtime. |
| 19 | +//! Tasks spawned on new threads behave differently depending on whether a tracer is |
| 20 | +//! injected. The log output clearly distinguishes the two cases. |
29 | 21 |
|
30 |
| -use arrow::util::pretty::pretty_format_batches; |
31 |
| -use datafusion::arrow::record_batch::RecordBatch; |
| 22 | +use datafusion::common::runtime::set_join_set_tracer; |
32 | 23 | use datafusion::datasource::file_format::parquet::ParquetFormat;
|
33 | 24 | use datafusion::datasource::listing::ListingOptions;
|
34 | 25 | use datafusion::error::Result;
|
35 | 26 | use datafusion::prelude::*;
|
36 | 27 | use datafusion::test_util::parquet_test_data;
|
| 28 | +use futures::future::BoxFuture; |
| 29 | +use futures::FutureExt; |
| 30 | +use std::any::Any; |
37 | 31 | use std::sync::Arc;
|
38 |
| -use tracing::{info, instrument, Level}; |
| 32 | +use tracing::{info, instrument, Instrument, Level, Span}; |
39 | 33 |
|
40 | 34 | #[tokio::main]
|
41 | 35 | async fn main() -> Result<()> {
|
42 |
| - // Initialize a tracing subscriber that prints to stdout. |
| 36 | + // Initialize tracing subscriber with thread info. |
43 | 37 | tracing_subscriber::fmt()
|
44 | 38 | .with_thread_ids(true)
|
45 | 39 | .with_thread_names(true)
|
46 | 40 | .with_max_level(Level::DEBUG)
|
47 | 41 | .init();
|
48 | 42 |
|
49 |
| - log::info!("Starting example, this log is not captured by tracing"); |
| 43 | + // Run query WITHOUT tracer injection. |
| 44 | + info!("***** RUNNING WITHOUT INJECTED TRACER *****"); |
| 45 | + run_instrumented_query().await?; |
| 46 | + info!("***** WITHOUT tracer: `tokio-runtime-worker` tasks did NOT inherit the `run_instrumented_query` span *****"); |
50 | 47 |
|
51 |
| - // execute the query within a tracing span |
52 |
| - let result = run_instrumented_query().await; |
| 48 | + // Inject custom tracer so tasks run in the current span. |
| 49 | + info!("Injecting custom tracer..."); |
| 50 | + set_join_set_tracer(instrument_future, instrument_block) |
| 51 | + .expect("Failed to set tracer"); |
53 | 52 |
|
54 |
| - info!( |
55 |
| - "Finished example. Check the logs above for tracing span details showing \ |
56 |
| -that tasks were spawned within the 'run_instrumented_query' span on different threads." |
57 |
| - ); |
| 53 | + // Run query WITH tracer injection. |
| 54 | + info!("***** RUNNING WITH INJECTED TRACER *****"); |
| 55 | + run_instrumented_query().await?; |
| 56 | + info!("***** WITH tracer: `tokio-runtime-worker` tasks DID inherit the `run_instrumented_query` span *****"); |
58 | 57 |
|
59 |
| - result |
| 58 | + Ok(()) |
| 59 | +} |
| 60 | + |
| 61 | +/// Instruments a boxed future to run in the current span. |
| 62 | +fn instrument_future( |
| 63 | + fut: BoxFuture<'static, Box<dyn Any + Send>>, |
| 64 | +) -> BoxFuture<'static, Box<dyn Any + Send>> { |
| 65 | + fut.in_current_span().boxed() |
| 66 | +} |
| 67 | + |
| 68 | +/// Instruments a boxed blocking closure to execute within the current span. |
| 69 | +fn instrument_block( |
| 70 | + f: Box<dyn FnOnce() -> Box<dyn Any + Send> + Send>, |
| 71 | +) -> Box<dyn FnOnce() -> Box<dyn Any + Send> + Send> { |
| 72 | + let span = Span::current(); |
| 73 | + Box::new(move || span.in_scope(|| f())) |
60 | 74 | }
|
61 | 75 |
|
62 | 76 | #[instrument(level = "info")]
|
63 | 77 | async fn run_instrumented_query() -> Result<()> {
|
64 |
| - info!("Starting query execution within the custom tracing span"); |
| 78 | + info!("Starting query execution"); |
65 | 79 |
|
66 |
| - // The default session will set the number of partitions to `std::thread::available_parallelism()`. |
67 | 80 | let ctx = SessionContext::new();
|
68 |
| - |
69 |
| - // Get the path to the test parquet data. |
70 | 81 | let test_data = parquet_test_data();
|
71 |
| - // Build listing options that pick up only the "alltypes_tiny_pages_plain.parquet" file. |
72 | 82 | let file_format = ParquetFormat::default().with_enable_pruning(true);
|
73 | 83 | let listing_options = ListingOptions::new(Arc::new(file_format))
|
74 | 84 | .with_file_extension("alltypes_tiny_pages_plain.parquet");
|
75 | 85 |
|
76 |
| - info!("Registering Parquet table 'alltypes' from {test_data} in {listing_options:?}"); |
77 |
| - |
78 |
| - // Register a listing table using an absolute URL. |
79 | 86 | let table_path = format!("file://{test_data}/");
|
80 |
| - ctx.register_listing_table( |
81 |
| - "alltypes", |
82 |
| - &table_path, |
83 |
| - listing_options.clone(), |
84 |
| - None, |
85 |
| - None, |
86 |
| - ) |
87 |
| - .await |
88 |
| - .expect("register_listing_table failed"); |
89 |
| - |
90 |
| - info!("Registered Parquet table 'alltypes' from {table_path}"); |
91 |
| - |
92 |
| - // Run a query that will trigger parallel execution on multiple threads. |
93 |
| - let sql = "SELECT COUNT(*), bool_col, date_string_col, string_col |
94 |
| - FROM ( |
95 |
| - SELECT bool_col, date_string_col, string_col FROM alltypes |
96 |
| - UNION ALL |
97 |
| - SELECT bool_col, date_string_col, string_col FROM alltypes |
98 |
| - ) AS t |
99 |
| - GROUP BY bool_col, date_string_col, string_col |
100 |
| - ORDER BY 1,2,3,4 DESC |
101 |
| - LIMIT 5;"; |
102 |
| - info!(%sql, "Executing SQL query"); |
103 |
| - let df = ctx.sql(sql).await?; |
104 |
| - |
105 |
| - let results: Vec<RecordBatch> = df.collect().await?; |
106 |
| - info!("Query execution complete"); |
107 |
| - |
108 |
| - // Print out the results and tracing output. |
109 |
| - datafusion::common::assert_batches_eq!( |
110 |
| - [ |
111 |
| - "+----------+----------+-----------------+------------+", |
112 |
| - "| count(*) | bool_col | date_string_col | string_col |", |
113 |
| - "+----------+----------+-----------------+------------+", |
114 |
| - "| 2 | false | 01/01/09 | 9 |", |
115 |
| - "| 2 | false | 01/01/09 | 7 |", |
116 |
| - "| 2 | false | 01/01/09 | 5 |", |
117 |
| - "| 2 | false | 01/01/09 | 3 |", |
118 |
| - "| 2 | false | 01/01/09 | 1 |", |
119 |
| - "+----------+----------+-----------------+------------+", |
120 |
| - ], |
121 |
| - &results |
122 |
| - ); |
123 |
| - |
124 |
| - info!("Query results:\n{}", pretty_format_batches(&results)?); |
125 |
| - |
| 87 | + info!("Registering table 'alltypes' from {}", table_path); |
| 88 | + ctx.register_listing_table("alltypes", &table_path, listing_options, None, None) |
| 89 | + .await |
| 90 | + .expect("Failed to register table"); |
| 91 | + |
| 92 | + let sql = "SELECT COUNT(*), string_col FROM alltypes GROUP BY string_col"; |
| 93 | + info!(sql, "Executing SQL query"); |
| 94 | + let result = ctx.sql(sql).await?.collect().await?; |
| 95 | + info!("Query complete: {} batches returned", result.len()); |
126 | 96 | Ok(())
|
127 | 97 | }
|
0 commit comments