Skip to content

Commit a235276

Browse files
authored
Add range table function (#14830)
* extract name * extract inclusive * range table function
1 parent e799097 commit a235276

File tree

3 files changed

+207
-21
lines changed

3 files changed

+207
-21
lines changed

datafusion/functions-table/src/generate_series.rs

Lines changed: 83 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,19 @@ use std::sync::Arc;
3434
#[derive(Debug, Clone)]
3535
enum GenSeriesArgs {
3636
/// ContainsNull signifies that at least one argument(start, end, step) was null, thus no series will be generated.
37-
ContainsNull,
37+
ContainsNull {
38+
include_end: bool,
39+
name: &'static str,
40+
},
3841
/// AllNotNullArgs holds the start, end, and step values for generating the series when all arguments are not null.
39-
AllNotNullArgs { start: i64, end: i64, step: i64 },
42+
AllNotNullArgs {
43+
start: i64,
44+
end: i64,
45+
step: i64,
46+
/// Indicates whether the end value should be included in the series.
47+
include_end: bool,
48+
name: &'static str,
49+
},
4050
}
4151

4252
/// Table that generates a series of integers from `start`(inclusive) to `end`(inclusive), incrementing by step
@@ -57,15 +67,26 @@ struct GenerateSeriesState {
5767

5868
/// Tracks current position when generating table
5969
current: i64,
70+
/// Indicates whether the end value should be included in the series.
71+
include_end: bool,
72+
name: &'static str,
6073
}
6174

6275
impl GenerateSeriesState {
6376
fn reach_end(&self, val: i64) -> bool {
6477
if self.step > 0 {
65-
return val > self.end;
78+
if self.include_end {
79+
return val > self.end;
80+
} else {
81+
return val >= self.end;
82+
}
6683
}
6784

68-
val < self.end
85+
if self.include_end {
86+
val < self.end
87+
} else {
88+
val <= self.end
89+
}
6990
}
7091
}
7192

@@ -74,8 +95,8 @@ impl fmt::Display for GenerateSeriesState {
7495
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
7596
write!(
7697
f,
77-
"generate_series: start={}, end={}, batch_size={}",
78-
self.start, self.end, self.batch_size
98+
"{}: start={}, end={}, batch_size={}",
99+
self.name, self.start, self.end, self.batch_size
79100
)
80101
}
81102
}
@@ -124,21 +145,31 @@ impl TableProvider for GenerateSeriesTable {
124145

125146
let state = match self.args {
126147
// if args have null, then return 0 row
127-
GenSeriesArgs::ContainsNull => GenerateSeriesState {
148+
GenSeriesArgs::ContainsNull { include_end, name } => GenerateSeriesState {
128149
schema: self.schema.clone(),
129150
start: 0,
130151
end: 0,
131152
step: 1,
132153
current: 1,
133154
batch_size,
155+
include_end,
156+
name,
134157
},
135-
GenSeriesArgs::AllNotNullArgs { start, end, step } => GenerateSeriesState {
158+
GenSeriesArgs::AllNotNullArgs {
159+
start,
160+
end,
161+
step,
162+
include_end,
163+
name,
164+
} => GenerateSeriesState {
136165
schema: self.schema.clone(),
137166
start,
138167
end,
139168
step,
140169
current: start,
141170
batch_size,
171+
include_end,
172+
name,
142173
},
143174
};
144175

@@ -150,12 +181,15 @@ impl TableProvider for GenerateSeriesTable {
150181
}
151182

152183
#[derive(Debug)]
153-
pub struct GenerateSeriesFunc {}
184+
struct GenerateSeriesFuncImpl {
185+
name: &'static str,
186+
include_end: bool,
187+
}
154188

155-
impl TableFunctionImpl for GenerateSeriesFunc {
189+
impl TableFunctionImpl for GenerateSeriesFuncImpl {
156190
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
157191
if exprs.is_empty() || exprs.len() > 3 {
158-
return plan_err!("generate_series function requires 1 to 3 arguments");
192+
return plan_err!("{} function requires 1 to 3 arguments", self.name);
159193
}
160194

161195
let mut normalize_args = Vec::new();
@@ -177,7 +211,10 @@ impl TableFunctionImpl for GenerateSeriesFunc {
177211
// contain null
178212
return Ok(Arc::new(GenerateSeriesTable {
179213
schema,
180-
args: GenSeriesArgs::ContainsNull,
214+
args: GenSeriesArgs::ContainsNull {
215+
include_end: self.include_end,
216+
name: self.name,
217+
},
181218
}));
182219
}
183220

@@ -186,7 +223,7 @@ impl TableFunctionImpl for GenerateSeriesFunc {
186223
[start, end] => (*start, *end, 1),
187224
[start, end, step] => (*start, *end, *step),
188225
_ => {
189-
return plan_err!("generate_series function requires 1 to 3 arguments");
226+
return plan_err!("{} function requires 1 to 3 arguments", self.name);
190227
}
191228
};
192229

@@ -204,7 +241,39 @@ impl TableFunctionImpl for GenerateSeriesFunc {
204241

205242
Ok(Arc::new(GenerateSeriesTable {
206243
schema,
207-
args: GenSeriesArgs::AllNotNullArgs { start, end, step },
244+
args: GenSeriesArgs::AllNotNullArgs {
245+
start,
246+
end,
247+
step,
248+
include_end: self.include_end,
249+
name: self.name,
250+
},
208251
}))
209252
}
210253
}
254+
255+
#[derive(Debug)]
256+
pub struct GenerateSeriesFunc {}
257+
258+
impl TableFunctionImpl for GenerateSeriesFunc {
259+
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
260+
let impl_func = GenerateSeriesFuncImpl {
261+
name: "generate_series",
262+
include_end: true,
263+
};
264+
impl_func.call(exprs)
265+
}
266+
}
267+
268+
#[derive(Debug)]
269+
pub struct RangeFunc {}
270+
271+
impl TableFunctionImpl for RangeFunc {
272+
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
273+
let impl_func = GenerateSeriesFuncImpl {
274+
name: "range",
275+
include_end: false,
276+
};
277+
impl_func.call(exprs)
278+
}
279+
}

datafusion/functions-table/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::sync::Arc;
2828

2929
/// Returns all default table functions
3030
pub fn all_default_table_functions() -> Vec<Arc<TableFunction>> {
31-
vec![generate_series()]
31+
vec![generate_series(), range()]
3232
}
3333

3434
/// Creates a singleton instance of a table function
@@ -55,3 +55,4 @@ macro_rules! create_udtf_function {
5555
}
5656

5757
create_udtf_function!(generate_series::GenerateSeriesFunc, "generate_series");
58+
create_udtf_function!(generate_series::RangeFunc, "range");

datafusion/sqllogictest/test_files/table_functions.slt

Lines changed: 122 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,15 @@ SELECT SUM(v1) FROM generate_series(1, 5) t1(v1)
6868
query I
6969
SELECT * FROM generate_series(6, -1, -2)
7070
----
71-
6
72-
4
73-
2
74-
0
71+
6
72+
4
73+
2
74+
0
7575

7676
query I
7777
SELECT * FROM generate_series(6, 66, 666)
7878
----
79-
6
79+
6
8080

8181

8282

@@ -120,7 +120,7 @@ SELECT v1 + 10 FROM (SELECT * FROM generate_series(1, 3) t1(v1))
120120

121121
# Test generate_series with JOIN
122122
query II rowsort
123-
SELECT a.v1, b.v1
123+
SELECT a.v1, b.v1
124124
FROM generate_series(1, 3) a(v1)
125125
JOIN generate_series(2, 4) b(v1)
126126
ON a.v1 = b.v1 - 1
@@ -187,3 +187,119 @@ SELECT generate_series(1, t1.end) FROM generate_series(3, 5) as t1(end)
187187
[1, 2, 3, 4, 5]
188188
[1, 2, 3, 4]
189189
[1, 2, 3]
190+
191+
# Test range table function
192+
query I
193+
SELECT * FROM range(6)
194+
----
195+
0
196+
1
197+
2
198+
3
199+
4
200+
5
201+
202+
203+
204+
query I rowsort
205+
SELECT * FROM range(1, 5)
206+
----
207+
1
208+
2
209+
3
210+
4
211+
212+
query I rowsort
213+
SELECT * FROM range(1, 1)
214+
----
215+
216+
query I rowsort
217+
SELECT * FROM range(3, 6)
218+
----
219+
3
220+
4
221+
5
222+
223+
# #generated_data > batch_size
224+
query I
225+
SELECT count(v1) FROM range(-66666,66666) t1(v1)
226+
----
227+
133332
228+
229+
query I rowsort
230+
SELECT SUM(v1) FROM range(1, 5) t1(v1)
231+
----
232+
10
233+
234+
query I
235+
SELECT * FROM range(6, -1, -2)
236+
----
237+
6
238+
4
239+
2
240+
0
241+
242+
query I
243+
SELECT * FROM range(6, 66, 666)
244+
----
245+
6
246+
247+
248+
249+
#
250+
# Test range with null arguments
251+
#
252+
253+
query I
254+
SELECT * FROM range(NULL, 5)
255+
----
256+
257+
query I
258+
SELECT * FROM range(1, NULL)
259+
----
260+
261+
query I
262+
SELECT * FROM range(NULL, NULL)
263+
----
264+
265+
query I
266+
SELECT * FROM range(1, 5, NULL)
267+
----
268+
269+
270+
query TT
271+
EXPLAIN SELECT * FROM range(1, 5)
272+
----
273+
logical_plan TableScan: tmp_table projection=[value]
274+
physical_plan LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
275+
276+
#
277+
# Test range with invalid arguments
278+
#
279+
280+
query error DataFusion error: Error during planning: start is bigger than end, but increment is positive: cannot generate infinite series
281+
SELECT * FROM range(5, 1)
282+
283+
query error DataFusion error: Error during planning: start is smaller than end, but increment is negative: cannot generate infinite series
284+
SELECT * FROM range(-6, 6, -1)
285+
286+
query error DataFusion error: Error during planning: step cannot be zero
287+
SELECT * FROM range(-6, 6, 0)
288+
289+
query error DataFusion error: Error during planning: start is bigger than end, but increment is positive: cannot generate infinite series
290+
SELECT * FROM range(6, -6, 1)
291+
292+
293+
statement error DataFusion error: Error during planning: range function requires 1 to 3 arguments
294+
SELECT * FROM range(1, 2, 3, 4)
295+
296+
297+
statement error DataFusion error: Error during planning: First argument must be an integer literal
298+
SELECT * FROM range('foo', 'bar')
299+
300+
# UDF and UDTF `range` can be used simultaneously
301+
query ? rowsort
302+
SELECT range(1, t1.end) FROM range(3, 5) as t1(end)
303+
----
304+
[1, 2, 3]
305+
[1, 2]

0 commit comments

Comments
 (0)