Skip to content

Commit dbb8905

Browse files
committed
Extract catalog API to separate crate
This moves `CatalogProvider`, `TableProvider`, `SchemaProvider` to a new `datafusion-catalog` crate. The circular dependency between core `SessionState` and implementations is broken up by introducing `CatalogSession` dyn trait. Implementations of `TableProvider` that reside under core current have access to `CatalogSession` by downcasting. This is supposed to be an intermediate step.
1 parent fab7e23 commit dbb8905

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+872
-556
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ exclude = ["datafusion-cli", "dev/depcheck"]
2020
members = [
2121
"datafusion/common",
2222
"datafusion/common-runtime",
23+
"datafusion/catalog",
2324
"datafusion/core",
2425
"datafusion/expr",
2526
"datafusion/execution",
@@ -88,6 +89,7 @@ chrono = { version = "0.4.34", default-features = false }
8889
ctor = "0.2.0"
8990
dashmap = "6.0.1"
9091
datafusion = { path = "datafusion/core", version = "40.0.0", default-features = false }
92+
datafusion-catalog = { path = "datafusion/catalog", version = "40.0.0" }
9193
datafusion-common = { path = "datafusion/common", version = "40.0.0", default-features = false }
9294
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "40.0.0" }
9395
datafusion-execution = { path = "datafusion/execution", version = "40.0.0" }

datafusion-cli/Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/src/catalog.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ use std::sync::{Arc, Weak};
2020

2121
use crate::object_storage::{get_object_store, AwsOptions, GcpOptions};
2222

23-
use datafusion::catalog::schema::SchemaProvider;
24-
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
23+
use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};
2524
use datafusion::common::plan_datafusion_err;
2625
use datafusion::datasource::listing::{
2726
ListingTable, ListingTableConfig, ListingTableUrl,
@@ -237,7 +236,7 @@ fn substitute_tilde(cur: String) -> String {
237236
mod tests {
238237
use super::*;
239238

240-
use datafusion::catalog::schema::SchemaProvider;
239+
use datafusion::catalog::SchemaProvider;
241240
use datafusion::prelude::SessionContext;
242241

243242
fn setup_context() -> (SessionContext, Arc<dyn SchemaProvider>) {

datafusion-cli/src/functions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ use arrow::record_batch::RecordBatch;
2222
use arrow::util::pretty::pretty_format_batches;
2323
use async_trait::async_trait;
2424

25+
use datafusion::catalog::Session;
2526
use datafusion::common::{plan_err, Column};
2627
use datafusion::datasource::function::TableFunctionImpl;
2728
use datafusion::datasource::TableProvider;
2829
use datafusion::error::Result;
29-
use datafusion::execution::context::SessionState;
3030
use datafusion::logical_expr::Expr;
3131
use datafusion::physical_plan::memory::MemoryExec;
3232
use datafusion::physical_plan::ExecutionPlan;
@@ -234,7 +234,7 @@ impl TableProvider for ParquetMetadataTable {
234234

235235
async fn scan(
236236
&self,
237-
_state: &SessionState,
237+
_state: &dyn Session,
238238
projection: Option<&Vec<usize>>,
239239
_filters: &[Expr],
240240
_limit: Option<usize>,

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1919
use arrow_schema::SchemaRef;
2020
use async_trait::async_trait;
2121
use bytes::Bytes;
22+
use datafusion::catalog::Session;
2223
use datafusion::datasource::listing::PartitionedFile;
2324
use datafusion::datasource::physical_plan::parquet::{
2425
ParquetAccessPlan, ParquetExecBuilder,
@@ -27,7 +28,6 @@ use datafusion::datasource::physical_plan::{
2728
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
2829
};
2930
use datafusion::datasource::TableProvider;
30-
use datafusion::execution::context::SessionState;
3131
use datafusion::execution::object_store::ObjectStoreUrl;
3232
use datafusion::parquet::arrow::arrow_reader::{
3333
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
@@ -271,7 +271,7 @@ impl IndexTableProvider {
271271
/// to a single predicate like `a = 1 AND b = 2` suitable for execution
272272
fn filters_to_predicate(
273273
&self,
274-
state: &SessionState,
274+
state: &dyn Session,
275275
filters: &[Expr],
276276
) -> Result<Arc<dyn PhysicalExpr>> {
277277
let df_schema = DFSchema::try_from(self.schema())?;
@@ -463,7 +463,7 @@ impl TableProvider for IndexTableProvider {
463463

464464
async fn scan(
465465
&self,
466-
state: &SessionState,
466+
state: &dyn Session,
467467
projection: Option<&Vec<usize>>,
468468
filters: &[Expr],
469469
limit: Option<usize>,

datafusion-examples/examples/catalog.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,7 @@
1919
use async_trait::async_trait;
2020
use datafusion::{
2121
arrow::util::pretty,
22-
catalog::{
23-
schema::SchemaProvider,
24-
{CatalogProvider, CatalogProviderList},
25-
},
22+
catalog::{CatalogProvider, CatalogProviderList, SchemaProvider},
2623
datasource::{
2724
file_format::{csv::CsvFormat, FileFormat},
2825
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},

datafusion-examples/examples/custom_datasource.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2626
use datafusion::arrow::record_batch::RecordBatch;
2727
use datafusion::datasource::{provider_as_source, TableProvider, TableType};
2828
use datafusion::error::Result;
29-
use datafusion::execution::context::{SessionState, TaskContext};
29+
use datafusion::execution::context::TaskContext;
3030
use datafusion::physical_plan::memory::MemoryStream;
3131
use datafusion::physical_plan::{
3232
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
@@ -37,6 +37,7 @@ use datafusion_expr::LogicalPlanBuilder;
3737
use datafusion_physical_expr::EquivalenceProperties;
3838

3939
use async_trait::async_trait;
40+
use datafusion::catalog::Session;
4041
use tokio::time::timeout;
4142

4243
/// This example demonstrates executing a simple query against a custom datasource
@@ -175,7 +176,7 @@ impl TableProvider for CustomDataSource {
175176

176177
async fn scan(
177178
&self,
178-
_state: &SessionState,
179+
_state: &dyn Session,
179180
projection: Option<&Vec<usize>>,
180181
// filters and limit can be used here to inject some push-down operations if needed
181182
_filters: &[Expr],

datafusion-examples/examples/parquet_index.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@ use arrow::datatypes::Int32Type;
2323
use arrow::util::pretty::pretty_format_batches;
2424
use arrow_schema::SchemaRef;
2525
use async_trait::async_trait;
26+
use datafusion::catalog::Session;
2627
use datafusion::datasource::listing::PartitionedFile;
2728
use datafusion::datasource::physical_plan::{
2829
parquet::StatisticsConverter,
2930
{FileScanConfig, ParquetExec},
3031
};
3132
use datafusion::datasource::TableProvider;
32-
use datafusion::execution::context::SessionState;
3333
use datafusion::execution::object_store::ObjectStoreUrl;
3434
use datafusion::parquet::arrow::{
3535
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
@@ -222,7 +222,7 @@ impl TableProvider for IndexTableProvider {
222222

223223
async fn scan(
224224
&self,
225-
state: &SessionState,
225+
state: &dyn Session,
226226
projection: Option<&Vec<usize>>,
227227
filters: &[Expr],
228228
limit: Option<usize>,

datafusion-examples/examples/simple_udtf.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ use arrow::csv::ReaderBuilder;
2020
use async_trait::async_trait;
2121
use datafusion::arrow::datatypes::SchemaRef;
2222
use datafusion::arrow::record_batch::RecordBatch;
23+
use datafusion::catalog::Session;
2324
use datafusion::datasource::function::TableFunctionImpl;
2425
use datafusion::datasource::TableProvider;
2526
use datafusion::error::Result;
26-
use datafusion::execution::context::{ExecutionProps, SessionState};
27+
use datafusion::execution::context::ExecutionProps;
2728
use datafusion::physical_plan::memory::MemoryExec;
2829
use datafusion::physical_plan::ExecutionPlan;
2930
use datafusion::prelude::SessionContext;
@@ -35,7 +36,6 @@ use std::fs::File;
3536
use std::io::Seek;
3637
use std::path::Path;
3738
use std::sync::Arc;
38-
3939
// To define your own table function, you only need to do the following 3 things:
4040
// 1. Implement your own [`TableProvider`]
4141
// 2. Implement your own [`TableFunctionImpl`] and return your [`TableProvider`]
@@ -95,7 +95,7 @@ impl TableProvider for LocalCsvTable {
9595

9696
async fn scan(
9797
&self,
98-
_state: &SessionState,
98+
_state: &dyn Session,
9999
projection: Option<&Vec<usize>>,
100100
_filters: &[Expr],
101101
_limit: Option<usize>,

datafusion/catalog/Cargo.toml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "datafusion-catalog"
20+
authors.workspace = true
21+
edition.workspace = true
22+
homepage.workspace = true
23+
license.workspace = true
24+
readme.workspace = true
25+
repository.workspace = true
26+
rust-version.workspace = true
27+
version.workspace = true
28+
29+
[dependencies]
30+
arrow-schema = { workspace = true }
31+
async-trait = "0.1.41"
32+
datafusion-common = { workspace = true }
33+
datafusion-execution = { workspace = true }
34+
datafusion-expr = { workspace = true }
35+
datafusion-physical-plan = { workspace = true }
36+
37+
[lints]
38+
workspace = true

0 commit comments

Comments
 (0)