Skip to content

Commit fc6e7c1

Browse files
nantunesNirnay Roy
authored and
Nirnay Roy
committed
fix(avro): Respect projection order in Avro reader (apache#15840)
Fixed issue in the Avro reader that caused queries to fail when columns were reordered in the SELECT statement. The reader now correctly: 1. Builds arrays in the order specified in the projection 2. Creates a properly ordered schema matching the projection Previously when selecting columns in a different order than the original schema (e.g., `SELECT timestamp, username FROM avro_table`), the reader would produce error due to type mismatches between the data arrays and the expected schema. Fixes apache#15839
1 parent ead5764 commit fc6e7c1

File tree

3 files changed

+98
-42
lines changed

3 files changed

+98
-42
lines changed

datafusion/datasource-avro/src/avro_to_arrow/arrow_array_reader.rs

Lines changed: 8 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use arrow::buffer::{Buffer, MutableBuffer};
3333
use arrow::datatypes::{
3434
ArrowDictionaryKeyType, ArrowNumericType, ArrowPrimitiveType, DataType, Date32Type,
3535
Date64Type, Field, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type,
36-
Int8Type, Schema, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
36+
Int8Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType,
3737
Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType,
3838
TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type,
3939
UInt8Type,
@@ -56,23 +56,17 @@ type RecordSlice<'a> = &'a [&'a Vec<(String, Value)>];
5656
pub struct AvroArrowArrayReader<'a, R: Read> {
5757
reader: AvroReader<'a, R>,
5858
schema: SchemaRef,
59-
projection: Option<Vec<String>>,
6059
schema_lookup: BTreeMap<String, usize>,
6160
}
6261

6362
impl<R: Read> AvroArrowArrayReader<'_, R> {
64-
pub fn try_new(
65-
reader: R,
66-
schema: SchemaRef,
67-
projection: Option<Vec<String>>,
68-
) -> Result<Self> {
63+
pub fn try_new(reader: R, schema: SchemaRef) -> Result<Self> {
6964
let reader = AvroReader::new(reader)?;
7065
let writer_schema = reader.writer_schema().clone();
7166
let schema_lookup = Self::schema_lookup(writer_schema)?;
7267
Ok(Self {
7368
reader,
7469
schema,
75-
projection,
7670
schema_lookup,
7771
})
7872
}
@@ -175,20 +169,9 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
175169
};
176170

177171
let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
178-
let projection = self.projection.clone().unwrap_or_default();
179-
let arrays =
180-
self.build_struct_array(&rows, "", self.schema.fields(), &projection);
181-
let projected_fields = if projection.is_empty() {
182-
self.schema.fields().clone()
183-
} else {
184-
projection
185-
.iter()
186-
.filter_map(|name| self.schema.column_with_name(name))
187-
.map(|(_, field)| field.clone())
188-
.collect()
189-
};
190-
let projected_schema = Arc::new(Schema::new(projected_fields));
191-
Some(arrays.and_then(|arr| RecordBatch::try_new(projected_schema, arr)))
172+
let arrays = self.build_struct_array(&rows, "", self.schema.fields());
173+
174+
Some(arrays.and_then(|arr| RecordBatch::try_new(Arc::clone(&self.schema), arr)))
192175
}
193176

194177
fn build_boolean_array(&self, rows: RecordSlice, col_name: &str) -> ArrayRef {
@@ -615,7 +598,7 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
615598
let sub_parent_field_name =
616599
format!("{}.{}", parent_field_name, list_field.name());
617600
let arrays =
618-
self.build_struct_array(&rows, &sub_parent_field_name, fields, &[])?;
601+
self.build_struct_array(&rows, &sub_parent_field_name, fields)?;
619602
let data_type = DataType::Struct(fields.clone());
620603
ArrayDataBuilder::new(data_type)
621604
.len(rows.len())
@@ -645,20 +628,14 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
645628
/// The function does not construct the StructArray as some callers would want the child arrays.
646629
///
647630
/// *Note*: The function is recursive, and will read nested structs.
648-
///
649-
/// If `projection` is not empty, then all values are returned. The first level of projection
650-
/// occurs at the `RecordBatch` level. No further projection currently occurs, but would be
651-
/// useful if plucking values from a struct, e.g. getting `a.b.c.e` from `a.b.c.{d, e}`.
652631
fn build_struct_array(
653632
&self,
654633
rows: RecordSlice,
655634
parent_field_name: &str,
656635
struct_fields: &Fields,
657-
projection: &[String],
658636
) -> ArrowResult<Vec<ArrayRef>> {
659637
let arrays: ArrowResult<Vec<ArrayRef>> = struct_fields
660638
.iter()
661-
.filter(|field| projection.is_empty() || projection.contains(field.name()))
662639
.map(|field| {
663640
let field_path = if parent_field_name.is_empty() {
664641
field.name().to_string()
@@ -840,12 +817,8 @@ impl<R: Read> AvroArrowArrayReader<'_, R> {
840817
}
841818
})
842819
.collect::<Vec<&Vec<(String, Value)>>>();
843-
let arrays = self.build_struct_array(
844-
&struct_rows,
845-
&field_path,
846-
fields,
847-
&[],
848-
)?;
820+
let arrays =
821+
self.build_struct_array(&struct_rows, &field_path, fields)?;
849822
// construct a struct array's data in order to set null buffer
850823
let data_type = DataType::Struct(fields.clone());
851824
let data = ArrayDataBuilder::new(data_type)

datafusion/datasource-avro/src/avro_to_arrow/reader.rs

Lines changed: 80 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use super::arrow_array_reader::AvroArrowArrayReader;
19-
use arrow::datatypes::SchemaRef;
19+
use arrow::datatypes::{Fields, SchemaRef};
2020
use arrow::error::Result as ArrowResult;
2121
use arrow::record_batch::RecordBatch;
2222
use datafusion_common::Result;
@@ -133,19 +133,35 @@ impl<R: Read> Reader<'_, R> {
133133
///
134134
/// If reading a `File`, you can customise the Reader, such as to enable schema
135135
/// inference, use `ReaderBuilder`.
136+
///
137+
/// If projection is provided, it uses a schema with only the fields in the projection, respecting their order.
138+
/// Only the first level of projection is handled. No further projection currently occurs, but would be
139+
/// useful if plucking values from a struct, e.g. getting `a.b.c.e` from `a.b.c.{d, e}`.
136140
pub fn try_new(
137141
reader: R,
138142
schema: SchemaRef,
139143
batch_size: usize,
140144
projection: Option<Vec<String>>,
141145
) -> Result<Self> {
146+
let projected_schema = projection.as_ref().filter(|p| !p.is_empty()).map_or_else(
147+
|| Arc::clone(&schema),
148+
|proj| {
149+
Arc::new(arrow::datatypes::Schema::new(
150+
proj.iter()
151+
.filter_map(|name| {
152+
schema.column_with_name(name).map(|(_, f)| f.clone())
153+
})
154+
.collect::<Fields>(),
155+
))
156+
},
157+
);
158+
142159
Ok(Self {
143160
array_reader: AvroArrowArrayReader::try_new(
144161
reader,
145-
Arc::clone(&schema),
146-
projection,
162+
Arc::clone(&projected_schema),
147163
)?,
148-
schema,
164+
schema: projected_schema,
149165
batch_size,
150166
})
151167
}
@@ -179,10 +195,13 @@ mod tests {
179195
use arrow::datatypes::{DataType, Field};
180196
use std::fs::File;
181197

182-
fn build_reader(name: &str) -> Reader<File> {
198+
fn build_reader(name: &str, projection: Option<Vec<String>>) -> Reader<File> {
183199
let testdata = datafusion_common::test_util::arrow_test_data();
184200
let filename = format!("{testdata}/avro/{name}");
185-
let builder = ReaderBuilder::new().read_schema().with_batch_size(64);
201+
let mut builder = ReaderBuilder::new().read_schema().with_batch_size(64);
202+
if let Some(projection) = projection {
203+
builder = builder.with_projection(projection);
204+
}
186205
builder.build(File::open(filename).unwrap()).unwrap()
187206
}
188207

@@ -195,7 +214,7 @@ mod tests {
195214

196215
#[test]
197216
fn test_avro_basic() {
198-
let mut reader = build_reader("alltypes_dictionary.avro");
217+
let mut reader = build_reader("alltypes_dictionary.avro", None);
199218
let batch = reader.next().unwrap().unwrap();
200219

201220
assert_eq!(11, batch.num_columns());
@@ -281,4 +300,58 @@ mod tests {
281300
assert_eq!(1230768000000000, col.value(0));
282301
assert_eq!(1230768060000000, col.value(1));
283302
}
303+
304+
#[test]
305+
fn test_avro_with_projection() {
306+
// Test projection to filter and reorder columns
307+
let projection = Some(vec![
308+
"string_col".to_string(),
309+
"double_col".to_string(),
310+
"bool_col".to_string(),
311+
]);
312+
let mut reader = build_reader("alltypes_dictionary.avro", projection);
313+
let batch = reader.next().unwrap().unwrap();
314+
315+
// Only 3 columns should be present (not all 11)
316+
assert_eq!(3, batch.num_columns());
317+
assert_eq!(2, batch.num_rows());
318+
319+
let schema = reader.schema();
320+
let batch_schema = batch.schema();
321+
assert_eq!(schema, batch_schema);
322+
323+
// Verify columns are in the order specified in projection
324+
// First column should be string_col (was at index 9 in original)
325+
assert_eq!("string_col", schema.field(0).name());
326+
assert_eq!(&DataType::Binary, schema.field(0).data_type());
327+
let col = batch
328+
.column(0)
329+
.as_any()
330+
.downcast_ref::<BinaryArray>()
331+
.unwrap();
332+
assert_eq!("0".as_bytes(), col.value(0));
333+
assert_eq!("1".as_bytes(), col.value(1));
334+
335+
// Second column should be double_col (was at index 7 in original)
336+
assert_eq!("double_col", schema.field(1).name());
337+
assert_eq!(&DataType::Float64, schema.field(1).data_type());
338+
let col = batch
339+
.column(1)
340+
.as_any()
341+
.downcast_ref::<Float64Array>()
342+
.unwrap();
343+
assert_eq!(0.0, col.value(0));
344+
assert_eq!(10.1, col.value(1));
345+
346+
// Third column should be bool_col (was at index 1 in original)
347+
assert_eq!("bool_col", schema.field(2).name());
348+
assert_eq!(&DataType::Boolean, schema.field(2).data_type());
349+
let col = batch
350+
.column(2)
351+
.as_any()
352+
.downcast_ref::<BooleanArray>()
353+
.unwrap();
354+
assert!(col.value(0));
355+
assert!(!col.value(1));
356+
}
284357
}

datafusion/sqllogictest/test_files/avro.slt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,3 +253,13 @@ physical_plan
253253
04)------AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
254254
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
255255
06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, file_type=avro
256+
257+
# test column projection order from avro file
258+
query ITII
259+
SELECT id, string_col, int_col, bigint_col FROM alltypes_plain ORDER BY id LIMIT 5
260+
----
261+
0 0 0 0
262+
1 1 1 10
263+
2 0 0 0
264+
3 1 1 10
265+
4 0 0 0

0 commit comments

Comments
 (0)