fix(spark): runtime-merge full file groups for MOR incremental queries (#18943)#19005
fix(spark): runtime-merge full file groups for MOR incremental queries (#18943)#19005ad1happy2go wants to merge 2 commits into
Conversation
apache#18943) Incremental queries on a MOR table could return incorrect rows because the read resolved each changed record only from the files written within the incremental window, without a correct runtime merge against the rest of the file group: - EVENT_TIME_ORDERING (no partial updates): a record written in the window may carry a lower ordering value than the version already in the base/earlier log, so the existing version should win. A window-only view cannot determine the winner and could surface the losing write. - Partial updates: a window log block holds only the changed columns, so the unchanged columns must be filled in from the base file; otherwise the row comes back partial/garbled. SQL MERGE INTO on MOR writes partial blocks by default (hoodie.spark.sql.merge.into.partial.updates), so this hits the common case. Snapshot and read-optimized queries were already correct; the bug was isolated to the incremental file listing plus commit-time filter pushdown. Fix: for each file group touched in the incremental window, read its base file plus its log files (the full file slice) and run the standard file-group reader merge, then filter the merged output to the incremental window by commit time. Unconditional - no new config and no write-path change. - MergeOnReadIncrementalRelationV2: build the incremental file system view from the (modified) partition listing (metadata-table-aware) so each slice carries its base file, scoped back to the file groups actually touched in the window; bound the view timeline to the window end and close the view after use. - HoodieFileGroupReaderBasedFileFormat: for incremental merging file groups, do not push the commit-time span filter into the file reads; apply it on the merged output instead. Also set an InstantRange bounded to the window end on the reader context so base records and log blocks committed after the window are not merged in (a record updated again after the window must be returned with its value as of the window end, not its latest value). - HoodieReaderContext: add setInstantRange so the read path can bound the merge inputs to the query window. Scope: only the V2 relation listing changed (table version 8+); the format-level gate covers both V1 and V2 reads. Tests in TestPartialUpdateForMergeInto cover partial-update incremental (commit/event-time ordering, avro/parquet, partitioned), non-partial event-time ordering, window bounds, insert+update in one window, multiple partial updates to one key, exclusion of later commits, post-compaction merge, and COW non-regression. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
cb0b1e9 to
6bfaf46
Compare
…queries The incremental file listing introduced in apache#18943 builds a fresh, metadata-aware file-system view scoped to the touched file groups. This fixed runtime-merge correctness but silently dropped the fail-early contract verified by TestIncrementalReadWithFullTableScan: when the full-table-scan fallback is disabled and files referenced by the window have been removed (e.g. by cleaning), the query used to throw a read-time FileNotFoundException; with the fresh listing it instead returned an empty/partial result because that listing no longer sees the missing files. Restore the prior behavior: extract the missing-files check into hasMissingAffectedFiles (reused by fullTableScan), and when the fallback is off but files are missing, list slices directly from the recorded affected files (the pre-apache#18943 path) so the scan surfaces the file-not-found error pointing at hoodie.datasource.read.incr.fallback.fulltablescan.enable. The merge-correct listing is still used whenever the window's files are present, so the partial-update incremental tests are unaffected. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
6bfaf46 to
e54305f
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #19005 +/- ##
============================================
- Coverage 68.25% 67.62% -0.63%
- Complexity 29509 29799 +290
============================================
Files 2542 2562 +20
Lines 142632 145152 +2520
Branches 17789 18346 +557
============================================
+ Hits 97354 98165 +811
- Misses 37271 38764 +1493
- Partials 8007 8223 +216
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR fixes a real correctness gap in the MOR incremental read path — partial updates and event-time-ordering windows now go through a proper runtime merge against the full file slice, with the commit-time window applied post-merge and an InstantRange bound to keep later log blocks out. The approach (full slice listing → bounded InstantRange → post-merge commit-time predicate) is consistent with the rest of the file group reader, and the test matrix is genuinely thorough (commit-time/event-time ordering, partitioned tables, window bounds, insert+update mix, multi-update same key, later-commit exclusion, post-compaction merge, COW non-regression). A couple of small things worth a second look in the inline comments — mostly perf and a question about V1 scope. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming and duplication nits below; the overall structure and documentation are clear and well-motivated.
| } else { | ||
| Some(new JPredicate[InternalRow] { | ||
| override def test(row: InternalRow): Boolean = | ||
| !row.isNullAt(idx) && allowedCommitTimes.contains(row.getUTF8String(idx).toString) |
There was a problem hiding this comment.
🤖 This row.getUTF8String(idx).toString allocates a Java String for every merged row in the file group. Would it be worth pre-converting allowedCommitTimes to Set[UTF8String] once and comparing UTF8String directly to skip the per-row allocation? Could matter on wide incremental windows.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| } finally { | ||
| fsView.close() | ||
| } | ||
| } |
There was a problem hiding this comment.
🤖 Just to double-check the scope: the PR description says only the V2 listing changed, but HoodieFileGroupReaderBasedFileFormat is the read path for both V1 and V2. V1's listing still builds the view from affectedFilesInCommits only (no pre-window base file), so the runtime merge sees only log records and the partial-update / EVENT_TIME_ORDERING bugs remain on V1. Is that intentional (V1 = table-version < 8, deprecated path), or should V1 listing get the same treatment in a follow-up?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
Confirmed V2-only and intentional: the commit message scopes the listing fix to table version 8+. V1 (MergeOnReadIncrementalRelationV1.scala:114-115 and :136) still builds the window-only HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits), so a MOR incremental read on a table-version < 8 table retains the partial-update / event-time gap. If table-version < 8 incremental MOR is still supported, this is worth a tracked follow-up rather than leaving it implicit.
|
|
||
| fallbackToFullTableScan && (startInstantArchived | ||
| || affectedFilesInCommits.asScala.exists(fileStatus => !metaClient.getStorage.exists(fileStatus.getPath))) | ||
| fallbackToFullTableScan && (startInstantArchived || hasMissingAffectedFiles) |
There was a problem hiding this comment.
🤖 With fallbackToFullTableScan=false, hasMissingAffectedFiles is now also evaluated in collectFileSplits/listFileSplits to choose between the new and legacy paths, which triggers an exists() per affected file at plan time even when before it was short-circuited. Could be slow for wide windows on object stores (one head request per file). Lazy val avoids repeat eval, so probably fine, but worth confirming this matches the intent.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| engineContext, metaClient, fileIndex.metadataConfig, timeline.findInstantsBeforeOrEquals(latestCommit)) | ||
| try { | ||
| partitionPaths.flatMap { relativePartitionPath => | ||
| fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala |
There was a problem hiding this comment.
🤖 nit: fs conventionally means FileSystem/storage throughout Hudi — using it for a FileSlice lambda here could trip up a reader. Could you rename it to slice or fileSlice?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| None | ||
| } else { | ||
| readSchema.getFieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD).flatMap { idx => | ||
| val allowedCommitTimes: Set[String] = requiredFilters.collect { |
There was a problem hiding this comment.
🤖 nit: the case In(attr, values) if attr == HoodieRecord.COMMIT_TIME_METADATA_FIELD => values.filter(_ != null).map(_.toString) pattern is extracted verbatim in both incrementalWindowEnd (line 397) and here. Have you considered pulling it into a small private helper like commitTimesFromFilters(filters: Seq[Filter]): Set[String] to keep the two callers in sync if the filter shape ever changes?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
apache#18943) (apache#19005) * fix(spark): runtime-merge full file groups for MOR incremental queries (apache#18943) * fix(spark): preserve fail-early on missing files for MOR incremental queries * fix(spark): null-safe InstantRange nullable boundary so the window-end-only bound used by the runtime merge does not NPE Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| * so that base/log files written by later commits are not visible. Without this bound a record | ||
| * updated again after the window would be merged with those later log files and its merged commit | ||
| * time would fall outside the window, dropping the in-window change from the result. | ||
| */ |
There was a problem hiding this comment.
This affectedFileGroupIds filter is the only thing scoping the metadata-aware partition view back to the touched file groups, but no test covers a modified partition that holds an untouched sibling file group. testPartialUpdateIncrementalQueryPartitioned writes one updated record per partition, so with MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT=0 each partition is a single file group and this filter is a no-op - removing it would fail no test. Suggest a case that lands multiple file groups in one partition and updates only one, asserting the untouched sibling is excluded. Output stays correct via the post-merge commit-time filter either way, so this guards the scoping/perf path.
| * of silently returning an empty/partial result from a fresh listing that no longer sees those | ||
| * files. | ||
| */ | ||
| private def legacyAffectedFileSlices(partitionPaths: Seq[String], latestCommit: String): Seq[FileSlice] = { |
There was a problem hiding this comment.
legacyAffectedFileSlices and collectIncrementalFileSlices share the same partitionPaths.flatMap { p => view.getLatestMergedFileSlicesBeforeOrOn(p, latestCommit).iterator().asScala [.filter(...)] } body wrapped in try/finally view.close(), differing only in how the view is built and whether the file-group filter applies. Consider a private helper taking the view plus a FileSlice => Boolean predicate that owns the try/finally close, so the two call sites cannot drift on the close() contract.
Describe the issue this Pull Request addresses
Closes #18943.
Incremental queries on a MOR table can return incorrect rows. Two manifestations:
Snapshot and read-optimized queries are correct; only the incremental path is affected. Because SQL
MERGE INTOon MOR writes partial log blocks by default (hoodie.spark.sql.merge.into.partial.updates), this hits the common case.Summary and Changelog
For each file group touched in the incremental window, the read now loads the full file slice (base file + log files) and runs the standard file-group reader merge, then filters the merged output to the window by commit time. The fix is unconditional — no new config and no write-path change.
MergeOnReadIncrementalRelationV2: build the incremental file-system view from the (modified) partition listing (metadata-table-aware) so each slice carries its base file, scoped back to the file groups actually touched in the window; bound the view timeline to the window end and close the view after use.HoodieFileGroupReaderBasedFileFormat: for incremental merging file groups, do not push the commit-time span filter into the file reads — apply it on the merged output instead. Also set anInstantRangebounded to the window end on the reader context, so base records and log blocks committed after the window are not merged in (a record updated again after the window must be returned with its value as of the window end, not its latest value).HoodieReaderContext: addsetInstantRangeso the read path can bound the merge inputs to the query window.TestPartialUpdateForMergeInto.Scope: only the V2 relation listing changed (table version 8+); the format-level gate covers both V1 and V2 reads. No code was copied.
Impact
Incremental queries on MOR tables now return correct, fully-merged rows. Performance: the incremental file listing now builds a metadata-table-aware file-system view over the modified partitions (instead of a window-files-only view) and scopes it to the touched file groups — this can enumerate more file groups on wide partitions, but only the touched groups are read and merged.
Risk Level
medium — this changes the MOR incremental read path. Mitigated by:
TestPartialUpdateForMergeInto: partial-update incremental (commit/event-time ordering, avro/parquet, partitioned), non-partial event-time ordering, window bounds, insert+update in one window, multiple partial updates to one key, exclusion of commits after the window end, post-compaction merge, and a COW non-regression.Documentation Update
none — bug fix, no new config or user-facing API change.
Contributor's checklist