Skip to content

chore: migrate to invoke_with_args for datetime functions #14876

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions datafusion/functions/src/datetime/current_date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ impl ScalarUDFImpl for CurrentDateFunc {
Ok(Date32)
}

fn invoke_batch(
fn invoke_with_args(
&self,
_args: &[ColumnarValue],
_number_rows: usize,
_args: datafusion_expr::ScalarFunctionArgs,
) -> Result<ColumnarValue> {
internal_err!(
"invoke should not be called on a simplified current_date() function"
Expand Down
5 changes: 2 additions & 3 deletions datafusion/functions/src/datetime/current_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@ impl ScalarUDFImpl for CurrentTimeFunc {
Ok(Time64(Nanosecond))
}

fn invoke_batch(
fn invoke_with_args(
&self,
_args: &[ColumnarValue],
_number_rows: usize,
_args: datafusion_expr::ScalarFunctionArgs,
) -> Result<ColumnarValue> {
internal_err!(
"invoke should not be called on a simplified current_time() function"
Expand Down
179 changes: 105 additions & 74 deletions datafusion/functions/src/datetime/date_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@ impl ScalarUDFImpl for DateBinFunc {
}
}

fn invoke_batch(
fn invoke_with_args(
&self,
args: &[ColumnarValue],
_number_rows: usize,
args: datafusion_expr::ScalarFunctionArgs,
) -> Result<ColumnarValue> {
let args = &args.args;
if args.len() == 2 {
// Default to unix EPOCH
let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Expand Down Expand Up @@ -514,10 +514,9 @@ mod tests {
use chrono::TimeDelta;

#[test]
#[allow(deprecated)] // TODO migrate UDF invoke from invoke_batch
fn test_date_bin() {
let res = DateBinFunc::new().invoke_batch(
&[
let mut args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
IntervalDayTime {
days: 0,
Expand All @@ -527,14 +526,16 @@ mod tests {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert!(res.is_ok());

let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
let batch_len = timestamps.len();
let res = DateBinFunc::new().invoke_batch(
&[
args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
IntervalDayTime {
days: 0,
Expand All @@ -544,12 +545,14 @@ mod tests {
ColumnarValue::Array(timestamps),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
],
batch_len,
);
number_rows: batch_len,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert!(res.is_ok());

let res = DateBinFunc::new().invoke_batch(
&[
args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
IntervalDayTime {
days: 0,
Expand All @@ -558,13 +561,15 @@ mod tests {
))),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert!(res.is_ok());

// stride supports month-day-nano
let res = DateBinFunc::new().invoke_batch(
&[
args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
IntervalMonthDayNano {
months: 0,
Expand All @@ -575,46 +580,53 @@ mod tests {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert!(res.is_ok());

//
// Fallible test cases
//

// invalid number of arguments
let res = DateBinFunc::new().invoke_batch(
&[ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
args = datafusion_expr::ScalarFunctionArgs {
args: vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
IntervalDayTime {
days: 0,
milliseconds: 1,
},
)))],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN expected two or three arguments"
);

// stride: invalid type
let res = DateBinFunc::new().invoke_batch(
&[
args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
);

// stride: invalid value
let res = DateBinFunc::new().invoke_batch(
&[

args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
IntervalDayTime {
days: 0,
Expand All @@ -624,60 +636,69 @@ mod tests {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};

let res = DateBinFunc::new().invoke_with_args(args);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN stride must be non-zero"
);

// stride: overflow of day-time interval
let res = DateBinFunc::new().invoke_batch(
&[
args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
IntervalDayTime::MAX,
))),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN stride argument is too large"
);

// stride: overflow of month-day-nano interval
let res = DateBinFunc::new().invoke_batch(
&[
args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN stride argument is too large"
);

// stride: month intervals
let res = DateBinFunc::new().invoke_batch(
&[
args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
);

// origin: invalid type
let res = DateBinFunc::new().invoke_batch(
&[
args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
IntervalDayTime {
days: 0,
Expand All @@ -687,15 +708,17 @@ mod tests {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got Timestamp(Microsecond, None)"
);

let res = DateBinFunc::new().invoke_batch(
&[
args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
IntervalDayTime {
days: 0,
Expand All @@ -705,8 +728,10 @@ mod tests {
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert!(res.is_ok());

// unsupported array type for stride
Expand All @@ -720,14 +745,16 @@ mod tests {
})
.collect::<IntervalDayTimeArray>(),
);
let res = DateBinFunc::new().invoke_batch(
&[
args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(intervals),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
],
1,
);
number_rows: 1,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
Expand All @@ -736,8 +763,8 @@ mod tests {
// unsupported array type for origin
let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
let batch_len = timestamps.len();
let res = DateBinFunc::new().invoke_batch(
&[
args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
IntervalDayTime {
days: 0,
Expand All @@ -747,8 +774,10 @@ mod tests {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
ColumnarValue::Array(timestamps),
],
batch_len,
);
number_rows: batch_len,
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
};
let res = DateBinFunc::new().invoke_with_args(args);
assert_eq!(
res.err().unwrap().strip_backtrace(),
"This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
Expand Down Expand Up @@ -864,20 +893,22 @@ mod tests {
.collect::<TimestampNanosecondArray>()
.with_timezone_opt(tz_opt.clone());
let batch_len = input.len();
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
let result = DateBinFunc::new()
.invoke_batch(
&[
ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
ColumnarValue::Array(Arc::new(input)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(string_to_timestamp_nanos(origin).unwrap()),
tz_opt.clone(),
)),
],
batch_len,
)
.unwrap();
let args = datafusion_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
ColumnarValue::Array(Arc::new(input)),
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(string_to_timestamp_nanos(origin).unwrap()),
tz_opt.clone(),
)),
],
number_rows: batch_len,
return_type: &DataType::Timestamp(
TimeUnit::Nanosecond,
tz_opt.clone(),
),
};
let result = DateBinFunc::new().invoke_with_args(args).unwrap();
if let ColumnarValue::Array(result) = result {
assert_eq!(
result.data_type(),
Expand Down
Loading