Skip to content

initial iceberg table sink design #32460

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open

Conversation

martykulma
Copy link
Contributor

An initial design for adding a Iceberg Table sink to Materialize.

Motivation

Much interest in this topic there is.

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

Copy link
Contributor

@bkirwi bkirwi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exciting times!

- Support multiple object store implementations
- S3, GCS, ABS
- Support control over output format
- File Type: Parquet, Avro, ORC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a lot of things to implement before this counts as successful! Is there any way to thin down this list further?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes - it absolutely is! The design doc just calls for success and MVP, nothing about futures, which in this case lots of these are. I'll create a Possible Futures section for some of these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to out of scope

```
*Appending data to iceberg requires uploading metadata and data files, and reading data requires accessing metadata to know which data files need to be retrieved. It is inefficient to write very small updates, and very resource intensive for readers to perform the reads when there are many small updates. To give users control over the dimensions of the appended data, Materialize will allow users to optionally specify a minumum size and a maximum period for the sink. Materialize will append data to the iceberg table if either:*
- *the size of the append is above the minimum size*
- *the maximum period of time has passed since the last append*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my past life integrating this sort of thing, it was typically most useful to have a fixed period - ie. uploading every hour on the hour. This was because it made it easier to schedule downstream batch stuff... if the downstream cron kicks off every hour on the hour as well, aligning the periods minimizes delays and wasted work.

I can imagine it being annoying to support both styles, though. Do we have a sense of how our early users might hope to integrate this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this up based on competitor analysis (PG WAL also happens to work this way). I don't have info on early users yet, but I'll be speaking with a potential consumer of this today. Will bring it up!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed size as a constraint and refined this as just COMMIT INTERVAL, which for right now is wall clock time, but I'm getting some additional feedback.


To manage transactions, the coordinator will tracking the size of the current iceberg append as well as the last successful append time. Once an append has reached either the minimum size or the maximum period, if set, writes are flushed and the append is committed to iceberg. Materialize will respect the MZ timestamp. All updates that happen within the same tick will be committed together, even if that exceeds the time.

If neither minimum size or maximum period is set, Materialize will perform appends according to the MZ timestamp.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on "according to the MZ timestamp"? Is that every time the frontier advances, or a separate append for every timestamp in the data, or?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was intending to have this be when the frontier advances! Will update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Appending data includes insert, update, and delete row operations. To perform deletes, Materialize will generate [equality delete files](https://iceberg.apache.org/spec/#equality-delete-files) that match the fields of the primary key. Inserts are performed via data files. Updates contain delete files and data files in the same commit. Materialize will enforce a 512MB limit (which I borrowed from S3Tables) on the size of the parquet files. Appends that are larger than 512MB will be composed of multiple files.


Appends to the iceberg table will utilize `Fast Append`, which avoids rewrites of manifest files (see [here](https://iceberg.apache.org/spec/#snapshots)). As part of the write, Materialize will store information in the snapshot [Summary properties](https://iceberg.apache.org/spec/#optional-snapshot-summary-fields). The properies field is a `HashMap<String, String>`, in which Materialize will store the timestamp and sink version number. To determine the latest append performed by Materialize, retrieve the most recent snapshots and examine properties.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely nicer than trying to encode the metadata in the ID!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a risk of external maintenance destroying this metadata? (eg. if the latest snapshot we've written is rewritten.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be lost if the snapshot expiration is very aggressive and MZ doesn't write anything to that iceberg table over a long enough period (e.g. setting the replication factor to 0 for the cluster for a while).

The snapshot and data files are immutable once written, so a compaction would not affect us. My understanding from reading is that the snapshot summary metadata is that every client creates it separately. A compaction is expected to yield a snapshot with no mz_ keys.

If we end up in a situation with no MZ snapshots, they'll have to recreate the sink. This is expected to be documented, I'll add a blurb to the design.


### Iceberg Table Maintenance

Iceberg, being a table specification, provides [guidance on iceberg table maintenance](https://iceberg.apache.org/docs/latest/maintenance/) tasks. It is up to the engine (Spark, Flink, etc.) to provide the implementation. S3Tables provides this functionality, where using a REST catalog would not. Implementing this functionality in Materialize would require a long running, asynchronous service. This could run on a replica, but will also require some method of fencing to ensure that multiple instances of this service are not actively trying to perform maintenance tasks on the same table. Because this service will need to read, compact, and write back data to the iceberg table, it will compete with dataflows in the same replica, making capacity planning more complex and affecting the performance characteristics of the sink.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we wanted to have materialize implement this maintenance work, it seems reasonable to me to have the sink itself complete it, since it is already a long running asynchronous service that's connected to the catalog, and could order writes to avoid conflicts...

(Would still take additional resources, though. If we think we can punt this work to someone else to start, that seems very convenient!)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants