Skip to content

Commit 8ad5111

Browse files
committed
Extract catalog API to separate crate
This moves `CatalogProvider`, `TableProvider`, `SchemaProvider` to a new `datafusion-catalog` crate. The circular dependency between core `SessionState` and implementations is broken up by introducing `CatalogSession` dyn trait. Implementations of `TableProvider` that reside under core current have access to `CatalogSession` by downcasting. This is supposed to be an intermediate step.
1 parent de0765a commit 8ad5111

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+810
-509
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ exclude = ["datafusion-cli", "dev/depcheck"]
2020
members = [
2121
"datafusion/common",
2222
"datafusion/common-runtime",
23+
"datafusion/catalog",
2324
"datafusion/core",
2425
"datafusion/expr",
2526
"datafusion/execution",
@@ -87,6 +88,7 @@ chrono = { version = "0.4.34", default-features = false }
8788
ctor = "0.2.0"
8889
dashmap = "6.0.1"
8990
datafusion = { path = "datafusion/core", version = "40.0.0", default-features = false }
91+
datafusion-catalog = { path = "datafusion/catalog", version = "40.0.0" }
9092
datafusion-common = { path = "datafusion/common", version = "40.0.0", default-features = false }
9193
datafusion-common-runtime = { path = "datafusion/common-runtime", version = "40.0.0" }
9294
datafusion-execution = { path = "datafusion/execution", version = "40.0.0" }

datafusion-cli/Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-cli/src/catalog.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ use std::sync::{Arc, Weak};
2020

2121
use crate::object_storage::{get_object_store, AwsOptions, GcpOptions};
2222

23-
use datafusion::catalog::schema::SchemaProvider;
24-
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
23+
use datafusion::catalog::CatalogProviderList;
24+
use datafusion::catalog_api::{CatalogProvider, SchemaProvider};
2525
use datafusion::common::plan_datafusion_err;
2626
use datafusion::datasource::listing::{
2727
ListingTable, ListingTableConfig, ListingTableUrl,

datafusion-cli/src/functions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ use arrow::record_batch::RecordBatch;
2222
use arrow::util::pretty::pretty_format_batches;
2323
use async_trait::async_trait;
2424

25+
use datafusion::catalog_api::CatalogSession;
2526
use datafusion::common::{plan_err, Column};
2627
use datafusion::datasource::function::TableFunctionImpl;
2728
use datafusion::datasource::TableProvider;
2829
use datafusion::error::Result;
29-
use datafusion::execution::context::SessionState;
3030
use datafusion::logical_expr::Expr;
3131
use datafusion::physical_plan::memory::MemoryExec;
3232
use datafusion::physical_plan::ExecutionPlan;
@@ -234,7 +234,7 @@ impl TableProvider for ParquetMetadataTable {
234234

235235
async fn scan(
236236
&self,
237-
_state: &SessionState,
237+
_state: &dyn CatalogSession,
238238
projection: Option<&Vec<usize>>,
239239
_filters: &[Expr],
240240
_limit: Option<usize>,

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
1919
use arrow_schema::SchemaRef;
2020
use async_trait::async_trait;
2121
use bytes::Bytes;
22+
use datafusion::catalog_api::CatalogSession;
2223
use datafusion::datasource::listing::PartitionedFile;
2324
use datafusion::datasource::physical_plan::parquet::{
2425
ParquetAccessPlan, ParquetExecBuilder,
@@ -27,7 +28,6 @@ use datafusion::datasource::physical_plan::{
2728
parquet::ParquetFileReaderFactory, FileMeta, FileScanConfig,
2829
};
2930
use datafusion::datasource::TableProvider;
30-
use datafusion::execution::context::SessionState;
3131
use datafusion::execution::object_store::ObjectStoreUrl;
3232
use datafusion::parquet::arrow::arrow_reader::{
3333
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
@@ -271,7 +271,7 @@ impl IndexTableProvider {
271271
/// to a single predicate like `a = 1 AND b = 2` suitable for execution
272272
fn filters_to_predicate(
273273
&self,
274-
state: &SessionState,
274+
state: &dyn CatalogSession,
275275
filters: &[Expr],
276276
) -> Result<Arc<dyn PhysicalExpr>> {
277277
let df_schema = DFSchema::try_from(self.schema())?;
@@ -463,7 +463,7 @@ impl TableProvider for IndexTableProvider {
463463

464464
async fn scan(
465465
&self,
466-
state: &SessionState,
466+
state: &dyn CatalogSession,
467467
projection: Option<&Vec<usize>>,
468468
filters: &[Expr],
469469
limit: Option<usize>,

datafusion-examples/examples/catalog.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@
1919
use async_trait::async_trait;
2020
use datafusion::{
2121
arrow::util::pretty,
22-
catalog::{
23-
schema::SchemaProvider,
24-
{CatalogProvider, CatalogProviderList},
25-
},
22+
catalog::CatalogProviderList,
23+
catalog_api::{CatalogProvider, SchemaProvider},
2624
datasource::{
2725
file_format::{csv::CsvFormat, FileFormat},
2826
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},

datafusion-examples/examples/custom_datasource.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2626
use datafusion::arrow::record_batch::RecordBatch;
2727
use datafusion::datasource::{provider_as_source, TableProvider, TableType};
2828
use datafusion::error::Result;
29-
use datafusion::execution::context::{SessionState, TaskContext};
29+
use datafusion::execution::context::TaskContext;
3030
use datafusion::physical_plan::memory::MemoryStream;
3131
use datafusion::physical_plan::{
3232
project_schema, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan,
@@ -37,6 +37,7 @@ use datafusion_expr::LogicalPlanBuilder;
3737
use datafusion_physical_expr::EquivalenceProperties;
3838

3939
use async_trait::async_trait;
40+
use datafusion::catalog_api::CatalogSession;
4041
use tokio::time::timeout;
4142

4243
/// This example demonstrates executing a simple query against a custom datasource
@@ -175,7 +176,7 @@ impl TableProvider for CustomDataSource {
175176

176177
async fn scan(
177178
&self,
178-
_state: &SessionState,
179+
_state: &dyn CatalogSession,
179180
projection: Option<&Vec<usize>>,
180181
// filters and limit can be used here to inject some push-down operations if needed
181182
_filters: &[Expr],

datafusion-examples/examples/parquet_index.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@ use arrow::datatypes::Int32Type;
2323
use arrow::util::pretty::pretty_format_batches;
2424
use arrow_schema::SchemaRef;
2525
use async_trait::async_trait;
26+
use datafusion::catalog_api::CatalogSession;
2627
use datafusion::datasource::listing::PartitionedFile;
2728
use datafusion::datasource::physical_plan::{
2829
parquet::StatisticsConverter,
2930
{FileScanConfig, ParquetExec},
3031
};
3132
use datafusion::datasource::TableProvider;
32-
use datafusion::execution::context::SessionState;
3333
use datafusion::execution::object_store::ObjectStoreUrl;
3434
use datafusion::parquet::arrow::{
3535
arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter,
@@ -222,7 +222,7 @@ impl TableProvider for IndexTableProvider {
222222

223223
async fn scan(
224224
&self,
225-
state: &SessionState,
225+
state: &dyn CatalogSession,
226226
projection: Option<&Vec<usize>>,
227227
filters: &[Expr],
228228
limit: Option<usize>,

datafusion-examples/examples/simple_udtf.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,11 @@ use arrow::csv::ReaderBuilder;
2020
use async_trait::async_trait;
2121
use datafusion::arrow::datatypes::SchemaRef;
2222
use datafusion::arrow::record_batch::RecordBatch;
23+
use datafusion::catalog_api::CatalogSession;
2324
use datafusion::datasource::function::TableFunctionImpl;
2425
use datafusion::datasource::TableProvider;
2526
use datafusion::error::Result;
26-
use datafusion::execution::context::{ExecutionProps, SessionState};
27+
use datafusion::execution::context::ExecutionProps;
2728
use datafusion::physical_plan::memory::MemoryExec;
2829
use datafusion::physical_plan::ExecutionPlan;
2930
use datafusion::prelude::SessionContext;
@@ -35,7 +36,6 @@ use std::fs::File;
3536
use std::io::Seek;
3637
use std::path::Path;
3738
use std::sync::Arc;
38-
3939
// To define your own table function, you only need to do the following 3 things:
4040
// 1. Implement your own [`TableProvider`]
4141
// 2. Implement your own [`TableFunctionImpl`] and return your [`TableProvider`]
@@ -95,7 +95,7 @@ impl TableProvider for LocalCsvTable {
9595

9696
async fn scan(
9797
&self,
98-
_state: &SessionState,
98+
_state: &dyn CatalogSession,
9999
projection: Option<&Vec<usize>>,
100100
_filters: &[Expr],
101101
_limit: Option<usize>,

datafusion/catalog/Cargo.toml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "datafusion-catalog"
20+
authors.workspace = true
21+
edition.workspace = true
22+
homepage.workspace = true
23+
license.workspace = true
24+
readme.workspace = true
25+
repository.workspace = true
26+
rust-version.workspace = true
27+
version.workspace = true
28+
29+
[dependencies]
30+
arrow-schema = { workspace = true }
31+
async-trait = "0.1.41"
32+
datafusion-expr = { workspace = true }
33+
datafusion-common = { workspace = true }
34+
datafusion-execution = { workspace = true }
35+
datafusion-physical-plan = { workspace = true }
36+
37+
[lints]
38+
workspace = true

datafusion/catalog/src/catalog.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::any::Any;
19+
use std::sync::Arc;
20+
21+
pub use crate::schema::SchemaProvider;
22+
use datafusion_common::not_impl_err;
23+
use datafusion_common::Result;
24+
25+
/// Represents a catalog, comprising a number of named schemas.
26+
///
27+
/// # Catalog Overview
28+
///
29+
/// To plan and execute queries, DataFusion needs a "Catalog" that provides
30+
/// metadata such as which schemas and tables exist, their columns and data
31+
/// types, and how to access the data.
32+
///
33+
/// The Catalog API consists:
34+
/// * [`CatalogProviderList`]: a collection of `CatalogProvider`s
35+
/// * [`CatalogProvider`]: a collection of `SchemaProvider`s (sometimes called a "database" in other systems)
36+
/// * [`SchemaProvider`]: a collection of `TableProvider`s (often called a "schema" in other systems)
37+
/// * [`TableProvider]`: individual tables
38+
///
39+
/// # Implementing Catalogs
40+
///
41+
/// To implement a catalog, you implement at least one of the [`CatalogProviderList`],
42+
/// [`CatalogProvider`] and [`SchemaProvider`] traits and register them
43+
/// appropriately the [`SessionContext`].
44+
///
45+
/// [`SessionContext`]: crate::execution::context::SessionContext
46+
///
47+
/// DataFusion comes with a simple in-memory catalog implementation,
48+
/// [`MemoryCatalogProvider`], that is used by default and has no persistence.
49+
/// DataFusion does not include more complex Catalog implementations because
50+
/// catalog management is a key design choice for most data systems, and thus
51+
/// it is unlikely that any general-purpose catalog implementation will work
52+
/// well across many use cases.
53+
///
54+
/// # Implementing "Remote" catalogs
55+
///
56+
/// Sometimes catalog information is stored remotely and requires a network call
57+
/// to retrieve. For example, the [Delta Lake] table format stores table
58+
/// metadata in files on S3 that must be first downloaded to discover what
59+
/// schemas and tables exist.
60+
///
61+
/// [Delta Lake]: https://delta.io/
62+
///
63+
/// The [`CatalogProvider`] can support this use case, but it takes some care.
64+
/// The planning APIs in DataFusion are not `async` and thus network IO can not
65+
/// be performed "lazily" / "on demand" during query planning. The rationale for
66+
/// this design is that using remote procedure calls for all catalog accesses
67+
/// required for query planning would likely result in multiple network calls
68+
/// per plan, resulting in very poor planning performance.
69+
///
70+
/// To implement [`CatalogProvider`] and [`SchemaProvider`] for remote catalogs,
71+
/// you need to provide an in memory snapshot of the required metadata. Most
72+
/// systems typically either already have this information cached locally or can
73+
/// batch access to the remote catalog to retrieve multiple schemas and tables
74+
/// in a single network call.
75+
///
76+
/// Note that [`SchemaProvider::table`] is an `async` function in order to
77+
/// simplify implementing simple [`SchemaProvider`]s. For many table formats it
78+
/// is easy to list all available tables but there is additional non trivial
79+
/// access required to read table details (e.g. statistics).
80+
///
81+
/// The pattern that DataFusion itself uses to plan SQL queries is to walk over
82+
/// the query to [find all table references],
83+
/// performing required remote catalog in parallel, and then plans the query
84+
/// using that snapshot.
85+
///
86+
/// [find all table references]: resolve_table_references
87+
///
88+
/// # Example Catalog Implementations
89+
///
90+
/// Here are some examples of how to implement custom catalogs:
91+
///
92+
/// * [`datafusion-cli`]: [`DynamicFileCatalogProvider`] catalog provider
93+
/// that treats files and directories on a filesystem as tables.
94+
///
95+
/// * The [`catalog.rs`]: a simple directory based catalog.
96+
///
97+
/// * [delta-rs]: [`UnityCatalogProvider`] implementation that can
98+
/// read from Delta Lake tables
99+
///
100+
/// [`datafusion-cli`]: https://datafusion.apache.org/user-guide/cli/index.html
101+
/// [`DynamicFileCatalogProvider`]: https://github.com/apache/datafusion/blob/31b9b48b08592b7d293f46e75707aad7dadd7cbc/datafusion-cli/src/catalog.rs#L75
102+
/// [`catalog.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/catalog.rs
103+
/// [delta-rs]: https://github.com/delta-io/delta-rs
104+
/// [`UnityCatalogProvider`]: https://github.com/delta-io/delta-rs/blob/951436ecec476ce65b5ed3b58b50fb0846ca7b91/crates/deltalake-core/src/data_catalog/unity/datafusion.rs#L111-L123
105+
///
106+
/// [`TableProvider]: crate::datasource::TableProvider
107+
108+
pub trait CatalogProvider: Sync + Send {
109+
/// Returns the catalog provider as [`Any`]
110+
/// so that it can be downcast to a specific implementation.
111+
fn as_any(&self) -> &dyn Any;
112+
113+
/// Retrieves the list of available schema names in this catalog.
114+
fn schema_names(&self) -> Vec<String>;
115+
116+
/// Retrieves a specific schema from the catalog by name, provided it exists.
117+
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
118+
119+
/// Adds a new schema to this catalog.
120+
///
121+
/// If a schema of the same name existed before, it is replaced in
122+
/// the catalog and returned.
123+
///
124+
/// By default returns a "Not Implemented" error
125+
fn register_schema(
126+
&self,
127+
name: &str,
128+
schema: Arc<dyn SchemaProvider>,
129+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
130+
// use variables to avoid unused variable warnings
131+
let _ = name;
132+
let _ = schema;
133+
not_impl_err!("Registering new schemas is not supported")
134+
}
135+
136+
/// Removes a schema from this catalog. Implementations of this method should return
137+
/// errors if the schema exists but cannot be dropped. For example, in DataFusion's
138+
/// default in-memory catalog, [`MemoryCatalogProvider`], a non-empty schema
139+
/// will only be successfully dropped when `cascade` is true.
140+
/// This is equivalent to how DROP SCHEMA works in PostgreSQL.
141+
///
142+
/// Implementations of this method should return None if schema with `name`
143+
/// does not exist.
144+
///
145+
/// By default returns a "Not Implemented" error
146+
fn deregister_schema(
147+
&self,
148+
_name: &str,
149+
_cascade: bool,
150+
) -> Result<Option<Arc<dyn SchemaProvider>>> {
151+
not_impl_err!("Deregistering new schemas is not supported")
152+
}
153+
}

0 commit comments

Comments
 (0)