-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat: implement Spark size function for arrays and maps #19592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: implement Spark size function for arrays and maps #19592
Conversation
|
Hi @Jefffrey , |
Implements the Spark-compatible `size` function that returns the number of elements in an array or the number of key-value pairs in a map. - Supports List, LargeList, FixedSizeList, and Map types - Returns NULL for NULL input (modern Spark 3.0+ behavior) - Returns Int32 to match Spark's IntegerType
ce4e96c to
9b9d5ed
Compare
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for picking this up, left some comments
| pub fn new() -> Self { | ||
| Self { | ||
| signature: Signature::one_of( | ||
| vec![TypeSignature::Any(1)], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type signature is too wide; I recommend exploring the signature API we provide and related functions to implement a more strict signature
| if args.arg_fields.len() != 1 { | ||
| return plan_err!("size expects exactly 1 argument"); | ||
| } | ||
|
|
||
| let input_field = &args.arg_fields[0]; | ||
|
|
||
| match input_field.data_type() { | ||
| DataType::List(_) | ||
| | DataType::LargeList(_) | ||
| | DataType::FixedSizeList(_, _) | ||
| | DataType::Map(_, _) | ||
| | DataType::Null => {} | ||
| dt => { | ||
| return plan_err!( | ||
| "size function requires array or map types, got: {}", | ||
| dt | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| let mut out_nullable = input_field.is_nullable(); | ||
|
|
||
| let scala_null_present = args | ||
| .scalar_arguments | ||
| .iter() | ||
| .any(|opt_s| opt_s.is_some_and(|sv| sv.is_null())); | ||
| if scala_null_present { | ||
| out_nullable = true; | ||
| } | ||
|
|
||
| Ok(Arc::new(Field::new( | ||
| self.name(), | ||
| DataType::Int32, | ||
| out_nullable, | ||
| ))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if args.arg_fields.len() != 1 { | |
| return plan_err!("size expects exactly 1 argument"); | |
| } | |
| let input_field = &args.arg_fields[0]; | |
| match input_field.data_type() { | |
| DataType::List(_) | |
| | DataType::LargeList(_) | |
| | DataType::FixedSizeList(_, _) | |
| | DataType::Map(_, _) | |
| | DataType::Null => {} | |
| dt => { | |
| return plan_err!( | |
| "size function requires array or map types, got: {}", | |
| dt | |
| ); | |
| } | |
| } | |
| let mut out_nullable = input_field.is_nullable(); | |
| let scala_null_present = args | |
| .scalar_arguments | |
| .iter() | |
| .any(|opt_s| opt_s.is_some_and(|sv| sv.is_null())); | |
| if scala_null_present { | |
| out_nullable = true; | |
| } | |
| Ok(Arc::new(Field::new( | |
| self.name(), | |
| DataType::Int32, | |
| out_nullable, | |
| ))) | |
| Ok(Arc::new(Field::new( | |
| self.name(), | |
| DataType::Int32, | |
| args.arg_fields[0].is_nullable(), | |
| ))) |
- We don't need to check argument count; signature guards this for us
- If we had a stricter signature then we wouldn't need to check input types either
- It's redundant to check for scalar nulls because if the input had a scalar null then the field would already be nullable
| if args.args.len() != 1 { | ||
| return plan_err!("size expects exactly 1 argument"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if args.args.len() != 1 { | |
| return plan_err!("size expects exactly 1 argument"); | |
| } |
Signature guards us
| let list_array = array.as_list::<i32>(); | ||
| let mut builder = Int32Builder::with_capacity(list_array.len()); | ||
| for i in 0..list_array.len() { | ||
| if list_array.is_null(i) { | ||
| builder.append_null(); | ||
| } else { | ||
| let len = list_array.value(i).len(); | ||
| builder.append_value(len as i32) | ||
| } | ||
| } | ||
|
|
||
| Ok(Arc::new(builder.finish())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we can greatly simplify this if we use a unary kernel instead of a separate builder 🤔
Or even utilize the OffsetBuffer inside to grab the lengths, e.g.
https://docs.rs/arrow/latest/arrow/buffer/struct.OffsetBuffer.html#method.lengths
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could even try to re-use the arrow kernel here:
https://docs.rs/arrow/latest/arrow/compute/kernels/length/fn.length.html
Though it doesn't handle maps; but at least can see how it handles it internally and perhaps replicate that code for maps here
| #[test] | ||
| fn test_size_nullability() { | ||
| let size_fn = SparkSize::new(); | ||
|
|
||
| // Non-nullable list input -> non-nullable output | ||
| let non_nullable_list = Arc::new(Field::new( | ||
| "col", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I'd remove this test; the amount of code it requires doesn't really justify the value I see it bringing 🤔
| #[test] | ||
| fn test_size_with_null_scalar() { | ||
| let size_fn = SparkSize::new(); | ||
|
|
||
| let non_nullable_list = Arc::new(Field::new( | ||
| "col", | ||
| DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), | ||
| false, | ||
| )); | ||
|
|
||
| // With null scalar argument |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests onwards should be moved to SLTs
|
Here is Comet's implementation of |
Which issue does this PR close?
sizefunction #5338.datafusion-sparkSpark Compatible Functions #15914.Rationale for this change
The size function is a commonly used Spark SQL function that returns the number of elements in an array or the number of key-value pairs in a map.
What changes are included in this PR?
Implement Spark-compatible size function in the datafusion-spark crate:
Are these changes tested?
Yes:
Are there any user-facing changes?
Yes, new size function available in the Spark crate.