Skip to content

Commit af02182

Browse files
authored
perf: Add memory profiling (#1702)
1 parent d2fb4f0 commit af02182

File tree

6 files changed

+115
-3
lines changed

6 files changed

+115
-3
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,12 @@ object CometConf extends ShimCometConf {
233233
defaultValue = true,
234234
notes = Some("stddev is slower than Spark's implementation"))
235235

236+
val COMET_MEMORY_PROFILING: ConfigEntry[Boolean] = conf("spark.comet.memory.profiling")
237+
.doc("Enable logging of JVM and native memory statistics.")
238+
.internal()
239+
.booleanConf
240+
.createWithDefault(false)
241+
236242
val COMET_MEMORY_OVERHEAD: OptionalConfigEntry[Long] = conf("spark.comet.memoryOverhead")
237243
.doc(
238244
"The amount of additional memory to be allocated per executor process for Comet, in MiB, " +

native/Cargo.lock

Lines changed: 38 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ parquet = { workspace = true, default-features = false, features = ["experimenta
4040
futures = { workspace = true }
4141
mimalloc = { version = "*", default-features = false, optional = true }
4242
tikv-jemallocator = { version = "0.6.0", optional = true, features = ["disable_initial_exec_tls"] }
43+
tikv-jemalloc-ctl = { version = "0.6.0", optional = true, features = ["disable_initial_exec_tls", "stats"] }
4344
tokio = { version = "1", features = ["rt-multi-thread"] }
4445
async-trait = { workspace = true }
4546
log = "0.4"
@@ -70,6 +71,9 @@ url = { workspace = true }
7071
parking_lot = "0.12.3"
7172
datafusion-comet-objectstore-hdfs = { path = "../hdfs", optional = true, default-features = false, features = ["hdfs"] }
7273

74+
[target.'cfg(target_os = "linux")'.dependencies]
75+
procfs = "0.17.0"
76+
7377
[dev-dependencies]
7478
pprof = { version = "0.14.0", features = ["flamegraph"] }
7579
criterion = { version = "0.5.1", features = ["async_tokio"] }
@@ -82,7 +86,7 @@ datafusion-functions-nested = { version = "47.0.0" }
8286
[features]
8387
default = []
8488
hdfs = ["datafusion-comet-objectstore-hdfs"]
85-
jemalloc = ["tikv-jemallocator"]
89+
jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl"]
8690

8791
# exclude optional packages from cargo machete verifications
8892
[package.metadata.cargo-machete]

native/core/src/execution/jni_api.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec};
6868
use crate::execution::spark_plan::SparkPlan;
6969
use log::info;
7070
use once_cell::sync::Lazy;
71+
#[cfg(target_os = "linux")]
72+
use procfs::process::Process;
73+
#[cfg(feature = "jemalloc")]
74+
use tikv_jemalloc_ctl::{epoch, stats};
7175

7276
static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
7377
let mut builder = tokio::runtime::Builder::new_multi_thread();
@@ -126,6 +130,8 @@ struct ExecutionContext {
126130
pub explain_native: bool,
127131
/// Memory pool config
128132
pub memory_pool_config: MemoryPoolConfig,
133+
/// Whether to log memory usage on each call to execute_plan
134+
pub memory_profiling_enabled: bool,
129135
}
130136

131137
/// Accept serialized query plan and return the address of the native query plan.
@@ -151,6 +157,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
151157
task_attempt_id: jlong,
152158
debug_native: jboolean,
153159
explain_native: jboolean,
160+
memory_profiling_enabled: jboolean,
154161
) -> jlong {
155162
try_unwrap_or_throw(&e, |mut env| {
156163
// Init JVM classes
@@ -231,6 +238,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
231238
debug_native: debug_native == 1,
232239
explain_native: explain_native == 1,
233240
memory_pool_config,
241+
memory_profiling_enabled: memory_profiling_enabled != JNI_FALSE,
234242
});
235243

236244
Ok(Box::into_raw(exec_context) as i64)
@@ -359,6 +367,41 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
359367
// Retrieve the query
360368
let exec_context = get_execution_context(exec_context);
361369

370+
// memory profiling is only available on linux
371+
if exec_context.memory_profiling_enabled {
372+
#[cfg(target_os = "linux")]
373+
{
374+
let pid = std::process::id();
375+
let process = Process::new(pid as i32).unwrap();
376+
let statm = process.statm().unwrap();
377+
let page_size = procfs::page_size();
378+
println!(
379+
"NATIVE_MEMORY: {{ resident: {:.0} }}",
380+
(statm.resident * page_size) as f64 / (1024.0 * 1024.0)
381+
);
382+
383+
#[cfg(feature = "jemalloc")]
384+
{
385+
// Obtain a MIB for the `epoch`, `stats.allocated`, and
386+
// `atats.resident` keys:
387+
let e = epoch::mib().unwrap();
388+
let allocated = stats::allocated::mib().unwrap();
389+
let resident = stats::resident::mib().unwrap();
390+
391+
// Many statistics are cached and only updated
392+
// when the epoch is advanced:
393+
e.advance().unwrap();
394+
395+
// Read statistics using MIB key:
396+
let allocated = allocated.read().unwrap() as f64 / (1024.0 * 1024.0);
397+
let resident = resident.read().unwrap() as f64 / (1024.0 * 1024.0);
398+
println!(
399+
"NATIVE_MEMORY_JEMALLOC: {{ allocated: {allocated:.0}, resident: {resident:.0} }}"
400+
);
401+
}
402+
}
403+
}
404+
362405
let exec_context_id = exec_context.id;
363406

364407
// Initialize the execution stream.

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.comet
2121

22+
import java.lang.management.ManagementFactory
23+
2224
import org.apache.spark._
2325
import org.apache.spark.internal.Logging
2426
import org.apache.spark.network.util.ByteUnit
@@ -57,6 +59,7 @@ class CometExecIterator(
5759
extends Iterator[ColumnarBatch]
5860
with Logging {
5961

62+
private val memoryProfilingEnabled = CometConf.COMET_MEMORY_PROFILING.get()
6063
private val nativeLib = new Native()
6164
private val nativeUtil = new NativeUtil()
6265
private val cometBatchIterators = inputs.map { iterator =>
@@ -92,7 +95,8 @@ class CometExecIterator(
9295
memoryLimitPerTask = getMemoryLimitPerTask(conf),
9396
taskAttemptId = TaskContext.get().taskAttemptId,
9497
debug = COMET_DEBUG_ENABLED.get(),
95-
explain = COMET_EXPLAIN_NATIVE_ENABLED.get())
98+
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
99+
memoryProfilingEnabled)
96100
}
97101

98102
private var nextBatch: Option[ColumnarBatch] = None
@@ -130,6 +134,22 @@ class CometExecIterator(
130134
def getNextBatch(): Option[ColumnarBatch] = {
131135
assert(partitionIndex >= 0 && partitionIndex < numParts)
132136

137+
if (memoryProfilingEnabled) {
138+
val memoryMXBean = ManagementFactory.getMemoryMXBean
139+
val heap = memoryMXBean.getHeapMemoryUsage
140+
val nonHeap = memoryMXBean.getNonHeapMemoryUsage
141+
142+
def mb(n: Long) = n / 1024 / 1024
143+
144+
// scalastyle:off println
145+
println(
146+
"JVM_MEMORY: { " +
147+
s"heapUsed: ${mb(heap.getUsed)}, heapCommitted: ${mb(heap.getCommitted)}, " +
148+
s"nonHeapUsed: ${mb(nonHeap.getUsed)}, nonHeapCommitted: ${mb(nonHeap.getCommitted)} " +
149+
"}")
150+
// scalastyle:on println
151+
}
152+
133153
nativeUtil.getNextBatch(
134154
numOutputCols,
135155
(arrayAddrs, schemaAddrs) => {

spark/src/main/scala/org/apache/comet/Native.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ class Native extends NativeBase {
6666
memoryLimitPerTask: Long,
6767
taskAttemptId: Long,
6868
debug: Boolean,
69-
explain: Boolean): Long
69+
explain: Boolean,
70+
memoryProfilingEnabled: Boolean): Long
7071
// scalastyle:on
7172

7273
/**

0 commit comments

Comments
 (0)