Skip to content

fix: default values for native_datafusion scan #1756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
May 23, 2025

Conversation

mbutrovich
Copy link
Contributor

@mbutrovich mbutrovich commented May 20, 2025

This change is only needed for native_datafusion since native_iceberg_compat works at the column granularity and seems to populate that fine.

Which issue does this PR close?

Closes #1750.

Rationale for this change

What changes are included in this PR?

  • Serialize default values from the query plan, deserialize on native side, and embed in SchemaMapper.
  • Default null columns now use compact scalar representation for the column.
  • Refactored some old comments and unused fields in SparkParquetOptions.

How are these changes tested?

@andygrove's simple unit test, and added a new fuzz test that tests all primitive types for default values.

@mbutrovich mbutrovich changed the title fix: default values for experimental native scans fix: default values for experimental native_datafusion scan May 20, 2025
@mbutrovich mbutrovich marked this pull request as ready for review May 20, 2025 17:31
@@ -715,6 +715,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
file_groups,
None,
data_filters,
None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, missing columns for native_iceberg_compat are handled elsewhere and the DataSourceExec will never know about them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's handled in the ConstantColumnReader which is shared between native_comet and native_iceberg_compat.
Also see ResolveDefaultColumns.getExistenceDefaultValues. Not quite sure what the difference between ExistenceDefaultValues and simply default values is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Spark javadoc -

org. apache. spark. sql. catalyst. util. ResolveDefaultColumns

def constantFoldCurrentDefaultsToExistDefaults(tableSchema: StructType, statementType: String): StructType

Finds "current default" expressions in CREATE/ REPLACE TABLE columns and constant-folds them.

The results are stored in the "exists default" metadata of the same columns. For example, in the event of this statement:

CREATE TABLE T(a INT, b INT DEFAULT 5 + 5)

This method constant-folds the "current default" value, stored in the CURRENT_DEFAULT metadata of 
the "b" column, to "10", storing the result in the "exists default" value within the EXISTS_DEFAULT 
metadata of that same column. Meanwhile the "current default" metadata of this "b" column retains its
original value of "5 + 5".

The reason for constant-folding the EXISTS_DEFAULT is to make the end-user visible behavior the same, 
after executing an ALTER TABLE ADD COLUMNS command with DEFAULT value, as if the system had 
performed an exhaustive backfill of the provided value to all previously existing rows in the table instead. 

We choose to avoid doing such a backfill because it would be a time-consuming and costly operation. 
Instead, we elect to store the EXISTS_DEFAULT in the column metadata for future reference when 
querying data out of the data source. In turn, each data source then takes responsibility to provide the 
constant-folded value in the EXISTS_DEFAULT metadata for such columns where the value is not 
present in storage.

I'll assume that the default values you get are the 'existence' defaults

);
let mut parquet_source =
ParquetSource::new(table_parquet_options).with_schema_adapter_factory(Arc::new(
SparkSchemaAdapterFactory::new(spark_parquet_options, default_values),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can discuss if it makes more sense to stick default_values inside of the SparkParquetOptions struct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to do that even though it might make the code. a little bit simpler. default_values are not exactly options. But I'm not going to argue if you choose to do it that way.

@@ -60,9 +60,6 @@ pub struct SparkParquetOptions {
pub allow_incompat: bool,
/// Support casting unsigned ints to signed ints (used by Parquet SchemaAdapter)
pub allow_cast_unsigned_ints: bool,
/// We also use the cast logic for adapting Parquet schemas, so this flag is used
/// for that use case
pub is_adapting_schema: bool,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dead code from when we used the cast logic (and CastOptions) to handle Parquet type conversion.

file_idx.map_or_else(
// If this field only exists in the table, and not in the file, then we know
// that it's null, so just return that.
|| Ok(new_null_array(field.data_type(), batch_rows)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of instantiating an entire null array in favor of a single null value for column.

@@ -2327,18 +2346,18 @@ object QueryPlanSerde extends Logging with CometExprShim {
val requiredSchema = schema2Proto(scan.requiredSchema.fields)
val dataSchema = schema2Proto(scan.relation.dataSchema.fields)

val data_schema_idxs = scan.requiredSchema.fields.map(field => {
val dataSchemaIndexes = scan.requiredSchema.fields.map(field => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fixing incorrectly formatted variables as I find them.

@mbutrovich mbutrovich requested a review from andygrove May 20, 2025 19:41
@mbutrovich
Copy link
Contributor Author

Something else to look at for Spark 3.4...

select column with default value (native_comet, native shuffle) *** FAILED *** (449 milliseconds)
  org.apache.spark.sql.AnalysisException: Failed to execute ALTER TABLE ADD COLUMNS command because the destination table column col2 has a DEFAULT value with type ByteType, but the statement provided a value of incompatible type IntegerType

@codecov-commenter
Copy link

codecov-commenter commented May 21, 2025

Codecov Report

Attention: Patch coverage is 85.71429% with 2 lines in your changes missing coverage. Please review.

Project coverage is 58.62%. Comparing base (f09f8af) to head (916b43b).
Report is 203 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 85.71% 0 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1756      +/-   ##
============================================
+ Coverage     56.12%   58.62%   +2.49%     
- Complexity      976     1131     +155     
============================================
  Files           119      130      +11     
  Lines         11743    12673     +930     
  Branches       2251     2367     +116     
============================================
+ Hits           6591     7429     +838     
- Misses         4012     4058      +46     
- Partials       1140     1186      +46     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @mbutrovich!

@mbutrovich mbutrovich changed the title fix: default values for experimental native_datafusion scan fix: default values for native_datafusion scan May 21, 2025
@andygrove andygrove merged commit 9da11c5 into apache:main May 23, 2025
79 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[native_datafusion] No support for default values for Parquet columns
4 participants