Skip to content

Make RuntimeEnvBuilder rather than RuntimeConfig #12157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Aug 28, 2024
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ mod tests {
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::{col, lit};

use crate::execution::session_state::SessionStateBuilder;
Expand Down Expand Up @@ -863,7 +863,7 @@ mod tests {
async fn query_compress_data(
file_compression_type: FileCompressionType,
) -> Result<()> {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::new()).unwrap());
let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
let mut cfg = SessionConfig::new();
cfg.options_mut().catalog.has_header = true;
let session_state = SessionStateBuilder::new()
Expand Down
13 changes: 6 additions & 7 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,15 @@ where
/// # use std::sync::Arc;
/// # use datafusion::prelude::*;
/// # use datafusion::execution::SessionStateBuilder;
/// # use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
/// # use datafusion_execution::runtime_env::RuntimeEnvBuilder;
/// // Configure a 4k batch size
/// let config = SessionConfig::new() .with_batch_size(4 * 1024);
///
/// // configure a memory limit of 1GB with 20% slop
/// let runtime_env = RuntimeEnv::new(
/// RuntimeConfig::new()
/// let runtime_env = RuntimeEnvBuilder::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

/// .with_memory_limit(1024 * 1024 * 1024, 0.80)
/// ).unwrap();
/// .build()
/// .unwrap();
///
/// // Create a SessionState using the config and runtime_env
/// let state = SessionStateBuilder::new()
Expand Down Expand Up @@ -1623,7 +1623,7 @@ mod tests {
use super::{super::options::CsvReadOptions, *};
use crate::assert_batches_eq;
use crate::execution::memory_pool::MemoryConsumer;
use crate::execution::runtime_env::RuntimeConfig;
use crate::execution::runtime_env::RuntimeEnvBuilder;
use crate::test;
use crate::test_util::{plan_and_collect, populate_csv_partitions};

Expand Down Expand Up @@ -1758,8 +1758,7 @@ mod tests {
let path = path.join("tests/tpch-csv");
let url = format!("file://{}", path.display());

let rt_cfg = RuntimeConfig::new();
let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap());
let runtime = Arc::new(RuntimeEnvBuilder::new().build()?);
let cfg = SessionConfig::new()
.set_str("datafusion.catalog.location", url.as_str())
.set_str("datafusion.catalog.format", "CSV")
Expand Down
12 changes: 7 additions & 5 deletions datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::{
compute::SortOptions,
record_batch::RecordBatch,
};
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
Expand Down Expand Up @@ -136,10 +136,12 @@ impl SortTest {
.sort_spill_reservation_bytes,
);

let runtime_env = RuntimeConfig::new()
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
.build();
let runtime = Arc::new(runtime_env.unwrap());
let runtime = Arc::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a follow on maybe we could even make a build_arc() type method to make this even nicer looking

Following the model of https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/parquet/struct.ParquetExecBuilder.html#method.build_arc

RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)))
.build()
.unwrap(),
);
SessionContext::new_with_config_rt(session_config, runtime)
} else {
SessionContext::new_with_config(session_config)
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use tokio::fs::File;
use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::execution::disk_manager::DiskManagerConfig;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_optimizer::join_selection::JoinSelection;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
Expand Down Expand Up @@ -509,17 +509,17 @@ impl TestCase {

let table = scenario.table();

let mut rt_config = RuntimeConfig::new()
let rt_config = RuntimeEnvBuilder::new()
// disk manager setting controls the spilling
.with_disk_manager(disk_manager_config)
.with_memory_limit(memory_limit, MEMORY_FRACTION);

if let Some(pool) = memory_pool {
rt_config = rt_config.with_memory_pool(pool);
let runtime = if let Some(pool) = memory_pool {
rt_config.with_memory_pool(pool).build().unwrap()
} else {
rt_config.build().unwrap()
};

let runtime = RuntimeEnv::new(rt_config).unwrap();

// Configure execution
let builder = SessionStateBuilder::new()
.with_config(config)
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_execution::cache::cache_unit::{
DefaultFileStatisticsCache, DefaultListFilesCache,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;

use datafusion::execution::session_state::SessionStateBuilder;
use tempfile::tempdir;
Expand Down Expand Up @@ -198,7 +198,10 @@ fn get_cache_runtime_state() -> (
.with_list_files_cache(Some(list_file_cache.clone()));

let rt = Arc::new(
RuntimeEnv::new(RuntimeConfig::new().with_cache_manager(cache_config)).unwrap(),
RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build()
.expect("could not build runtime environment"),
);
let state = SessionContext::new_with_config_rt(SessionConfig::default(), rt).state();

Expand Down
27 changes: 20 additions & 7 deletions datafusion/execution/src/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use url::Url;
/// Execution runtime environment that manages system resources such
/// as memory, disk, cache and storage.
///
/// A [`RuntimeEnv`] is created from a [`RuntimeConfig`] and has the
/// A [`RuntimeEnv`] is created from a [`RuntimeEnvBuilder`] and has the
/// following resource management functionality:
///
/// * [`MemoryPool`]: Manage memory
Expand Down Expand Up @@ -147,13 +147,17 @@ impl RuntimeEnv {

impl Default for RuntimeEnv {
fn default() -> Self {
RuntimeEnv::new(RuntimeConfig::new()).unwrap()
RuntimeEnvBuilder::new().build().unwrap()
}
}

/// Please see: <https://github.com/apache/datafusion/issues/12156>
/// This a type alias for backwards compatibility.
pub type RuntimeConfig = RuntimeEnvBuilder;

#[derive(Clone)]
/// Execution runtime configuration
pub struct RuntimeConfig {
pub struct RuntimeEnvBuilder {
/// DiskManager to manage temporary disk file usage
pub disk_manager: DiskManagerConfig,
/// [`MemoryPool`] from which to allocate memory
Expand All @@ -166,13 +170,13 @@ pub struct RuntimeConfig {
pub object_store_registry: Arc<dyn ObjectStoreRegistry>,
}

impl Default for RuntimeConfig {
impl Default for RuntimeEnvBuilder {
fn default() -> Self {
Self::new()
}
}

impl RuntimeConfig {
impl RuntimeEnvBuilder {
/// New with default values
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -229,8 +233,17 @@ impl RuntimeConfig {
self.with_disk_manager(DiskManagerConfig::new_specified(vec![path.into()]))
}

/// Build a `RuntimeEnv` object from the configuration
/// Build a RuntimeEnv
pub fn build(self) -> Result<RuntimeEnv> {
RuntimeEnv::new(self)
let memory_pool = self
.memory_pool
.unwrap_or_else(|| Arc::new(UnboundedMemoryPool::default()));

Ok(RuntimeEnv {
memory_pool,
disk_manager: DiskManager::try_new(self.disk_manager)?,
cache_manager: CacheManager::try_new(&self.cache_manager)?,
object_store_registry: self.object_store_registry,
})
}
}
5 changes: 3 additions & 2 deletions datafusion/execution/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
config::SessionConfig,
memory_pool::MemoryPool,
registry::FunctionRegistry,
runtime_env::{RuntimeConfig, RuntimeEnv},
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
};
use datafusion_common::{plan_datafusion_err, DataFusionError, Result};
use datafusion_expr::planner::ExprPlanner;
Expand Down Expand Up @@ -57,7 +57,8 @@ pub struct TaskContext {

impl Default for TaskContext {
fn default() -> Self {
let runtime = RuntimeEnv::new(RuntimeConfig::new())
let runtime = RuntimeEnvBuilder::new()
.build()
.expect("default runtime created successfully");

// Create a default task context, mostly useful for testing
Expand Down
15 changes: 8 additions & 7 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ mod tests {
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::memory_pool::FairSpillPool;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_functions_aggregate::array_agg::array_agg_udaf;
use datafusion_functions_aggregate::average::avg_udaf;
use datafusion_functions_aggregate::count::count_udaf;
Expand Down Expand Up @@ -1320,11 +1320,10 @@ mod tests {
fn new_spill_ctx(batch_size: usize, max_memory: usize) -> Arc<TaskContext> {
let session_config = SessionConfig::new().with_batch_size(batch_size);
let runtime = Arc::new(
RuntimeEnv::new(
RuntimeConfig::default()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory))),
)
.unwrap(),
RuntimeEnvBuilder::default()
.with_memory_pool(Arc::new(FairSpillPool::new(max_memory)))
.build()
.unwrap(),
);
let task_ctx = TaskContext::default()
.with_session_config(session_config)
Expand Down Expand Up @@ -1806,7 +1805,9 @@ mod tests {
let input_schema = input.schema();

let runtime = Arc::new(
RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0)).unwrap(),
RuntimeEnvBuilder::default()
.with_memory_limit(1, 1.0)
.build()?,
);
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);
Expand Down
9 changes: 6 additions & 3 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ mod tests {
use crate::test::build_table_scan_i32;

use datafusion_common::{assert_batches_sorted_eq, assert_contains};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;

async fn join_collect(
left: Arc<dyn ExecutionPlan>,
Expand Down Expand Up @@ -673,8 +673,11 @@ mod tests {

#[tokio::test]
async fn test_overallocation() -> Result<()> {
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down
16 changes: 11 additions & 5 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1572,7 +1572,7 @@ mod tests {
ScalarValue,
};
use datafusion_execution::config::SessionConfig;
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};

Expand Down Expand Up @@ -3798,8 +3798,11 @@ mod tests {
];

for join_type in join_types {
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down Expand Up @@ -3871,8 +3874,11 @@ mod tests {
];

for join_type in join_types {
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let session_config = SessionConfig::default().with_batch_size(50);
let task_ctx = TaskContext::default()
.with_session_config(session_config)
Expand Down
9 changes: 6 additions & 3 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ mod tests {

use arrow::datatypes::{DataType, Field};
use datafusion_common::{assert_batches_sorted_eq, assert_contains, ScalarValue};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::Operator;
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{Partitioning, PhysicalExpr};
Expand Down Expand Up @@ -1019,8 +1019,11 @@ mod tests {
];

for join_type in join_types {
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let runtime = Arc::new(
RuntimeEnvBuilder::new()
.with_memory_limit(100, 1.0)
.build()?,
);
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

Expand Down
Loading