Skip to content

Implement arrow-avro SchemaStore and Fingerprinting To Enable Schema Resolution #8006

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

jecsand838
Copy link
Contributor

Which issue does this PR close?

Rationale for this change

Apache Avro’s single object encoding prefixes every record with the marker 0xC3 0x01 followed by a Rabin schema fingerprint so that readers can identify the correct writer schema without carrying the full definition in each message.
While the current arrow‑avro implementation can read container files, it cannot ingest these framed messages or handle streams where the writer schema changes over time.

The Avro specification recommends computing a 64‑bit CRC‑64‑AVRO (Rabin) hashed fingerprint of the parsed canonical form of a schema to look up the Schema from a local schema store or registry.

This PR introduces SchemaStore and fingerprinting to enable:

  • Zero‑copy schema identification for decoding streaming Avro messages published in single‑object format (i.e. Kafka, Pulsar, etc) into Arrow.
  • Dynamic schema evolution by laying the foundation to resolve writer reader schema differences on the fly.
    NOTE: Schema Resolution support in Codec and RecordDecoder coming the next PR.

What changes are included in this PR?

Area Highlights
schema.rs New Fingerprint, SchemaStore, and SINGLE_OBJECT_MAGIC; canonical‑form generator; Rabin fingerprint calculator; compare_schemas helper.
reader/mod.rs Decoder now detects the C3 01 prefix, extracts the fingerprint, looks up the writer schema in a SchemaStore, and switches to an LRU cached RecordDecoder without interrupting streaming; supports static_store_mode to skip the 2 byte peek for high‑throughput fixed‑schema pipelines.
ReaderBuilder New builder configuration methods: .with_writer_schema_store, .with_active_fingerprint, .with_static_store_mode, .with_reader_schema, .with_max_decoder_cache_size, with rigorous validation to prevent misconfiguration.
codec.rs Added AvroFieldBuilder::with_reader_schema and a stubbed AvroField::resolve_from_writer_and_reader entry point for full writer/reader schema resolution.
Unit tests New tests covering fingerprint generation, store registration/lookup, schema switching, unknown‑fingerprint errors, and interaction with UTF8‑view decoding.
Docs & Examples Extensive inline docs with examples on all new public methods / structs.

Are these changes tested?

Yes. New tests cover:

  1. Fingerprinting against the canonical examples from the Avro spec
  2. SchemaStore behavior deduplication, duplicate registration, and lookup.
  3. Decoder fast‑path with static_store_mode=true, ensuring the prefix is treated as payload, the 2 byte peek is skipped, and no schema switch is attempted.

Are there any user-facing changes?

N/A

Follow-Up PRs

  1. Implement Schema Resolution Functionality in Codec and RecordDecoder
  2. Improve arrow-avro errors + add more benchmarks & examples to prepare for public release

@github-actions github-actions bot added the arrow Changes to the arrow crate label Jul 26, 2025
@jecsand838 jecsand838 force-pushed the avro-reader-schema-store branch 4 times, most recently from b549452 to a4a4df8 Compare July 26, 2025 03:27
@jecsand838
Copy link
Contributor Author

@alamb @scovich I apologize in advance for how large this one got! A substantial portion of the updates are detailed doc comments, examples, and tests. Functionally I don't think this PR is as large as it seems. However, let me know if this needs to be broken up and I'd be happy to do so.

@jecsand838 jecsand838 force-pushed the avro-reader-schema-store branch from a4a4df8 to ca39cba Compare July 26, 2025 04:06
… and made the schema module public. Integrated new `SchemaStore` to the `Decoder` in `reader/mod.rs`. Stubbed out `AvroField::resolve_from_writer_and_reader` in `codec.rs`. Added new tests to cover changes
@jecsand838 jecsand838 force-pushed the avro-reader-schema-store branch from ca39cba to 4890faa Compare July 26, 2025 04:42
@alamb
Copy link
Contributor

alamb commented Jul 26, 2025

@veronica-m-ef I wonder if you might have some time to help review this PR, as you previously contributed to this code?

@alamb
Copy link
Contributor

alamb commented Jul 26, 2025

Perhaps @svencowart you might also be interested and able to help review this PR?

Copy link
Contributor

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely a big piece of work, but I don't know how to split up the functionality of this PR -- except some of the cosmetic changes, code movement, and variable renames should ideally be eliminated or moved to a different PR for clarity.

Comment on lines +149 to +150
writer: &'a Schema<'a>,
reader: &'a Schema<'a>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: I guess this is a low-level avro schema instance, not the arrow schema Schema?
At least, I don't remember arrow Schema objects having a lifetime parameter?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah -- SchemaRef is from arrow-schema, but Schema is crate-local.

/// <https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
#[inline]
pub fn generate_canonical_form(schema: &Schema) -> String {
serde_json::to_string(&parse_canonical_json(schema)).unwrap()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unwrap because the to_string call can never fail for some reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cleaned this up in my last commit. Ty for catching this.

Comment on lines 315 to 317
let canonical = generate_canonical_form(schema);
match hash_type {
FingerprintAlgorithm::Rabin => Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting something to a string just so we can hash it seems really expensive... but if I understand correctly, the avro spec mandates it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct unfortunately. The fingerprints are supposed to be of a Schema in canonical form

Luckily, there shouldn't be a scenario where we need to parse a schema and fingerprint it while decoding.

/// The hashing algorithm used for generating fingerprints.
fingerprint_algorithm: FingerprintAlgorithm,
/// A map from a schema's fingerprint to the schema itself.
schemas: HashMap<Fingerprint, Schema<'a>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This map probing seems vulnerable to hash collisions, because we probe only by hash?
(as opposed to passing the schema, probing by hash, and then confirming against the schema)?

From the spec:

fingerprints are not meant to provide any security guarantees, even the longer SHA-256-based ones. Most Avro applications should be surrounded by security measures that prevent attackers from writing random data and otherwise interfering with the consumers of schemas. We recommend that these surrounding mechanisms be used to prevent collision and pre-image attacks (i.e., “forgery”) on schema fingerprints, rather than relying on the security properties of the fingerprints themselves.

Granted, the chances of a collision should be vanishingly small for a reasonable number of schemas and a uniformly distributed 64-bit hash, so maybe we don't care?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to add some improvements to this logic when I got back in here for the extra hash types. However I went ahead and added a check to the register function. It was pretty trivial and was worth it.

///
/// An `Option` containing a clone of the `Schema` if found, otherwise `None`.
pub fn lookup(&self, fp: &Fingerprint) -> Option<Schema<'a>> {
self.schemas.get(fp).cloned()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an expensive clone (for a big schema)... should we return a reference to the schema instead, and let the caller clone it if they wish?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a great suggestion, ty for making it. This was included in my last commit as well.

}

fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these methods deleted? Or moved? Or just github is giving a messy diff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods got deleted. I was able to confirm that only the Reader would ever expect a Header and was able to change build and build_decoder to this:

    /// Create a [`Reader`] from this builder and a `BufRead`
    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, ArrowError> {
        self.validate()?;
        let header = read_header(&mut reader)?;
        let decoder = self.make_decoder(Some(&header))?;
        Ok(Reader {
            reader,
            header,
            decoder,
            block_decoder: BlockDecoder::default(),
            block_data: Vec::new(),
            block_cursor: 0,
            finished: false,
        })
    }

    /// Create a [`Decoder`] from this builder.
    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
        self.validate()?;
        self.make_decoder(None)
    }

///
/// When enabled, string data from Avro files will be loaded into
/// Arrow's StringViewArray instead of the standard StringArray.
pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be considerable code movement in this part of the file... makes it hard to see what meaningfully changed. Is there a way to clean up the diff?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% I'm working on that now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried to clean up the diff with my latest pushes. Let me know if that's better and easier to follow.

Comment on lines 630 to 634
// No initial fingerprint; the first record must contain one.
// A temporary decoder is created from the reader schema.
_ => {
let dec = self.make_record_decoder(&reader_schema, None)?;
(None, dec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks error-prone... but I guess there's no way to avoid it if the spec allows this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more of the default state that we'd need to cover. What we could do is if the schema_store is set without an active_fingerprint, then throw an explicit error in the Decoder that is more clear than ArrowError::ParseError(format!("Unknown fingerprint: {new_fingerprint:?}")).

I'll clean that up in the morning, this is a good call out!

Copy link
Contributor Author

@jecsand838 jecsand838 Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this check + early failure to the beginning of Decoder::decoder to help clean this up a bit:

        if self.active_fingerprint.is_none()
            && self.writer_schema_store.is_some()
            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
        {
            return Err(ArrowError::ParseError(
                "Expected single‑object encoding fingerprint prefix for first message \
                     (writer_schema_store is set but active_fingerprint is None)"
                    .into(),
            ));
        }

Let me know what you think.

Comment on lines 631 to 632
// A temporary decoder is created from the reader schema.
_ => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it valid to ignore the case where we have Some(fp) but no schema store? That seems like an error?

Copy link
Contributor Author

@jecsand838 jecsand838 Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% It's an error, I'm just doing that check at the start, in the validate method:

            (None, _, Some(_), _) => Err(ArrowError::ParseError(
                "Active fingerprint requires a writer schema store".into(),
            )),

Comment on lines 637 to 643
Ok(Decoder {
batch_size: self.batch_size,
decoded_rows: 0,
active_fp: init_fp,
active_decoder: initial_decoder,
cache: HashMap::new(),
lru: VecDeque::new(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two constructor calls seem to have a lot of redundancy. Would it be worthwhile to factor out the args that actually differ, and create the decoder only once, outside the match?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was a good call out. I included this abstraction in my latest commit.

@jecsand838 jecsand838 force-pushed the avro-reader-schema-store branch from 9dde02c to da7b1b9 Compare July 28, 2025 04:41
Copy link
Contributor

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heading out the door for a couple days, but this refresh looks way better at a glance.

Will hopefully get a more thorough pass on Wed

Comment on lines 286 to 288
while self.cache.len() > self.max_cache_size {
if let Some(lru_key) = self.cache.keys().next().cloned() {
self.cache.shift_remove(&lru_key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will pay quadratic work for a cache with a lot of extra entries. Hopefully that's a rare case tho?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... looking at the code, there is only one call site for this method, and there will be at most one extra entry to remove. We should probably just bake that in at the call site, instead of splitting the logic up like this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed this change up in my latest commit. That was a good catch.

jecsand838 and others added 2 commits July 28, 2025 12:05
@jecsand838 jecsand838 force-pushed the avro-reader-schema-store branch 4 times, most recently from 0608fd1 to 98ae29a Compare July 29, 2025 04:00
@jecsand838 jecsand838 force-pushed the avro-reader-schema-store branch from 98ae29a to 25c3899 Compare July 29, 2025 04:06
@jecsand838
Copy link
Contributor Author

Heading out the door for a couple days, but this refresh looks way better at a glance.

Will hopefully get a more thorough pass on Wed

@scovich Really appreciate the solid review on a bigger PR like this. I got those changes pushed up and the code is definitely looking much better.

@jecsand838 jecsand838 requested a review from scovich July 29, 2025 04:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
arrow Changes to the arrow crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants