Skip to content

Commit 1ed00e1

Browse files
committed
Merge branch 'main' into feature/array_remove
2 parents 866423b + 3f0d442 commit 1ed00e1

36 files changed

+627
-295
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -272,18 +272,19 @@ object CometConf extends ShimCometConf {
272272
.booleanConf
273273
.createWithDefault(false)
274274

275-
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf(
276-
s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
277-
.doc(
278-
"The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. " +
279-
"Compression can be disabled by setting spark.shuffle.compress=false.")
280-
.stringConf
281-
.checkValues(Set("zstd"))
282-
.createWithDefault("zstd")
275+
val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] =
276+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec")
277+
.doc(
278+
"The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and " +
279+
"snappy are supported. Compression can be disabled by setting " +
280+
"spark.shuffle.compress=false.")
281+
.stringConf
282+
.checkValues(Set("zstd", "lz4", "snappy"))
283+
.createWithDefault("lz4")
283284

284-
val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] =
285-
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.level")
286-
.doc("The compression level to use when compression shuffle files.")
285+
val COMET_EXEC_SHUFFLE_COMPRESSION_ZSTD_LEVEL: ConfigEntry[Int] =
286+
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.zstd.level")
287+
.doc("The compression level to use when compressing shuffle files with zstd.")
287288
.intConf
288289
.createWithDefault(1)
289290

@@ -452,15 +453,6 @@ object CometConf extends ShimCometConf {
452453
.intConf
453454
.createWithDefault(8192)
454455

455-
val COMET_EXEC_MEMORY_FRACTION: ConfigEntry[Double] = conf("spark.comet.exec.memoryFraction")
456-
.doc(
457-
"The fraction of memory from Comet memory overhead that the native memory " +
458-
"manager can use for execution. The purpose of this config is to set aside memory for " +
459-
"untracked data structures, as well as imprecise size estimation during memory " +
460-
"acquisition.")
461-
.doubleConf
462-
.createWithDefault(0.7)
463-
464456
val COMET_PARQUET_ENABLE_DIRECT_BUFFER: ConfigEntry[Boolean] =
465457
conf("spark.comet.parquet.enable.directBuffer")
466458
.doc("Whether to use Java direct byte buffer when reading Parquet.")

docs/source/user-guide/configs.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,11 @@ Comet provides the following configuration settings.
4747
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true |
4848
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
4949
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
50-
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. | 0.7 |
5150
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. Available memory pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global' and 'fair_spill_global', By default, this config is 'greedy_task_shared'. | greedy_task_shared |
5251
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
5352
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
54-
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. Compression can be disabled by setting spark.shuffle.compress=false. | zstd |
55-
| spark.comet.exec.shuffle.compression.level | The compression level to use when compression shuffle files. | 1 |
53+
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
54+
| spark.comet.exec.shuffle.compression.zstd.level | The compression level to use when compressing shuffle files with zstd. | 1 |
5655
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
5756
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
5857
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |

docs/source/user-guide/tuning.md

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,52 @@ Comet provides some tuning options to help you get the best performance from you
2323

2424
## Memory Tuning
2525

26-
Comet shares an off-heap memory pool between Spark and Comet. This requires setting `spark.memory.offHeap.enabled=true`.
27-
If this setting is not enabled, Comet will not accelerate queries and will fall back to Spark.
26+
### Unified Memory Management with Off-Heap Memory
27+
28+
The recommended way to share memory between Spark and Comet is to set `spark.memory.offHeap.enabled=true`. This allows
29+
Comet to share an off-heap memory pool with Spark. The size of the pool is specified by `spark.memory.offHeap.size`. For more details about Spark off-heap memory mode, please refer to Spark documentation: https://spark.apache.org/docs/latest/configuration.html.
30+
31+
### Dedicated Comet Memory Pools
32+
33+
Spark uses on-heap memory mode by default, i.e., the `spark.memory.offHeap.enabled` setting is not enabled. If Spark is under on-heap memory mode, Comet will use its own dedicated memory pools that
34+
are not shared with Spark. This requires additional configuration settings to be specified to set the size and type of
35+
memory pool to use.
36+
37+
The size of the pool can be set explicitly with `spark.comet.memoryOverhead`. If this setting is not specified then
38+
the memory overhead will be calculated by multiplying the executor memory by `spark.comet.memory.overhead.factor`
39+
(defaults to `0.2`).
40+
41+
The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`.
42+
43+
The valid pool types are:
44+
45+
- `greedy`
46+
- `greedy_global`
47+
- `greedy_task_shared`
48+
- `fair_spill`
49+
- `fair_spill_global`
50+
- `fair_spill_task_shared`
51+
52+
Pool types ending with `_global` use a single global memory pool between all tasks on same executor.
53+
54+
Pool types ending with `_task_shared` share a single memory pool across all attempts for a single task.
55+
56+
Other pool types create a dedicated pool per native query plan using a fraction of the available pool size based on number of cores
57+
and cores per task.
58+
59+
The `greedy*` pool types use DataFusion's [GreedyMemoryPool], which implements a greedy first-come first-serve limit. This
60+
pool works well for queries that do not need to spill or have a single spillable operator.
61+
62+
The `fair_spill*` pool types use DataFusion's [FairSpillPool], which prevents spillable reservations from using more
63+
than an even fraction of the available memory sans any unspillable reservations
64+
(i.e. `(pool_size - unspillable_memory) / num_spillable_reservations)`). This pool works best when you know beforehand
65+
the query has multiple spillable operators that will likely all need to spill. Sometimes it will cause spills even
66+
when there was sufficient memory (reserved for other operators) to avoid doing so. Unspillable memory is allocated in
67+
a first-come, first-serve fashion
68+
69+
[GreedyMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.GreedyMemoryPool.html
70+
[FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html
2871

29-
Each executor will have a single memory pool which will be shared by all native plans being executed within that
30-
process, and by Spark itself. The size of the pool is specified by `spark.memory.offHeap.size`.
3172

3273
### Determining How Much Memory to Allocate
3374

@@ -106,15 +147,19 @@ then any shuffle operations that cannot be supported in this mode will fall back
106147
### Shuffle Compression
107148

108149
By default, Spark compresses shuffle files using LZ4 compression. Comet overrides this behavior with ZSTD compression.
109-
Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in
150+
Compression can be disabled by setting `spark.shuffle.compress=false`, which may result in faster shuffle times in
110151
certain environments, such as single-node setups with fast NVMe drives, at the expense of increased disk space usage.
111152

112153
## Explain Plan
154+
113155
### Extended Explain
156+
114157
With Spark 4.0.0 and newer, Comet can provide extended explain plan information in the Spark UI. Currently this lists
115158
reasons why Comet may not have been enabled for specific operations.
116159
To enable this, in the Spark configuration, set the following:
160+
117161
```shell
118162
-c spark.sql.extendedExplainProviders=org.apache.comet.ExtendedExplainInfo
119163
```
120-
This will add a section to the detailed plan displayed in the Spark SQL UI page.
164+
165+
This will add a section to the detailed plan displayed in the Spark SQL UI page.

native/Cargo.lock

Lines changed: 25 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ serde = { version = "1", features = ["derive"] }
5252
lazy_static = "1.4.0"
5353
prost = "0.12.1"
5454
jni = "0.21"
55+
snap = "1.1"
56+
# we disable default features in lz4_flex to force the use of the faster unsafe encoding and decoding implementation
57+
lz4_flex = { version = "0.11.3", default-features = false }
5558
zstd = "0.11"
5659
rand = { workspace = true}
5760
num = { workspace = true }

native/core/benches/row_columnar.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use arrow::datatypes::DataType as ArrowDataType;
1919
use comet::execution::shuffle::row::{
2020
process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow,
2121
};
22+
use comet::execution::shuffle::CompressionCodec;
2223
use criterion::{criterion_group, criterion_main, Criterion};
2324
use tempfile::Builder;
2425

@@ -77,6 +78,7 @@ fn benchmark(c: &mut Criterion) {
7778
false,
7879
0,
7980
None,
81+
&CompressionCodec::Zstd(1),
8082
)
8183
.unwrap();
8284
});

0 commit comments

Comments
 (0)