@@ -22,7 +22,7 @@ mod test {
22
22
use datafusion:: prelude:: SessionContext ;
23
23
use datafusion_catalog:: TableProvider ;
24
24
use datafusion_common:: stats:: Precision ;
25
- use datafusion_common:: { ScalarValue , Statistics } ;
25
+ use datafusion_common:: { ColumnStatistics , ScalarValue , Statistics } ;
26
26
use datafusion_execution:: config:: SessionConfig ;
27
27
use datafusion_expr_common:: operator:: Operator ;
28
28
use datafusion_physical_expr:: expressions:: { binary, lit, Column } ;
@@ -35,6 +35,18 @@ mod test {
35
35
use datafusion_physical_plan:: ExecutionPlan ;
36
36
use std:: sync:: Arc ;
37
37
38
+ /// Creates a test table with statistics from the test data directory.
39
+ ///
40
+ /// This function:
41
+ /// - Creates an external table from './tests/data/test_statistics_per_partition'
42
+ /// - If we set the `target_partition` to `2, the data contains 2 partitions, each with 2 rows
43
+ /// - Each partition has an "id" column (INT) with the following values:
44
+ /// - First partition: [3, 4]
45
+ /// - Second partition: [1, 2]
46
+ /// - Each row is 110 bytes in size
47
+ ///
48
+ /// @param target_partition Optional parameter to set the target partitions
49
+ /// @return ExecutionPlan representing the scan of the table with statistics
38
50
async fn generate_listing_table_with_statistics (
39
51
target_partition : Option < usize > ,
40
52
) -> Arc < dyn ExecutionPlan > {
@@ -63,45 +75,52 @@ mod test {
63
75
. unwrap ( )
64
76
}
65
77
66
- fn check_unchanged_statistics ( statistics : Vec < Statistics > ) {
67
- // Check the statistics of each partition
68
- for stat in & statistics {
69
- assert_eq ! ( stat. num_rows, Precision :: Exact ( 2 ) ) ;
70
- // First column (id) should have non-null values
71
- assert_eq ! ( stat. column_statistics[ 0 ] . null_count, Precision :: Exact ( 0 ) ) ;
78
+ /// Helper function to create expected statistics for a partition with Int32 column
79
+ fn create_partition_statistics (
80
+ num_rows : usize ,
81
+ total_byte_size : usize ,
82
+ min_value : i32 ,
83
+ max_value : i32 ,
84
+ include_date_column : bool ,
85
+ ) -> Statistics {
86
+ let mut column_stats = vec ! [ ColumnStatistics {
87
+ null_count: Precision :: Exact ( 0 ) ,
88
+ max_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( max_value) ) ) ,
89
+ min_value: Precision :: Exact ( ScalarValue :: Int32 ( Some ( min_value) ) ) ,
90
+ sum_value: Precision :: Absent ,
91
+ distinct_count: Precision :: Absent ,
92
+ } ] ;
93
+
94
+ if include_date_column {
95
+ column_stats. push ( ColumnStatistics {
96
+ null_count : Precision :: Absent ,
97
+ max_value : Precision :: Absent ,
98
+ min_value : Precision :: Absent ,
99
+ sum_value : Precision :: Absent ,
100
+ distinct_count : Precision :: Absent ,
101
+ } ) ;
72
102
}
73
103
74
- // Verify specific id values for each partition
75
- assert_eq ! (
76
- statistics[ 0 ] . column_statistics[ 0 ] . max_value,
77
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
78
- ) ;
79
- assert_eq ! (
80
- statistics[ 0 ] . column_statistics[ 0 ] . min_value,
81
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
82
- ) ;
83
- assert_eq ! (
84
- statistics[ 1 ] . column_statistics[ 0 ] . max_value,
85
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
86
- ) ;
87
- assert_eq ! (
88
- statistics[ 1 ] . column_statistics[ 0 ] . min_value,
89
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
90
- ) ;
104
+ Statistics {
105
+ num_rows : Precision :: Exact ( num_rows) ,
106
+ total_byte_size : Precision :: Exact ( total_byte_size) ,
107
+ column_statistics : column_stats,
108
+ }
91
109
}
92
110
93
111
#[ tokio:: test]
94
112
async fn test_statistics_by_partition_of_data_source ( ) -> datafusion_common:: Result < ( ) >
95
113
{
96
114
let scan = generate_listing_table_with_statistics ( Some ( 2 ) ) . await ;
97
115
let statistics = scan. statistics_by_partition ( ) ?;
116
+ let expected_statistic_partition_1 =
117
+ create_partition_statistics ( 2 , 110 , 3 , 4 , true ) ;
118
+ let expected_statistic_partition_2 =
119
+ create_partition_statistics ( 2 , 110 , 1 , 2 , true ) ;
98
120
// Check the statistics of each partition
99
121
assert_eq ! ( statistics. len( ) , 2 ) ;
100
- for stat in & statistics {
101
- assert_eq ! ( stat. column_statistics. len( ) , 2 ) ;
102
- assert_eq ! ( stat. total_byte_size, Precision :: Exact ( 110 ) ) ;
103
- }
104
- check_unchanged_statistics ( statistics) ;
122
+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
123
+ assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
105
124
Ok ( ( ) )
106
125
}
107
126
@@ -114,11 +133,14 @@ mod test {
114
133
vec ! [ ( Arc :: new( Column :: new( "id" , 0 ) ) , "id" . to_string( ) ) ] ;
115
134
let projection = ProjectionExec :: try_new ( exprs, scan) ?;
116
135
let statistics = projection. statistics_by_partition ( ) ?;
117
- for stat in & statistics {
118
- assert_eq ! ( stat. column_statistics. len( ) , 1 ) ;
119
- assert_eq ! ( stat. total_byte_size, Precision :: Exact ( 8 ) ) ;
120
- }
121
- check_unchanged_statistics ( statistics) ;
136
+ let expected_statistic_partition_1 =
137
+ create_partition_statistics ( 2 , 8 , 3 , 4 , false ) ;
138
+ let expected_statistic_partition_2 =
139
+ create_partition_statistics ( 2 , 8 , 1 , 2 , false ) ;
140
+ // Check the statistics of each partition
141
+ assert_eq ! ( statistics. len( ) , 2 ) ;
142
+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
143
+ assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
122
144
Ok ( ( ) )
123
145
}
124
146
@@ -138,55 +160,20 @@ mod test {
138
160
) ;
139
161
let mut sort_exec = Arc :: new ( sort. clone ( ) ) ;
140
162
let statistics = sort_exec. statistics_by_partition ( ) ?;
163
+ let expected_statistic_partition =
164
+ create_partition_statistics ( 4 , 220 , 1 , 4 , true ) ;
141
165
assert_eq ! ( statistics. len( ) , 1 ) ;
142
- assert_eq ! ( statistics[ 0 ] . num_rows, Precision :: Exact ( 4 ) ) ;
143
- assert_eq ! ( statistics[ 0 ] . column_statistics. len( ) , 2 ) ;
144
- assert_eq ! ( statistics[ 0 ] . total_byte_size, Precision :: Exact ( 220 ) ) ;
145
- assert_eq ! (
146
- statistics[ 0 ] . column_statistics[ 0 ] . null_count,
147
- Precision :: Exact ( 0 )
148
- ) ;
149
- assert_eq ! (
150
- statistics[ 0 ] . column_statistics[ 0 ] . max_value,
151
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
152
- ) ;
153
- assert_eq ! (
154
- statistics[ 0 ] . column_statistics[ 0 ] . min_value,
155
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
156
- ) ;
166
+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition) ;
167
+
157
168
sort_exec = Arc :: new ( sort. with_preserve_partitioning ( true ) ) ;
169
+ let expected_statistic_partition_1 =
170
+ create_partition_statistics ( 2 , 110 , 3 , 4 , true ) ;
171
+ let expected_statistic_partition_2 =
172
+ create_partition_statistics ( 2 , 110 , 1 , 2 , true ) ;
158
173
let statistics = sort_exec. statistics_by_partition ( ) ?;
159
174
assert_eq ! ( statistics. len( ) , 2 ) ;
160
- assert_eq ! ( statistics[ 0 ] . num_rows, Precision :: Exact ( 2 ) ) ;
161
- assert_eq ! ( statistics[ 1 ] . num_rows, Precision :: Exact ( 2 ) ) ;
162
- assert_eq ! ( statistics[ 0 ] . column_statistics. len( ) , 2 ) ;
163
- assert_eq ! ( statistics[ 1 ] . column_statistics. len( ) , 2 ) ;
164
- assert_eq ! ( statistics[ 0 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
165
- assert_eq ! ( statistics[ 1 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
166
- assert_eq ! (
167
- statistics[ 0 ] . column_statistics[ 0 ] . null_count,
168
- Precision :: Exact ( 0 )
169
- ) ;
170
- assert_eq ! (
171
- statistics[ 0 ] . column_statistics[ 0 ] . max_value,
172
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
173
- ) ;
174
- assert_eq ! (
175
- statistics[ 0 ] . column_statistics[ 0 ] . min_value,
176
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
177
- ) ;
178
- assert_eq ! (
179
- statistics[ 1 ] . column_statistics[ 0 ] . null_count,
180
- Precision :: Exact ( 0 )
181
- ) ;
182
- assert_eq ! (
183
- statistics[ 1 ] . column_statistics[ 0 ] . max_value,
184
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
185
- ) ;
186
- assert_eq ! (
187
- statistics[ 1 ] . column_statistics[ 0 ] . min_value,
188
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
189
- ) ;
175
+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
176
+ assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
190
177
Ok ( ( ) )
191
178
}
192
179
@@ -202,48 +189,33 @@ mod test {
202
189
) ?;
203
190
let filter: Arc < dyn ExecutionPlan > =
204
191
Arc :: new ( FilterExec :: try_new ( predicate, scan) ?) ;
205
- let _full_statistics = filter. statistics ( ) ?;
206
- // The full statistics is invalid, at least, we can improve the selectivity estimation of the filter
207
- /*
208
- Statistics {
209
- num_rows: Inexact(0),
210
- total_byte_size: Inexact(0),
211
- column_statistics: [
212
- ColumnStatistics {
213
- null_count: Exact(0),
214
- max_value: Exact(NULL),
215
- min_value: Exact(NULL),
216
- sum_value: Exact(NULL),
217
- distinct_count: Exact(0),
218
- },
219
- ColumnStatistics {
220
- null_count: Exact(0),
221
- max_value: Exact(NULL),
222
- min_value: Exact(NULL),
223
- sum_value: Exact(NULL),
224
- distinct_count: Exact(0),
225
- },
226
- ],
227
- }
228
- */
192
+ let full_statistics = filter. statistics ( ) ?;
193
+ let expected_full_statistic = Statistics {
194
+ num_rows : Precision :: Inexact ( 0 ) ,
195
+ total_byte_size : Precision :: Inexact ( 0 ) ,
196
+ column_statistics : vec ! [
197
+ ColumnStatistics {
198
+ null_count: Precision :: Exact ( 0 ) ,
199
+ max_value: Precision :: Exact ( ScalarValue :: Null ) ,
200
+ min_value: Precision :: Exact ( ScalarValue :: Null ) ,
201
+ sum_value: Precision :: Exact ( ScalarValue :: Null ) ,
202
+ distinct_count: Precision :: Exact ( 0 ) ,
203
+ } ,
204
+ ColumnStatistics {
205
+ null_count: Precision :: Exact ( 0 ) ,
206
+ max_value: Precision :: Exact ( ScalarValue :: Null ) ,
207
+ min_value: Precision :: Exact ( ScalarValue :: Null ) ,
208
+ sum_value: Precision :: Exact ( ScalarValue :: Null ) ,
209
+ distinct_count: Precision :: Exact ( 0 ) ,
210
+ } ,
211
+ ] ,
212
+ } ;
213
+ assert_eq ! ( full_statistics, expected_full_statistic) ;
214
+
229
215
let statistics = filter. statistics_by_partition ( ) ?;
230
- // Also the statistics of each partition is also invalid due to above
231
- // But we can ensure the current behavior by tests
232
216
assert_eq ! ( statistics. len( ) , 2 ) ;
233
- for stat in & statistics {
234
- assert_eq ! ( stat. column_statistics. len( ) , 2 ) ;
235
- assert_eq ! ( stat. total_byte_size, Precision :: Inexact ( 0 ) ) ;
236
- assert_eq ! ( stat. num_rows, Precision :: Inexact ( 0 ) ) ;
237
- assert_eq ! ( stat. column_statistics[ 0 ] . null_count, Precision :: Exact ( 0 ) ) ;
238
- assert_eq ! (
239
- stat. column_statistics[ 0 ] . max_value,
240
- Precision :: Exact ( ScalarValue :: Null )
241
- ) ;
242
- assert_eq ! (
243
- stat. column_statistics[ 0 ] . min_value,
244
- Precision :: Exact ( ScalarValue :: Null )
245
- ) ;
246
- }
217
+ assert_eq ! ( statistics[ 0 ] , expected_full_statistic) ;
218
+ assert_eq ! ( statistics[ 1 ] , expected_full_statistic) ;
247
219
Ok ( ( ) )
248
220
}
249
221
@@ -254,63 +226,18 @@ mod test {
254
226
let statistics = union_exec. statistics_by_partition ( ) ?;
255
227
// Check that we have 4 partitions (2 from each scan)
256
228
assert_eq ! ( statistics. len( ) , 4 ) ;
257
-
229
+ let expected_statistic_partition_1 =
230
+ create_partition_statistics ( 2 , 110 , 3 , 4 , true ) ;
231
+ let expected_statistic_partition_2 =
232
+ create_partition_statistics ( 2 , 110 , 1 , 2 , true ) ;
258
233
// Verify first partition (from first scan)
259
- assert_eq ! ( statistics[ 0 ] . num_rows, Precision :: Exact ( 2 ) ) ;
260
- assert_eq ! ( statistics[ 0 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
261
- assert_eq ! ( statistics[ 0 ] . column_statistics. len( ) , 2 ) ;
262
- assert_eq ! (
263
- statistics[ 0 ] . column_statistics[ 0 ] . null_count,
264
- Precision :: Exact ( 0 )
265
- ) ;
266
- assert_eq ! (
267
- statistics[ 0 ] . column_statistics[ 0 ] . max_value,
268
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
269
- ) ;
270
- assert_eq ! (
271
- statistics[ 0 ] . column_statistics[ 0 ] . min_value,
272
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
273
- ) ;
274
-
234
+ assert_eq ! ( statistics[ 0 ] , expected_statistic_partition_1) ;
275
235
// Verify second partition (from first scan)
276
- assert_eq ! ( statistics[ 1 ] . num_rows, Precision :: Exact ( 2 ) ) ;
277
- assert_eq ! ( statistics[ 1 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
278
- assert_eq ! (
279
- statistics[ 1 ] . column_statistics[ 0 ] . null_count,
280
- Precision :: Exact ( 0 )
281
- ) ;
282
- assert_eq ! (
283
- statistics[ 1 ] . column_statistics[ 0 ] . max_value,
284
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
285
- ) ;
286
- assert_eq ! (
287
- statistics[ 1 ] . column_statistics[ 0 ] . min_value,
288
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
289
- ) ;
290
-
236
+ assert_eq ! ( statistics[ 1 ] , expected_statistic_partition_2) ;
291
237
// Verify third partition (from second scan - same as first partition)
292
- assert_eq ! ( statistics[ 2 ] . num_rows, Precision :: Exact ( 2 ) ) ;
293
- assert_eq ! ( statistics[ 2 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
294
- assert_eq ! (
295
- statistics[ 2 ] . column_statistics[ 0 ] . max_value,
296
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 4 ) ) )
297
- ) ;
298
- assert_eq ! (
299
- statistics[ 2 ] . column_statistics[ 0 ] . min_value,
300
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 3 ) ) )
301
- ) ;
302
-
238
+ assert_eq ! ( statistics[ 2 ] , expected_statistic_partition_1) ;
303
239
// Verify fourth partition (from second scan - same as second partition)
304
- assert_eq ! ( statistics[ 3 ] . num_rows, Precision :: Exact ( 2 ) ) ;
305
- assert_eq ! ( statistics[ 3 ] . total_byte_size, Precision :: Exact ( 110 ) ) ;
306
- assert_eq ! (
307
- statistics[ 3 ] . column_statistics[ 0 ] . max_value,
308
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 2 ) ) )
309
- ) ;
310
- assert_eq ! (
311
- statistics[ 3 ] . column_statistics[ 0 ] . min_value,
312
- Precision :: Exact ( ScalarValue :: Int32 ( Some ( 1 ) ) )
313
- ) ;
240
+ assert_eq ! ( statistics[ 3 ] , expected_statistic_partition_2) ;
314
241
315
242
Ok ( ( ) )
316
243
}
0 commit comments