Skip to content

[lake/tiering] Support reporting watermark to Paimon snapshot#3420

Open
Shawn-Hx wants to merge 1 commit into
apache:mainfrom
Shawn-Hx:FLUSS-3419
Open

[lake/tiering] Support reporting watermark to Paimon snapshot#3420
Shawn-Hx wants to merge 1 commit into
apache:mainfrom
Shawn-Hx:FLUSS-3419

Conversation

@Shawn-Hx

@Shawn-Hx Shawn-Hx commented Jun 2, 2026

Copy link
Copy Markdown

Purpose

Linked issue: close #3419

This PR supports reporting watermarks to Paimon snapshots during lake tiering.

Brief change log

  • Introduced watermark extraction for Fluss rows in the lake tiering pipeline.
  • Added watermark propagation through lake write results and tiering commit.
  • Aggregated watermarks across tiered buckets before committing to the lake.
  • Reported the aggregated watermark to Paimon snapshots when committing.
  • Updated serializers and tests to support nullable and negative watermark values.
  • Added coverage for watermark extraction, commit aggregation, and Paimon snapshot watermark reporting.

Tests

Add tests in:

  • PaimonWriteResultSerializerTest
  • PaimonTieringTest
  • TieringCommitOperatorTest
  • SimpleWatermarkExtractorTest

API and Format

This change extends the lake tiering API to carry watermark information through write results and committers.

Documentation

No user-facing documentation needs to be added.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds end-to-end watermark propagation to the Fluss lake tiering pipeline and reports the aggregated watermark into committed Paimon snapshots, enabling event-time progress tracking on the lake side.

Changes:

  • Introduces WatermarkExtractor and a LakeWriteResult contract to carry per-split watermarks through tiering writers and into committers.
  • Adds SimpleWatermarkExtractor (parsing Flink-style watermark table properties) and wires it into TieringSplitReader/WriterInitContext, with commit-side aggregation.
  • Updates Paimon tiering components (writer, committer, serializer) and tests to persist/report nullable (and negative) watermark values.

Reviewed changes

Copilot reviewed 37 out of 37 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java Updates test plugin write result type to implement LakeWriteResult.
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResultSerializerTest.java Adds serializer tests for watermark field + v1 backward compatibility.
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringTest.java Adds integration coverage for watermark extraction + reporting to Paimon snapshot.
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResultSerializer.java Bumps serializer version and encodes nullable watermark alongside commit message.
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonWriteResult.java Implements LakeWriteResult and carries watermark in write result.
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeWriter.java Extracts/aggregates per-writer max watermark while writing records.
fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java Passes aggregated watermark into committable creation.
fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java Updates test init context to satisfy new watermark extractor method.
fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceWriteResult.java Implements LakeWriteResult (default watermark = null).
fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java Updates committer API to accept watermark parameter.
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java Updates test init context to satisfy new watermark extractor method.
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java Implements LakeWriteResult (default watermark = null).
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java Updates committer API to accept watermark parameter.
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java Updates test write result to implement LakeWriteResult.
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeCommitter.java Updates test committer API to accept watermark parameter.
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingWriteResult.java Implements LakeWriteResult for test results and adds optional watermark.
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TestingLakeTieringFactory.java Captures watermark passed into committer for assertions.
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractorTest.java Adds unit tests for watermark parsing/extraction behavior.
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorTest.java Adds commit-aggregation tests covering watermark behavior.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/watermark/SimpleWatermarkExtractor.java New extractor implementation that parses simple watermark definitions.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java Extends writer init context to carry WatermarkExtractor.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java Creates watermark extractor per table and passes it to lake writers; bounds type to LakeWriteResult.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceReader.java Bounds WriteResult to LakeWriteResult.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSourceFetcherManager.java Bounds WriteResult to LakeWriteResult.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java Bounds WriteResult to LakeWriteResult (including builder).
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultTypeInfo.java Bounds WriteResult to LakeWriteResult.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java Version bump + embeds inner writeResult serializer version for state compatibility.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResultEmitter.java Bounds WriteResult to LakeWriteResult.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TableBucketWriteResult.java Bounds WriteResult to LakeWriteResult.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperatorFactory.java Bounds WriteResult to LakeWriteResult.
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java Aggregates watermark across buckets and passes it into lake committer.
fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java Adds watermarkExtractor() to writer init context.
fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriteResult.java New interface to expose optional per-write-result watermark.
fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeWriter.java Bounds WriteResult to LakeWriteResult.
fluss-common/src/main/java/org/apache/fluss/lake/writer/LakeTieringFactory.java Bounds WriteResult to LakeWriteResult.
fluss-common/src/main/java/org/apache/fluss/lake/watermark/WatermarkExtractor.java New interface for extracting watermarks from rows.
fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitter.java Adds watermark overload for toCommittable and updates generics bound.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +116 to +120
LOG.warn(
"Watermark rowtime column '{}' not found in row type for {}, "
+ "computed column is not supported for watermark extraction.",
tableInfo.getTablePath(),
rowtimeColumn);
Comment on lines +47 to +67
/**
* Converts a list of write results to a committable object with watermark.
*
* @param writeResults the list of write results
* @param watermark watermark to be committed
* @return the committable object
* @throws IOException if an I/O error occurs
*/
CommittableT toCommittable(List<WriteResult> writeResults, @Nullable Long watermark)
throws IOException;

/**
* Converts a list of write results to a committable object.
*
* @param writeResults the list of write results
* @return the committable object
* @throws IOException if an I/O error occurs
*/
CommittableT toCommittable(List<WriteResult> writeResults) throws IOException;
default CommittableT toCommittable(List<WriteResult> writeResults) throws IOException {
return toCommittable(writeResults, null);
}
Comment on lines +251 to +254
if (nonEmptyResults.size() < committableWriteResults.size()) {
// Empty results means some splits has not been processed, possibly caused by force
// completion. Do not update watermark here.
if (watermark != null) {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[tiering] Support reporting watermark to Paimon snapshot

2 participants