feat(hive-sync): batch and parallelize JDBC partition operations#18986
Draft
nsivabalan wants to merge 3 commits into
Draft
feat(hive-sync): batch and parallelize JDBC partition operations#18986nsivabalan wants to merge 3 commits into
nsivabalan wants to merge 3 commits into
Conversation
Hive sync partition operations on HMS today serialize through a single IMetaStoreClient and ship entire partition lists in a single Thrift call for TOUCH/UPDATE. For large tables (~2k partitions) this is ~5-9x slower than parallel implementations (see apache#18331). The biggest contributors are (1) one giant alter_partitions call for UPDATE/TOUCH, and (2) per- partition Thrift round-trips for DROP, all sequential. This change introduces an opt-in IMetaStoreClientPool gated behind hoodie.datasource.hive_sync.batching.enabled (default false). When on, HMSDDLExecutor splits ADD / UPDATE / TOUCH / DROP into batches of hoodie.datasource.hive_sync.batch_num (existing config, default 1000) and fans them out across a pool of RetryingMetaStoreClient instances sized by hoodie.datasource.hive_sync.batching.threads (default 4). Design invariant: only partition-row operations go through the pool. Table-row operations (createTable, alter_table, last-commit-time-synced, writer-version, table-comments) stay on the existing session client, so there is no lost-update risk on table parameters. The sync flow remains serial-parallel-serial (phase 1: table setup, phase 2: parallel partition fan-out, phase 3: table finalization). Sequential fallback is preserved when the flag is off or when HIVE_SYNC_USE_SPARK_CATALOG is on (incompatible with the pool's direct RetryingMetaStoreClient.getProxy path). Tests: TestIMetaStoreClientPool covers borrow/return, concurrent borrows, close idempotency. TestHiveSyncTool.testHMSSyncWithBatchingEnabled exercises end-to-end sync against the embedded HMS with batching on. Related: apache#18331 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to apache#18983 (HMS parallelism). Applies the equivalent treatment to the HiveQL sync mode (hoodie.datasource.hive_sync.mode=hiveql). HiveQL had two issues that this change addresses: 1. Batching gaps in QueryBasedDDLExecutor.constructPartitionAlterStatements: TOUCH concatenated every partition into one giant ALTER TABLE ... TOUCH PARTITION (...) PARTITION (...) ... statement; SET_LOCATION (UPDATE) emitted one statement per partition. ADD was already batched. 2. Sequential SQL execution in HiveQueryDDLExecutor.updateHiveSQLs: even when batches existed, they ran in a single for-loop on one Hive Driver. This change introduces HiveDriverPool, an eager pool of single-thread executors each owning a Hive Driver bound to a shared SessionState. Gated behind the existing hoodie.datasource.hive_sync.batching.enabled flag (default off) and sized by hoodie.datasource.hive_sync.batching.threads (default 4) — no new configs. Design notes: - Hive's Driver and SessionState are thread-bound. SessionState.start() attaches to the calling thread's ThreadLocal. The pool gives each slot its own dedicated worker thread so the Driver stays valid for that thread's lifetime. Bootstrap, dispatch, and close all run on the bound thread. - SessionState is shared across workers (lazily constructed once), because each worker calls SessionState.start(sharedState) on its own thread to attach. Constructing one SessionState per worker triggered race conditions in Hive's resource-directory machinery on macOS. - TOUCH is now batched by HIVE_BATCH_SYNC_PARTITION_NUM. SET_LOCATION remains one statement per partition (Hive SQL doesn't support multi-partition SET LOCATION) but is now fanned out across workers. - Hive 2.x's ALTER PARTITION SET LOCATION ignores db.table qualifiers and silently uses the connection's current database, so the leading USE database statement is load-bearing. The pool peels it off and runs it on every worker via runOnEachWorker() before fanning the rest out. Tests: - TestHiveDriverPool: bootstrap, dispatch round-robin, error propagation, concurrent-borrow bounding, close idempotency. - TestHiveSyncTool.testHiveQLSyncWithBatchingEnabled: end-to-end with batching.enabled=true, threads=3, batch_num=3 against embedded HMS. - TestHiveSyncTool.testHiveQLTouchPartitionsWithBatching: exercises the batched TOUCH path specifically. - Full hudi-hive-sync suite: 305 passed, 0 failures, 0 errors. Related: apache#18331 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to apache#18983 (HMS pool) and apache#18984 (HiveQL Driver pool). Applies the equivalent treatment to JDBC sync mode. The JDBC executor's partition phase ran every SQL statement sequentially on a single shared java.sql.Connection. ADD and DROP were already batched in the parent QueryBasedDDLExecutor (and in JDBCExecutor.constructDropPartitions); TOUCH became batched in apache#18984. The only batching gap that remains is SET_LOCATION (UPDATE), which Hive SQL fundamentally cannot batch (no multi-partition SET LOCATION syntax). So the win for JDBC is pure parallel execution. This change introduces JDBCConnectionPool, a simple borrow/return pool of N java.sql.Connection instances. JDBC Connection is not thread-safe but is cheap to construct (one TCP socket to HiveServer2), so the design mirrors IMetaStoreClientPool more than HiveDriverPool — no thread-binding needed. Gated behind the existing hoodie.datasource.hive_sync.batching.enabled flag (default false) and sized by hoodie.datasource.hive_sync.batching.threads (default 4) — no new configs. Design notes: - JDBCExecutor keeps its session Connection for table-row work (createDatabase, createTable, schema evolution, updateTableComments). JDBCBasedMetadataOperator continues to use that session Connection, unchanged. - The pool is purely additive for partition fan-out. Pool clients only run partition-row statements. - Hive 2.x's ALTER PARTITION SET LOCATION ignores db.table qualifiers and silently uses the connection's current database. The pool's runOnEachConnection() broadcasts the leading USE database statement to every connection before partition fan-out, same pattern as HiveDriverPool.runOnEachWorker() from apache#18984. This is run lazily on first dispatch, not at pool construction (the database may not exist yet at construction time). Tests: - TestJDBCConnectionPool: borrow/return, concurrent-borrow bounding, close idempotency, exhaustion blocking, size validation. - TestHiveSyncTool.testJDBCSyncWithBatchingEnabled: end-to-end JDBC sync with batching.enabled=true, threads=3, batch_num=3 against the embedded HiveServer2. - Full hudi-hive-sync suite: 314 passed, 0 failures. Related: apache#18331 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Collaborator
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18986 +/- ##
============================================
- Coverage 68.26% 68.25% -0.02%
- Complexity 29513 29564 +51
============================================
Files 2542 2545 +3
Lines 142627 142987 +360
Branches 17788 17850 +62
============================================
+ Hits 97369 97593 +224
- Misses 37253 37369 +116
- Partials 8005 8025 +20
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
JDBCConnectionPoolfor JDBC partition sync (issue #18331)java.sql.Connectioninstanceshoodie.datasource.hive_sync.batching.enabled=trueNote: stacked on #18984 (which is stacked on #18983). The diff currently includes both upstream commits. Once #18983 and #18984 merge, this PR will rebase cleanly onto master and the diff will shrink to just the JDBC change (~581 added / 3 removed). Reviewing the top commit
f2b079109192in isolation gives the JDBC-only delta.Design constraint
JDBC
Connectionis not thread-safe but is cheap to construct (one TCP socket to HiveServer2). So the design mirrors #18983'sIMetaStoreClientPoolmore than #18984'sHiveDriverPool— simple borrow/return with no thread-binding. Each pooled connection is its own HiveServer2 session.What's actually new
The batching gaps for JDBC were small:
QueryBasedDDLExecutor.constructAddPartitions(parent class).JDBCExecutor.constructDropPartitions.constructPartitionAlterStatementschange.SET LOCATIONsyntax).So the win for JDBC is pure parallel execution of the existing batches and statements, not new batching.
Hive 2.x quirk handling
Same Hive 2.x
USE databasequirk we encountered in #18984:ALTER PARTITION SET LOCATIONignoresdb.tblqualifiers and silently uses the connection's current database. Each pooled connection is its own HiveServer2 session, so each one needs its ownUSE databasebefore any partition ALTER. The pool exposesrunOnEachConnection(List<String>)for this;JDBCExecutor.runSQLspeels off any leadingUSEstatements from the SQL list and broadcasts them to every pooled connection, then fans the rest round-robin.This is done lazily on first dispatch (not at pool construction) because the database may not yet exist when the pool is built —
HoodieHiveSyncClientconstructs the pool beforeHiveSyncTool.syncHoodieTable()runscreateDatabase. (I learned this the hard way during testing.)Design invariant (same as #18983 / #18984)
JDBCExecutorcontinues to handle table-row work:createDatabase,createTable,updateTableComments, schema evolution.JDBCBasedMetadataOperator(used as the Thrift-incompatibility fallback) continues to use the same session Connection — unchanged.Configs
No new configs — reuses everything from #18983:
hoodie.datasource.hive_sync.batching.enabledfalsehoodie.datasource.hive_sync.batching.threads4hoodie.datasource.hive_sync.batch_num1000Test plan
mvn compileonhudi-sync/hudi-hive-sync— clean, 0 Checkstyle violations, 0 RAT issuesmvn testonhudi-sync/hudi-hive-sync— 314 tests, 0 failures, 0 errorsTestJDBCConnectionPool(new, 8 tests) — borrow/return, concurrent-borrow bounding, idempotent close, exhaustion blocking, size validation, executor lifecycleTestHiveSyncTool#testJDBCSyncWithBatchingEnabled(new) — end-to-end JDBC sync with batching on against the embedded HiveServer2 (10 + 4 partitions, batch_num=3, threads=3)Stack so far
After all three merge, the perf gap reported in #18331 should be closed for all sync modes.
Related: #18331
🤖 Generated with Claude Code