Skip to content

Commit d91e3c7

Browse files
alambcomphead
authored andcommitted
Implement tree explain for DataSourceExec (apache#15029)
* Implement tree explain for DataSourceExec * improve test * Apply suggestions from code review Co-authored-by: Oleks V <[email protected]> * fmt --------- Co-authored-by: Oleks V <[email protected]>
1 parent bff838f commit d91e3c7

File tree

6 files changed

+237
-44
lines changed

6 files changed

+237
-44
lines changed

datafusion/datasource-csv/src/source.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -617,8 +617,13 @@ impl FileSource for CsvSource {
617617
fn file_type(&self) -> &str {
618618
"csv"
619619
}
620-
fn fmt_extra(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
621-
write!(f, ", has_header={}", self.has_header)
620+
fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
621+
match t {
622+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
623+
write!(f, ", has_header={}", self.has_header)
624+
}
625+
DisplayFormatType::TreeRender => Ok(()),
626+
}
622627
}
623628
}
624629

datafusion/datasource-parquet/src/source.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -554,14 +554,11 @@ impl FileSource for ParquetSource {
554554

555555
fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
556556
match t {
557-
DisplayFormatType::Default
558-
| DisplayFormatType::Verbose
559-
| DisplayFormatType::TreeRender => {
557+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
560558
let predicate_string = self
561559
.predicate()
562560
.map(|p| format!(", predicate={p}"))
563561
.unwrap_or_default();
564-
565562
let pruning_predicate_string = self
566563
.pruning_predicate()
567564
.map(|pre| {
@@ -581,6 +578,12 @@ impl FileSource for ParquetSource {
581578

582579
write!(f, "{}{}", predicate_string, pruning_predicate_string)
583580
}
581+
DisplayFormatType::TreeRender => {
582+
if let Some(predicate) = self.predicate() {
583+
writeln!(f, "predicate={predicate}")?;
584+
}
585+
Ok(())
586+
}
584587
}
585588
}
586589
}

datafusion/datasource/src/file_scan_config.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,10 @@ impl DataSource for FileScanConfig {
218218
self.fmt_file_source(t, f)
219219
}
220220
DisplayFormatType::TreeRender => {
221-
// TODO: collect info
221+
writeln!(f, "format={}", self.file_source.file_type())?;
222+
self.file_source.fmt_extra(t, f)?;
223+
let num_files = self.file_groups.iter().map(Vec::len).sum::<usize>();
224+
writeln!(f, "files={num_files}")?;
222225
Ok(())
223226
}
224227
}

datafusion/datasource/src/memory.rs

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -425,25 +425,17 @@ impl DataSource for MemorySourceConfig {
425425
}
426426
}
427427
DisplayFormatType::TreeRender => {
428-
let partition_sizes: Vec<_> =
429-
self.partitions.iter().map(|b| b.len()).collect();
430-
writeln!(f, "partition_sizes={:?}", partition_sizes)?;
431-
432-
if let Some(output_ordering) = self.sort_information.first() {
433-
writeln!(f, "output_ordering={}", output_ordering)?;
434-
}
435-
436-
let eq_properties = self.eq_properties();
437-
let constraints = eq_properties.constraints();
438-
if !constraints.is_empty() {
439-
writeln!(f, "constraints={}", constraints)?;
440-
}
441-
442-
if let Some(limit) = self.fetch {
443-
writeln!(f, "fetch={}", limit)?;
444-
}
445-
446-
write!(f, "partitions={}", partition_sizes.len())
428+
let total_rows = self.partitions.iter().map(|b| b.len()).sum::<usize>();
429+
let total_bytes: usize = self
430+
.partitions
431+
.iter()
432+
.flatten()
433+
.map(|batch| batch.get_array_memory_size())
434+
.sum();
435+
writeln!(f, "format=memory")?;
436+
writeln!(f, "rows={total_rows}")?;
437+
writeln!(f, "bytes={total_bytes}")?;
438+
Ok(())
447439
}
448440
}
449441
}

datafusion/datasource/src/source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ pub trait DataSource: Send + Sync + Debug {
5252
context: Arc<TaskContext>,
5353
) -> datafusion_common::Result<SendableRecordBatchStream>;
5454
fn as_any(&self) -> &dyn Any;
55+
/// Format this source for display in explain plans
5556
fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result;
5657

5758
/// Return a copy of this DataSource with a new partitioning scheme
@@ -103,7 +104,7 @@ impl DisplayAs for DataSourceExec {
103104
DisplayFormatType::Default | DisplayFormatType::Verbose => {
104105
write!(f, "DataSourceExec: ")?;
105106
}
106-
DisplayFormatType::TreeRender => write!(f, "")?,
107+
DisplayFormatType::TreeRender => {}
107108
}
108109
self.data_source.fmt_as(t, f)
109110
}

datafusion/sqllogictest/test_files/explain_tree.slt

Lines changed: 206 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,36 @@ STORED AS PARQUET
5454
LOCATION 'test_files/scratch/explain_tree/table2.parquet';
5555

5656

57-
# table3: Memoru
57+
# table3: Memory
5858
statement ok
5959
CREATE TABLE table3 as select * from table1;
6060

61+
# table4: JSON
62+
query I
63+
COPY (SELECT * from table1)
64+
TO 'test_files/scratch/explain_tree/table4.json'
65+
----
66+
3
67+
68+
statement ok
69+
CREATE EXTERNAL TABLE table4
70+
STORED AS JSON
71+
LOCATION 'test_files/scratch/explain_tree/table4.json';
72+
73+
# table5: ARROW
74+
query I
75+
COPY (SELECT * from table1)
76+
TO 'test_files/scratch/explain_tree/table5.arrow'
77+
----
78+
3
79+
80+
statement ok
81+
CREATE EXTERNAL TABLE table5
82+
STORED AS ARROW
83+
LOCATION 'test_files/scratch/explain_tree/table5.arrow';
84+
85+
86+
6187
######## Begin Queries ########
6288

6389
# Filter
@@ -83,7 +109,10 @@ physical_plan
83109
12)└─────────────┬─────────────┘
84110
13)┌─────────────┴─────────────┐
85111
14)│ DataSourceExec │
86-
15)└───────────────────────────┘
112+
15)│ -------------------- │
113+
16)│ files: 1 │
114+
17)│ format: csv │
115+
18)└───────────────────────────┘
87116

88117
# Aggregate
89118
query TT
@@ -110,7 +139,10 @@ physical_plan
110139
15)└─────────────┬─────────────┘
111140
16)┌─────────────┴─────────────┐
112141
17)│ DataSourceExec │
113-
18)└───────────────────────────┘
142+
18)│ -------------------- │
143+
19)│ files: 1 │
144+
20)│ format: csv │
145+
21)└───────────────────────────┘
114146

115147
# 2 Joins
116148
query TT
@@ -139,7 +171,10 @@ physical_plan
139171
15)└─────────────┬─────────────┘└─────────────┬─────────────┘
140172
16)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
141173
17)│ DataSourceExec ││ DataSourceExec │
142-
18)└───────────────────────────┘└───────────────────────────┘
174+
18)│ -------------------- ││ -------------------- │
175+
19)│ files: 1 ││ files: 1 │
176+
20)│ format: csv ││ format: parquet │
177+
21)└───────────────────────────┘└───────────────────────────┘
143178

144179
# 3 Joins
145180
query TT
@@ -175,18 +210,22 @@ physical_plan
175210
13)┌─────────────┴─────────────┐┌─────────────┴─────────────┐┌─────────────┴─────────────┐
176211
14)│ CoalesceBatchesExec ││ CoalesceBatchesExec ││ DataSourceExec │
177212
15)│ ││ ││ -------------------- │
178-
16)│ ││ ││ partition_sizes: [1] │
179-
17)│ ││ ││ partitions: 1 │
180-
18)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘
181-
19)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
182-
20)│ RepartitionExec ││ RepartitionExec │
183-
21)└─────────────┬─────────────┘└─────────────┬─────────────┘
184-
22)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
185-
23)│ RepartitionExec ││ RepartitionExec │
186-
24)└─────────────┬─────────────┘└─────────────┬─────────────┘
187-
25)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
188-
26)│ DataSourceExec ││ DataSourceExec │
189-
27)└───────────────────────────┘└───────────────────────────┘
213+
16)│ ││ ││ bytes: 1560 │
214+
17)│ ││ ││ format: memory │
215+
18)│ ││ ││ rows: 1 │
216+
19)└─────────────┬─────────────┘└─────────────┬─────────────┘└───────────────────────────┘
217+
20)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
218+
21)│ RepartitionExec ││ RepartitionExec │
219+
22)└─────────────┬─────────────┘└─────────────┬─────────────┘
220+
23)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
221+
24)│ RepartitionExec ││ RepartitionExec │
222+
25)└─────────────┬─────────────┘└─────────────┬─────────────┘
223+
26)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
224+
27)│ DataSourceExec ││ DataSourceExec │
225+
28)│ -------------------- ││ -------------------- │
226+
29)│ files: 1 ││ files: 1 │
227+
30)│ format: csv ││ format: parquet │
228+
31)└───────────────────────────┘└───────────────────────────┘
190229

191230
# Long Filter (demonstrate what happens with wrapping)
192231
query TT
@@ -213,9 +252,153 @@ physical_plan
213252
12)└─────────────┬─────────────┘
214253
13)┌─────────────┴─────────────┐
215254
14)│ DataSourceExec │
216-
15)└───────────────────────────┘
255+
15)│ -------------------- │
256+
16)│ files: 1 │
257+
17)│ format: csv │
258+
18)└───────────────────────────┘
259+
260+
# Query with filter on csv
261+
query TT
262+
explain SELECT int_col FROM table1 WHERE string_col != 'foo';
263+
----
264+
logical_plan
265+
01)Projection: table1.int_col
266+
02)--Filter: table1.string_col != Utf8("foo")
267+
03)----TableScan: table1 projection=[int_col, string_col], partial_filters=[table1.string_col != Utf8("foo")]
268+
physical_plan
269+
01)┌───────────────────────────┐
270+
02)│ CoalesceBatchesExec │
271+
03)└─────────────┬─────────────┘
272+
04)┌─────────────┴─────────────┐
273+
05)│ FilterExec │
274+
06)│ -------------------- │
275+
07)│ predicate: │
276+
08)│ string_col@1 != foo │
277+
09)└─────────────┬─────────────┘
278+
10)┌─────────────┴─────────────┐
279+
11)│ RepartitionExec │
280+
12)└─────────────┬─────────────┘
281+
13)┌─────────────┴─────────────┐
282+
14)│ DataSourceExec │
283+
15)│ -------------------- │
284+
16)│ files: 1 │
285+
17)│ format: csv │
286+
18)└───────────────────────────┘
217287

218288

289+
# Query with filter on parquet
290+
query TT
291+
explain SELECT int_col FROM table2 WHERE string_col != 'foo';
292+
----
293+
logical_plan
294+
01)Projection: table2.int_col
295+
02)--Filter: table2.string_col != Utf8View("foo")
296+
03)----TableScan: table2 projection=[int_col, string_col], partial_filters=[table2.string_col != Utf8View("foo")]
297+
physical_plan
298+
01)┌───────────────────────────┐
299+
02)│ CoalesceBatchesExec │
300+
03)└─────────────┬─────────────┘
301+
04)┌─────────────┴─────────────┐
302+
05)│ FilterExec │
303+
06)│ -------------------- │
304+
07)│ predicate: │
305+
08)│ string_col@1 != foo │
306+
09)└─────────────┬─────────────┘
307+
10)┌─────────────┴─────────────┐
308+
11)│ RepartitionExec │
309+
12)└─────────────┬─────────────┘
310+
13)┌─────────────┴─────────────┐
311+
14)│ DataSourceExec │
312+
15)│ -------------------- │
313+
16)│ files: 1 │
314+
17)│ format: parquet │
315+
18)│ │
316+
19)│ predicate: │
317+
20)│ string_col@1 != foo │
318+
21)└───────────────────────────┘
319+
320+
# Query with filter on memory
321+
query TT
322+
explain SELECT int_col FROM table3 WHERE string_col != 'foo';
323+
----
324+
logical_plan
325+
01)Projection: table3.int_col
326+
02)--Filter: table3.string_col != Utf8("foo")
327+
03)----TableScan: table3 projection=[int_col, string_col]
328+
physical_plan
329+
01)┌───────────────────────────┐
330+
02)│ CoalesceBatchesExec │
331+
03)└─────────────┬─────────────┘
332+
04)┌─────────────┴─────────────┐
333+
05)│ FilterExec │
334+
06)│ -------------------- │
335+
07)│ predicate: │
336+
08)│ string_col@1 != foo │
337+
09)└─────────────┬─────────────┘
338+
10)┌─────────────┴─────────────┐
339+
11)│ DataSourceExec │
340+
12)│ -------------------- │
341+
13)│ bytes: 1560 │
342+
14)│ format: memory │
343+
15)│ rows: 1 │
344+
16)└───────────────────────────┘
345+
346+
# Query with filter on json
347+
query TT
348+
explain SELECT int_col FROM table4 WHERE string_col != 'foo';
349+
----
350+
logical_plan
351+
01)Projection: table4.int_col
352+
02)--Filter: table4.string_col != Utf8("foo")
353+
03)----TableScan: table4 projection=[int_col, string_col], partial_filters=[table4.string_col != Utf8("foo")]
354+
physical_plan
355+
01)┌───────────────────────────┐
356+
02)│ CoalesceBatchesExec │
357+
03)└─────────────┬─────────────┘
358+
04)┌─────────────┴─────────────┐
359+
05)│ FilterExec │
360+
06)│ -------------------- │
361+
07)│ predicate: │
362+
08)│ string_col@1 != foo │
363+
09)└─────────────┬─────────────┘
364+
10)┌─────────────┴─────────────┐
365+
11)│ RepartitionExec │
366+
12)└─────────────┬─────────────┘
367+
13)┌─────────────┴─────────────┐
368+
14)│ DataSourceExec │
369+
15)│ -------------------- │
370+
16)│ files: 1 │
371+
17)│ format: json │
372+
18)└───────────────────────────┘
373+
374+
# Query with filter on arrow
375+
query TT
376+
explain SELECT int_col FROM table5 WHERE string_col != 'foo';
377+
----
378+
logical_plan
379+
01)Projection: table5.int_col
380+
02)--Filter: table5.string_col != Utf8("foo")
381+
03)----TableScan: table5 projection=[int_col, string_col], partial_filters=[table5.string_col != Utf8("foo")]
382+
physical_plan
383+
01)┌───────────────────────────┐
384+
02)│ CoalesceBatchesExec │
385+
03)└─────────────┬─────────────┘
386+
04)┌─────────────┴─────────────┐
387+
05)│ FilterExec │
388+
06)│ -------------------- │
389+
07)│ predicate: │
390+
08)│ string_col@1 != foo │
391+
09)└─────────────┬─────────────┘
392+
10)┌─────────────┴─────────────┐
393+
11)│ RepartitionExec │
394+
12)└─────────────┬─────────────┘
395+
13)┌─────────────┴─────────────┐
396+
14)│ DataSourceExec │
397+
15)│ -------------------- │
398+
16)│ files: 1 │
399+
17)│ format: arrow │
400+
18)└───────────────────────────┘
401+
219402
# cleanup
220403
statement ok
221404
drop table table1;
@@ -225,3 +408,9 @@ drop table table2;
225408

226409
statement ok
227410
drop table table3;
411+
412+
statement ok
413+
drop table table4;
414+
415+
statement ok
416+
drop table table5;

0 commit comments

Comments
 (0)