Skip to content

Commit 23dfb03

Browse files
authored
perf: Use a global tokio runtime (#1614)
1 parent 1687151 commit 23dfb03

File tree

9 files changed

+40
-67
lines changed

9 files changed

+40
-67
lines changed

common/src/main/java/org/apache/comet/parquet/Native.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,7 @@ public static native long initRecordBatchReader(
257257
byte[] requiredSchema,
258258
byte[] dataSchema,
259259
String sessionTimezone,
260-
int batchSize,
261-
int workerThreads,
262-
int blockingThreads);
260+
int batchSize);
263261

264262
// arrow native version of read batch
265263
/**

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -357,16 +357,6 @@ public void init() throws URISyntaxException, IOException {
357357
conf.getInt(
358358
CometConf.COMET_BATCH_SIZE().key(),
359359
(Integer) CometConf.COMET_BATCH_SIZE().defaultValue().get());
360-
int workerThreads =
361-
conf.getInt(
362-
CometConf.COMET_WORKER_THREADS().key(),
363-
(Integer) CometConf.COMET_WORKER_THREADS().defaultValue().get());
364-
;
365-
int blockingThreads =
366-
conf.getInt(
367-
CometConf.COMET_BLOCKING_THREADS().key(),
368-
(Integer) CometConf.COMET_BLOCKING_THREADS().defaultValue().get());
369-
;
370360
this.handle =
371361
Native.initRecordBatchReader(
372362
filePath,
@@ -377,9 +367,7 @@ public void init() throws URISyntaxException, IOException {
377367
serializedRequestedArrowSchema,
378368
serializedDataArrowSchema,
379369
timeZoneId,
380-
batchSize,
381-
workerThreads,
382-
blockingThreads);
370+
batchSize);
383371
isInitialized = true;
384372
}
385373

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -457,22 +457,6 @@ object CometConf extends ShimCometConf {
457457
.booleanConf
458458
.createWithDefault(false)
459459

460-
val COMET_WORKER_THREADS: ConfigEntry[Int] =
461-
conf("spark.comet.workerThreads")
462-
.internal()
463-
.doc("The number of worker threads used for Comet native execution. " +
464-
"By default, this config is 4.")
465-
.intConf
466-
.createWithDefault(4)
467-
468-
val COMET_BLOCKING_THREADS: ConfigEntry[Int] =
469-
conf("spark.comet.blockingThreads")
470-
.internal()
471-
.doc("The number of blocking threads used for Comet native execution. " +
472-
"By default, this config is 10.")
473-
.intConf
474-
.createWithDefault(10)
475-
476460
val COMET_BATCH_SIZE: ConfigEntry[Int] = conf("spark.comet.batchSize")
477461
.doc("The columnar batch size, i.e., the maximum number of rows that a batch can contain.")
478462
.intConf

docs/source/user-guide/tuning.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ under the License.
2121

2222
Comet provides some tuning options to help you get the best performance from your queries.
2323

24+
## Configuring Tokio Runtime
25+
26+
Comet uses a global tokio runtime per executor process using tokio's defaults of one worker thread per core and a
27+
maximum of 512 blocking threads. These values can be overridden using the environment variables `COMET_WORKER_THREADS`
28+
and `COMET_MAX_BLOCKING_THREADS`.
29+
30+
DataFusion currently has a known issue when merging spill files in sort operators where the process can deadlock if
31+
there are more spill files than `COMET_MAX_BLOCKING_THREADS` ([tracking issue](https://github.com/apache/datafusion/issues/15323)).
32+
2433
## Memory Tuning
2534

2635
It is necessary to specify how much memory Comet can use in addition to memory already allocated to Spark. In some

native/core/src/execution/jni_api.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,29 @@ use crate::execution::spark_plan::SparkPlan;
7171
use log::info;
7272
use once_cell::sync::{Lazy, OnceCell};
7373

74+
static TOKIO_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
75+
let mut builder = tokio::runtime::Builder::new_multi_thread();
76+
if let Some(n) = parse_usize_env_var("COMET_WORKER_THREADS") {
77+
builder.worker_threads(n);
78+
}
79+
if let Some(n) = parse_usize_env_var("COMET_MAX_BLOCKING_THREADS") {
80+
builder.max_blocking_threads(n);
81+
}
82+
builder
83+
.enable_all()
84+
.build()
85+
.expect("Failed to create Tokio runtime")
86+
});
87+
88+
fn parse_usize_env_var(name: &str) -> Option<usize> {
89+
std::env::var_os(name).and_then(|n| n.to_str().and_then(|s| s.parse::<usize>().ok()))
90+
}
91+
92+
/// Function to get a handle to the global Tokio runtime
93+
pub fn get_runtime() -> &'static Runtime {
94+
&TOKIO_RUNTIME
95+
}
96+
7497
/// Comet native execution context. Kept alive across JNI calls.
7598
struct ExecutionContext {
7699
/// The id of the execution context.
@@ -89,8 +112,6 @@ struct ExecutionContext {
89112
pub input_sources: Vec<Arc<GlobalRef>>,
90113
/// The record batch stream to pull results from
91114
pub stream: Option<SendableRecordBatchStream>,
92-
/// The Tokio runtime used for async.
93-
pub runtime: Runtime,
94115
/// Native metrics
95116
pub metrics: Arc<GlobalRef>,
96117
// The interval in milliseconds to update metrics
@@ -177,8 +198,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
177198
task_attempt_id: jlong,
178199
debug_native: jboolean,
179200
explain_native: jboolean,
180-
worker_threads: jint,
181-
blocking_threads: jint,
182201
) -> jlong {
183202
try_unwrap_or_throw(&e, |mut env| {
184203
// Init JVM classes
@@ -192,13 +211,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
192211
// Deserialize query plan
193212
let spark_plan = serde::deserialize_op(bytes.as_slice())?;
194213

195-
// Use multi-threaded tokio runtime to prevent blocking spawned tasks if any
196-
let runtime = tokio::runtime::Builder::new_multi_thread()
197-
.worker_threads(worker_threads as usize)
198-
.max_blocking_threads(blocking_threads as usize)
199-
.enable_all()
200-
.build()?;
201-
202214
let metrics = Arc::new(jni_new_global_ref!(env, metrics_node)?);
203215

204216
// Get the global references of input sources
@@ -258,7 +270,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
258270
scans: vec![],
259271
input_sources,
260272
stream: None,
261-
runtime,
262273
metrics,
263274
metrics_update_interval,
264275
metrics_last_update_time: Instant::now(),
@@ -559,7 +570,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
559570
loop {
560571
// Polling the stream.
561572
let next_item = exec_context.stream.as_mut().unwrap().next();
562-
let poll_output = exec_context.runtime.block_on(async { poll!(next_item) });
573+
let poll_output = get_runtime().block_on(async { poll!(next_item) });
563574

564575
// update metrics at interval
565576
if let Some(interval) = exec_context.metrics_update_interval {

native/core/src/parquet/mod.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use jni::{
4444
};
4545

4646
use self::util::jni::TypePromotionInfo;
47+
use crate::execution::jni_api::get_runtime;
4748
use crate::execution::operators::ExecutionError;
4849
use crate::execution::planner::PhysicalPlanner;
4950
use crate::execution::serde;
@@ -606,7 +607,6 @@ enum ParquetReaderState {
606607
}
607608
/// Parquet read context maintained across multiple JNI calls.
608609
struct BatchContext {
609-
runtime: tokio::runtime::Runtime,
610610
batch_stream: Option<SendableRecordBatchStream>,
611611
current_batch: Option<RecordBatch>,
612612
reader_state: ParquetReaderState,
@@ -652,8 +652,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
652652
data_schema: jbyteArray,
653653
session_timezone: jstring,
654654
batch_size: jint,
655-
worker_threads: jint,
656-
blocking_threads: jint,
657655
) -> jlong {
658656
try_unwrap_or_throw(&e, |mut env| unsafe {
659657
let session_config = SessionConfig::new().with_batch_size(batch_size as usize);
@@ -666,12 +664,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
666664
.unwrap()
667665
.into();
668666

669-
let runtime = tokio::runtime::Builder::new_multi_thread()
670-
.worker_threads(worker_threads as usize)
671-
.max_blocking_threads(blocking_threads as usize)
672-
.enable_all()
673-
.build()?;
674-
675667
let (object_store_url, object_store_path) =
676668
prepare_object_store(session_ctx.runtime_env(), path.clone())?;
677669

@@ -718,7 +710,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
718710
let batch_stream = Some(scan.execute(partition_index, session_ctx.task_ctx())?);
719711

720712
let ctx = BatchContext {
721-
runtime,
722713
batch_stream,
723714
current_batch: None,
724715
reader_state: ParquetReaderState::Init,
@@ -738,7 +729,7 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_readNextRecordBatch(
738729
let context = get_batch_context(handle)?;
739730
let mut rows_read: i32 = 0;
740731
let batch_stream = context.batch_stream.as_mut().unwrap();
741-
let runtime = &context.runtime;
732+
let runtime = get_runtime();
742733

743734
loop {
744735
let next_item = batch_stream.next();

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.network.util.ByteUnit
2525
import org.apache.spark.sql.comet.CometMetricNode
2626
import org.apache.spark.sql.vectorized._
2727

28-
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_METRICS_UPDATE_INTERVAL, COMET_WORKER_THREADS}
28+
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_METRICS_UPDATE_INTERVAL}
2929
import org.apache.comet.vector.NativeUtil
3030

3131
/**
@@ -92,9 +92,7 @@ class CometExecIterator(
9292
memoryLimitPerTask = getMemoryLimitPerTask(conf),
9393
taskAttemptId = TaskContext.get().taskAttemptId,
9494
debug = COMET_DEBUG_ENABLED.get(),
95-
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
96-
workerThreads = COMET_WORKER_THREADS.get(),
97-
blockingThreads = COMET_BLOCKING_THREADS.get())
95+
explain = COMET_EXPLAIN_NATIVE_ENABLED.get())
9896
}
9997

10098
private var nextBatch: Option[ColumnarBatch] = None

spark/src/main/scala/org/apache/comet/Native.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,7 @@ class Native extends NativeBase {
6666
memoryLimitPerTask: Long,
6767
taskAttemptId: Long,
6868
debug: Boolean,
69-
explain: Boolean,
70-
workerThreads: Int,
71-
blockingThreads: Int): Long
69+
explain: Boolean): Long
7270
// scalastyle:on
7371

7472
/**

spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,10 +233,6 @@ object CometParquetFileFormat extends Logging with ShimSQLConf {
233233
CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key,
234234
CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.get())
235235
hadoopConf.setInt(CometConf.COMET_BATCH_SIZE.key, CometConf.COMET_BATCH_SIZE.get())
236-
hadoopConf.setInt(CometConf.COMET_WORKER_THREADS.key, CometConf.COMET_WORKER_THREADS.get())
237-
hadoopConf.setInt(
238-
CometConf.COMET_BLOCKING_THREADS.key,
239-
CometConf.COMET_BLOCKING_THREADS.get())
240236
}
241237

242238
def getDatetimeRebaseSpec(

0 commit comments

Comments
 (0)