Skip to content

Commit 5da7ab3

Browse files
authored
Minor: move SessionStateDefaults into its own module (#11566)
* Minor: move `SessionStateDefaults` into its own module * Fix no default features
1 parent 827d0e3 commit 5da7ab3

File tree

5 files changed

+211
-182
lines changed

5 files changed

+211
-182
lines changed

datafusion/core/src/datasource/listing/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1038,8 +1038,8 @@ mod tests {
10381038
use crate::datasource::file_format::avro::AvroFormat;
10391039
use crate::datasource::file_format::csv::CsvFormat;
10401040
use crate::datasource::file_format::json::JsonFormat;
1041-
use crate::datasource::file_format::parquet::ParquetFormat;
10421041
#[cfg(feature = "parquet")]
1042+
use crate::datasource::file_format::parquet::ParquetFormat;
10431043
use crate::datasource::{provider_as_source, MemTable};
10441044
use crate::execution::options::ArrowReadOptions;
10451045
use crate::physical_plan::collect;

datafusion/core/src/datasource/schema_adapter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ mod tests {
246246
use crate::datasource::schema_adapter::{
247247
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
248248
};
249+
#[cfg(feature = "parquet")]
249250
use parquet::arrow::ArrowWriter;
250251
use tempfile::TempDir;
251252

datafusion/core/src/execution/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919
2020
pub mod context;
2121
pub mod session_state;
22+
mod session_state_defaults;
23+
24+
pub use session_state_defaults::SessionStateDefaults;
2225

2326
// backwards compatibility
2427
pub use crate::datasource::file_format::options;

datafusion/core/src/execution/session_state.rs

Lines changed: 4 additions & 181 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,17 @@
1818
//! [`SessionState`]: information required to run queries in a session
1919
2020
use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_SCHEMA};
21-
use crate::catalog::listing_schema::ListingSchemaProvider;
22-
use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider};
23-
use crate::catalog::{
24-
CatalogProvider, CatalogProviderList, MemoryCatalogProvider,
25-
MemoryCatalogProviderList,
26-
};
21+
use crate::catalog::schema::SchemaProvider;
22+
use crate::catalog::{CatalogProviderList, MemoryCatalogProviderList};
2723
use crate::datasource::cte_worktable::CteWorkTable;
28-
use crate::datasource::file_format::arrow::ArrowFormatFactory;
29-
use crate::datasource::file_format::avro::AvroFormatFactory;
30-
use crate::datasource::file_format::csv::CsvFormatFactory;
31-
use crate::datasource::file_format::json::JsonFormatFactory;
32-
#[cfg(feature = "parquet")]
33-
use crate::datasource::file_format::parquet::ParquetFormatFactory;
3424
use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
3525
use crate::datasource::function::{TableFunction, TableFunctionImpl};
36-
use crate::datasource::provider::{DefaultTableFactory, TableProviderFactory};
26+
use crate::datasource::provider::TableProviderFactory;
3727
use crate::datasource::provider_as_source;
3828
use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, QueryPlanner};
39-
#[cfg(feature = "array_expressions")]
40-
use crate::functions_array;
29+
use crate::execution::SessionStateDefaults;
4130
use crate::physical_optimizer::optimizer::PhysicalOptimizer;
4231
use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
43-
use crate::{functions, functions_aggregate};
4432
use arrow_schema::{DataType, SchemaRef};
4533
use async_trait::async_trait;
4634
use chrono::{DateTime, Utc};
@@ -54,7 +42,6 @@ use datafusion_common::{
5442
ResolvedTableReference, TableReference,
5543
};
5644
use datafusion_execution::config::SessionConfig;
57-
use datafusion_execution::object_store::ObjectStoreUrl;
5845
use datafusion_execution::runtime_env::RuntimeEnv;
5946
use datafusion_execution::TaskContext;
6047
use datafusion_expr::execution_props::ExecutionProps;
@@ -85,7 +72,6 @@ use std::collections::hash_map::Entry;
8572
use std::collections::{HashMap, HashSet};
8673
use std::fmt::Debug;
8774
use std::sync::Arc;
88-
use url::Url;
8975
use uuid::Uuid;
9076

9177
/// Execution context for registering data sources and executing queries.
@@ -1420,169 +1406,6 @@ impl From<SessionState> for SessionStateBuilder {
14201406
}
14211407
}
14221408

1423-
/// Defaults that are used as part of creating a SessionState such as table providers,
1424-
/// file formats, registering of builtin functions, etc.
1425-
pub struct SessionStateDefaults {}
1426-
1427-
impl SessionStateDefaults {
1428-
/// returns a map of the default [`TableProviderFactory`]s
1429-
pub fn default_table_factories() -> HashMap<String, Arc<dyn TableProviderFactory>> {
1430-
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
1431-
HashMap::new();
1432-
#[cfg(feature = "parquet")]
1433-
table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new()));
1434-
table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new()));
1435-
table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new()));
1436-
table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new()));
1437-
table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new()));
1438-
table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new()));
1439-
1440-
table_factories
1441-
}
1442-
1443-
/// returns the default MemoryCatalogProvider
1444-
pub fn default_catalog(
1445-
config: &SessionConfig,
1446-
table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
1447-
runtime: &Arc<RuntimeEnv>,
1448-
) -> MemoryCatalogProvider {
1449-
let default_catalog = MemoryCatalogProvider::new();
1450-
1451-
default_catalog
1452-
.register_schema(
1453-
&config.options().catalog.default_schema,
1454-
Arc::new(MemorySchemaProvider::new()),
1455-
)
1456-
.expect("memory catalog provider can register schema");
1457-
1458-
Self::register_default_schema(config, table_factories, runtime, &default_catalog);
1459-
1460-
default_catalog
1461-
}
1462-
1463-
/// returns the list of default [`ExprPlanner`]s
1464-
pub fn default_expr_planners() -> Vec<Arc<dyn ExprPlanner>> {
1465-
let expr_planners: Vec<Arc<dyn ExprPlanner>> = vec![
1466-
Arc::new(functions::core::planner::CoreFunctionPlanner::default()),
1467-
// register crate of array expressions (if enabled)
1468-
#[cfg(feature = "array_expressions")]
1469-
Arc::new(functions_array::planner::ArrayFunctionPlanner),
1470-
#[cfg(feature = "array_expressions")]
1471-
Arc::new(functions_array::planner::FieldAccessPlanner),
1472-
#[cfg(any(
1473-
feature = "datetime_expressions",
1474-
feature = "unicode_expressions"
1475-
))]
1476-
Arc::new(functions::planner::UserDefinedFunctionPlanner),
1477-
];
1478-
1479-
expr_planners
1480-
}
1481-
1482-
/// returns the list of default [`ScalarUDF']'s
1483-
pub fn default_scalar_functions() -> Vec<Arc<ScalarUDF>> {
1484-
let mut functions: Vec<Arc<ScalarUDF>> = functions::all_default_functions();
1485-
#[cfg(feature = "array_expressions")]
1486-
functions.append(&mut functions_array::all_default_array_functions());
1487-
1488-
functions
1489-
}
1490-
1491-
/// returns the list of default [`AggregateUDF']'s
1492-
pub fn default_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
1493-
functions_aggregate::all_default_aggregate_functions()
1494-
}
1495-
1496-
/// returns the list of default [`FileFormatFactory']'s
1497-
pub fn default_file_formats() -> Vec<Arc<dyn FileFormatFactory>> {
1498-
let file_formats: Vec<Arc<dyn FileFormatFactory>> = vec![
1499-
#[cfg(feature = "parquet")]
1500-
Arc::new(ParquetFormatFactory::new()),
1501-
Arc::new(JsonFormatFactory::new()),
1502-
Arc::new(CsvFormatFactory::new()),
1503-
Arc::new(ArrowFormatFactory::new()),
1504-
Arc::new(AvroFormatFactory::new()),
1505-
];
1506-
1507-
file_formats
1508-
}
1509-
1510-
/// registers all builtin functions - scalar, array and aggregate
1511-
pub fn register_builtin_functions(state: &mut SessionState) {
1512-
Self::register_scalar_functions(state);
1513-
Self::register_array_functions(state);
1514-
Self::register_aggregate_functions(state);
1515-
}
1516-
1517-
/// registers all the builtin scalar functions
1518-
pub fn register_scalar_functions(state: &mut SessionState) {
1519-
functions::register_all(state).expect("can not register built in functions");
1520-
}
1521-
1522-
/// registers all the builtin array functions
1523-
pub fn register_array_functions(state: &mut SessionState) {
1524-
// register crate of array expressions (if enabled)
1525-
#[cfg(feature = "array_expressions")]
1526-
functions_array::register_all(state).expect("can not register array expressions");
1527-
}
1528-
1529-
/// registers all the builtin aggregate functions
1530-
pub fn register_aggregate_functions(state: &mut SessionState) {
1531-
functions_aggregate::register_all(state)
1532-
.expect("can not register aggregate functions");
1533-
}
1534-
1535-
/// registers the default schema
1536-
pub fn register_default_schema(
1537-
config: &SessionConfig,
1538-
table_factories: &HashMap<String, Arc<dyn TableProviderFactory>>,
1539-
runtime: &Arc<RuntimeEnv>,
1540-
default_catalog: &MemoryCatalogProvider,
1541-
) {
1542-
let url = config.options().catalog.location.as_ref();
1543-
let format = config.options().catalog.format.as_ref();
1544-
let (url, format) = match (url, format) {
1545-
(Some(url), Some(format)) => (url, format),
1546-
_ => return,
1547-
};
1548-
let url = url.to_string();
1549-
let format = format.to_string();
1550-
1551-
let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
1552-
let authority = match url.host_str() {
1553-
Some(host) => format!("{}://{}", url.scheme(), host),
1554-
None => format!("{}://", url.scheme()),
1555-
};
1556-
let path = &url.as_str()[authority.len()..];
1557-
let path = object_store::path::Path::parse(path).expect("Can't parse path");
1558-
let store = ObjectStoreUrl::parse(authority.as_str())
1559-
.expect("Invalid default catalog url");
1560-
let store = match runtime.object_store(store) {
1561-
Ok(store) => store,
1562-
_ => return,
1563-
};
1564-
let factory = match table_factories.get(format.as_str()) {
1565-
Some(factory) => factory,
1566-
_ => return,
1567-
};
1568-
let schema =
1569-
ListingSchemaProvider::new(authority, path, factory.clone(), store, format);
1570-
let _ = default_catalog
1571-
.register_schema("default", Arc::new(schema))
1572-
.expect("Failed to register default schema");
1573-
}
1574-
1575-
/// registers the default [`FileFormatFactory`]s
1576-
pub fn register_default_file_formats(state: &mut SessionState) {
1577-
let formats = SessionStateDefaults::default_file_formats();
1578-
for format in formats {
1579-
if let Err(e) = state.register_file_format(format, false) {
1580-
log::info!("Unable to register default file format: {e}")
1581-
};
1582-
}
1583-
}
1584-
}
1585-
15861409
/// Adapter that implements the [`ContextProvider`] trait for a [`SessionState`]
15871410
///
15881411
/// This is used so the SQL planner can access the state of the session without

0 commit comments

Comments
 (0)