Skip to content

Implement tree explain for DataSourceExec #15029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions datafusion/datasource-csv/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,8 +617,13 @@ impl FileSource for CsvSource {
fn file_type(&self) -> &str {
"csv"
}
fn fmt_extra(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, ", has_header={}", self.has_header)
fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, ", has_header={}", self.has_header)
}
DisplayFormatType::TreeRender => Ok(()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the description of TreeRender:

/// TreeRender, displayed in the `tree` explain type.
///
/// This format is inspired by DuckDB's explain plans. The information
/// presented should be "user friendly", and contain only the most relevant
/// information for understanding a plan. It should NOT contain the same level
/// of detail information as the [`Self::Default`] format.
///
/// In this mode, each line contains a key=value pair.
/// Everything before the first `=` is treated as the key, and everything after the
/// first `=` is treated as the value.
///
/// For example, if the output of `TreeRender` is this:
/// ```text
/// partition_sizes=[1]
/// partitions=1
/// ```
///
/// It is rendered in the center of a box in the following way:
///
/// ```text
/// ┌───────────────────────────┐
/// │ DataSourceExec │
/// │ -------------------- │
/// │ partition_sizes: [1] │
/// │ partitions: 1 │
/// └───────────────────────────┘
/// ```

TreeRender mode should have only the most relevant details for understanding the high level plan

}
}
}

Expand Down
11 changes: 7 additions & 4 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,14 +554,11 @@ impl FileSource for ParquetSource {

fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default
| DisplayFormatType::Verbose
| DisplayFormatType::TreeRender => {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
let predicate_string = self
.predicate()
.map(|p| format!(", predicate={p}"))
.unwrap_or_default();

let pruning_predicate_string = self
.pruning_predicate()
.map(|pre| {
Expand All @@ -581,6 +578,12 @@ impl FileSource for ParquetSource {

write!(f, "{}{}", predicate_string, pruning_predicate_string)
}
DisplayFormatType::TreeRender => {
if let Some(predicate) = self.predicate() {
writeln!(f, "predicate={predicate}")?;
}
Ok(())
}
}
}
}
5 changes: 4 additions & 1 deletion datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ impl DataSource for FileScanConfig {
self.fmt_file_source(t, f)
}
DisplayFormatType::TreeRender => {
// TODO: collect info
writeln!(f, "format={}", self.file_source.file_type())?;
self.file_source.fmt_extra(t, f)?;
let num_files = self.file_groups.iter().map(Vec::len).sum::<usize>();
writeln!(f, "files={num_files}")?;
Ok(())
}
}
Expand Down
30 changes: 11 additions & 19 deletions datafusion/datasource/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,25 +425,17 @@ impl DataSource for MemorySourceConfig {
}
}
DisplayFormatType::TreeRender => {
let partition_sizes: Vec<_> =
self.partitions.iter().map(|b| b.len()).collect();
writeln!(f, "partition_sizes={:?}", partition_sizes)?;

if let Some(output_ordering) = self.sort_information.first() {
writeln!(f, "output_ordering={}", output_ordering)?;
}

let eq_properties = self.eq_properties();
let constraints = eq_properties.constraints();
if !constraints.is_empty() {
writeln!(f, "constraints={}", constraints)?;
}

if let Some(limit) = self.fetch {
writeln!(f, "fetch={}", limit)?;
}

write!(f, "partitions={}", partition_sizes.len())
let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, the previous version is too verbose I think

let total_bytes: usize = self
.partitions
.iter()
.flatten()
.map(|batch| batch.get_array_memory_size())
.sum();
writeln!(f, "format=memory")?;
writeln!(f, "rows={total_rows}")?;
writeln!(f, "bytes={total_bytes}")?;
Ok(())
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub trait DataSource: Send + Sync + Debug {
context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream>;
fn as_any(&self) -> &dyn Any;
/// Format this source for display in explain plans
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;

/// Return a copy of this DataSource with a new partitioning scheme
Expand Down Expand Up @@ -103,7 +104,7 @@ impl DisplayAs for DataSourceExec {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "DataSourceExec: ")?;
}
DisplayFormatType::TreeRender => write!(f, "")?,
DisplayFormatType::TreeRender => {}
}
self.data_source.fmt_as(t, f)
}
Expand Down
223 changes: 206 additions & 17 deletions datafusion/sqllogictest/test_files/explain_tree.slt
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,36 @@ STORED AS PARQUET
LOCATION 'test_files/scratch/explain_tree/table2.parquet';


# table3: Memoru
# table3: Memory
statement ok
CREATE TABLE table3 as select * from table1;

# table4: JSON
query I
COPY (SELECT * from table1)
TO 'test_files/scratch/explain_tree/table4.json'
----
3

statement ok
CREATE EXTERNAL TABLE table4
STORED AS JSON
LOCATION 'test_files/scratch/explain_tree/table4.json';

# table5: ARROW
query I
COPY (SELECT * from table1)
TO 'test_files/scratch/explain_tree/table5.arrow'
----
3

statement ok
CREATE EXTERNAL TABLE table5
STORED AS ARROW
LOCATION 'test_files/scratch/explain_tree/table5.arrow';



######## Begin Queries ########

# Filter
Expand All @@ -83,7 +109,10 @@ physical_plan
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ DataSourceExec │
15)└───────────────────────────┘
15)│ -------------------- │
16)│ files: 1 │
17)│ format: csv │
18)└───────────────────────────┘

# Aggregate
query TT
Expand All @@ -110,7 +139,10 @@ physical_plan
15)└─────────────┬─────────────┘
16)┌─────────────┴─────────────┐
17)│ DataSourceExec │
18)└───────────────────────────┘
18)│ -------------------- │
19)│ files: 1 │
20)│ format: csv │
21)└───────────────────────────┘

# 2 Joins
query TT
Expand Down Expand Up @@ -139,7 +171,10 @@ physical_plan
15)└─────────────┬─────────────┘└─────────────┬─────────────┘
16)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
17)│ DataSourceExec ││ DataSourceExec │
18)└───────────────────────────┘└───────────────────────────┘
18)│ -------------------- ││ -------------------- │
19)│ files: 1 ││ files: 1 │
20)│ format: csv ││ format: parquet │
21)└───────────────────────────┘└───────────────────────────┘

# 3 Joins
query TT
Expand Down Expand Up @@ -175,18 +210,22 @@ physical_plan
13)┌─────────────┴─────────────┐┌─────────────┴─────────────┐┌─────────────┴─────────────┐
14)│ CoalesceBatchesExec ││ CoalesceBatchesExec ││ DataSourceExec │
15)│ ││ ││ -------------------- │
16)│ ││ ││ partition_sizes: [1] │
17)│ ││ ││ partitions: 1 │
18)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘
19)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
20)│ RepartitionExec ││ RepartitionExec │
21)└─────────────┬─────────────┘└─────────────┬─────────────┘
22)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
23)│ RepartitionExec ││ RepartitionExec │
24)└─────────────┬─────────────┘└─────────────┬─────────────┘
25)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
26)│ DataSourceExec ││ DataSourceExec │
27)└───────────────────────────┘└───────────────────────────┘
16)│ ││ ││ bytes: 1560 │
17)│ ││ ││ format: memory │
18)│ ││ ││ rows: 1 │
19)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘
20)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
21)│ RepartitionExec ││ RepartitionExec │
22)└─────────────┬─────────────┘└─────────────┬─────────────┘
23)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
24)│ RepartitionExec ││ RepartitionExec │
25)└─────────────┬─────────────┘└─────────────┬─────────────┘
26)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
27)│ DataSourceExec ││ DataSourceExec │
28)│ -------------------- ││ -------------------- │
29)│ files: 1 ││ files: 1 │
30)│ format: csv ││ format: parquet │
31)└───────────────────────────┘└───────────────────────────┘

# Long Filter (demonstrate what happens with wrapping)
query TT
Expand All @@ -213,9 +252,153 @@ physical_plan
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ DataSourceExec │
15)└───────────────────────────┘
15)│ -------------------- │
16)│ files: 1 │
17)│ format: csv │
18)└───────────────────────────┘

# Query with filter on csv
query TT
explain SELECT int_col FROM table1 WHERE string_col != 'foo';
----
logical_plan
01)Projection: table1.int_col
02)--Filter: table1.string_col != Utf8("foo")
03)----TableScan: table1 projection=[int_col, string_col], partial_filters=[table1.string_col != Utf8("foo")]
physical_plan
01)┌───────────────────────────┐
02)│ CoalesceBatchesExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ FilterExec │
06)│ -------------------- │
07)│ predicate: │
08)│ string_col@1 != foo │
09)└─────────────┬─────────────┘
10)┌─────────────┴─────────────┐
11)│ RepartitionExec │
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ DataSourceExec │
15)│ -------------------- │
16)│ files: 1 │
17)│ format: csv │
18)└───────────────────────────┘


# Query with filter on parquet
query TT
explain SELECT int_col FROM table2 WHERE string_col != 'foo';
----
logical_plan
01)Projection: table2.int_col
02)--Filter: table2.string_col != Utf8View("foo")
03)----TableScan: table2 projection=[int_col, string_col], partial_filters=[table2.string_col != Utf8View("foo")]
physical_plan
01)┌───────────────────────────┐
02)│ CoalesceBatchesExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ FilterExec │
06)│ -------------------- │
07)│ predicate: │
08)│ string_col@1 != foo │
09)└─────────────┬─────────────┘
10)┌─────────────┴─────────────┐
11)│ RepartitionExec │
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ DataSourceExec │
15)│ -------------------- │
16)│ files: 1 │
17)│ format: parquet │
18)│ │
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why there is an extra newline here 🤔

19)│ predicate: │
20)│ string_col@1 != foo │
21)└───────────────────────────┘

# Query with filter on memory
query TT
explain SELECT int_col FROM table3 WHERE string_col != 'foo';
----
logical_plan
01)Projection: table3.int_col
02)--Filter: table3.string_col != Utf8("foo")
03)----TableScan: table3 projection=[int_col, string_col]
physical_plan
01)┌───────────────────────────┐
02)│ CoalesceBatchesExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ FilterExec │
06)│ -------------------- │
07)│ predicate: │
08)│ string_col@1 != foo │
09)└─────────────┬─────────────┘
10)┌─────────────┴─────────────┐
11)│ DataSourceExec │
12)│ -------------------- │
13)│ bytes: 1560 │
14)│ format: memory │
15)│ rows: 1 │
16)└───────────────────────────┘

# Query with filter on json
query TT
explain SELECT int_col FROM table4 WHERE string_col != 'foo';
----
logical_plan
01)Projection: table4.int_col
02)--Filter: table4.string_col != Utf8("foo")
03)----TableScan: table4 projection=[int_col, string_col], partial_filters=[table4.string_col != Utf8("foo")]
physical_plan
01)┌───────────────────────────┐
02)│ CoalesceBatchesExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ FilterExec │
06)│ -------------------- │
07)│ predicate: │
08)│ string_col@1 != foo │
09)└─────────────┬─────────────┘
10)┌─────────────┴─────────────┐
11)│ RepartitionExec │
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ DataSourceExec │
15)│ -------------------- │
16)│ files: 1 │
17)│ format: json │
18)└───────────────────────────┘

# Query with filter on arrow
query TT
explain SELECT int_col FROM table5 WHERE string_col != 'foo';
----
logical_plan
01)Projection: table5.int_col
02)--Filter: table5.string_col != Utf8("foo")
03)----TableScan: table5 projection=[int_col, string_col], partial_filters=[table5.string_col != Utf8("foo")]
physical_plan
01)┌───────────────────────────┐
02)│ CoalesceBatchesExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ FilterExec │
06)│ -------------------- │
07)│ predicate: │
08)│ string_col@1 != foo │
09)└─────────────┬─────────────┘
10)┌─────────────┴─────────────┐
11)│ RepartitionExec │
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ DataSourceExec │
15)│ -------------------- │
16)│ files: 1 │
17)│ format: arrow │
18)└───────────────────────────┘

# cleanup
statement ok
drop table table1;
Expand All @@ -225,3 +408,9 @@ drop table table2;

statement ok
drop table table3;

statement ok
drop table table4;

statement ok
drop table table5;