Skip to content

Commit c077ef5

Browse files
logan-keedealamb
andauthored
move information_schema to datafusion-catalog (#14364)
* move information_schema to datafusion-catalog * fix: formatting * fix: doctests import * Remove unecessary datafuson-catalog dependency * remove some more unecessary dependencies * Update datafusion-cli/Carglo.ock * fix: doctest dependency --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 29e9a1c commit c077ef5

File tree

14 files changed

+280
-280
lines changed

14 files changed

+280
-280
lines changed

datafusion-cli/Cargo.lock

Lines changed: 5 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/catalog/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,19 @@ rust-version.workspace = true
2828
version.workspace = true
2929

3030
[dependencies]
31-
arrow-schema = { workspace = true }
31+
arrow = { workspace = true }
3232
async-trait = { workspace = true }
3333
dashmap = { workspace = true }
3434
datafusion-common = { workspace = true }
3535
datafusion-execution = { workspace = true }
3636
datafusion-expr = { workspace = true }
3737
datafusion-physical-plan = { workspace = true }
38+
datafusion-sql = { workspace = true }
39+
futures = { workspace = true }
3840
itertools = { workspace = true }
41+
log = { workspace = true }
3942
parking_lot = { workspace = true }
43+
sqlparser = { workspace = true }
4044

4145
[dev-dependencies]
4246
tokio = { workspace = true }

datafusion/catalog/src/async.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -430,7 +430,7 @@ mod tests {
430430
},
431431
};
432432

433-
use arrow_schema::SchemaRef;
433+
use arrow::datatypes::SchemaRef;
434434
use async_trait::async_trait;
435435
use datafusion_common::{error::Result, Statistics, TableReference};
436436
use datafusion_execution::config::SessionConfig;

datafusion/core/src/catalog_common/information_schema.rs renamed to datafusion/catalog/src/information_schema.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,31 +19,29 @@
1919
//!
2020
//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
2121
22-
use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider};
23-
use crate::datasource::streaming::StreamingTable;
24-
use crate::execution::context::TaskContext;
25-
use crate::logical_expr::{TableType, Volatility};
26-
use crate::physical_plan::stream::RecordBatchStreamAdapter;
27-
use crate::physical_plan::SendableRecordBatchStream;
28-
use crate::{
29-
config::{ConfigEntry, ConfigOptions},
30-
physical_plan::streaming::PartitionStream,
31-
};
22+
use crate::streaming::StreamingTable;
23+
use crate::{CatalogProviderList, SchemaProvider, TableProvider};
24+
use arrow::array::builder::{BooleanBuilder, UInt8Builder};
3225
use arrow::{
3326
array::{StringBuilder, UInt64Builder},
3427
datatypes::{DataType, Field, Schema, SchemaRef},
3528
record_batch::RecordBatch,
3629
};
37-
use arrow_array::builder::{BooleanBuilder, UInt8Builder};
3830
use async_trait::async_trait;
31+
use datafusion_common::config::{ConfigEntry, ConfigOptions};
3932
use datafusion_common::error::Result;
4033
use datafusion_common::DataFusionError;
34+
use datafusion_execution::TaskContext;
4135
use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, TypeSignature, WindowUDF};
36+
use datafusion_expr::{TableType, Volatility};
37+
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
38+
use datafusion_physical_plan::streaming::PartitionStream;
39+
use datafusion_physical_plan::SendableRecordBatchStream;
4240
use std::collections::{HashMap, HashSet};
4341
use std::fmt::Debug;
4442
use std::{any::Any, sync::Arc};
4543

46-
pub(crate) const INFORMATION_SCHEMA: &str = "information_schema";
44+
pub const INFORMATION_SCHEMA: &str = "information_schema";
4745
pub(crate) const TABLES: &str = "tables";
4846
pub(crate) const VIEWS: &str = "views";
4947
pub(crate) const COLUMNS: &str = "columns";

datafusion/catalog/src/lib.rs

Lines changed: 242 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,264 @@
1818
//! Interfaces and default implementations of catalogs and schemas.
1919
//!
2020
//! Implementations
21+
//! * Information schema: [`information_schema`]
2122
//! * Simple memory based catalog: [`MemoryCatalogProviderList`], [`MemoryCatalogProvider`], [`MemorySchemaProvider`]
2223
2324
pub mod memory;
25+
pub use datafusion_sql::{ResolvedTableReference, TableReference};
2426
pub use memory::{
2527
MemoryCatalogProvider, MemoryCatalogProviderList, MemorySchemaProvider,
2628
};
29+
use std::collections::BTreeSet;
30+
use std::ops::ControlFlow;
2731

2832
mod r#async;
2933
mod catalog;
3034
mod dynamic_file;
35+
pub mod information_schema;
3136
mod schema;
3237
mod session;
3338
mod table;
34-
3539
pub use catalog::*;
3640
pub use dynamic_file::catalog::*;
3741
pub use r#async::*;
3842
pub use schema::*;
3943
pub use session::*;
4044
pub use table::*;
45+
pub mod streaming;
46+
47+
/// Collects all tables and views referenced in the SQL statement. CTEs are collected separately.
48+
/// This can be used to determine which tables need to be in the catalog for a query to be planned.
49+
///
50+
/// # Returns
51+
///
52+
/// A `(table_refs, ctes)` tuple, the first element contains table and view references and the second
53+
/// element contains any CTE aliases that were defined and possibly referenced.
54+
///
55+
/// ## Example
56+
///
57+
/// ```
58+
/// # use datafusion_sql::parser::DFParser;
59+
/// # use datafusion_catalog::resolve_table_references;
60+
/// let query = "SELECT a FROM foo where x IN (SELECT y FROM bar)";
61+
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
62+
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
63+
/// assert_eq!(table_refs.len(), 2);
64+
/// assert_eq!(table_refs[0].to_string(), "bar");
65+
/// assert_eq!(table_refs[1].to_string(), "foo");
66+
/// assert_eq!(ctes.len(), 0);
67+
/// ```
68+
///
69+
/// ## Example with CTEs
70+
///
71+
/// ```
72+
/// # use datafusion_sql::parser::DFParser;
73+
/// # use datafusion_catalog::resolve_table_references;
74+
/// let query = "with my_cte as (values (1), (2)) SELECT * from my_cte;";
75+
/// let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
76+
/// let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
77+
/// assert_eq!(table_refs.len(), 0);
78+
/// assert_eq!(ctes.len(), 1);
79+
/// assert_eq!(ctes[0].to_string(), "my_cte");
80+
/// ```
81+
pub fn resolve_table_references(
82+
statement: &datafusion_sql::parser::Statement,
83+
enable_ident_normalization: bool,
84+
) -> datafusion_common::Result<(Vec<TableReference>, Vec<TableReference>)> {
85+
use datafusion_sql::parser::{
86+
CopyToSource, CopyToStatement, Statement as DFStatement,
87+
};
88+
use datafusion_sql::planner::object_name_to_table_reference;
89+
use information_schema::INFORMATION_SCHEMA;
90+
use information_schema::INFORMATION_SCHEMA_TABLES;
91+
use sqlparser::ast::*;
92+
93+
struct RelationVisitor {
94+
relations: BTreeSet<ObjectName>,
95+
all_ctes: BTreeSet<ObjectName>,
96+
ctes_in_scope: Vec<ObjectName>,
97+
}
98+
99+
impl RelationVisitor {
100+
/// Record the reference to `relation`, if it's not a CTE reference.
101+
fn insert_relation(&mut self, relation: &ObjectName) {
102+
if !self.relations.contains(relation)
103+
&& !self.ctes_in_scope.contains(relation)
104+
{
105+
self.relations.insert(relation.clone());
106+
}
107+
}
108+
}
109+
110+
impl Visitor for RelationVisitor {
111+
type Break = ();
112+
113+
fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> {
114+
self.insert_relation(relation);
115+
ControlFlow::Continue(())
116+
}
117+
118+
fn pre_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
119+
if let Some(with) = &q.with {
120+
for cte in &with.cte_tables {
121+
// The non-recursive CTE name is not in scope when evaluating the CTE itself, so this is valid:
122+
// `WITH t AS (SELECT * FROM t) SELECT * FROM t`
123+
// Where the first `t` refers to a predefined table. So we are careful here
124+
// to visit the CTE first, before putting it in scope.
125+
if !with.recursive {
126+
// This is a bit hackish as the CTE will be visited again as part of visiting `q`,
127+
// but thankfully `insert_relation` is idempotent.
128+
cte.visit(self);
129+
}
130+
self.ctes_in_scope
131+
.push(ObjectName(vec![cte.alias.name.clone()]));
132+
}
133+
}
134+
ControlFlow::Continue(())
135+
}
136+
137+
fn post_visit_query(&mut self, q: &Query) -> ControlFlow<Self::Break> {
138+
if let Some(with) = &q.with {
139+
for _ in &with.cte_tables {
140+
// Unwrap: We just pushed these in `pre_visit_query`
141+
self.all_ctes.insert(self.ctes_in_scope.pop().unwrap());
142+
}
143+
}
144+
ControlFlow::Continue(())
145+
}
146+
147+
fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> {
148+
if let Statement::ShowCreate {
149+
obj_type: ShowCreateObject::Table | ShowCreateObject::View,
150+
obj_name,
151+
} = statement
152+
{
153+
self.insert_relation(obj_name)
154+
}
155+
156+
// SHOW statements will later be rewritten into a SELECT from the information_schema
157+
let requires_information_schema = matches!(
158+
statement,
159+
Statement::ShowFunctions { .. }
160+
| Statement::ShowVariable { .. }
161+
| Statement::ShowStatus { .. }
162+
| Statement::ShowVariables { .. }
163+
| Statement::ShowCreate { .. }
164+
| Statement::ShowColumns { .. }
165+
| Statement::ShowTables { .. }
166+
| Statement::ShowCollation { .. }
167+
);
168+
if requires_information_schema {
169+
for s in INFORMATION_SCHEMA_TABLES {
170+
self.relations.insert(ObjectName(vec![
171+
Ident::new(INFORMATION_SCHEMA),
172+
Ident::new(*s),
173+
]));
174+
}
175+
}
176+
ControlFlow::Continue(())
177+
}
178+
}
179+
180+
let mut visitor = RelationVisitor {
181+
relations: BTreeSet::new(),
182+
all_ctes: BTreeSet::new(),
183+
ctes_in_scope: vec![],
184+
};
185+
186+
fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor) {
187+
match statement {
188+
DFStatement::Statement(s) => {
189+
let _ = s.as_ref().visit(visitor);
190+
}
191+
DFStatement::CreateExternalTable(table) => {
192+
visitor.relations.insert(table.name.clone());
193+
}
194+
DFStatement::CopyTo(CopyToStatement { source, .. }) => match source {
195+
CopyToSource::Relation(table_name) => {
196+
visitor.insert_relation(table_name);
197+
}
198+
CopyToSource::Query(query) => {
199+
query.visit(visitor);
200+
}
201+
},
202+
DFStatement::Explain(explain) => visit_statement(&explain.statement, visitor),
203+
}
204+
}
205+
206+
visit_statement(statement, &mut visitor);
207+
208+
let table_refs = visitor
209+
.relations
210+
.into_iter()
211+
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
212+
.collect::<datafusion_common::Result<_>>()?;
213+
let ctes = visitor
214+
.all_ctes
215+
.into_iter()
216+
.map(|x| object_name_to_table_reference(x, enable_ident_normalization))
217+
.collect::<datafusion_common::Result<_>>()?;
218+
Ok((table_refs, ctes))
219+
}
220+
221+
#[cfg(test)]
222+
mod tests {
223+
use super::*;
224+
225+
#[test]
226+
fn resolve_table_references_shadowed_cte() {
227+
use datafusion_sql::parser::DFParser;
228+
229+
// An interesting edge case where the `t` name is used both as an ordinary table reference
230+
// and as a CTE reference.
231+
let query = "WITH t AS (SELECT * FROM t) SELECT * FROM t";
232+
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
233+
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
234+
assert_eq!(table_refs.len(), 1);
235+
assert_eq!(ctes.len(), 1);
236+
assert_eq!(ctes[0].to_string(), "t");
237+
assert_eq!(table_refs[0].to_string(), "t");
238+
239+
// UNION is a special case where the CTE is not in scope for the second branch.
240+
let query = "(with t as (select 1) select * from t) union (select * from t)";
241+
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
242+
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
243+
assert_eq!(table_refs.len(), 1);
244+
assert_eq!(ctes.len(), 1);
245+
assert_eq!(ctes[0].to_string(), "t");
246+
assert_eq!(table_refs[0].to_string(), "t");
247+
248+
// Nested CTEs are also handled.
249+
// Here the first `u` is a CTE, but the second `u` is a table reference.
250+
// While `t` is always a CTE.
251+
let query = "(with t as (with u as (select 1) select * from u) select * from u cross join t)";
252+
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
253+
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
254+
assert_eq!(table_refs.len(), 1);
255+
assert_eq!(ctes.len(), 2);
256+
assert_eq!(ctes[0].to_string(), "t");
257+
assert_eq!(ctes[1].to_string(), "u");
258+
assert_eq!(table_refs[0].to_string(), "u");
259+
}
260+
261+
#[test]
262+
fn resolve_table_references_recursive_cte() {
263+
use datafusion_sql::parser::DFParser;
264+
265+
let query = "
266+
WITH RECURSIVE nodes AS (
267+
SELECT 1 as id
268+
UNION ALL
269+
SELECT id + 1 as id
270+
FROM nodes
271+
WHERE id < 10
272+
)
273+
SELECT * FROM nodes
274+
";
275+
let statement = DFParser::parse_sql(query).unwrap().pop_back().unwrap();
276+
let (table_refs, ctes) = resolve_table_references(&statement, true).unwrap();
277+
assert_eq!(table_refs.len(), 0);
278+
assert_eq!(ctes.len(), 1);
279+
assert_eq!(ctes[0].to_string(), "nodes");
280+
}
281+
}

datafusion/core/src/datasource/streaming.rs renamed to datafusion/catalog/src/streaming.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ use std::sync::Arc;
2323
use arrow::datatypes::SchemaRef;
2424
use async_trait::async_trait;
2525

26-
use crate::datasource::TableProvider;
27-
use crate::physical_plan::streaming::{PartitionStream, StreamingTableExec};
28-
use crate::physical_plan::ExecutionPlan;
29-
use datafusion_catalog::Session;
26+
use crate::Session;
27+
use crate::TableProvider;
3028
use datafusion_common::{plan_err, Result};
3129
use datafusion_expr::{Expr, TableType};
30+
use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
31+
use datafusion_physical_plan::ExecutionPlan;
3232
use log::debug;
3333

3434
/// A [`TableProvider`] that streams a set of [`PartitionStream`]

datafusion/catalog/src/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use std::fmt::Debug;
2121
use std::sync::Arc;
2222

2323
use crate::session::Session;
24-
use arrow_schema::SchemaRef;
24+
use arrow::datatypes::SchemaRef;
2525
use async_trait::async_trait;
2626
use datafusion_common::Result;
2727
use datafusion_common::{not_impl_err, Constraints, Statistics};
@@ -202,7 +202,7 @@ pub trait TableProvider: Debug + Sync + Send {
202202
/// ```rust
203203
/// # use std::any::Any;
204204
/// # use std::sync::Arc;
205-
/// # use arrow_schema::SchemaRef;
205+
/// # use arrow::datatypes::SchemaRef;
206206
/// # use async_trait::async_trait;
207207
/// # use datafusion_catalog::{TableProvider, Session};
208208
/// # use datafusion_common::Result;

0 commit comments

Comments
 (0)