Skip to content

Fixed Migrate Datetime functions to invoke_with_args Issue 14705 #14792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions datafusion/functions/src/datetime/current_date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,19 @@ impl ScalarUDFImpl for CurrentDateFunc {
Ok(Date32)
}

fn invoke_batch(
&self,
_args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
internal_err!(
"invoke should not be called on a simplified current_date() function"
)
fn invoke_with_args(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The signature of the invoke_with_args should be:

    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue>.

You can check the ScalarUDFImpl trait in the datafusion::expr::udf crate.

if !args.is_empty() {
return Err(DataFusionError::Execution(
"current_date() takes 0 arguments".to_string(),
));
}

let current_date = chrono::Utc::now().date_naive();
let epoch_days = current_date.num_days_from_ce() - 719163;
let array: ArrayRef = Arc::new(Date32Array::from_value(epoch_days, 1));
Ok(ColumnarValue::Array(array))
}

fn aliases(&self) -> &[String] {
&self.aliases
}
Expand Down
99 changes: 91 additions & 8 deletions datafusion/functions/src/datetime/current_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow::datatypes::DataType;
use arrow::datatypes::DataType::Time64;
use arrow::datatypes::TimeUnit::Nanosecond;
use std::any::Any;
use chrono::Timelike;

use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
Expand Down Expand Up @@ -78,16 +79,20 @@ impl ScalarUDFImpl for CurrentTimeFunc {
Ok(Time64(Nanosecond))
}

fn invoke_batch(
&self,
_args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
internal_err!(
"invoke should not be called on a simplified current_time() function"
)
fn invoke_with_args(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if !args.is_empty() {
return Err(DataFusionError::Execution(
"current_time() takes 0 arguments".to_string(),
));
}

let current_time = chrono::Utc::now().time();
let nanos_since_midnight = current_time.num_seconds_from_midnight() as i64 * 1_000_000_000
+ current_time.nanosecond() as i64;
let array: ArrayRef = Arc::new(Time64NanosecondArray::from_value(nanos_since_midnight, 1));
Ok(ColumnarValue::Array(array))
}

fn simplify(
&self,
_args: Vec<Expr>,
Expand All @@ -104,3 +109,81 @@ impl ScalarUDFImpl for CurrentTimeFunc {
self.doc()
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::{DataType, Field, Schema};
use object_store::memory::InMemory;
use std::sync::Arc;

async fn setup_test_catalog() -> Result<ListingCatalog> {
let store = Arc::new(InMemory::new());

// Create CSV test data
let csv_data = b"col1,col2\n1,a\n2,b\n";
store
.put(&object_store::path::Path::from("test.csv"), csv_data.to_vec())
.await
.unwrap();

// Create JSON test data
let json_data = b"[{\"name\":\"test\",\"value\":1}]";
store
.put(&object_store::path::Path::from("test.json"), json_data.to_vec())
.await
.unwrap();

Ok(ListingCatalog::new(store, String::new()))
}

#[tokio::test]
async fn test_schema_names() -> Result<()> {
let catalog = setup_test_catalog().await?;
let names = catalog.schema_names()?;
assert_eq!(names, vec!["default"]);
Ok(())
}

#[tokio::test]
async fn test_table_names() -> Result<()> {
let catalog = setup_test_catalog().await?;
let schema = catalog.schema("default").await?;
let mut names = schema.table_names()?;
names.sort();
assert_eq!(names, vec!["test.csv", "test.json"]);
Ok(())
}

#[tokio::test]
async fn test_table_exist() -> Result<()> {
let catalog = setup_test_catalog().await?;
let schema = catalog.schema("default").await?;
assert!(schema.table_exist("test.csv"));
assert!(schema.table_exist("test.json"));
assert!(!schema.table_exist("nonexistent.csv"));
Ok(())
}

#[tokio::test]
async fn test_csv_table_provider() -> Result<()> {
let catalog = setup_test_catalog().await?;
let schema = catalog.schema("default").await?;
let table = schema.table("test.csv").await?;

let table_schema = table.schema();
assert_eq!(table_schema.fields().len(), 2);
Ok(())
}

#[tokio::test]
async fn test_json_table_provider() -> Result<()> {
let catalog = setup_test_catalog().await?;
let schema = catalog.schema("default").await?;
let table = schema.table("test.json").await?;

let table_schema = table.schema();
assert!(table_schema.fields().len() > 0);
Ok(())
}
}
32 changes: 15 additions & 17 deletions datafusion/functions/src/datetime/date_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,25 +187,23 @@ impl ScalarUDFImpl for DateBinFunc {
}
}

fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
if args.len() == 2 {
// Default to unix EPOCH
let origin = ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(0),
Some("+00:00".into()),
));
date_bin_impl(&args[0], &args[1], &origin)
} else if args.len() == 3 {
date_bin_impl(&args[0], &args[1], &args[2])
} else {
exec_err!("DATE_BIN expected two or three arguments")
}
fn invoke_with_args(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 && args.len() != 3 {
return Err(DataFusionError::Execution(
"DATE_BIN expected two or three arguments".to_string(),
));
}

let (stride, array, origin) = match args.len() {
2 => (&args[0], &args[1], None),
3 => (&args[0], &args[1], Some(&args[2])),
_ => unreachable!(),
};

date_bin_impl(stride, array, origin.expect("Origin argument must be provided"))
}


fn output_ordering(&self, input: &[ExprProperties]) -> Result<SortProperties> {
// The DATE_BIN function preserves the order of its second argument.
let step = &input[0];
Expand Down
18 changes: 12 additions & 6 deletions datafusion/functions/src/datetime/date_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,18 @@ impl ScalarUDFImpl for DatePartFunc {
)
}

fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
let [part, array] = take_function_args(self.name(), args)?;
fn invoke_with_args(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return Err(DataFusionError::Execution(format!(
"{} function requires 2 arguments, got {}",
self.name(),
args.len()
)));
}

let [part, array] = args else {
return Err(DataFusionError::Internal("Expected 2 arguments".to_string()));
};

let part = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) = part {
v
Expand Down
16 changes: 10 additions & 6 deletions datafusion/functions/src/datetime/date_trunc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,16 @@ impl ScalarUDFImpl for DateTruncFunc {
}
}

fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
let (granularity, array) = (&args[0], &args[1]);
fn invoke_with_args(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() !=2 {
return Err(DataFusionError::Execution(format!(
"{} function requires 2 arguments, got {}",
self.name(),
args.len()
)));
}

let (granularity, array) = (&args[0], &args[1]);

let granularity = if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(v))) =
granularity
Expand Down
20 changes: 8 additions & 12 deletions datafusion/functions/src/datetime/from_unixtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,14 @@ impl ScalarUDFImpl for FromUnixtimeFunc {
internal_err!("call return_type_from_args instead")
}

fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
let len = args.len();
if len != 1 && len != 2 {
return exec_err!(
"from_unixtime function requires 1 or 2 argument, got {}",
args.len()
);
}
fn invoke_with_args(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let len = args.len();
if len != 1 && len != 2 {
return Err(DataFusionError::Execution(format!(
"from_unixtime function requires 1 or 2 arguments, got {}",
args.len()
)));
}

if args[0].data_type() != Int64 {
return exec_err!(
Expand Down
27 changes: 12 additions & 15 deletions datafusion/functions/src/datetime/make_date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,18 @@ impl ScalarUDFImpl for MakeDateFunc {
Ok(Date32)
}

fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
// first, identify if any of the arguments is an Array. If yes, store its `len`,
// as any scalar will need to be converted to an array of len `len`.
let len = args
.iter()
.fold(Option::<usize>::None, |acc, arg| match arg {
ColumnarValue::Scalar(_) => acc,
ColumnarValue::Array(a) => Some(a.len()),
});

let [years, months, days] = take_function_args(self.name(), args)?;
fn invoke_with_args(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 3 {
return Err(DataFusionError::Execution(format!(
"{} function requires 3 arguments, got {}",
self.name(),
args.len()
)));
}

let [years, months, days] = args else {
return Err(DataFusionError::Internal("Expected 3 arguments".to_string()));
};

let years = years.cast_to(&Int32, None)?;
let months = months.cast_to(&Int32, None)?;
Expand Down
16 changes: 10 additions & 6 deletions datafusion/functions/src/datetime/now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,18 @@ impl ScalarUDFImpl for NowFunc {
internal_err!("return_type_from_args should be called instead")
}

fn invoke_batch(
&self,
_args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
internal_err!("invoke should not be called on a simplified now() function")
fn invoke_with_args(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if !args.is_empty() {
return Err(DataFusionError::Execution(
"now() takes 0 arguments".to_string(),
));
}

Err(DataFusionError::Internal(
"invoke should not be called on a simplified now() function".to_string(),
))
}

fn simplify(
&self,
_args: Vec<Expr>,
Expand Down
53 changes: 29 additions & 24 deletions datafusion/functions/src/datetime/to_char.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,32 +135,37 @@ impl ScalarUDFImpl for ToCharFunc {
Ok(Utf8)
}

fn invoke_batch(
&self,
args: &[ColumnarValue],
_number_rows: usize,
) -> Result<ColumnarValue> {
let [date_time, format] = take_function_args(self.name(), args)?;

match format {
ColumnarValue::Scalar(ScalarValue::Utf8(None))
| ColumnarValue::Scalar(ScalarValue::Null) => {
_to_char_scalar(date_time.clone(), None)
}
// constant format
ColumnarValue::Scalar(ScalarValue::Utf8(Some(format))) => {
// invoke to_char_scalar with the known string, without converting to array
_to_char_scalar(date_time.clone(), Some(format))
}
ColumnarValue::Array(_) => _to_char_array(args),
_ => {
exec_err!(
"Format for `to_char` must be non-null Utf8, received {:?}",
format.data_type()
)
}
fn fn invoke_with_args(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
if args.len() != 2 {
return Err(DataFusionError::Execution(format!(
"{} function requires 2 arguments, got {}",
self.name(),
args.len()
)));
}

let [date_time, format] = args else {
return Err(DataFusionError::Internal("Expected 2 arguments".to_string()));
};

match format {
ColumnarValue::Scalar(ScalarValue::Utf8(None))
| ColumnarValue::Scalar(ScalarValue::Null) => {
_to_char_scalar(date_time.clone(), None)
}
ColumnarValue::Scalar(ScalarValue::Utf8(Some(format))) => {
_to_char_scalar(date_time.clone(), Some(format))
}
ColumnarValue::Array(_) => _to_char_array(args),
_ => {
Err(DataFusionError::Execution(format!(
"Format for `to_char` must be non-null Utf8, received {:?}",
format.data_type()
)))
}
}
}


fn aliases(&self) -> &[String] {
&self.aliases
Expand Down
Loading
Loading