Skip to content
Draft
34 changes: 34 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,28 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("localTableScan", defaultValue = false)

val COMET_EXEC_GRACE_HASH_JOIN_NUM_PARTITIONS: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.numPartitions")
.category(CATEGORY_EXEC)
.doc("The number of partitions (buckets) to use for Grace Hash Join. A higher number " +
"reduces the size of each partition but increases overhead.")
.intConf
.checkValue(v => v > 0, "The number of partitions must be positive.")
.createWithDefault(16)

val COMET_EXEC_GRACE_HASH_JOIN_FAST_PATH_THRESHOLD: ConfigEntry[Long] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.graceHashJoin.fastPathThreshold")
.category(CATEGORY_EXEC)
.doc(
"Per-task memory budget in bytes for Grace Hash Join fast-path hash tables. " +
"When a build side fits in memory and is smaller than this threshold, " +
"the join executes as a single HashJoinExec without partitioning or spilling. " +
"Set to 0 to disable the fast path. Larger values risk OOM because HashJoinExec " +
"creates non-spillable hash tables.")
.longConf
.checkValue(v => v >= 0, "The fast path threshold must be non-negative.")
.createWithDefault(64L * 1024 * 1024) // 64 MB

val COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.columnarToRow.native.enabled")
.category(CATEGORY_EXEC)
Expand Down Expand Up @@ -381,6 +403,18 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_REPLACE_SMJ_MAX_BUILD_SIZE: ConfigEntry[Long] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin.maxBuildSize")
.category(CATEGORY_EXEC)
.doc(
"Maximum estimated size in bytes of the build side for replacing SortMergeJoin " +
"with ShuffledHashJoin. When the build side's logical plan statistics exceed this " +
"threshold, the SortMergeJoin is kept because sort-merge join's streaming merge " +
"on pre-sorted data outperforms hash join's per-task hash table construction " +
"for large build sides. Set to -1 to disable this check and always replace.")
.longConf
.createWithDefault(100L * 1024 * 1024) // 100 MB

val COMET_EXEC_SHUFFLE_WITH_HASH_PARTITIONING_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.native.shuffle.partitioning.hash.enabled")
.category(CATEGORY_SHUFFLE)
Expand Down
293 changes: 293 additions & 0 deletions docs/source/contributor-guide/grace-hash-join-design.md

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ edition = "2021"
rust-version = "1.88"

[workspace.dependencies]
arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow = { version = "57.3.0", features = ["prettyprint", "ffi", "chrono-tz", "ipc_compression"] }
async-trait = { version = "0.1" }
bytes = { version = "1.11.1" }
parquet = { version = "57.3.0", default-features = false, features = ["experimental"] }
Expand Down
1 change: 1 addition & 0 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ include = [
publish = false

[dependencies]
ahash = "0.8"
arrow = { workspace = true }
parquet = { workspace = true, default-features = false, features = ["experimental", "arrow"] }
futures = { workspace = true }
Expand Down
6 changes: 5 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ struct ExecutionContext {
pub memory_pool_config: MemoryPoolConfig,
/// Whether to log memory usage on each call to execute_plan
pub tracing_enabled: bool,
/// Spark configuration map for comet-specific settings
pub spark_conf: HashMap<String, String>,
}

/// Accept serialized query plan and return the address of the native query plan.
Expand Down Expand Up @@ -322,6 +324,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
explain_native,
memory_pool_config,
tracing_enabled,
spark_conf: spark_config,
});

Ok(Box::into_raw(exec_context) as i64)
Expand Down Expand Up @@ -535,7 +538,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
let start = Instant::now();
let planner =
PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx), partition)
.with_exec_id(exec_context_id);
.with_exec_id(exec_context_id)
.with_spark_conf(exec_context.spark_conf.clone());
let (scans, root_op) = planner.create_plan(
&exec_context.spark_plan,
&mut exec_context.input_sources.clone(),
Expand Down
Loading
Loading