Skip to content

Commit 98ae29a

Browse files
committed
Address Remaining PR Comments
1 parent 8e6face commit 98ae29a

File tree

3 files changed

+108
-160
lines changed

3 files changed

+108
-160
lines changed

arrow-avro/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ bzip2 = { version = "0.6.0", optional = true }
5656
xz = { version = "0.1", default-features = false, optional = true }
5757
crc = { version = "3.0", optional = true }
5858
uuid = "1.17"
59-
indexmap = "2"
59+
indexmap = "2.10"
6060

6161
[dev-dependencies]
6262
arrow-data = { workspace = true }

arrow-avro/src/reader/mod.rs

Lines changed: 66 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@
8888
8989
use crate::codec::{AvroField, AvroFieldBuilder};
9090
use crate::schema::{
91-
compare_schemas, Fingerprint, FingerprintAlgorithm, Schema as AvroSchema, SchemaStore,
92-
SINGLE_OBJECT_MAGIC,
91+
compare_schemas, generate_fingerprint, Fingerprint, FingerprintAlgorithm, Schema as AvroSchema,
92+
SchemaStore, SINGLE_OBJECT_MAGIC,
9393
};
9494
use arrow_array::{RecordBatch, RecordBatchReader};
9595
use arrow_schema::{ArrowError, SchemaRef};
@@ -140,13 +140,12 @@ pub struct Decoder {
140140
active_decoder: RecordDecoder,
141141
active_fingerprint: Option<Fingerprint>,
142142
batch_size: usize,
143-
decoded_rows: usize,
143+
remaining_capacity: usize,
144144
cache: IndexMap<Fingerprint, RecordDecoder>,
145145
max_cache_size: usize,
146146
reader_schema: Option<AvroSchema<'static>>,
147-
schema_store: Option<SchemaStore<'static>>,
147+
writer_schema_store: Option<SchemaStore<'static>>,
148148
utf8_view: bool,
149-
static_store_mode: bool,
150149
strict_mode: bool,
151150
pending_schema: Option<(Fingerprint, RecordDecoder)>,
152151
}
@@ -168,34 +167,43 @@ impl Decoder {
168167
///
169168
/// Returns the number of bytes consumed.
170169
pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
170+
if self.active_fingerprint.is_none() && self.writer_schema_store.is_some() {
171+
if !data.starts_with(&SINGLE_OBJECT_MAGIC) {
172+
return Err(ArrowError::ParseError(
173+
"Expected single‑object encoding fingerprint prefix for first message \
174+
(writer_schema_store is set but active_fingerprint is None)"
175+
.into(),
176+
));
177+
}
178+
}
171179
let mut total_consumed = 0usize;
172-
let hash_type = self.schema_store.as_ref().map_or(
180+
let hash_type = self.writer_schema_store.as_ref().map_or(
173181
FingerprintAlgorithm::Rabin,
174182
SchemaStore::fingerprint_algorithm,
175183
);
176-
while total_consumed < data.len() && self.decoded_rows < self.batch_size {
184+
while total_consumed < data.len() && self.remaining_capacity > 0 {
177185
if let Some(prefix_bytes) = self.handle_prefix(&data[total_consumed..], hash_type)? {
178-
// Schema change detected (> 0) or there were insufficient bytes to read the next prefix (= 0).
179-
// If the former, the batch must end because the next record has a different schema.
180-
// If the latter, batch ends because the caller needs to fetch more bytes.
186+
// A batch is complete when its `remaining_capacity` is 0. It may be completed early if
187+
// a schema change is detected or there are insufficient bytes to read the next prefix.
188+
// A schema change requires a new batch.
181189
total_consumed += prefix_bytes;
182190
break;
183191
}
184192
let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
185193
total_consumed += n;
186-
self.decoded_rows += 1;
194+
self.remaining_capacity -= 1;
187195
}
188196
Ok(total_consumed)
189197
}
190198

191199
/// Produce a `RecordBatch` if at least one row is fully decoded, returning
192200
/// `Ok(None)` if no new rows are available.
193201
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
194-
if self.decoded_rows == 0 {
202+
if self.remaining_capacity == self.batch_size {
195203
return Ok(None);
196204
}
197205
let batch = self.active_decoder.flush()?;
198-
self.decoded_rows = 0;
206+
self.remaining_capacity = self.batch_size;
199207
// Apply a pending schema switch if one is staged
200208
if let Some((new_fingerprint, new_decoder)) = self.pending_schema.take() {
201209
// Cache the old decoder before replacing it
@@ -210,7 +218,6 @@ impl Decoder {
210218
self.active_decoder = new_decoder;
211219
}
212220
}
213-
self.evict_cache();
214221
Ok(Some(batch))
215222
}
216223

@@ -220,10 +227,7 @@ impl Decoder {
220227
buf: &[u8],
221228
hash_type: FingerprintAlgorithm,
222229
) -> Result<Option<usize>, ArrowError> {
223-
if self.schema_store.is_none()
224-
|| self.static_store_mode
225-
|| !buf.starts_with(&SINGLE_OBJECT_MAGIC)
226-
{
230+
if self.writer_schema_store.is_none() || !buf.starts_with(&SINGLE_OBJECT_MAGIC) {
227231
return Ok(None);
228232
}
229233
let full_len = prefix_len(hash_type);
@@ -247,8 +251,8 @@ impl Decoder {
247251
self.prepare_schema_switch(new_fp)?;
248252
// If there are already decoded rows, we must flush them first.
249253
// Forcing the batch to be full ensures `flush` is called next.
250-
if self.decoded_rows > 0 {
251-
self.decoded_rows = self.batch_size;
254+
if self.remaining_capacity < self.batch_size {
255+
self.remaining_capacity = 0;
252256
}
253257
}
254258
Ok(Some(full_len))
@@ -260,7 +264,7 @@ impl Decoder {
260264
} else {
261265
// No cached decoder, create a new one
262266
let store = self
263-
.schema_store
267+
.writer_schema_store
264268
.as_ref()
265269
.ok_or_else(|| ArrowError::ParseError("Schema store unavailable".into()))?;
266270
let writer_schema = store.lookup(&new_fingerprint).ok_or_else(|| {
@@ -282,23 +286,14 @@ impl Decoder {
282286
Ok(())
283287
}
284288

285-
#[inline]
286-
fn evict_cache(&mut self) {
287-
while self.cache.len() > self.max_cache_size {
288-
if let Some(lru_key) = self.cache.keys().next().cloned() {
289-
self.cache.shift_remove(&lru_key);
290-
}
291-
}
292-
}
293-
294289
/// Returns the number of rows that can be added to this decoder before it is full.
295290
pub fn capacity(&self) -> usize {
296-
self.batch_size.saturating_sub(self.decoded_rows)
291+
self.remaining_capacity
297292
}
298293

299294
/// Returns true if the decoder has reached its capacity for the current batch.
300295
pub fn batch_is_full(&self) -> bool {
301-
self.capacity() == 0
296+
self.remaining_capacity == 0
302297
}
303298
}
304299

@@ -312,7 +307,6 @@ pub struct ReaderBuilder {
312307
reader_schema: Option<AvroSchema<'static>>,
313308
writer_schema_store: Option<SchemaStore<'static>>,
314309
active_fingerprint: Option<Fingerprint>,
315-
static_store_mode: bool,
316310
decoder_cache_size: usize,
317311
}
318312

@@ -325,7 +319,6 @@ impl Default for ReaderBuilder {
325319
reader_schema: None,
326320
writer_schema_store: None,
327321
active_fingerprint: None,
328-
static_store_mode: false,
329322
decoder_cache_size: 20,
330323
}
331324
}
@@ -367,20 +360,18 @@ impl ReaderBuilder {
367360
active_decoder: RecordDecoder,
368361
active_fingerprint: Option<Fingerprint>,
369362
reader_schema: Option<AvroSchema<'static>>,
370-
schema_store: Option<SchemaStore<'static>>,
371-
static_store_mode: bool,
363+
writer_schema_store: Option<SchemaStore<'static>>,
372364
) -> Decoder {
373365
Decoder {
374366
batch_size: self.batch_size,
375-
decoded_rows: 0,
367+
remaining_capacity: self.batch_size,
376368
active_fingerprint,
377369
active_decoder,
378370
cache: IndexMap::new(),
379371
max_cache_size: self.decoder_cache_size,
380372
reader_schema,
381373
utf8_view: self.utf8_view,
382-
schema_store,
383-
static_store_mode,
374+
writer_schema_store,
384375
strict_mode: self.strict_mode,
385376
pending_schema: None,
386377
}
@@ -397,39 +388,36 @@ impl ReaderBuilder {
397388
})?;
398389
let record_decoder =
399390
self.make_record_decoder(&writer_schema, self.reader_schema.as_ref())?;
400-
Ok(self.make_decoder_with_parts(record_decoder, None, None, None, true))
391+
Ok(self.make_decoder_with_parts(record_decoder, None, None, None))
401392
}
402393
None => {
403394
let reader_schema = self.reader_schema.clone().ok_or_else(|| {
404395
ArrowError::ParseError("Reader schema required for raw Avro".into())
405396
})?;
406397
let (init_fingerprint, initial_decoder) =
407-
match (&self.writer_schema_store, self.active_fingerprint) {
398+
if let (Some(schema_store), Some(fingerprint)) =
399+
(&self.writer_schema_store, self.active_fingerprint)
400+
{
408401
// An initial fingerprint is provided, use it to look up the first schema.
409-
(Some(schema_store), Some(fingerprint)) => {
410-
let writer_schema =
411-
schema_store.lookup(&fingerprint).ok_or_else(|| {
412-
ArrowError::ParseError(
413-
"Active fingerprint not found in schema store".into(),
414-
)
415-
})?;
416-
let decoder =
417-
self.make_record_decoder(writer_schema, Some(&reader_schema))?;
418-
(Some(fingerprint), decoder)
419-
}
402+
let writer_schema = schema_store.lookup(&fingerprint).ok_or_else(|| {
403+
ArrowError::ParseError(
404+
"Active fingerprint not found in schema store".into(),
405+
)
406+
})?;
407+
let decoder =
408+
self.make_record_decoder(writer_schema, Some(&reader_schema))?;
409+
(Some(fingerprint), decoder)
410+
} else {
420411
// No initial fingerprint; the first record must contain one.
421-
// A temporary decoder is created from the reader schema.
422-
_ => {
423-
let decoder = self.make_record_decoder(&reader_schema, None)?;
424-
(None, decoder)
425-
}
412+
// A decoder is created from the reader schema only.
413+
let decoder = self.make_record_decoder(&reader_schema, None)?;
414+
(None, decoder)
426415
};
427416
Ok(self.make_decoder_with_parts(
428417
initial_decoder,
429418
init_fingerprint,
430419
Some(reader_schema),
431420
self.writer_schema_store.clone(),
432-
self.static_store_mode,
433421
))
434422
}
435423
}
@@ -493,18 +481,6 @@ impl ReaderBuilder {
493481
self
494482
}
495483

496-
/// If `true`, all schemas must be pre-registered in the `SchemaStore`.
497-
///
498-
/// When this mode is enabled, decoding will fail if a schema fingerprint is
499-
/// encountered that does not already exist in the store. This prevents the
500-
/// dynamic resolution of schemas and ensures that only known schemas are used.
501-
///
502-
/// Defaults to `false`.
503-
pub fn with_static_store_mode(mut self, enabled: bool) -> Self {
504-
self.static_store_mode = enabled;
505-
self
506-
}
507-
508484
/// Set the maximum number of decoders to cache.
509485
///
510486
/// When dealing with Avro files that contain multiple schemas, we may need to switch
@@ -521,20 +497,13 @@ impl ReaderBuilder {
521497
self.writer_schema_store.as_ref(),
522498
self.reader_schema.as_ref(),
523499
self.active_fingerprint.as_ref(),
524-
self.static_store_mode,
525500
) {
526-
(Some(_), None, _, _) => Err(ArrowError::ParseError(
501+
(Some(_), None, _) => Err(ArrowError::ParseError(
527502
"Reader schema must be set when writer schema store is provided".into(),
528503
)),
529-
(None, _, Some(_), _) => Err(ArrowError::ParseError(
504+
(None, _, Some(_)) => Err(ArrowError::ParseError(
530505
"Active fingerprint requires a writer schema store".into(),
531506
)),
532-
(None, _, _, true) => Err(ArrowError::ParseError(
533-
"static_store_mode=true requires a writer schema store".into(),
534-
)),
535-
(Some(_), _, None, true) => Err(ArrowError::ParseError(
536-
"static_store_mode=true requires an active fingerprint".into(),
537-
)),
538507
_ => Ok(()),
539508
}
540509
}
@@ -780,32 +749,6 @@ mod test {
780749
assert_eq!(store.fingerprint_algorithm(), FingerprintAlgorithm::Rabin);
781750
}
782751

783-
#[test]
784-
fn test_static_store_mode_ignores_subsequent_prefix() {
785-
let (store, fp_int, fp_long, schema_int, _schema_long) = make_two_schema_store();
786-
let mut decoder = ReaderBuilder::new()
787-
.with_batch_size(8)
788-
.with_reader_schema(schema_int.clone())
789-
.with_writer_schema_store(store)
790-
.with_active_fingerprint(fp_int)
791-
.with_static_store_mode(true)
792-
.build_decoder()
793-
.expect("build decoder");
794-
let prefix = make_prefix(fp_long);
795-
match decoder.decode(&prefix) {
796-
Err(ArrowError::ParseError(_)) => {
797-
assert!(
798-
decoder.pending_schema.is_none(),
799-
"no schema switch should be staged"
800-
);
801-
}
802-
Ok(n) => {
803-
panic!("decode unexpectedly succeeded (consumed {n} bytes) in static_store_mode")
804-
}
805-
Err(e) => panic!("unexpected error kind: {e}"),
806-
}
807-
}
808-
809752
#[test]
810753
fn test_unknown_fingerprint_is_error() {
811754
let (mut store, fp_int, _fp_long, schema_int, _schema_long) = make_two_schema_store();
@@ -833,6 +776,24 @@ mod test {
833776
);
834777
}
835778

779+
#[test]
780+
fn test_missing_initial_fingerprint_error() {
781+
let (store, _fp_int, _fp_long, schema_int, _schema_long) = make_two_schema_store();
782+
let mut decoder = ReaderBuilder::new()
783+
.with_batch_size(8)
784+
.with_reader_schema(schema_int.clone())
785+
.with_writer_schema_store(store)
786+
.build_decoder()
787+
.expect("build decoder");
788+
let buf = [0x02u8, 0x00u8];
789+
let err = decoder.decode(&buf).expect_err("decode should error");
790+
let msg = format!("{err}");
791+
assert!(
792+
msg.contains("Expected single‑object encoding fingerprint"),
793+
"unexpected error message: {msg}"
794+
);
795+
}
796+
836797
#[test]
837798
fn test_utf8view_support() {
838799
let schema_json = r#"{

0 commit comments

Comments
 (0)