Skip to content

Improve error messages if schema hint mismatches with parquet schema #7481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions arrow-array/src/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,8 +1073,8 @@ mod tests {

let a = Int64Array::from(vec![1, 2, 3, 4, 5]);

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]);
assert!(batch.is_err());
let err = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a)]).unwrap_err();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive by cleanup

assert_eq!(err.to_string(), "Invalid argument error: column types must match schema types, expected Int32 but found Int64 at column index 0");
}

#[test]
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/arrow/arrow_reader/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::arrow::ProjectionMask;
use arrow_array::{BooleanArray, RecordBatch};
use arrow_schema::ArrowError;
use std::fmt::{Debug, Formatter};

/// A predicate operating on [`RecordBatch`]
///
Expand Down Expand Up @@ -174,6 +175,12 @@ pub struct RowFilter {
pub(crate) predicates: Vec<Box<dyn ArrowPredicate>>,
}

impl Debug for RowFilter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "RowFilter {{ {} predicates: }}", self.predicates.len())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArrowPredicate doesn't implement Debug

}
}

impl RowFilter {
/// Create a new [`RowFilter`] from an array of [`ArrowPredicate`]
pub fn new(predicates: Vec<Box<dyn ArrowPredicate>>) -> Self {
Expand Down
156 changes: 127 additions & 29 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
pub use selection::{RowSelection, RowSelector};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

pub use crate::arrow::array_reader::RowGroups;
Expand Down Expand Up @@ -113,6 +114,24 @@ pub struct ArrowReaderBuilder<T> {
pub(crate) offset: Option<usize>,
}

impl<T: Debug> Debug for ArrowReaderBuilder<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ArrowReaderBuilder<T>")
.field("input", &self.input)
.field("metadata", &self.metadata)
.field("schema", &self.schema)
.field("fields", &self.fields)
.field("batch_size", &self.batch_size)
.field("row_groups", &self.row_groups)
.field("projection", &self.projection)
.field("filter", &self.filter)
.field("selection", &self.selection)
.field("limit", &self.limit)
.field("offset", &self.offset)
.finish()
}
}

impl<T> ArrowReaderBuilder<T> {
pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
Self {
Expand Down Expand Up @@ -520,37 +539,55 @@ impl ArrowReaderMetadata {
// parquet_to_arrow_field_levels is expected to throw an error if the schemas have
// different lengths, but we check here to be safe.
if inferred_len != supplied_len {
Err(arrow_err!(format!(
"incompatible arrow schema, expected {} columns received {}",
return Err(arrow_err!(format!(
"Incompatible supplied Arrow schema: expected {} columns received {}",
inferred_len, supplied_len
)))
} else {
let diff_fields: Vec<_> = supplied_schema
.fields()
.iter()
.zip(fields.iter())
.filter_map(|(field1, field2)| {
if field1 != field2 {
Some(field1.name().clone())
} else {
None
}
})
.collect();
)));
}

if !diff_fields.is_empty() {
Err(ParquetError::ArrowError(format!(
"incompatible arrow schema, the following fields could not be cast: [{}]",
diff_fields.join(", ")
)))
} else {
Ok(Self {
metadata,
schema: supplied_schema,
fields: field_levels.levels.map(Arc::new),
})
let mut errors = Vec::new();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improving this message is the point of the PR

I also relaxed the check slightly so this will now allow the fields to differ in metadata where previously this would return an error. There is no test coverage for mismatched schemas

FYI @paleolimbot in case you have any wisdom to share here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...this would mean that extension types can be cast implicitly to their storage (or perhaps the opposite, depending on which field metadata takes precedence). It is probably safer to fail, but not the end of the world because those errors will show up later (an error matching a signature if the extension metadata is dropped, or an error parsing bytes if unexpected content was given an extension type by accident). A true "user defined type" solution for DataFusion would be a place to handle this properly in some future (field_common(field_a, field_b) -> Field, field_cast(array, array_field, common_field) -> ArrayRef, or something).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think relaxing the check means that a user could supply the reader a schema that had metadata that was not present in the file and the reader will then read RecordBatches that have that metadata

I agree field_cast is the longer term right thing to do in DataFusion

In arrow-rs I think that field "casting" is happening during reading of parquet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably the only correctness issue would be if the supplied schema had conflicting extension metadata (e.g., unit: m vs unit: cm). I am not sure that the current Parquet reader ever produces extension metadata (does it read the ARROW:i_forget_the_exact_name key and deserialize the schema?), so perhaps not an issue as long as somebody remembers this when it does.

Copy link
Contributor Author

@alamb alamb May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can put the metadata check back in perhaps and we can relax it when necessary.

I mostly was being lazy to avoid writing a test for it .


let field_iter = supplied_schema.fields().iter().zip(fields.iter());

for (field1, field2) in field_iter {
if field1.data_type() != field2.data_type() {
errors.push(format!(
"data type mismatch for field {}: requested {:?} but found {:?}",
field1.name(),
field1.data_type(),
field2.data_type()
));
}
if field1.is_nullable() != field2.is_nullable() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a non-nullable field be cast to a nullable one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory yes, but the current check prevents it and I would like to concentrate on improving the error messages rather than puzzling out the implications of a behavior change.

I will also update this PR to restore the metadata checking as well (with a nicer message)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a metadata check, but I couldn't write a test for it due to

errors.push(format!(
"nullability mismatch for field {}: expected {:?} but found {:?}",
field1.name(),
field1.is_nullable(),
field2.is_nullable()
));
}
if field1.metadata() != field2.metadata() {
errors.push(format!(
"metadata mismatch for field {}: expected {:?} but found {:?}",
field1.name(),
field1.metadata(),
field2.metadata()
));
}
}

if !errors.is_empty() {
let message = errors.join(", ");
return Err(ParquetError::ArrowError(format!(
"Incompatible supplied Arrow schema: {message}",
)));
}

Ok(Self {
metadata,
schema: supplied_schema,
fields: field_levels.levels.map(Arc::new),
})
}

/// Returns a reference to the [`ParquetMetaData`] for this parquet file
Expand All @@ -573,6 +610,12 @@ impl ArrowReaderMetadata {
/// A newtype used within [`ReaderOptionsBuilder`] to distinguish sync readers from async
pub struct SyncReader<T: ChunkReader>(T);

impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("SyncReader").field(&self.0).finish()
}
}

/// A synchronous builder used to construct [`ParquetRecordBatchReader`] for a file
///
/// For an async API see [`crate::arrow::async_reader::ParquetRecordBatchStreamBuilder`]
Expand Down Expand Up @@ -3392,7 +3435,7 @@ mod tests {
Field::new("col2_valid", ArrowDataType::Int32, false),
Field::new("col3_invalid", ArrowDataType::Int32, false),
])),
"Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
"Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty good example of the before / after error messages. I would feel much better trying to debug the new message than the old

);
}

Expand Down Expand Up @@ -3430,10 +3473,65 @@ mod tests {
false,
),
])),
"Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
"Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
requested Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }]) \
but found Struct([Field { name: \"nested1_valid\", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"nested1_invalid\", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }])",
);
}

/// Return parquet data with a single column of utf8 strings
fn utf8_parquet() -> Bytes {
let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
let props = None;
// write parquet file with non nullable strings
let mut parquet_data = vec![];
let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
Bytes::from(parquet_data)
}

#[test]
fn test_schema_error_bad_types() {
// verify incompatible schemas error on read
let parquet_data = utf8_parquet();

// Ask to read it back with an incompatible schema (int vs string)
let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
"column1",
arrow::datatypes::DataType::Int32,
false,
)]));

// read it back out
let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
let err =
ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
.unwrap_err();
assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8")
}

#[test]
fn test_schema_error_bad_nullability() {
// verify incompatible schemas error on read
let parquet_data = utf8_parquet();

// Ask to read it back with an incompatible schema (nullability mismatch)
let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
"column1",
arrow::datatypes::DataType::Utf8,
true,
)]));

// read it back out
let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
let err =
ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
.unwrap_err();
assert_eq!(err.to_string(), "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false")
}

#[test]
fn test_read_binary_as_utf8() {
let file = write_parquet_from_iter(vec![
Expand Down
Loading