Skip to content

fix: default values for native_datafusion scan #1756

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,42 @@ impl PhysicalPlanner {
.map(|expr| self.create_expr(expr, Arc::clone(&required_schema)))
.collect();

let default_values: Option<HashMap<usize, ScalarValue>> = if !scan
.default_values
.is_empty()
{
// We have default values. Extract the two lists (same length) of values and
// indexes in the schema, and then create a HashMap to use in the SchemaMapper.
let default_values: Result<Vec<ScalarValue>, DataFusionError> = scan
.default_values
.iter()
.map(|expr| {
let literal = self.create_expr(expr, Arc::clone(&required_schema))?;
let df_literal = literal
.as_any()
.downcast_ref::<DataFusionLiteral>()
.ok_or_else(|| {
GeneralError("Expected literal of default value.".to_string())
})?;
Ok(df_literal.value().clone())
})
.collect();
let default_values = default_values?;
let default_values_indexes: Vec<usize> = scan
.default_values_indexes
.iter()
.map(|offset| *offset as usize)
.collect();
Some(
default_values_indexes
.into_iter()
.zip(default_values)
.collect(),
)
} else {
None
};

// Get one file from the list of files
let one_file = scan
.file_partitions
Expand Down Expand Up @@ -1145,6 +1181,7 @@ impl PhysicalPlanner {
file_groups,
Some(projection_vector),
Some(data_filters?),
default_values,
scan.session_timezone.as_str(),
)?;
Ok((
Expand Down Expand Up @@ -3157,7 +3194,10 @@ mod tests {

let source = Arc::new(
ParquetSource::default().with_schema_adapter_factory(Arc::new(
SparkSchemaAdapterFactory::new(SparkParquetOptions::new(EvalMode::Ansi, "", false)),
SparkSchemaAdapterFactory::new(
SparkParquetOptions::new(EvalMode::Ansi, "", false),
None,
),
)),
);

Expand Down
1 change: 1 addition & 0 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
file_groups,
None,
data_filters,
None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, missing columns for native_iceberg_compat are handled elsewhere and the DataSourceExec will never know about them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's handled in the ConstantColumnReader which is shared between native_comet and native_iceberg_compat.
Also see ResolveDefaultColumns.getExistenceDefaultValues. Not quite sure what the difference between ExistenceDefaultValues and simply default values is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Spark javadoc -

org. apache. spark. sql. catalyst. util. ResolveDefaultColumns

def constantFoldCurrentDefaultsToExistDefaults(tableSchema: StructType, statementType: String): StructType

Finds "current default" expressions in CREATE/ REPLACE TABLE columns and constant-folds them.

The results are stored in the "exists default" metadata of the same columns. For example, in the event of this statement:

CREATE TABLE T(a INT, b INT DEFAULT 5 + 5)

This method constant-folds the "current default" value, stored in the CURRENT_DEFAULT metadata of 
the "b" column, to "10", storing the result in the "exists default" value within the EXISTS_DEFAULT 
metadata of that same column. Meanwhile the "current default" metadata of this "b" column retains its
original value of "5 + 5".

The reason for constant-folding the EXISTS_DEFAULT is to make the end-user visible behavior the same, 
after executing an ALTER TABLE ADD COLUMNS command with DEFAULT value, as if the system had 
performed an exhaustive backfill of the provided value to all previously existing rows in the table instead. 

We choose to avoid doing such a backfill because it would be a time-consuming and costly operation. 
Instead, we elect to store the EXISTS_DEFAULT in the column metadata for future reference when 
querying data out of the data source. In turn, each data source then takes responsibility to provide the 
constant-folded value in the EXISTS_DEFAULT metadata for such columns where the value is not 
present in storage.

I'll assume that the default values you get are the 'existence' defaults

session_timezone.as_str(),
)?;

Expand Down
10 changes: 7 additions & 3 deletions native/core/src/parquet/parquet_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::physical_expr::expressions::BinaryExpr;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::scalar::ScalarValue;
use datafusion_comet_spark_expr::EvalMode;
use itertools::Itertools;
use std::collections::HashMap;
use std::sync::Arc;

/// Initializes a DataSourceExec plan with a ParquetSource. This may be used by either the
Expand Down Expand Up @@ -61,12 +63,14 @@ pub(crate) fn init_datasource_exec(
file_groups: Vec<Vec<PartitionedFile>>,
projection_vector: Option<Vec<usize>>,
data_filters: Option<Vec<Arc<dyn PhysicalExpr>>>,
default_values: Option<HashMap<usize, ScalarValue>>,
session_timezone: &str,
) -> Result<Arc<DataSourceExec>, ExecutionError> {
let (table_parquet_options, spark_parquet_options) = get_options(session_timezone);
let mut parquet_source = ParquetSource::new(table_parquet_options).with_schema_adapter_factory(
Arc::new(SparkSchemaAdapterFactory::new(spark_parquet_options)),
);
let mut parquet_source =
ParquetSource::new(table_parquet_options).with_schema_adapter_factory(Arc::new(
SparkSchemaAdapterFactory::new(spark_parquet_options, default_values),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can discuss if it makes more sense to stick default_values inside of the SparkParquetOptions struct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to do that even though it might make the code. a little bit simpler. default_values are not exactly options. But I'm not going to argue if you choose to do it that way.

));
// Create a conjunctive form of the vector because ParquetExecBuilder takes
// a single expression
if let Some(data_filters) = data_filters {
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ pub struct SparkParquetOptions {
pub allow_incompat: bool,
/// Support casting unsigned ints to signed ints (used by Parquet SchemaAdapter)
pub allow_cast_unsigned_ints: bool,
/// We also use the cast logic for adapting Parquet schemas, so this flag is used
/// for that use case
pub is_adapting_schema: bool,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dead code from when we used the cast logic (and CastOptions) to handle Parquet type conversion.

/// Whether to always represent decimals using 128 bits. If false, the native reader may represent decimals using 32 or 64 bits, depending on the precision.
pub use_decimal_128: bool,
/// Whether to read dates/timestamps that were written in the legacy hybrid Julian + Gregorian calendar as it is. If false, throw exceptions instead. If the spark type is TimestampNTZ, this should be true.
Expand All @@ -78,7 +75,6 @@ impl SparkParquetOptions {
timezone: timezone.to_string(),
allow_incompat,
allow_cast_unsigned_ints: false,
is_adapting_schema: false,
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
Expand All @@ -91,7 +87,6 @@ impl SparkParquetOptions {
timezone: "".to_string(),
allow_incompat,
allow_cast_unsigned_ints: false,
is_adapting_schema: false,
use_decimal_128: false,
use_legacy_date_timestamp_or_ntz: false,
case_sensitive: false,
Expand Down
62 changes: 46 additions & 16 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
//! Custom schema adapter that uses Spark-compatible conversions

use crate::parquet::parquet_support::{spark_parquet_convert, SparkParquetOptions};
use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::{Schema, SchemaRef};
use datafusion::common::ColumnStatistics;
use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
use datafusion::physical_plan::ColumnarValue;
use datafusion::scalar::ScalarValue;
use std::collections::HashMap;
use std::sync::Arc;

/// An implementation of DataFusion's `SchemaAdapterFactory` that uses a Spark-compatible
Expand All @@ -31,12 +33,17 @@ use std::sync::Arc;
pub struct SparkSchemaAdapterFactory {
/// Spark cast options
parquet_options: SparkParquetOptions,
default_values: Option<HashMap<usize, ScalarValue>>,
}

impl SparkSchemaAdapterFactory {
pub fn new(options: SparkParquetOptions) -> Self {
pub fn new(
options: SparkParquetOptions,
default_values: Option<HashMap<usize, ScalarValue>>,
) -> Self {
Self {
parquet_options: options,
default_values,
}
}
}
Expand All @@ -56,6 +63,7 @@ impl SchemaAdapterFactory for SparkSchemaAdapterFactory {
Box::new(SparkSchemaAdapter {
required_schema,
parquet_options: self.parquet_options.clone(),
default_values: self.default_values.clone(),
})
}
}
Expand All @@ -69,6 +77,7 @@ pub struct SparkSchemaAdapter {
required_schema: SchemaRef,
/// Spark cast options
parquet_options: SparkParquetOptions,
default_values: Option<HashMap<usize, ScalarValue>>,
}

impl SchemaAdapter for SparkSchemaAdapter {
Expand Down Expand Up @@ -134,6 +143,7 @@ impl SchemaAdapter for SparkSchemaAdapter {
required_schema: Arc::<Schema>::clone(&self.required_schema),
field_mappings,
parquet_options: self.parquet_options.clone(),
default_values: self.default_values.clone(),
}),
projection,
))
Expand All @@ -158,16 +168,7 @@ impl SchemaAdapter for SparkSchemaAdapter {
/// out of the execution of this query. Thus `map_batch` uses
/// `projected_table_schema` as it can only operate on the projected fields.
///
/// [`map_partial_batch`] is used to create a RecordBatch with a schema that
/// can be used for Parquet predicate pushdown, meaning that it may contain
/// fields which are not in the projected schema (as the fields that parquet
/// pushdown filters operate can be completely distinct from the fields that are
/// projected (output) out of the ParquetExec). `map_partial_batch` thus uses
/// `table_schema` to create the resulting RecordBatch (as it could be operating
/// on any fields in the schema).
///
/// [`map_batch`]: Self::map_batch
/// [`map_partial_batch`]: Self::map_partial_batch
#[derive(Debug)]
pub struct SchemaMapping {
/// The schema of the table. This is the expected schema after conversion
Expand All @@ -181,6 +182,7 @@ pub struct SchemaMapping {
field_mappings: Vec<Option<usize>>,
/// Spark cast options
parquet_options: SparkParquetOptions,
default_values: Option<HashMap<usize, ScalarValue>>,
}

impl SchemaMapper for SchemaMapping {
Expand All @@ -197,15 +199,43 @@ impl SchemaMapper for SchemaMapping {
// go through each field in the projected schema
.fields()
.iter()
.enumerate()
// and zip it with the index that maps fields from the projected table schema to the
// projected file schema in `batch`
.zip(&self.field_mappings)
// and for each one...
.map(|(field, file_idx)| {
.map(|((field_idx, field), file_idx)| {
file_idx.map_or_else(
// If this field only exists in the table, and not in the file, then we know
// that it's null, so just return that.
|| Ok(new_null_array(field.data_type(), batch_rows)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got rid of instantiating an entire null array in favor of a single null value for column.

// If this field only exists in the table, and not in the file, then we need to
// populate a default value for it.
|| {
if self.default_values.is_some() {
// We have a map of default values, see if this field is in there.
if let Some(value) =
self.default_values.as_ref().unwrap().get(&field_idx)
// Default value exists, construct a column from it.
{
let cv = if field.data_type() == &value.data_type() {
ColumnarValue::Scalar(value.clone())
} else {
// Data types don't match. This can happen when default values
// are stored by Spark in a format different than the column's
// type (e.g., INT32 when the column is DATE32)
spark_parquet_convert(
ColumnarValue::Scalar(value.clone()),
field.data_type(),
&self.parquet_options,
)?
};
return cv.into_array(batch_rows);
}
}
// Construct an entire column of nulls. We use the Scalar representation
// for better performance.
let cv =
ColumnarValue::Scalar(ScalarValue::try_new_null(field.data_type())?);
cv.into_array(batch_rows)
},
// However, if it does exist in both, then try to cast it to the correct output
// type
|batch_idx| {
Expand Down Expand Up @@ -316,7 +346,7 @@ mod test {

let parquet_source = Arc::new(
ParquetSource::new(TableParquetOptions::new()).with_schema_adapter_factory(Arc::new(
SparkSchemaAdapterFactory::new(spark_parquet_options),
SparkSchemaAdapterFactory::new(spark_parquet_options, None),
)),
);

Expand Down
2 changes: 2 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ message NativeScan {
repeated SparkFilePartition file_partitions = 7;
repeated int64 projection_vector = 8;
string session_timezone = 9;
repeated spark.spark_expression.Expr default_values = 10;
repeated int64 default_values_indexes = 11;
}

message Projection {
Expand Down
27 changes: 23 additions & 4 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, Normalize
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getExistenceDefaultValues
import org.apache.spark.sql.comet._
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution
Expand Down Expand Up @@ -2302,6 +2303,24 @@ object QueryPlanSerde extends Logging with CometExprShim {
nativeScanBuilder.addAllDataFilters(dataFilters.asJava)
}

val possibleDefaultValues = getExistenceDefaultValues(scan.requiredSchema)
if (possibleDefaultValues.exists(_ != null)) {
// Our schema has default values. Serialize two lists, one with the default values
// and another with the indexes in the schema so the native side can map missing
// columns to these default values.
val (defaultValues, indexes) = possibleDefaultValues.zipWithIndex
.filter { case (expr, _) => expr != null }
.map { case (expr, index) =>
// ResolveDefaultColumnsUtil.getExistenceDefaultValues has evaluated these
// expressions and they should now just be literals.
(Literal(expr), index.toLong.asInstanceOf[java.lang.Long])
}
.unzip
nativeScanBuilder.addAllDefaultValues(
defaultValues.flatMap(exprToProto(_, scan.output)).toIterable.asJava)
nativeScanBuilder.addAllDefaultValuesIndexes(indexes.toIterable.asJava)
}

// TODO: modify CometNativeScan to generate the file partitions without instantiating RDD.
scan.inputRDD match {
case rdd: DataSourceRDD =>
Expand All @@ -2326,18 +2345,18 @@ object QueryPlanSerde extends Logging with CometExprShim {
val requiredSchema = schema2Proto(scan.requiredSchema.fields)
val dataSchema = schema2Proto(scan.relation.dataSchema.fields)

val data_schema_idxs = scan.requiredSchema.fields.map(field => {
val dataSchemaIndexes = scan.requiredSchema.fields.map(field => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just fixing incorrectly formatted variables as I find them.

scan.relation.dataSchema.fieldIndex(field.name)
})
val partition_schema_idxs = Array
val partitionSchemaIndexes = Array
.range(
scan.relation.dataSchema.fields.length,
scan.relation.dataSchema.length + scan.relation.partitionSchema.fields.length)

val projection_vector = (data_schema_idxs ++ partition_schema_idxs).map(idx =>
val projectionVector = (dataSchemaIndexes ++ partitionSchemaIndexes).map(idx =>
idx.toLong.asInstanceOf[java.lang.Long])

nativeScanBuilder.addAllProjectionVector(projection_vector.toIterable.asJava)
nativeScanBuilder.addAllProjectionVector(projectionVector.toIterable.asJava)

// In `CometScanRule`, we ensure partitionSchema is supported.
assert(partitionSchema.length == scan.relation.partitionSchema.fields.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("parquet default values") {
withTable("t1") {
sql("create table t1(col1 boolean) using parquet")
sql("insert into t1 values(true)")
sql("alter table t1 add column col2 string default 'hello'")
checkSparkAnswerAndOperator("select * from t1")
}
}

test("coalesce should return correct datatype") {
Seq(true, false).foreach { dictionaryEnabled =>
withTempDir { dir =>
Expand Down
Loading
Loading