Skip to content

Commit 5163e15

Browse files
devanbenzDevanalamb
authored
Make RuntimeEnvBuilder rather than RuntimeConfig (#12157)
* feat/12156: Make RuntimeEnvBuilder rather than RuntimeConfig Signed-off-by: Devan <[email protected]> * feat/12156: Make RuntimeEnvBuilder rather than RuntimeConfig Signed-off-by: Devan <[email protected]> * doc link Signed-off-by: Devan <[email protected]> * update to use builder for rt env Signed-off-by: Devan <[email protected]> * update to use builder Signed-off-by: Devan <[email protected]> * clippy Signed-off-by: Devan <[email protected]> * touch Signed-off-by: Devan <[email protected]> * fmt Signed-off-by: Devan <[email protected]> * revert some formatting that occurred Signed-off-by: Devan <[email protected]> * revert some formatting that occurred Signed-off-by: Devan <[email protected]> * use builder Signed-off-by: Devan <[email protected]> * fmt Signed-off-by: Devan <[email protected]> * Update datafusion/execution/src/runtime_env.rs Co-authored-by: Andrew Lamb <[email protected]> --------- Signed-off-by: Devan <[email protected]> Co-authored-by: Devan <[email protected]> Co-authored-by: Andrew Lamb <[email protected]>
1 parent 7d8bb0b commit 5163e15

File tree

15 files changed

+129
-83
lines changed

15 files changed

+129
-83
lines changed

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -680,7 +680,7 @@ mod tests {
680680
use datafusion_common::cast::as_string_array;
681681
use datafusion_common::internal_err;
682682
use datafusion_common::stats::Precision;
683-
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
683+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
684684
use datafusion_expr::{col, lit};
685685

686686
use crate::execution::session_state::SessionStateBuilder;
@@ -863,7 +863,7 @@ mod tests {
863863
async fn query_compress_data(
864864
file_compression_type: FileCompressionType,
865865
) -> Result<()> {
866-
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new()).unwrap());
866+
let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
867867
let mut cfg = SessionConfig::new();
868868
cfg.options_mut().catalog.has_header = true;
869869
let session_state = SessionStateBuilder::new()

datafusion/core/src/execution/context/mod.rs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -212,15 +212,15 @@ where
212212
/// # use std::sync::Arc;
213213
/// # use datafusion::prelude::*;
214214
/// # use datafusion::execution::SessionStateBuilder;
215-
/// # use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
215+
/// # use datafusion_execution::runtime_env::RuntimeEnvBuilder;
216216
/// // Configure a 4k batch size
217217
/// let config = SessionConfig::new() .with_batch_size(4 * 1024);
218218
///
219219
/// // configure a memory limit of 1GB with 20% slop
220-
/// let runtime_env = RuntimeEnv::new(
221-
/// RuntimeConfig::new()
220+
/// let runtime_env = RuntimeEnvBuilder::new()
222221
/// .with_memory_limit(1024 * 1024 * 1024, 0.80)
223-
/// ).unwrap();
222+
/// .build()
223+
/// .unwrap();
224224
///
225225
/// // Create a SessionState using the config and runtime_env
226226
/// let state = SessionStateBuilder::new()
@@ -1623,7 +1623,7 @@ mod tests {
16231623
use super::{super::options::CsvReadOptions, *};
16241624
use crate::assert_batches_eq;
16251625
use crate::execution::memory_pool::MemoryConsumer;
1626-
use crate::execution::runtime_env::RuntimeConfig;
1626+
use crate::execution::runtime_env::RuntimeEnvBuilder;
16271627
use crate::test;
16281628
use crate::test_util::{plan_and_collect, populate_csv_partitions};
16291629

@@ -1758,8 +1758,7 @@ mod tests {
17581758
let path = path.join("tests/tpch-csv");
17591759
let url = format!("file://{}", path.display());
17601760

1761-
let rt_cfg = RuntimeConfig::new();
1762-
let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap());
1761+
let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
17631762
let cfg = SessionConfig::new()
17641763
.set_str("datafusion.catalog.location", url.as_str())
17651764
.set_str("datafusion.catalog.format", "CSV")

datafusion/core/tests/fuzz_cases/sort_fuzz.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use arrow::{
2222
compute::SortOptions,
2323
record_batch::RecordBatch,
2424
};
25-
use datafusion::execution::runtime_env::RuntimeConfig;
25+
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
2626
use datafusion::physical_plan::expressions::PhysicalSortExpr;
2727
use datafusion::physical_plan::memory::MemoryExec;
2828
use datafusion::physical_plan::sorts::sort::SortExec;
@@ -136,10 +136,12 @@ impl SortTest {
136136
.sort_spill_reservation_bytes,
137137
);
138138

139-
let runtime_env = RuntimeConfig::new()
140-
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
141-
.build();
142-
let runtime = Arc::new(runtime_env.unwrap());
139+
let runtime = Arc::new(
140+
RuntimeEnvBuilder::new()
141+
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
142+
.build()
143+
.unwrap(),
144+
);
143145
SessionContext::new_with_config_rt(session_config, runtime)
144146
} else {
145147
SessionContext::new_with_config(session_config)

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use tokio::fs::File;
4040
use datafusion::datasource::streaming::StreamingTable;
4141
use datafusion::datasource::{MemTable, TableProvider};
4242
use datafusion::execution::disk_manager::DiskManagerConfig;
43-
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
43+
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
4444
use datafusion::execution::session_state::SessionStateBuilder;
4545
use datafusion::physical_optimizer::join_selection::JoinSelection;
4646
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
@@ -509,17 +509,17 @@ impl TestCase {
509509

510510
let table = scenario.table();
511511

512-
let mut rt_config = RuntimeConfig::new()
512+
let rt_config = RuntimeEnvBuilder::new()
513513
// disk manager setting controls the spilling
514514
.with_disk_manager(disk_manager_config)
515515
.with_memory_limit(memory_limit, MEMORY_FRACTION);
516516

517-
if let Some(pool) = memory_pool {
518-
rt_config = rt_config.with_memory_pool(pool);
517+
let runtime = if let Some(pool) = memory_pool {
518+
rt_config.with_memory_pool(pool).build().unwrap()
519+
} else {
520+
rt_config.build().unwrap()
519521
};
520522

521-
let runtime = RuntimeEnv::new(rt_config).unwrap();
522-
523523
// Configure execution
524524
let builder = SessionStateBuilder::new()
525525
.with_config(config)

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use datafusion_execution::cache::cache_unit::{
3333
DefaultFileStatisticsCache, DefaultListFilesCache,
3434
};
3535
use datafusion_execution::config::SessionConfig;
36-
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
36+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
3737

3838
use datafusion::execution::session_state::SessionStateBuilder;
3939
use tempfile::tempdir;
@@ -198,7 +198,10 @@ fn get_cache_runtime_state() -> (
198198
.with_list_files_cache(Some(list_file_cache.clone()));
199199

200200
let rt = Arc::new(
201-
RuntimeEnv::new(RuntimeConfig::new().with_cache_manager(cache_config)).unwrap(),
201+
RuntimeEnvBuilder::new()
202+
.with_cache_manager(cache_config)
203+
.build()
204+
.expect("could not build runtime environment"),
202205
);
203206
let state = SessionContext::new_with_config_rt(SessionConfig::default(), rt).state();
204207

datafusion/execution/src/runtime_env.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use url::Url;
4141
/// Execution runtime environment that manages system resources such
4242
/// as memory, disk, cache and storage.
4343
///
44-
/// A [`RuntimeEnv`] is created from a [`RuntimeConfig`] and has the
44+
/// A [`RuntimeEnv`] is created from a [`RuntimeEnvBuilder`] and has the
4545
/// following resource management functionality:
4646
///
4747
/// * [`MemoryPool`]: Manage memory
@@ -147,13 +147,17 @@ impl RuntimeEnv {
147147

148148
impl Default for RuntimeEnv {
149149
fn default() -> Self {
150-
RuntimeEnv::new(RuntimeConfig::new()).unwrap()
150+
RuntimeEnvBuilder::new().build().unwrap()
151151
}
152152
}
153153

154+
/// Please see: <https://github.com/apache/datafusion/issues/12156>
155+
/// This a type alias for backwards compatibility.
156+
pub type RuntimeConfig = RuntimeEnvBuilder;
157+
154158
#[derive(Clone)]
155159
/// Execution runtime configuration
156-
pub struct RuntimeConfig {
160+
pub struct RuntimeEnvBuilder {
157161
/// DiskManager to manage temporary disk file usage
158162
pub disk_manager: DiskManagerConfig,
159163
/// [`MemoryPool`] from which to allocate memory
@@ -166,13 +170,13 @@ pub struct RuntimeConfig {
166170
pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
167171
}
168172

169-
impl Default for RuntimeConfig {
173+
impl Default for RuntimeEnvBuilder {
170174
fn default() -> Self {
171175
Self::new()
172176
}
173177
}
174178

175-
impl RuntimeConfig {
179+
impl RuntimeEnvBuilder {
176180
/// New with default values
177181
pub fn new() -> Self {
178182
Self {
@@ -229,8 +233,17 @@ impl RuntimeConfig {
229233
self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()]))
230234
}
231235

232-
/// Build a `RuntimeEnv` object from the configuration
236+
/// Build a RuntimeEnv
233237
pub fn build(self) -> Result<RuntimeEnv> {
234-
RuntimeEnv::new(self)
238+
let memory_pool = self
239+
.memory_pool
240+
.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));
241+
242+
Ok(RuntimeEnv {
243+
memory_pool,
244+
disk_manager: DiskManager::try_new(self.disk_manager)?,
245+
cache_manager: CacheManager::try_new(&self.cache_manager)?,
246+
object_store_registry: self.object_store_registry,
247+
})
235248
}
236249
}

datafusion/execution/src/task.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
config::SessionConfig,
2525
memory_pool::MemoryPool,
2626
registry::FunctionRegistry,
27-
runtime_env::{RuntimeConfig, RuntimeEnv},
27+
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
2828
};
2929
use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
3030
use datafusion_expr::planner::ExprPlanner;
@@ -57,7 +57,8 @@ pub struct TaskContext {
5757

5858
impl Default for TaskContext {
5959
fn default() -> Self {
60-
let runtime = RuntimeEnv::new(RuntimeConfig::new())
60+
let runtime = RuntimeEnvBuilder::new()
61+
.build()
6162
.expect("default runtime created successfully");
6263

6364
// Create a default task context, mostly useful for testing

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1212,7 +1212,7 @@ mod tests {
12121212
};
12131213
use datafusion_execution::config::SessionConfig;
12141214
use datafusion_execution::memory_pool::FairSpillPool;
1215-
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
1215+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
12161216
use datafusion_functions_aggregate::array_agg::array_agg_udaf;
12171217
use datafusion_functions_aggregate::average::avg_udaf;
12181218
use datafusion_functions_aggregate::count::count_udaf;
@@ -1324,11 +1324,10 @@ mod tests {
13241324
fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> {
13251325
let session_config = SessionConfig::new().with_batch_size(batch_size);
13261326
let runtime = Arc::new(
1327-
RuntimeEnv::new(
1328-
RuntimeConfig::default()
1329-
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory))),
1330-
)
1331-
.unwrap(),
1327+
RuntimeEnvBuilder::default()
1328+
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
1329+
.build()
1330+
.unwrap(),
13321331
);
13331332
let task_ctx = TaskContext::default()
13341333
.with_session_config(session_config)
@@ -1809,7 +1808,9 @@ mod tests {
18091808
let input_schema = input.schema();
18101809

18111810
let runtime = Arc::new(
1812-
RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0)).unwrap(),
1811+
RuntimeEnvBuilder::default()
1812+
.with_memory_limit(1, 1.0)
1813+
.build()?,
18131814
);
18141815
let task_ctx = TaskContext::default().with_runtime(runtime);
18151816
let task_ctx = Arc::new(task_ctx);

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ mod tests {
488488
use crate::test::build_table_scan_i32;
489489

490490
use datafusion_common::{assert_batches_sorted_eq, assert_contains};
491-
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
491+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
492492

493493
async fn join_collect(
494494
left: Arc<dyn ExecutionPlan>,
@@ -673,8 +673,11 @@ mod tests {
673673

674674
#[tokio::test]
675675
async fn test_overallocation() -> Result<()> {
676-
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
677-
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
676+
let runtime = Arc::new(
677+
RuntimeEnvBuilder::new()
678+
.with_memory_limit(100, 1.0)
679+
.build()?,
680+
);
678681
let task_ctx = TaskContext::default().with_runtime(runtime);
679682
let task_ctx = Arc::new(task_ctx);
680683

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1572,7 +1572,7 @@ mod tests {
15721572
ScalarValue,
15731573
};
15741574
use datafusion_execution::config::SessionConfig;
1575-
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
1575+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
15761576
use datafusion_expr::Operator;
15771577
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
15781578

@@ -3798,8 +3798,11 @@ mod tests {
37983798
];
37993799

38003800
for join_type in join_types {
3801-
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
3802-
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
3801+
let runtime = Arc::new(
3802+
RuntimeEnvBuilder::new()
3803+
.with_memory_limit(100, 1.0)
3804+
.build()?,
3805+
);
38033806
let task_ctx = TaskContext::default().with_runtime(runtime);
38043807
let task_ctx = Arc::new(task_ctx);
38053808

@@ -3871,8 +3874,11 @@ mod tests {
38713874
];
38723875

38733876
for join_type in join_types {
3874-
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
3875-
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
3877+
let runtime = Arc::new(
3878+
RuntimeEnvBuilder::new()
3879+
.with_memory_limit(100, 1.0)
3880+
.build()?,
3881+
);
38763882
let session_config = SessionConfig::default().with_batch_size(50);
38773883
let task_ctx = TaskContext::default()
38783884
.with_session_config(session_config)

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ mod tests {
644644

645645
use arrow::datatypes::{DataType, Field};
646646
use datafusion_common::{assert_batches_sorted_eq, assert_contains, ScalarValue};
647-
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
647+
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
648648
use datafusion_expr::Operator;
649649
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
650650
use datafusion_physical_expr::{Partitioning, PhysicalExpr};
@@ -1019,8 +1019,11 @@ mod tests {
10191019
];
10201020

10211021
for join_type in join_types {
1022-
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
1023-
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
1022+
let runtime = Arc::new(
1023+
RuntimeEnvBuilder::new()
1024+
.with_memory_limit(100, 1.0)
1025+
.build()?,
1026+
);
10241027
let task_ctx = TaskContext::default().with_runtime(runtime);
10251028
let task_ctx = Arc::new(task_ctx);
10261029

0 commit comments

Comments
 (0)