Skip to content

Commit ac74cd3

Browse files
authored
Minor: Add `RuntimeEnvBuilder::build_arc() (#12213)
1 parent 77e0e3b commit ac74cd3

File tree

14 files changed

+75
-102
lines changed

14 files changed

+75
-102
lines changed

datafusion/core/src/execution/context/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,13 +219,13 @@ where
219219
/// // configure a memory limit of 1GB with 20% slop
220220
/// let runtime_env = RuntimeEnvBuilder::new()
221221
/// .with_memory_limit(1024 * 1024 * 1024, 0.80)
222-
/// .build()
222+
/// .build_arc()
223223
/// .unwrap();
224224
///
225225
/// // Create a SessionState using the config and runtime_env
226226
/// let state = SessionStateBuilder::new()
227227
/// .with_config(config)
228-
/// .with_runtime_env(Arc::new(runtime_env))
228+
/// .with_runtime_env(runtime_env)
229229
/// // include support for built in functions and configurations
230230
/// .with_default_features()
231231
/// .build();
@@ -1758,7 +1758,7 @@ mod tests {
17581758
let path = path.join("tests/tpch-csv");
17591759
let url = format!("file://{}", path.display());
17601760

1761-
let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
1761+
let runtime = RuntimeEnvBuilder::new().build_arc()?;
17621762
let cfg = SessionConfig::new()
17631763
.set_str("datafusion.catalog.location", url.as_str())
17641764
.set_str("datafusion.catalog.format", "CSV")

datafusion/core/tests/fuzz_cases/sort_fuzz.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,10 @@ impl SortTest {
136136
.sort_spill_reservation_bytes,
137137
);
138138

139-
let runtime = Arc::new(
140-
RuntimeEnvBuilder::new()
141-
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
142-
.build()
143-
.unwrap(),
144-
);
139+
let runtime = RuntimeEnvBuilder::new()
140+
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
141+
.build_arc()
142+
.unwrap();
145143
SessionContext::new_with_config_rt(session_config, runtime)
146144
} else {
147145
SessionContext::new_with_config(session_config)

datafusion/core/tests/memory_limit/mod.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -509,21 +509,20 @@ impl TestCase {
509509

510510
let table = scenario.table();
511511

512-
let rt_config = RuntimeEnvBuilder::new()
512+
let mut builder = RuntimeEnvBuilder::new()
513513
// disk manager setting controls the spilling
514514
.with_disk_manager(disk_manager_config)
515515
.with_memory_limit(memory_limit, MEMORY_FRACTION);
516516

517-
let runtime = if let Some(pool) = memory_pool {
518-
rt_config.with_memory_pool(pool).build().unwrap()
519-
} else {
520-
rt_config.build().unwrap()
517+
if let Some(pool) = memory_pool {
518+
builder = builder.with_memory_pool(pool);
521519
};
520+
let runtime = builder.build_arc().unwrap();
522521

523522
// Configure execution
524523
let builder = SessionStateBuilder::new()
525524
.with_config(config)
526-
.with_runtime_env(Arc::new(runtime))
525+
.with_runtime_env(runtime)
527526
.with_default_features();
528527
let builder = match scenario.rules() {
529528
Some(rules) => builder.with_physical_optimizer_rules(rules),

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,11 @@ fn get_cache_runtime_state() -> (
197197
.with_files_statistics_cache(Some(file_static_cache.clone()))
198198
.with_list_files_cache(Some(list_file_cache.clone()));
199199

200-
let rt = Arc::new(
201-
RuntimeEnvBuilder::new()
202-
.with_cache_manager(cache_config)
203-
.build()
204-
.expect("could not build runtime environment"),
205-
);
200+
let rt = RuntimeEnvBuilder::new()
201+
.with_cache_manager(cache_config)
202+
.build_arc()
203+
.expect("could not build runtime environment");
204+
206205
let state = SessionContext::new_with_config_rt(SessionConfig::default(), rt).state();
207206

208207
(file_static_cache, list_file_cache, state)

datafusion/execution/src/runtime_env.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,4 +246,9 @@ impl RuntimeEnvBuilder {
246246
object_store_registry: self.object_store_registry,
247247
})
248248
}
249+
250+
/// Convenience method to create a new `Arc<RuntimeEnv>`
251+
pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
252+
self.build().map(Arc::new)
253+
}
249254
}

datafusion/execution/src/task.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub struct TaskContext {
5858
impl Default for TaskContext {
5959
fn default() -> Self {
6060
let runtime = RuntimeEnvBuilder::new()
61-
.build()
61+
.build_arc()
6262
.expect("default runtime created successfully");
6363

6464
// Create a default task context, mostly useful for testing
@@ -69,7 +69,7 @@ impl Default for TaskContext {
6969
scalar_functions: HashMap::new(),
7070
aggregate_functions: HashMap::new(),
7171
window_functions: HashMap::new(),
72-
runtime: Arc::new(runtime),
72+
runtime,
7373
}
7474
}
7575
}

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,12 +1331,10 @@ mod tests {
13311331

13321332
fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> {
13331333
let session_config = SessionConfig::new().with_batch_size(batch_size);
1334-
let runtime = Arc::new(
1335-
RuntimeEnvBuilder::default()
1336-
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
1337-
.build()
1338-
.unwrap(),
1339-
);
1334+
let runtime = RuntimeEnvBuilder::default()
1335+
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
1336+
.build_arc()
1337+
.unwrap();
13401338
let task_ctx = TaskContext::default()
13411339
.with_session_config(session_config)
13421340
.with_runtime(runtime);
@@ -1815,11 +1813,9 @@ mod tests {
18151813
let input: Arc<dyn ExecutionPlan> = Arc::new(TestYieldingExec::new(true));
18161814
let input_schema = input.schema();
18171815

1818-
let runtime = Arc::new(
1819-
RuntimeEnvBuilder::default()
1820-
.with_memory_limit(1, 1.0)
1821-
.build()?,
1822-
);
1816+
let runtime = RuntimeEnvBuilder::default()
1817+
.with_memory_limit(1, 1.0)
1818+
.build_arc()?;
18231819
let task_ctx = TaskContext::default().with_runtime(runtime);
18241820
let task_ctx = Arc::new(task_ctx);
18251821

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -673,11 +673,9 @@ mod tests {
673673

674674
#[tokio::test]
675675
async fn test_overallocation() -> Result<()> {
676-
let runtime = Arc::new(
677-
RuntimeEnvBuilder::new()
678-
.with_memory_limit(100, 1.0)
679-
.build()?,
680-
);
676+
let runtime = RuntimeEnvBuilder::new()
677+
.with_memory_limit(100, 1.0)
678+
.build_arc()?;
681679
let task_ctx = TaskContext::default().with_runtime(runtime);
682680
let task_ctx = Arc::new(task_ctx);
683681

datafusion/physical-plan/src/joins/hash_join.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3798,11 +3798,9 @@ mod tests {
37983798
];
37993799

38003800
for join_type in join_types {
3801-
let runtime = Arc::new(
3802-
RuntimeEnvBuilder::new()
3803-
.with_memory_limit(100, 1.0)
3804-
.build()?,
3805-
);
3801+
let runtime = RuntimeEnvBuilder::new()
3802+
.with_memory_limit(100, 1.0)
3803+
.build_arc()?;
38063804
let task_ctx = TaskContext::default().with_runtime(runtime);
38073805
let task_ctx = Arc::new(task_ctx);
38083806

@@ -3874,11 +3872,9 @@ mod tests {
38743872
];
38753873

38763874
for join_type in join_types {
3877-
let runtime = Arc::new(
3878-
RuntimeEnvBuilder::new()
3879-
.with_memory_limit(100, 1.0)
3880-
.build()?,
3881-
);
3875+
let runtime = RuntimeEnvBuilder::new()
3876+
.with_memory_limit(100, 1.0)
3877+
.build_arc()?;
38823878
let session_config = SessionConfig::default().with_batch_size(50);
38833879
let task_ctx = TaskContext::default()
38843880
.with_session_config(session_config)

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,11 +1019,9 @@ mod tests {
10191019
];
10201020

10211021
for join_type in join_types {
1022-
let runtime = Arc::new(
1023-
RuntimeEnvBuilder::new()
1024-
.with_memory_limit(100, 1.0)
1025-
.build()?,
1026-
);
1022+
let runtime = RuntimeEnvBuilder::new()
1023+
.with_memory_limit(100, 1.0)
1024+
.build_arc()?;
10271025
let task_ctx = TaskContext::default().with_runtime(runtime);
10281026
let task_ctx = Arc::new(task_ctx);
10291027

datafusion/physical-plan/src/joins/sort_merge_join.rs

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2900,12 +2900,10 @@ mod tests {
29002900
];
29012901

29022902
// Disable DiskManager to prevent spilling
2903-
let runtime = Arc::new(
2904-
RuntimeEnvBuilder::new()
2905-
.with_memory_limit(100, 1.0)
2906-
.with_disk_manager(DiskManagerConfig::Disabled)
2907-
.build()?,
2908-
);
2903+
let runtime = RuntimeEnvBuilder::new()
2904+
.with_memory_limit(100, 1.0)
2905+
.with_disk_manager(DiskManagerConfig::Disabled)
2906+
.build_arc()?;
29092907
let session_config = SessionConfig::default().with_batch_size(50);
29102908

29112909
for join_type in join_types {
@@ -2987,12 +2985,10 @@ mod tests {
29872985
];
29882986

29892987
// Disable DiskManager to prevent spilling
2990-
let runtime = Arc::new(
2991-
RuntimeEnvBuilder::new()
2992-
.with_memory_limit(100, 1.0)
2993-
.with_disk_manager(DiskManagerConfig::Disabled)
2994-
.build()?,
2995-
);
2988+
let runtime = RuntimeEnvBuilder::new()
2989+
.with_memory_limit(100, 1.0)
2990+
.with_disk_manager(DiskManagerConfig::Disabled)
2991+
.build_arc()?;
29962992
let session_config = SessionConfig::default().with_batch_size(50);
29972993

29982994
for join_type in join_types {
@@ -3052,12 +3048,10 @@ mod tests {
30523048
];
30533049

30543050
// Enable DiskManager to allow spilling
3055-
let runtime = Arc::new(
3056-
RuntimeEnvBuilder::new()
3057-
.with_memory_limit(100, 1.0)
3058-
.with_disk_manager(DiskManagerConfig::NewOs)
3059-
.build()?,
3060-
);
3051+
let runtime = RuntimeEnvBuilder::new()
3052+
.with_memory_limit(100, 1.0)
3053+
.with_disk_manager(DiskManagerConfig::NewOs)
3054+
.build_arc()?;
30613055

30623056
for batch_size in [1, 50] {
30633057
let session_config = SessionConfig::default().with_batch_size(batch_size);
@@ -3162,12 +3156,10 @@ mod tests {
31623156
];
31633157

31643158
// Enable DiskManager to allow spilling
3165-
let runtime = Arc::new(
3166-
RuntimeEnvBuilder::new()
3167-
.with_memory_limit(500, 1.0)
3168-
.with_disk_manager(DiskManagerConfig::NewOs)
3169-
.build()?,
3170-
);
3159+
let runtime = RuntimeEnvBuilder::new()
3160+
.with_memory_limit(500, 1.0)
3161+
.with_disk_manager(DiskManagerConfig::NewOs)
3162+
.build_arc()?;
31713163

31723164
for batch_size in [1, 50] {
31733165
let session_config = SessionConfig::default().with_batch_size(batch_size);

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1506,11 +1506,9 @@ mod tests {
15061506
let partitioning = Partitioning::RoundRobinBatch(4);
15071507

15081508
// setup up context
1509-
let runtime = Arc::new(
1510-
RuntimeEnvBuilder::default()
1511-
.with_memory_limit(1, 1.0)
1512-
.build()?,
1513-
);
1509+
let runtime = RuntimeEnvBuilder::default()
1510+
.with_memory_limit(1, 1.0)
1511+
.build_arc()?;
15141512

15151513
let task_ctx = TaskContext::default().with_runtime(runtime);
15161514
let task_ctx = Arc::new(task_ctx);

datafusion/physical-plan/src/sorts/sort.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1148,11 +1148,9 @@ mod tests {
11481148
.options()
11491149
.execution
11501150
.sort_spill_reservation_bytes;
1151-
let runtime = Arc::new(
1152-
RuntimeEnvBuilder::new()
1153-
.with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1154-
.build()?,
1155-
);
1151+
let runtime = RuntimeEnvBuilder::new()
1152+
.with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
1153+
.build_arc()?;
11561154
let task_ctx = Arc::new(
11571155
TaskContext::default()
11581156
.with_session_config(session_config)
@@ -1226,14 +1224,12 @@ mod tests {
12261224
.execution
12271225
.sort_spill_reservation_bytes;
12281226

1229-
let runtime = Arc::new(
1230-
RuntimeEnvBuilder::new()
1231-
.with_memory_limit(
1232-
sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1233-
1.0,
1234-
)
1235-
.build()?,
1236-
);
1227+
let runtime = RuntimeEnvBuilder::new()
1228+
.with_memory_limit(
1229+
sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1230+
1.0,
1231+
)
1232+
.build_arc()?;
12371233
let task_ctx = Arc::new(
12381234
TaskContext::default()
12391235
.with_runtime(runtime)

datafusion/wasmtest/src/lib.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,10 @@ mod test {
9898
let sql = "SELECT 2 + 2;";
9999

100100
// Execute SQL (using datafusion)
101-
let rt = Arc::new(
102-
RuntimeEnvBuilder::new()
103-
.with_disk_manager(DiskManagerConfig::Disabled)
104-
.build()
105-
.unwrap(),
106-
);
101+
let rt = RuntimeEnvBuilder::new()
102+
.with_disk_manager(DiskManagerConfig::Disabled)
103+
.build_arc()
104+
.unwrap();
107105
let session_config = SessionConfig::new().with_target_partitions(1);
108106
let session_context =
109107
Arc::new(SessionContext::new_with_config_rt(session_config, rt));

0 commit comments

Comments
 (0)