feat(hive-sync): parallelize DROP partitions in HiveQL sync mode#19033
feat(hive-sync): parallelize DROP partitions in HiveQL sync mode#19033nsivabalan wants to merge 5 commits into
Conversation
Hive sync partition operations on HMS today serialize through a single IMetaStoreClient and ship entire partition lists in a single Thrift call for TOUCH/UPDATE. For large tables (~2k partitions) this is ~5-9x slower than parallel implementations (see apache#18331). The biggest contributors are (1) one giant alter_partitions call for UPDATE/TOUCH, and (2) per- partition Thrift round-trips for DROP, all sequential. This change introduces an opt-in IMetaStoreClientPool gated behind hoodie.datasource.hive_sync.batching.enabled (default false). When on, HMSDDLExecutor splits ADD / UPDATE / TOUCH / DROP into batches of hoodie.datasource.hive_sync.batch_num (existing config, default 1000) and fans them out across a pool of RetryingMetaStoreClient instances sized by hoodie.datasource.hive_sync.batching.threads (default 4). Design invariant: only partition-row operations go through the pool. Table-row operations (createTable, alter_table, last-commit-time-synced, writer-version, table-comments) stay on the existing session client, so there is no lost-update risk on table parameters. The sync flow remains serial-parallel-serial (phase 1: table setup, phase 2: parallel partition fan-out, phase 3: table finalization). Sequential fallback is preserved when the flag is off or when HIVE_SYNC_USE_SPARK_CATALOG is on (incompatible with the pool's direct RetryingMetaStoreClient.getProxy path). Tests: TestIMetaStoreClientPool covers borrow/return, concurrent borrows, close idempotency. TestHiveSyncTool.testHMSSyncWithBatchingEnabled exercises end-to-end sync against the embedded HMS with batching on. Related: apache#18331 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to apache#18983 (HMS parallelism). Applies the equivalent treatment to the HiveQL sync mode (hoodie.datasource.hive_sync.mode=hiveql). HiveQL had two issues that this change addresses: 1. Batching gaps in QueryBasedDDLExecutor.constructPartitionAlterStatements: TOUCH concatenated every partition into one giant ALTER TABLE ... TOUCH PARTITION (...) PARTITION (...) ... statement; SET_LOCATION (UPDATE) emitted one statement per partition. ADD was already batched. 2. Sequential SQL execution in HiveQueryDDLExecutor.updateHiveSQLs: even when batches existed, they ran in a single for-loop on one Hive Driver. This change introduces HiveDriverPool, an eager pool of single-thread executors each owning a Hive Driver bound to a shared SessionState. Gated behind the existing hoodie.datasource.hive_sync.batching.enabled flag (default off) and sized by hoodie.datasource.hive_sync.batching.threads (default 4) — no new configs. Design notes: - Hive's Driver and SessionState are thread-bound. SessionState.start() attaches to the calling thread's ThreadLocal. The pool gives each slot its own dedicated worker thread so the Driver stays valid for that thread's lifetime. Bootstrap, dispatch, and close all run on the bound thread. - SessionState is shared across workers (lazily constructed once), because each worker calls SessionState.start(sharedState) on its own thread to attach. Constructing one SessionState per worker triggered race conditions in Hive's resource-directory machinery on macOS. - TOUCH is now batched by HIVE_BATCH_SYNC_PARTITION_NUM. SET_LOCATION remains one statement per partition (Hive SQL doesn't support multi-partition SET LOCATION) but is now fanned out across workers. - Hive 2.x's ALTER PARTITION SET LOCATION ignores db.table qualifiers and silently uses the connection's current database, so the leading USE database statement is load-bearing. The pool peels it off and runs it on every worker via runOnEachWorker() before fanning the rest out. Tests: - TestHiveDriverPool: bootstrap, dispatch round-robin, error propagation, concurrent-borrow bounding, close idempotency. - TestHiveSyncTool.testHiveQLSyncWithBatchingEnabled: end-to-end with batching.enabled=true, threads=3, batch_num=3 against embedded HMS. - TestHiveSyncTool.testHiveQLTouchPartitionsWithBatching: exercises the batched TOUCH path specifically. - Full hudi-hive-sync suite: 305 passed, 0 failures, 0 errors. Related: apache#18331 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…omments - Change HiveDriverPool.awaitAll(...) to return void. The List<CommandProcessorResponse> it previously returned was always empty and no caller consumed it. Drops the unused CommandProcessorResponse import. - Lift the empty-input short-circuit to the top of HiveQueryDDLExecutor.runSQLs so the no-op case skips both the pool and the session Driver branches cleanly. - Document isUseStatement's strict 4-char prefix expectation so future callers don't feed it externally produced (potentially padded) SQL. No behavior change. Full hudi-hive-sync suite: 305 tests, 0 failures, 0 errors. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
PR review follow-ups for apache#18984: - HiveQueryDDLExecutor.driverPool -> Option<HiveDriverPool> (PR comment). Constructor arg, instance field, and HoodieHiveSyncClient call sites updated. Eliminates a stale 'Optional. When non-null' inline doc. - DefaultDriverFactory: stop redundantly calling setCurrentDatabase on every newDriver(). Database is a pool-wide property that never changes across workers, so set it once when the shared SessionState is first constructed (PR comment). - HiveDriverPool.awaitAll: on first failure, cancel remaining (not yet started) pending futures so workers don't keep running pointless work after a fatal error. Cancel uses mayInterruptIfRunning=false so any in-flight statement is allowed to run to completion (keeps Driver state consistent). Suppressed errors continue to be logged at WARN. Adds handling for CancellationException so the cancel-walk doesn't itself raise a spurious HoodieHiveSyncException. - HiveDriverPool bootstrap: bound each Future.get() at 60s (BOOTSTRAP_TIMEOUT_SECONDS). Prior code blocked forever if Hive init hung — now we surface a HoodieException with a timeout cause. - Logging: stop logging full SQL text per-statement in runAll/awaitAll (batched TOUCH/ADD can be many KB; N workers multiply log volume). Replaced with a single per-call summary line. Same treatment applied to HiveQueryDDLExecutor.updateHiveSQLs (sequential path). - New unit test: runOnEachWorkerRunsSetupOnEveryWorker — asserts every worker sees the leading USE before any fan-out partition statement. - New unit test: awaitAllCancelsPendingFuturesOnFirstError — uses a size-1 pool to guarantee the 2nd/3rd statements are still pending behind the failing 1st, then asserts they are cancelled. - New end-to-end test: testHiveQLSetLocationWithBatching — drives updatePartitionsToTable through the SET_LOCATION fan-out path with batching on; asserts partition count and per-partition relative paths survive parallel ALTER PARTITION SET LOCATION. Out of scope (documented as follow-up): DROP partition parallelization in HIVEQL mode. DROP goes through IMetaStoreClient.dropPartition (Thrift, not Hive Driver), so it would need IMetaStoreClientPool wired into the HiveQL path — a separable change from the HiveDriverPool work this PR delivers. Tests: full hudi-hive-sync suite passes — 308 tests, 0 failures, 0 errors (was 305 before this commit). New tests: - TestHiveDriverPool: 9 tests (was 7) - TestHiveSyncTool: testHiveQLSetLocationWithBatching added Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to apache#18984 (HiveQL partition batching). The HiveQL executor's dropPartitionsToTable goes through IMetaStoreClient.dropPartition (Thrift), not Hive Driver — so it can't reuse HiveDriverPool. This change wires the existing IMetaStoreClientPool (from apache#18983) into HiveQueryDDLExecutor and uses it to fan drop batches across the pool's worker threads. Behavior: - batching.enabled=false (default): unchanged. dropPartitionsToTable iterates the partition list sequentially on the session metastore client, exactly as before. - batching.enabled=true: partitions are split into batches of HIVE_BATCH_SYNC_PARTITION_NUM, batches fan out across the pool's workers (one IMetaStoreClient per worker), and first-error semantics match the existing HMS-mode implementation (first failure thrown, subsequent failures logged at WARN). HoodieHiveSyncClient now builds partitionClientPool in both HMS and HIVEQL branches (and the legacy default-mode branch). The pool is closed in HoodieHiveSyncClient.close() before Hive.closeCurrent(), unchanged from apache#18983. Tests: - New end-to-end test: testHiveQLDropPartitionsWithBatching — creates 8 partitions, drops 4 through the parallel pool path with threads=3 and batch_num=2 (so we get multiple drop batches racing), asserts the remaining partition set matches. - Full hudi-hive-sync suite: 309 tests, 0 failures, 0 errors (was 308). Configs: no new configs. Reuses hoodie.datasource.hive_sync.batching.* from apache#18983 / apache#18984. Related: apache#18331 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR extends the parallel batching infrastructure to HiveQL DROP partitions by wiring the existing IMetaStoreClientPool into HiveQueryDDLExecutor. The structure is cleanly gated behind the existing opt-in flag, and the test coverage for both the new pool plumbing and the end-to-end DROP path is solid. A couple of correctness concerns around resource cleanup and SessionState sharing are worth a closer look in the inline comments. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of nits on a field that exists purely for documentation and a duplicated futures-error loop.
| @@ -597,6 +642,17 @@ public void deleteLastReplicatedTimeStamp(String tableName) { | |||
| public void close() { | |||
| try { | |||
| ddlExecutor.close(); | |||
There was a problem hiding this comment.
🤖 If ddlExecutor.close() throws, control jumps to the outer catch (Exception e) { log.error(...) } (which doesn't rethrow), so partitionClientPool.close() never runs and the Thrift sockets + worker threads leak silently. Could you wrap the pool cleanup in a try-finally so it runs regardless of whether ddlExecutor.close() succeeds? E.g. try { ddlExecutor.close(); } finally { if (partitionClientPool != null) { ... } }.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| // SessionState is shared across workers; build it once on the first call (with | ||
| // currentDatabase already set) and attach it to each worker's thread-local on | ||
| // subsequent calls. The database is a pool-wide property and never changes | ||
| // across workers, so setting it once at construction time is sufficient. |
There was a problem hiding this comment.
🤖 The shared SessionState pattern here makes me a bit nervous — every worker thread calls SessionState.start(sharedSessionState) and then runs concurrent Driver.run() against the same instance. Hive's SessionState has plenty of mutable fields the Driver writes during query execution (e.g. currentDatabase via USE, hiveOperation, cmd, lineageState, tablesAlreadyMarkedAsRead). Are we confident none of these get written concurrently by parallel ALTER PARTITION / TOUCH statements? @yihua could you weigh in on whether sharing SessionState across worker threads is safe in practice, or whether each worker should construct its own (paying the SessionState init cost in exchange for isolation)?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| for (List<String> batch : batches) { | ||
| futures.add(partitionClientPool.executor().submit(() -> | ||
| partitionClientPool.run(poolClient -> { | ||
| action.apply(poolClient, batch); |
There was a problem hiding this comment.
🤖 When a Future fails, f.get() throws ExecutionException wrapping the real cause. firstError then stores the ExecutionException, which gets rethrown and caught by the outer catch (Exception e) in addPartitionsToTable/dropPartitionsToTable/registerAlterPartitionEvent, which wraps it again in HoodieHiveSyncException. The result is HoodieHiveSyncException -> ExecutionException -> RealCause — one more wrapping layer than the sequential path produced. HiveDriverPool.awaitAll already has an unwrap() helper; could we do the same here for consistency?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| // Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit | ||
| // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for | ||
| // reference only — close() is delegated through ddlExecutor.close(). | ||
| private Option<HiveDriverPool> partitionDriverPool = Option.empty(); |
There was a problem hiding this comment.
🤖 nit: the comment says this field is 'kept for reference only — close() is delegated through ddlExecutor.close()', which means it's never read after being passed into HiveQueryDDLExecutor. Could you make it a local variable in the constructor instead? A field that isn't read after construction will make future readers wonder what lifecycle role it plays.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| * semantics, so each worker still iterates its chunk one partition at a time — the | ||
| * win is fanning chunks across independent Thrift clients. | ||
| */ | ||
| private void runDropBatches(String tableName, List<List<String>> batches) throws Exception { |
There was a problem hiding this comment.
🤖 nit: the parallel-path error-collection loop inside runDropBatches (Exception firstError = null; for (Future<Void> f : futures) …) is nearly identical to the one in HMSDDLExecutor.runBatches. Since HiveDriverPool already has a well-factored awaitAll(List<Future<?>>) for exactly this pattern, it might be worth adding a similar awaitAll(List<Future<Void>>) to IMetaStoreClientPool so both callers can delegate to it instead.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #19033 +/- ##
============================================
- Coverage 68.26% 67.65% -0.62%
- Complexity 29513 29865 +352
============================================
Files 2542 2564 +22
Lines 142627 145490 +2863
Branches 17788 18371 +583
============================================
+ Hits 97369 98428 +1059
- Misses 37253 38828 +1575
- Partials 8005 8234 +229
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Summary
dropPartitionsToTablein the HiveQL sync mode (hoodie.datasource.hive_sync.mode=hiveql).7022e7d49341in isolation gives the DROP-only delta (~119 added / 16 removed).What this fixes
HiveQL DROP goes through
IMetaStoreClient.dropPartition(Thrift), not Hive Driver — so it can't reuse theHiveDriverPoolintroduced in #18984. Today it loops sequentially against the session metastore client. This PR wires the existingIMetaStoreClientPoolfrom #18983 intoHiveQueryDDLExecutorand uses it to fan drop batches across the pool's workers.Behavior:
batching.enabled=false(default): unchanged.dropPartitionsToTableiterates the partition list sequentially on the session metastore client, exactly as before.batching.enabled=true: partitions split into batches ofHIVE_BATCH_SYNC_PARTITION_NUM, batches fan out across the pool's workers (one independentIMetaStoreClientper worker), first-error semantics match the HMS-mode implementation (first failure thrown, subsequent suppressed at WARN).Configs
No new configs. Reuses everything from #18983 / #18984:
hoodie.datasource.hive_sync.batching.enabledfalsehoodie.datasource.hive_sync.batching.threads4hoodie.datasource.hive_sync.batch_num1000Test plan
mvn compileonhudi-sync/hudi-hive-sync— clean, 0 Checkstyle violations, 0 RAT issuesmvn testonhudi-sync/hudi-hive-sync— 309 tests, 0 failures, 0 errors (was 308 on the parent branch)TestHiveSyncTool#testHiveQLDropPartitionsWithBatching(new) — creates 8 partitions, drops 4 through the parallel pool path withthreads=3andbatch_num=2(multiple drop batches dispatched in parallel), asserts the remaining set matches.Files touched (top commit only)
HiveQueryDDLExecutor.java— new constructor acceptingOption<IMetaStoreClientPool>;dropPartitionsToTablenow batches and fans out when the pool is present, falls back to sequential single-client path otherwise.HoodieHiveSyncClient.java— buildIMetaStoreClientPoolin HIVEQL branches as well (HMS branch already did so).TestHiveSyncTool.java— new end-to-end test.Out of scope
constructDropPartitionsalready batches byHIVE_BATCH_SYNC_PARTITION_NUMbut runs sequentially; parallelizing it needs a JDBCConnectionpool, tracked separately.Related: #18331
🤖 Generated with Claude Code