Skip to content

[lake/hudi] Introduce Hudi source split planner#3456

Open
fhan688 wants to merge 3 commits into
apache:mainfrom
fhan688:Introduce-Hudi-source-split-planner
Open

[lake/hudi] Introduce Hudi source split planner#3456
fhan688 wants to merge 3 commits into
apache:mainfrom
fhan688:Introduce-Hudi-source-split-planner

Conversation

@fhan688

@fhan688 fhan688 commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Purpose

Linked issue: #3276

This PR introduces the first planner-only Hudi lake source implementation for Fluss.

Currently, the Hudi lake storage module supports basic Hudi catalog integration, but
HudiLakeStorage#createLakeSource does not return a usable lake source. As a result, Fluss cannot
plan readable lake splits for data already stored in Hudi.

This change adds Hudi source split planning based on a completed Hudi instant. Record reading, limit
pushdown, and Hudi tiering writer support remain explicitly unsupported and can be implemented in
follow-up PRs.

Brief change log

  • Add HudiLakeSource, HudiSplit, HudiSplitPlanner, and HudiSplitSerializer.
  • Wire HudiLakeStorage#createLakeSource to return a Hudi lake source.
  • Add HudiTableInfo to resolve Hudi catalog table metadata, meta client, completed timeline,
    filesystem view, table type, partition fields, and bucket-aware metadata.
  • Plan Hudi splits from the requested snapshot instant after validating that the instant exists in
    the completed Hudi timeline.
  • Support split planning for:
    • COW tables through latest base files before or on the requested instant.
    • MOR tables through latest merged file slices before or on the requested instant.
  • Persist Fluss bucket and partition metadata into Hudi table properties for planner-side recovery.
  • Return bucket -1 for bucket-unaware Fluss log tables.
  • Add and adjust UT coverage for:
    • Hudi lake source planner/serializer wiring.
    • Explicit unsupported limit and record reader behavior.
    • Hudi split serialization version handling.
    • Hudi split planner planning and missing instant behavior.
    • Fluss bucket and partition metadata persisted in Hudi table properties.
    • Partition value extraction from Hudi partition paths.

Tests

  • mvn -pl fluss-lake/fluss-lake-hudi -am -DskipITs -Dcheckstyle.skip=true -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false -Dtest=HudiLakeSourceTest,HudiSplitSerializerTest,HudiTableInfoTest,HudiConversionsTest,HudiSplitPlannerTest test
  • mvn -pl fluss-lake/fluss-lake-hudi -am -DskipITs -Dcheckstyle.skip=true -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false test

The Hudi module test run passed with 54 tests and 0 failures.

API and Format

This PR does not change public Fluss APIs or Fluss storage format.

It adds internal Hudi table properties to preserve Fluss metadata for Hudi source split planning:

  • fluss.bucket.keys
  • fluss.bucket-aware
  • fluss.partition.keys

Documentation

No user-facing documentation is added in this PR because this is a planner-only foundation. Hudi
record reading and Hudi tiering writer support are still not exposed as completed user workflows.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds an initial (planner-only) Hudi lake source implementation to Fluss, enabling split planning against a completed Hudi instant so that Fluss can plan readable lake splits for data stored in Hudi.

Changes:

  • Introduce Hudi lake source components for split planning (HudiLakeSource, HudiSplit, HudiSplitPlanner, HudiSplitSerializer) and wire them into HudiLakeStorage#createLakeSource.
  • Add HudiTableInfo to resolve Hudi catalog/table metadata, timeline, filesystem view, partition fields, and bucket-awareness metadata.
  • Add unit tests covering planner wiring, serialization version handling, split planning success/failure cases, and Fluss metadata persistence into Hudi table properties.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/HudiLakeStorage.java Returns a real Hudi lake source instead of throwing unsupported.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiLakeSource.java New planner-only Hudi LakeSource implementation (filters/limit/reader unsupported).
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplit.java New LakeSplit implementation for Hudi file slices + bucket/partition metadata.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitPlanner.java Plans splits for COW/MOR tables at a requested completed instant.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/source/HudiSplitSerializer.java Serializer for HudiSplit with explicit version handling.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiConversions.java Persists Fluss bucket/partition metadata into Hudi table properties.
fluss-lake/fluss-lake-hudi/src/main/java/org/apache/fluss/lake/hudi/utils/HudiTableInfo.java New resolver for Hudi catalog/table options, meta client, timeline/view, partitions, bucket-awareness.
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiLakeSourceTest.java Tests planner/serializer wiring and explicit unsupported behaviors.
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiSplitPlannerTest.java Tests split planning for a completed instant and failure when instant missing.
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/source/HudiSplitSerializerTest.java Tests round-trip serialization and unknown version rejection.
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/utils/HudiConversionsTest.java Tests Fluss metadata persistence into Hudi table properties.
fluss-lake/fluss-lake-hudi/src/test/java/org/apache/fluss/lake/hudi/utils/HudiTableInfoTest.java Tests partition value extraction from Hudi partition paths.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@luoyuxia

luoyuxia commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

@XuQianJin-Stars Could you please help review this one?

@fhan688 fhan688 closed this Jun 9, 2026
@fhan688 fhan688 reopened this Jun 9, 2026
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(
hudiTableInfo.getEngineContext(), hudiTableInfo.getMetaClient(), false);
if (partitionPaths.isEmpty()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FSUtils.getAllPartitionPaths(...) returns an empty list for a partitioned table that has no data written yet. With this fallback we then ask getLatestMergedFileSlicesBeforeOrOn(""), which is meaningless on a partitioned layout. In addition, HudiTableInfo#partitionValues("") returns emptyList() for that case, so the resulting HudiSplit.partition() has a length different from partitionFields.size() and downstream consumers that rely on partition().size() == partitionColumns.size() will silently misalign.

Suggestion: Only fall back to "" when the table is non-partitioned (partitionFields.isEmpty()). For a partitioned table with no discovered partitions, return an empty split list and log at debug.

}

@Override
public byte[] serialize(HudiSplit hudiSplit) throws IOException {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. InstantiationUtils.deserializeObject returns Object with no type check — a crafted byte stream can deserialize to arbitrary classes on the classpath. This violates the project security rule "Deserialization: always not trust, use safe load method."
  2. Hudi internal types like FileSlice / HoodieBaseFile do not guarantee a stable serialVersionUID across versions, so this is also a forward-compatibility risk.

Minimum fix: cast and validate the result, throwing IOException on type mismatch.

Preferred fix: follow PaimonSplitSerializer's field-by-field serialization — serialize fileSlice via Hudi's own Avro/JSON serializer (or its individual fields), and write bucket / partition explicitly. Don't bind our wire format to Hudi's internal Java serialization layout.


@Override
public void close() {
fileSystemView.close();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() should be idempotent and must release all resources even if one of them throws. Currently:

  1. If fileSystemView.close() throws, closeCatalog(hudiCatalog) is skipped → catalog leak.
  2. Calling close() twice will throw on the second invocation.

tablePath,
snapshotTime);
return splits;
} catch (IOException e) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first catch is redundant, and the second one converts every RuntimeException (e.g. IllegalStateException, HoodieException) into an IOException, hiding the original type and making operational diagnosis harder.


@Override
public List<HudiSplit> plan() throws IOException {
String snapshotTime = String.valueOf(snapshotId);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hudi instant time is a string (canonical format is a 17-digit timestamp like 20260608010101000). Round-tripping it through a long and then String.valueOf will drop any leading zeros. Today, Hudi writes don't normally produce leading-zero instants, but repaired or imported tables can. Note also that LakeSource.PlannerContext#snapshotId() returning long is itself a lossy carrier for Hudi.

  • Document the constraint that Hudi snapshotIds must not have leading zeros.
  • Or use String.format("%017d", snapshotId) to enforce the 17-digit instant format.
  • Add a "leading-zero instant" case to HudiSplitPlannerTest.

basePath,
Collections.unmodifiableList(new ArrayList<>(partitionFields)),
bucketAware);
} catch (Exception e) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If HoodieTableFileSystemView has already been constructed and a subsequent step (e.g. splitCommaSeparated(...) / resolveBucketAware(...)) throws, the FSView is leaked.

Suggestion: hold fsView in a local and IOUtils.closeQuietly it in the catch as well.

@Override
public LakeTieringFactory<?, ?> createLakeTieringFactory() {
throw new UnsupportedOperationException(
"HudiLakeStorage is currently a scaffold and does not support creating a "

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createLakeSource is implemented in this PR, so calling the storage "a scaffold" is no longer accurate. Please rephrase along the lines of:

"Hudi lake tiering writer is not implemented yet (tracking <issue/PR link>)."

so users can locate the tracking issue.

for (String partitionPath : partitionPaths) {
splits.addAll(planPartition(hudiTableInfo, snapshotTime, partitionPath));
}
LOG.debug(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only logging on the success path makes "why is my query empty?" investigations harder. Please log at info/warn when splits.isEmpty() (especially after a successful containsInstant check), since that usually signals a filter or partition mismatch.

return Collections.emptyList();
}

String[] pathSegments = partitionPath.split("/");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current tests cover "unknown field", "duplicate field" and "mixed style", but not the case where the number of segments equals partitionFields.size() yet the field names don't match (e.g. dt=20260608/dt=20260609). The implementation correctly throws via the null check, but please add a unit test for this edge case.

recordKeyField); // use primary key as index key
}

// buket keys column

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trivial fix: buketbucket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants