feat:(DNM) add a lsm-tree based FG reader#18987
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18987 +/- ##
============================================
- Coverage 68.24% 67.29% -0.96%
+ Complexity 29478 29100 -378
============================================
Files 2542 2545 +3
Lines 142541 142967 +426
Branches 17798 17877 +79
============================================
- Hits 97281 96203 -1078
- Misses 37254 38657 +1403
- Partials 8006 8107 +101
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for working on this! The PR adds a dedicated LSM file group reader and updates log-file parsing to recognize native parquet log files. The k-way merge structure and pattern handling look reasonable overall. A few items worth a closer look in the inline comments — most importantly, a regression in getFileIdFromLogPath for archive files, and the delete-log ordering_val field that the schema defines but the read path doesn't consume. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A few naming and simplification suggestions below — the main one being the magic regex group indices scattered across multiple methods; the others are minor dead-parameter and redundant-lookup nits in the loser tree.
| return logFileMatcher.get().group(1); | ||
| } | ||
| return FSUtils.getFileId(fileName); | ||
| } |
There was a problem hiding this comment.
🤖 Switching to matchLogFile here looks like a regression for archive files — the original LOG_FILE_PATTERN.matcher(...) matched both log|archive, but matchLogFile only returns matches when group(3).equals("log"). The existing testArchiveLogFileName test asserts getFileIdFromLogPath(archive_path) returns the file id, which would now throw InvalidHoodiePathException. Could you preserve the archive-supporting behavior, or use LOG_FILE_PATTERN directly here?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| } | ||
| if (!areSchemasProjectionEquivalent(fileRequiredSchema, readerSchema) || !requiredSchemaAndRenamedColumns.getRight().isEmpty()) { | ||
| UnaryOperator<T> projector = readerContext.getRecordContext() | ||
| .projectRecord(fileRequiredSchema, readerSchema, requiredSchemaAndRenamedColumns.getRight()); |
There was a problem hiding this comment.
🤖 The DELETE_LOG_SCHEMA defines ordering_val (nullable bytes), but here we always pass OrderingValues.getDefault() rather than reading it from the record. The existing pattern (BufferedRecords.fromDeleteRecord → RecordContext.getOrderingValue(DeleteRecord)) preserves the actual ordering value when it isn't commit-time. If the write side writes a real ordering_val for any event-time-ordered table, those deletes would lose to inserts they should override. Could you confirm what the writer puts in this field and read it accordingly?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| readerContext.getRecordContext().seal(record), | ||
| readerSchema, | ||
| readerContext.getRecordContext(), | ||
| orderingFieldNames, |
There was a problem hiding this comment.
🤖 The State enum values, ReaderState.state, and ReaderState.firstSameKeyIndex look like they're written by popWinner / markLoser / setChampionState but never read by the merge loop or anywhere else. If they're intended for a future optimization (e.g., skipping the inner merge loop when WINNER_WITH_NEW_KEY), worth either wiring that up or removing them to keep the loser-tree implementation easier to follow.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| this.metaClient = hoodieTableMetaClient; | ||
| this.storage = storage; | ||
|
|
||
| readerContext.setHasLogFiles(this.inputSplit.hasLogFiles()); |
There was a problem hiding this comment.
🤖 If a caller invokes multiple iterator getters (e.g. getClosableIterator() then getClosableHoodieRecordIterator()) on the same reader, this line overwrites lsmRecordIterator without closing the previous one — that earlier iterator (and any open parquet readers / spill files it holds) leaks until JVM exit. Could you guard with a null/closed check, or document that only one iterator may be obtained per reader instance?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| Path spillDirectory = Paths.get(spillBasePath); | ||
| Files.createDirectories(spillDirectory); | ||
| this.spillFile = Files.createTempFile(spillDirectory, SPILL_FILE_PREFIX, SPILL_FILE_SUFFIX).toFile(); | ||
| this.spillFile.deleteOnExit(); |
There was a problem hiding this comment.
🤖 If spill(...) throws (or Files.createTempFile does), and sourceIterator.close() in the finally also throws, the original exception is replaced and lost. Worth catching/suppressing the close exception so the underlying IOException surfaces — debugging "failed to spill" is much easier with the real cause.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| public static final String PATH_SEPARATOR = "/"; | ||
| public static final Pattern LOG_FILE_PATTERN = | ||
| Pattern.compile("^\\.([^._]+)_([^.]*)\\.(log|archive)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(\\.cdc)?)?$"); | ||
| public static final Pattern NATIVE_LOG_FILE_PATTERN = |
There was a problem hiding this comment.
🤖 The pattern uses [^_]+ for the file id, but Hudi file ids elsewhere in BASE_FILE_PATTERN allow [a-zA-Z0-9-]+ and LOG_FILE_PATTERN allows [^._]+. Are we sure native log file ids will never contain . or other special characters? Tightening this to [a-zA-Z0-9-]+ (or [^._]+ for consistency with the existing log pattern) would avoid accidental matches against unrelated .parquet paths.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| return Integer.compare(left.mergeOrder, right.mergeOrder); | ||
| } | ||
|
|
||
| private void markLoser(int loserIndex, int winnerIndex, int compareResult) { |
There was a problem hiding this comment.
🤖 nit: compareResult is accepted here but never referenced in the method body — could you drop it from the signature? The sameKey check re-derives the relationship directly from the keys, so the parameter is dead weight that makes callers look like they're communicating something meaningful to markLoser.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
|
|
||
| private void markLoser(int loserIndex, int winnerIndex, int compareResult) { | ||
| ReaderState<T> loser = leaves.get(loserIndex); | ||
| boolean sameKey = leaves.get(loserIndex).current.getRecordKey().equals(leaves.get(winnerIndex).current.getRecordKey()); |
There was a problem hiding this comment.
🤖 nit: loser already holds leaves.get(loserIndex), so leaves.get(loserIndex).current.getRecordKey() on this line could just be loser.current.getRecordKey() — a bit easier to read.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Describe the issue this Pull Request addresses
RFC-103 introduces an LSM tree file-group layout where base and log files are sorted by record key and merged with a streaming k-way merge. The reader side needs a dedicated implementation for that layout without changing the existing
HoodieFileGroupReaderpath.The design also uses native parquet log files instead of Avro log files with embedded parquet data blocks. Native data logs use
<fileId>_<writeToken>_<instant>_<version>.parquet, and native delete logs use<fileId>_<writeToken>_<instant>_<version>.delete.parquet, so common file-name parsing and file-system view classification need to recognize those files correctly.Summary and Changelog
Adds a separate LSM file-group reader for native parquet log files and updates common log-file parsing to recognize RFC-style native parquet data/delete logs.
Commit 1: feat:(DNM) add a lsm-tree based FG reader (
f0b63593dedd)HoodieLsmFileGroupReaderas a separate reader entry point instead of modifyingHoodieFileGroupReader.LsmFileGroupRecordIteratorto perform streaming sorted k-way merge over one active record per base/log file.BufferedRecordMergersemantics.HoodieReaderContextand added reader-side handling for native delete parquet logs with the fixed delete schema.FSUtilsandHoodieLogFile, including data log and.delete.parquetdelete log names.AbstractTableFileSystemViewso native parquet log files are classified as log files and excluded from base-file discovery.TestHoodieLogFilecoverage for native parquet data/delete log parsing and helper extraction.Impact
This adds a new reader implementation for LSM file groups without changing the existing
HoodieFileGroupReaderbehavior. It affects common file-name parsing and file-system view classification for native parquet log files, enabling readers to distinguish native log v2 files from regular parquet base files.No writer path, table config default, or existing Avro log reader behavior is changed. The main compatibility impact is that RFC-style native parquet log files are now recognized as Hudi log files by common utilities.
Risk Level
medium
The change touches common file parsing and file-system view classification, which are core read-path utilities. The new LSM reader also implements merge ordering semantics that must stay consistent with existing file-group merge behavior. Risk is mitigated by keeping the LSM reader separate from
HoodieFileGroupReader, preserving existing merge APIs, and validating with:mvn -pl hudi-common -DskipTests compilemvn -pl hudi-common -DskipITs -Dtest=TestHoodieLogFile testDocumentation Update
none
This PR adds reader implementation and native log-file recognition but does not introduce a new user-facing config, default behavior change, or public documentation surface in this repo. The behavior follows the RFC-103 design.
Contributor's checklist