Skip to content

Minor: Add `RuntimeEnvBuilder::build_arc() #12213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,13 @@ where
/// // configure a memory limit of 1GB with 20% slop
/// let runtime_env = RuntimeEnvBuilder::new()
/// .with_memory_limit(1024 * 1024 * 1024, 0.80)
/// .build()
/// .build_arc()
/// .unwrap();
///
/// // Create a SessionState using the config and runtime_env
/// let state = SessionStateBuilder::new()
/// .with_config(config)
/// .with_runtime_env(Arc::new(runtime_env))
/// .with_runtime_env(runtime_env)
/// // include support for built in functions and configurations
/// .with_default_features()
/// .build();
Expand Down Expand Up @@ -1758,7 +1758,7 @@ mod tests {
let path = path.join("tests/tpch-csv");
let url = format!("file://{}", path.display());

let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
let runtime = RuntimeEnvBuilder::new().build_arc()?;
let cfg = SessionConfig::new()
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.format", "CSV")
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,10 @@ impl SortTest {
.sort_spill_reservation_bytes,
);

let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
.build()
.unwrap(),
);
let runtime = RuntimeEnvBuilder::new()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pretty good example of how the new API makes the code simpler

.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
.build_arc()
.unwrap();
SessionContext::new_with_config_rt(session_config, runtime)
} else {
SessionContext::new_with_config(session_config)
Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,21 +509,20 @@ impl TestCase {

let table = scenario.table();

let rt_config = RuntimeEnvBuilder::new()
let mut builder = RuntimeEnvBuilder::new()
// disk manager setting controls the spilling
.with_disk_manager(disk_manager_config)
.with_memory_limit(memory_limit, MEMORY_FRACTION);

let runtime = if let Some(pool) = memory_pool {
rt_config.with_memory_pool(pool).build().unwrap()
} else {
rt_config.build().unwrap()
if let Some(pool) = memory_pool {
builder = builder.with_memory_pool(pool);
};
let runtime = builder.build_arc().unwrap();

// Configure execution
let builder = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(Arc::new(runtime))
.with_runtime_env(runtime)
.with_default_features();
let builder = match scenario.rules() {
Some(rules) => builder.with_physical_optimizer_rules(rules),
Expand Down
11 changes: 5 additions & 6 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,11 @@ fn get_cache_runtime_state() -> (
.with_files_statistics_cache(Some(file_static_cache.clone()))
.with_list_files_cache(Some(list_file_cache.clone()));

let rt = Arc::new(
RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build()
.expect("could not build runtime environment"),
);
let rt = RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build_arc()
.expect("could not build runtime environment");

let state = SessionContext::new_with_config_rt(SessionConfig::default(), rt).state();

(file_static_cache, list_file_cache, state)
Expand Down
5 changes: 5 additions & 0 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,9 @@ impl RuntimeEnvBuilder {
object_store_registry: self.object_store_registry,
})
}

/// Convenience method to create a new `Arc<RuntimeEnv>`
pub fn build_arc(self) -> Result<Arc<RuntimeEnv>> {
self.build().map(Arc::new)
}
}
4 changes: 2 additions & 2 deletions datafusion/execution/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct TaskContext {
impl Default for TaskContext {
fn default() -> Self {
let runtime = RuntimeEnvBuilder::new()
.build()
.build_arc()
.expect("default runtime created successfully");

// Create a default task context, mostly useful for testing
Expand All @@ -69,7 +69,7 @@ impl Default for TaskContext {
scalar_functions: HashMap::new(),
aggregate_functions: HashMap::new(),
window_functions: HashMap::new(),
runtime: Arc::new(runtime),
runtime,
}
}
}
Expand Down
18 changes: 7 additions & 11 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1331,12 +1331,10 @@ mod tests {

fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> {
let session_config = SessionConfig::new().with_batch_size(batch_size);
let runtime = Arc::new(
RuntimeEnvBuilder::default()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
.build()
.unwrap(),
);
let runtime = RuntimeEnvBuilder::default()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
.build_arc()
.unwrap();
let task_ctx = TaskContext::default()
.with_session_config(session_config)
.with_runtime(runtime);
Expand Down Expand Up @@ -1815,11 +1813,9 @@ mod tests {
let input: Arc<dyn ExecutionPlan> = Arc::new(TestYieldingExec::new(true));
let input_schema = input.schema();

let runtime = Arc::new(
RuntimeEnvBuilder::default()
.with_memory_limit(1, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::default()
.with_memory_limit(1, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down
8 changes: 3 additions & 5 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -673,11 +673,9 @@ mod tests {

#[tokio::test]
async fn test_overallocation() -> Result<()> {
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down
16 changes: 6 additions & 10 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3798,11 +3798,9 @@ mod tests {
];

for join_type in join_types {
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down Expand Up @@ -3874,11 +3872,9 @@ mod tests {
];

for join_type in join_types {
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let session_config = SessionConfig::default().with_batch_size(50);
let task_ctx = TaskContext::default()
.with_session_config(session_config)
Expand Down
8 changes: 3 additions & 5 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,11 +1019,9 @@ mod tests {
];

for join_type in join_types {
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build_arc()?;
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down
40 changes: 16 additions & 24 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2900,12 +2900,10 @@ mod tests {
];

// Disable DiskManager to prevent spilling
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::Disabled)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::Disabled)
.build_arc()?;
let session_config = SessionConfig::default().with_batch_size(50);

for join_type in join_types {
Expand Down Expand Up @@ -2987,12 +2985,10 @@ mod tests {
];

// Disable DiskManager to prevent spilling
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::Disabled)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::Disabled)
.build_arc()?;
let session_config = SessionConfig::default().with_batch_size(50);

for join_type in join_types {
Expand Down Expand Up @@ -3052,12 +3048,10 @@ mod tests {
];

// Enable DiskManager to allow spilling
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::NewOs)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.with_disk_manager(DiskManagerConfig::NewOs)
.build_arc()?;

for batch_size in [1, 50] {
let session_config = SessionConfig::default().with_batch_size(batch_size);
Expand Down Expand Up @@ -3162,12 +3156,10 @@ mod tests {
];

// Enable DiskManager to allow spilling
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(500, 1.0)
.with_disk_manager(DiskManagerConfig::NewOs)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(500, 1.0)
.with_disk_manager(DiskManagerConfig::NewOs)
.build_arc()?;

for batch_size in [1, 50] {
let session_config = SessionConfig::default().with_batch_size(batch_size);
Expand Down
8 changes: 3 additions & 5 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1506,11 +1506,9 @@ mod tests {
let partitioning = Partitioning::RoundRobinBatch(4);

// setup up context
let runtime = Arc::new(
RuntimeEnvBuilder::default()
.with_memory_limit(1, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::default()
.with_memory_limit(1, 1.0)
.build_arc()?;

let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
Expand Down
22 changes: 9 additions & 13 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,11 +1148,9 @@ mod tests {
.options()
.execution
.sort_spill_reservation_bytes;
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(sort_spill_reservation_bytes + 12288, 1.0)
.build_arc()?;
let task_ctx = Arc::new(
TaskContext::default()
.with_session_config(session_config)
Expand Down Expand Up @@ -1226,14 +1224,12 @@ mod tests {
.execution
.sort_spill_reservation_bytes;

let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(
sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1.0,
)
.build()?,
);
let runtime = RuntimeEnvBuilder::new()
.with_memory_limit(
sort_spill_reservation_bytes + avg_batch_size * (partitions - 1),
1.0,
)
.build_arc()?;
let task_ctx = Arc::new(
TaskContext::default()
.with_runtime(runtime)
Expand Down
10 changes: 4 additions & 6 deletions datafusion/wasmtest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,10 @@ mod test {
let sql = "SELECT 2 + 2;";

// Execute SQL (using datafusion)
let rt = Arc::new(
RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::Disabled)
.build()
.unwrap(),
);
let rt = RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::Disabled)
.build_arc()
.unwrap();
let session_config = SessionConfig::new().with_target_partitions(1);
let session_context =
Arc::new(SessionContext::new_with_config_rt(session_config, rt));
Expand Down