perf(spark): restore Catalyst fast-paths for SimpleKeyGen/Nonpartitioned bulk insert#18990
perf(spark): restore Catalyst fast-paths for SimpleKeyGen/Nonpartitioned bulk insert#18990nsivabalan wants to merge 2 commits into
Conversation
…ned bulk insert The post-apache#5470 HoodieDatasetBulkInsertHelper rewrite routed every keygen through df.queryExecution.toRdd.mapPartitions(...) which materialises rows and pays a per-row keygen reflection cost. Restore tiered dispatch so the two common keygen classes stay as Catalyst column projections: - Tier 1 (NonpartitionedKeyGenerator, single record-key field): col(rk).cast(String) + lit("") -- no UDF, no toRdd round-trip. - Tier 2 (SimpleKeyGenerator, single record-key + single partition-path, default URL/slash flags): col(rk).cast(String) plus a partition-path expression mirroring PartitionPathFormatterBase#combine including the handleEmpty -> __HIVE_DEFAULT_PARTITION__ substitution, with hive-style `<field>=` prefixing emitted as `concat(lit, ...)`. urlEncode and the 1.2.0+ slashSeparatedDatePartitioning flag drop to Tier 3. - Tier 3 (everything else): anonymous functions.udf(...) -- registered per call, not on the SparkSession, so nothing leaks across writes. - Auto-keygen keeps the existing RDD path (needs TaskContext partitionId and a stateful row counter -- no clean Catalyst expression). The Tier 3 UDF goes through BuiltinKeyGenerator.getRecordKey(Row) / getPartitionPath(Row), i.e. the canonical Avro-aligned formatters, so all three formatter flags (hive-style, URL encode, slash-separated dates) are honoured for the keygens that fall through. Test coverage in TestHoodieDatasetBulkInsertHelper: - testKeyGenParityAgainstAvroGroundTruth (11 parameterised cases) -- one per tier-relevant keygen + formatter-flag combo, compares the helper's record-key / partition-path output to BuiltinKeyGenerator's Avro path. - testFastPathCastsNonStringRecordKey -- Tier 1/2 cast a non-string record-key column (ts: long) to the expected string form. - testFastPathAvoidsUdf -- Tier 1/2 analyzed plans contain no ScalaUDF (i.e. they actually benefit from Catalyst codegen). - testTier2EmptyPartitionValueSubstitutedWithHiveDefault -- empty partition values resolve to __HIVE_DEFAULT_PARTITION__ under both default and hive-style flags. - testUdfPathRespectsDriverSessionTimezone -- the Tier 3 UDF path picks up the driver's spark.sql.session.timeZone (guards against executor JVM default leakage on TimestampBasedKeyGenerator). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18990 +/- ##
============================================
- Coverage 68.77% 67.94% -0.83%
- Complexity 29073 29249 +176
============================================
Files 2517 2542 +25
Lines 139807 142597 +2790
Branches 17210 17811 +601
============================================
+ Hits 96148 96890 +742
- Misses 35885 37716 +1831
- Partials 7774 7991 +217
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR restores Catalyst fast-paths for the two most common keygens (Nonpartitioned + Simple) and keeps everything else on an anonymous-UDF Tier 3, with a parity test against the Avro ground truth covering the supported flag combos. A couple of edge cases worth double-checking in the inline comments — mostly around the Tier 2 dispatch when partition columns aren't strings, and the class-name comparison missing the *AvroKeyGenerator variants. Please take a look, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming and assertion-style nits below.
|
|
||
| val tier2 = keyGeneratorClassName == classOf[SimpleKeyGenerator].getName && | ||
| recordKeyFields.size == 1 && partitionPathFields.size == 1 && | ||
| !urlEncode && !slashSep |
There was a problem hiding this comment.
🤖 The Tier 2 gate checks the keygen class and formatter flags but not the partition column's type. For a Timestamp partition column, col(field).cast(StringType) formats differently than the canonical Timestamp.toString() that combinePartitionPath uses (e.g. Spark's cast drops trailing zeros and uses session timezone, Timestamp.toString keeps fractional seconds and uses JVM default tz). Same concern applies to Tier 1 / the recordKeyCol cast. The parity test only exercises a String partition column so this divergence wouldn't be caught — could you confirm the type-mismatch case is intentional (matches the pre-#5470 fast path) and maybe add a StringType guard or test coverage for a Timestamp partition?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| val slashSep = typedProps.getBoolean(KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.key, | ||
| KeyGeneratorOptions.SLASH_SEPARATED_DATE_PARTITIONING.defaultValue.toBoolean) | ||
|
|
||
| val tier1 = keyGeneratorClassName == classOf[NonpartitionedKeyGenerator].getName && |
There was a problem hiding this comment.
🤖 The class-name comparison uses the user-supplied keyGeneratorClassName directly, so users who configured the Avro variant (SimpleAvroKeyGenerator / NonpartitionedAvroKeyGenerator) won't hit Tier 1/2 even though the factory converts them to the Spark variant at load time. Could compare against HoodieSparkKeyGeneratorFactory.convertToSparkKeyGenerator(keyGeneratorClassName) here. Not a correctness issue — Tier 3 still works for those — just a missed optimization for that config path.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| // Tier 1/2 do not reproduce the canonical keygen's null/empty record-key validation: | ||
| // a null record-key value passes through as SQL NULL instead of throwing HoodieKeyException. | ||
| // This matches the behaviour of the pre-existing fast paths in this helper prior to the | ||
| // RDD-based rewrite. Tier 3 retains the strict canonical behaviour. |
There was a problem hiding this comment.
🤖 The comment notes that Tier 1/2 pass NULL record-keys through instead of throwing, but the divergence also covers empty-string record keys — SimpleKeyGenerator.getRecordKey(Row) calls requireNonNullNonEmptyKey which throws on both null and empty. Worth mentioning explicitly so future readers don't assume empty strings still validate.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| Dataset<Row> result = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(dataset, config, | ||
| new HoodieTableConfig(), new NonSortPartitionerWithRows(), "000000001"); | ||
| String plan = result.queryExecution().analyzed().toString(); | ||
| assertTrue(!plan.toLowerCase().contains("scalaudf"), |
There was a problem hiding this comment.
🤖 nit: assertTrue(!condition) reads a bit awkwardly here — assertFalse(plan.toLowerCase().contains("scalaudf"), ...) is more idiomatic and produces a slightly clearer failure message.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Describe the issue this Pull Request addresses
Closes #18989.
Summary and Changelog
HoodieDatasetBulkInsertHelper.prepareForBulkInsertwas routing every key-generator throughdf.queryExecution.toRdd.mapPartitions(...), forcing an RDD round-trip and per-row reflection-based keygen invocation even for the common keygens where the record-key and partition-path values can be sourced directly from input columns.This patch restores tiered dispatch:
NonpartitionedKeyGenerator(single record-key field): emitscol(rk).cast(String)+lit("")as Catalyst columns. No UDF, no toRdd round-trip.SimpleKeyGenerator(single record-key + single partition-path field, URL-encoding off, slash-separated dates off): emitscol(rk).cast(String)and a partition-path expression mirroringPartitionPathFormatterBase#combine, including thehandleEmpty -> __HIVE_DEFAULT_PARTITION__substitution and hive-style<field>=prefixing.ComplexKeyGenerator,TimestampBasedKeyGenerator,CustomKeyGenerator,SimpleKeyGeneratorwith URL-encode or slash-separated dates): anonymousfunctions.udf(...)over a struct of input columns calling the canonicalBuiltinKeyGenerator.getRecordKey(Row)/getPartitionPath(Row). The UDFs are not registered against theSparkSession, so nothing leaks across writes.TaskContext.partitionIdand a stateful per-task counter, which can't be expressed cleanly as a driver-side closure.The Tier 3 UDF goes through the
Row-overload keygen API which uses the canonicalStringformatter, so all three partition-formatter flags (hive-style, URL encode, slash-separated dates) remain honored for the keygens that fall through. The Tier 2 fast-path encodes only the default and hive-style flag subset (URL encoding has no efficient pure-Catalyst equivalent; the 1.2.0+ slash-separated branch exercises a separate code path we'd rather not encode twice).New tests in
TestHoodieDatasetBulkInsertHelper:testKeyGenParityAgainstAvroGroundTruth(parameterized, 11 cases) — every supported keygen class plus theSimpleKeyGenflag combos (default / hive / slash / hive+slash / URL / hive+URL / Complex single+multi / TimestampBased / Custom). Each case asserts the helper's record-key and partition-path output matchesBuiltinKeyGenerator's Avro path byte-for-byte.testFastPathCastsNonStringRecordKey— Tier 1/2 must materialize the string form of a non-string record-key column (usests: long).testFastPathAvoidsUdf— Tier 1/2 analyzed logical plans must not contain aScalaUDFnode (i.e. they actually benefit from Catalyst codegen).testTier2EmptyPartitionValueSubstitutedWithHiveDefault— empty partition values resolve to__HIVE_DEFAULT_PARTITION__under both default and hive-style flags.testUdfPathRespectsDriverSessionTimezone— Tier 3 UDF picks up the driver'sspark.sql.session.timeZone(guards against executor JVM default leakage onTimestampBasedKeyGenerator).Impact
Performance: restores per-row Catalyst codegen for bulk inserts that use
NonpartitionedKeyGeneratororSimpleKeyGenerator(with default or hive-style partitioning) — the most common configurations in practice. No behaviour change for the keygens that fall through to Tier 3; their output is byte-identical to the prior RDD path (and to the Avro ground truth, which the parity test now enforces).No public API change. No config change. No storage format change.
Risk Level
Low. The change is contained to
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(Scala helper, no public API surface) and the parity test exhaustively checks every keygen + formatter combination against the canonical Avro keygen output. The Tier 3 fallback is the existing RDD-replaced UDF path, so any keygen the fast paths don't claim continues to use the same canonical formatter.Documentation Update
None.
Contributor's checklist
🤖 Generated with Claude Code