Skip to content

Commit 18e54f2

Browse files
onlyjackfrostalamb
andauthored
chore: migrate to invoke_with_args for datetime functions (#14876)
* migrate to involk_with_args for datatime functions * Fix clippy --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent ce14fbc commit 18e54f2

13 files changed

+399
-314
lines changed

datafusion/functions/src/datetime/current_date.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,9 @@ impl ScalarUDFImpl for CurrentDateFunc {
8181
Ok(Date32)
8282
}
8383

84-
fn invoke_batch(
84+
fn invoke_with_args(
8585
&self,
86-
_args: &[ColumnarValue],
87-
_number_rows: usize,
86+
_args: datafusion_expr::ScalarFunctionArgs,
8887
) -> Result<ColumnarValue> {
8988
internal_err!(
9089
"invoke should not be called on a simplified current_date() function"

datafusion/functions/src/datetime/current_time.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,9 @@ impl ScalarUDFImpl for CurrentTimeFunc {
7878
Ok(Time64(Nanosecond))
7979
}
8080

81-
fn invoke_batch(
81+
fn invoke_with_args(
8282
&self,
83-
_args: &[ColumnarValue],
84-
_number_rows: usize,
83+
_args: datafusion_expr::ScalarFunctionArgs,
8584
) -> Result<ColumnarValue> {
8685
internal_err!(
8786
"invoke should not be called on a simplified current_time() function"

datafusion/functions/src/datetime/date_bin.rs

Lines changed: 105 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,11 @@ impl ScalarUDFImpl for DateBinFunc {
187187
}
188188
}
189189

190-
fn invoke_batch(
190+
fn invoke_with_args(
191191
&self,
192-
args: &[ColumnarValue],
193-
_number_rows: usize,
192+
args: datafusion_expr::ScalarFunctionArgs,
194193
) -> Result<ColumnarValue> {
194+
let args = &args.args;
195195
if args.len() == 2 {
196196
// Default to unix EPOCH
197197
let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
@@ -514,10 +514,9 @@ mod tests {
514514
use chrono::TimeDelta;
515515

516516
#[test]
517-
#[allow(deprecated)] // TODO migrate UDF invoke from invoke_batch
518517
fn test_date_bin() {
519-
let res = DateBinFunc::new().invoke_batch(
520-
&[
518+
let mut args = datafusion_expr::ScalarFunctionArgs {
519+
args: vec![
521520
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
522521
IntervalDayTime {
523522
days: 0,
@@ -527,14 +526,16 @@ mod tests {
527526
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
528527
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
529528
],
530-
1,
531-
);
529+
number_rows: 1,
530+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
531+
};
532+
let res = DateBinFunc::new().invoke_with_args(args);
532533
assert!(res.is_ok());
533534

534535
let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
535536
let batch_len = timestamps.len();
536-
let res = DateBinFunc::new().invoke_batch(
537-
&[
537+
args = datafusion_expr::ScalarFunctionArgs {
538+
args: vec![
538539
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
539540
IntervalDayTime {
540541
days: 0,
@@ -544,12 +545,14 @@ mod tests {
544545
ColumnarValue::Array(timestamps),
545546
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
546547
],
547-
batch_len,
548-
);
548+
number_rows: batch_len,
549+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
550+
};
551+
let res = DateBinFunc::new().invoke_with_args(args);
549552
assert!(res.is_ok());
550553

551-
let res = DateBinFunc::new().invoke_batch(
552-
&[
554+
args = datafusion_expr::ScalarFunctionArgs {
555+
args: vec![
553556
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
554557
IntervalDayTime {
555558
days: 0,
@@ -558,13 +561,15 @@ mod tests {
558561
))),
559562
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
560563
],
561-
1,
562-
);
564+
number_rows: 1,
565+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
566+
};
567+
let res = DateBinFunc::new().invoke_with_args(args);
563568
assert!(res.is_ok());
564569

565570
// stride supports month-day-nano
566-
let res = DateBinFunc::new().invoke_batch(
567-
&[
571+
args = datafusion_expr::ScalarFunctionArgs {
572+
args: vec![
568573
ColumnarValue::Scalar(ScalarValue::IntervalMonthDayNano(Some(
569574
IntervalMonthDayNano {
570575
months: 0,
@@ -575,46 +580,53 @@ mod tests {
575580
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
576581
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
577582
],
578-
1,
579-
);
583+
number_rows: 1,
584+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
585+
};
586+
let res = DateBinFunc::new().invoke_with_args(args);
580587
assert!(res.is_ok());
581588

582589
//
583590
// Fallible test cases
584591
//
585592

586593
// invalid number of arguments
587-
let res = DateBinFunc::new().invoke_batch(
588-
&[ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
594+
args = datafusion_expr::ScalarFunctionArgs {
595+
args: vec![ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
589596
IntervalDayTime {
590597
days: 0,
591598
milliseconds: 1,
592599
},
593600
)))],
594-
1,
595-
);
601+
number_rows: 1,
602+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
603+
};
604+
let res = DateBinFunc::new().invoke_with_args(args);
596605
assert_eq!(
597606
res.err().unwrap().strip_backtrace(),
598607
"Execution error: DATE_BIN expected two or three arguments"
599608
);
600609

601610
// stride: invalid type
602-
let res = DateBinFunc::new().invoke_batch(
603-
&[
611+
args = datafusion_expr::ScalarFunctionArgs {
612+
args: vec![
604613
ColumnarValue::Scalar(ScalarValue::IntervalYearMonth(Some(1))),
605614
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
606615
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
607616
],
608-
1,
609-
);
617+
number_rows: 1,
618+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
619+
};
620+
let res = DateBinFunc::new().invoke_with_args(args);
610621
assert_eq!(
611622
res.err().unwrap().strip_backtrace(),
612623
"Execution error: DATE_BIN expects stride argument to be an INTERVAL but got Interval(YearMonth)"
613624
);
614625

615626
// stride: invalid value
616-
let res = DateBinFunc::new().invoke_batch(
617-
&[
627+
628+
args = datafusion_expr::ScalarFunctionArgs {
629+
args: vec![
618630
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
619631
IntervalDayTime {
620632
days: 0,
@@ -624,60 +636,69 @@ mod tests {
624636
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
625637
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
626638
],
627-
1,
628-
);
639+
number_rows: 1,
640+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
641+
};
642+
643+
let res = DateBinFunc::new().invoke_with_args(args);
629644
assert_eq!(
630645
res.err().unwrap().strip_backtrace(),
631646
"Execution error: DATE_BIN stride must be non-zero"
632647
);
633648

634649
// stride: overflow of day-time interval
635-
let res = DateBinFunc::new().invoke_batch(
636-
&[
650+
args = datafusion_expr::ScalarFunctionArgs {
651+
args: vec![
637652
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
638653
IntervalDayTime::MAX,
639654
))),
640655
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
641656
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
642657
],
643-
1,
644-
);
658+
number_rows: 1,
659+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
660+
};
661+
let res = DateBinFunc::new().invoke_with_args(args);
645662
assert_eq!(
646663
res.err().unwrap().strip_backtrace(),
647664
"Execution error: DATE_BIN stride argument is too large"
648665
);
649666

650667
// stride: overflow of month-day-nano interval
651-
let res = DateBinFunc::new().invoke_batch(
652-
&[
668+
args = datafusion_expr::ScalarFunctionArgs {
669+
args: vec![
653670
ColumnarValue::Scalar(ScalarValue::new_interval_mdn(0, i32::MAX, 1)),
654671
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
655672
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
656673
],
657-
1,
658-
);
674+
number_rows: 1,
675+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
676+
};
677+
let res = DateBinFunc::new().invoke_with_args(args);
659678
assert_eq!(
660679
res.err().unwrap().strip_backtrace(),
661680
"Execution error: DATE_BIN stride argument is too large"
662681
);
663682

664683
// stride: month intervals
665-
let res = DateBinFunc::new().invoke_batch(
666-
&[
684+
args = datafusion_expr::ScalarFunctionArgs {
685+
args: vec![
667686
ColumnarValue::Scalar(ScalarValue::new_interval_mdn(1, 1, 1)),
668687
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
669688
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
670689
],
671-
1,
672-
);
690+
number_rows: 1,
691+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
692+
};
693+
let res = DateBinFunc::new().invoke_with_args(args);
673694
assert_eq!(
674695
res.err().unwrap().strip_backtrace(),
675696
"This feature is not implemented: DATE_BIN stride does not support combination of month, day and nanosecond intervals"
676697
);
677698

678699
// origin: invalid type
679-
let res = DateBinFunc::new().invoke_batch(
680-
&[
700+
args = datafusion_expr::ScalarFunctionArgs {
701+
args: vec![
681702
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
682703
IntervalDayTime {
683704
days: 0,
@@ -687,15 +708,17 @@ mod tests {
687708
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
688709
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
689710
],
690-
1,
691-
);
711+
number_rows: 1,
712+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
713+
};
714+
let res = DateBinFunc::new().invoke_with_args(args);
692715
assert_eq!(
693716
res.err().unwrap().strip_backtrace(),
694717
"Execution error: DATE_BIN expects origin argument to be a TIMESTAMP with nanosecond precision but got Timestamp(Microsecond, None)"
695718
);
696719

697-
let res = DateBinFunc::new().invoke_batch(
698-
&[
720+
args = datafusion_expr::ScalarFunctionArgs {
721+
args: vec![
699722
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
700723
IntervalDayTime {
701724
days: 0,
@@ -705,8 +728,10 @@ mod tests {
705728
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(Some(1), None)),
706729
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
707730
],
708-
1,
709-
);
731+
number_rows: 1,
732+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
733+
};
734+
let res = DateBinFunc::new().invoke_with_args(args);
710735
assert!(res.is_ok());
711736

712737
// unsupported array type for stride
@@ -720,14 +745,16 @@ mod tests {
720745
})
721746
.collect::<IntervalDayTimeArray>(),
722747
);
723-
let res = DateBinFunc::new().invoke_batch(
724-
&[
748+
args = datafusion_expr::ScalarFunctionArgs {
749+
args: vec![
725750
ColumnarValue::Array(intervals),
726751
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
727752
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
728753
],
729-
1,
730-
);
754+
number_rows: 1,
755+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
756+
};
757+
let res = DateBinFunc::new().invoke_with_args(args);
731758
assert_eq!(
732759
res.err().unwrap().strip_backtrace(),
733760
"This feature is not implemented: DATE_BIN only supports literal values for the stride argument, not arrays"
@@ -736,8 +763,8 @@ mod tests {
736763
// unsupported array type for origin
737764
let timestamps = Arc::new((1..6).map(Some).collect::<TimestampNanosecondArray>());
738765
let batch_len = timestamps.len();
739-
let res = DateBinFunc::new().invoke_batch(
740-
&[
766+
args = datafusion_expr::ScalarFunctionArgs {
767+
args: vec![
741768
ColumnarValue::Scalar(ScalarValue::IntervalDayTime(Some(
742769
IntervalDayTime {
743770
days: 0,
@@ -747,8 +774,10 @@ mod tests {
747774
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(1), None)),
748775
ColumnarValue::Array(timestamps),
749776
],
750-
batch_len,
751-
);
777+
number_rows: batch_len,
778+
return_type: &DataType::Timestamp(TimeUnit::Nanosecond, None),
779+
};
780+
let res = DateBinFunc::new().invoke_with_args(args);
752781
assert_eq!(
753782
res.err().unwrap().strip_backtrace(),
754783
"This feature is not implemented: DATE_BIN only supports literal values for the origin argument, not arrays"
@@ -864,20 +893,22 @@ mod tests {
864893
.collect::<TimestampNanosecondArray>()
865894
.with_timezone_opt(tz_opt.clone());
866895
let batch_len = input.len();
867-
#[allow(deprecated)] // TODO migrate UDF invoke to invoke_batch
868-
let result = DateBinFunc::new()
869-
.invoke_batch(
870-
&[
871-
ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
872-
ColumnarValue::Array(Arc::new(input)),
873-
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
874-
Some(string_to_timestamp_nanos(origin).unwrap()),
875-
tz_opt.clone(),
876-
)),
877-
],
878-
batch_len,
879-
)
880-
.unwrap();
896+
let args = datafusion_expr::ScalarFunctionArgs {
897+
args: vec![
898+
ColumnarValue::Scalar(ScalarValue::new_interval_dt(1, 0)),
899+
ColumnarValue::Array(Arc::new(input)),
900+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
901+
Some(string_to_timestamp_nanos(origin).unwrap()),
902+
tz_opt.clone(),
903+
)),
904+
],
905+
number_rows: batch_len,
906+
return_type: &DataType::Timestamp(
907+
TimeUnit::Nanosecond,
908+
tz_opt.clone(),
909+
),
910+
};
911+
let result = DateBinFunc::new().invoke_with_args(args).unwrap();
881912
if let ColumnarValue::Array(result) = result {
882913
assert_eq!(
883914
result.data_type(),

0 commit comments

Comments
 (0)