Skip to content

Commit 732db47

Browse files
committed
Scope information schema to catalog (apache#4408)
1 parent a31b44e commit 732db47

File tree

3 files changed

+79
-84
lines changed

3 files changed

+79
-84
lines changed

datafusion-cli/src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,8 @@ fn create_runtime_env() -> Result<RuntimeEnv> {
146146
let object_store_provider = DatafusionCliObjectStoreProvider {};
147147
let object_store_registry =
148148
ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider)));
149-
let rn_config = RuntimeConfig::new()
150-
.with_object_store_registry(Arc::new(object_store_registry));
149+
let rn_config =
150+
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));
151151
RuntimeEnv::new(rn_config)
152152
}
153153

datafusion/core/src/catalog/information_schema.rs

Lines changed: 75 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@ use datafusion_common::Result;
3636
use crate::datasource::{MemTable, TableProvider};
3737
use crate::logical_expr::TableType;
3838

39-
use super::{
40-
catalog::{CatalogList, CatalogProvider},
41-
schema::SchemaProvider,
42-
};
39+
use super::{catalog::CatalogProvider, schema::SchemaProvider};
4340

4441
use crate::config::ConfigOptions;
4542

@@ -52,22 +49,22 @@ const DF_SETTINGS: &str = "df_settings";
5249
/// Wraps another [`CatalogProvider`] and adds a "information_schema"
5350
/// schema that can introspect on tables in the catalog_list
5451
pub(crate) struct CatalogWithInformationSchema {
55-
catalog_list: Weak<dyn CatalogList>,
5652
config_options: Weak<RwLock<ConfigOptions>>,
5753
/// wrapped provider
5854
inner: Arc<dyn CatalogProvider>,
55+
catalog_name: String,
5956
}
6057

6158
impl CatalogWithInformationSchema {
6259
pub(crate) fn new(
63-
catalog_list: Weak<dyn CatalogList>,
60+
catalog_name: String,
6461
config_options: Weak<RwLock<ConfigOptions>>,
6562
inner: Arc<dyn CatalogProvider>,
6663
) -> Self {
6764
Self {
68-
catalog_list,
6965
config_options,
7066
inner,
67+
catalog_name,
7168
}
7269
}
7370
}
@@ -87,13 +84,12 @@ impl CatalogProvider for CatalogWithInformationSchema {
8784

8885
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
8986
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
90-
Weak::upgrade(&self.catalog_list).and_then(|catalog_list| {
91-
Weak::upgrade(&self.config_options).map(|config_options| {
92-
Arc::new(InformationSchemaProvider {
93-
catalog_list,
94-
config_options,
95-
}) as Arc<dyn SchemaProvider>
96-
})
87+
Weak::upgrade(&self.config_options).map(|config_options| {
88+
Arc::new(InformationSchemaProvider {
89+
catalog: Arc::clone(&self.inner),
90+
catalog_name: self.catalog_name.clone(),
91+
config_options,
92+
}) as Arc<dyn SchemaProvider>
9793
})
9894
} else {
9995
self.inner.schema(name)
@@ -117,7 +113,8 @@ impl CatalogProvider for CatalogWithInformationSchema {
117113
/// providers, they will appear the next time the `information_schema`
118114
/// table is queried.
119115
struct InformationSchemaProvider {
120-
catalog_list: Arc<dyn CatalogList>,
116+
catalog: Arc<dyn CatalogProvider>,
117+
catalog_name: String,
121118
config_options: Arc<RwLock<ConfigOptions>>,
122119
}
123120

@@ -127,41 +124,47 @@ impl InformationSchemaProvider {
127124
// create a mem table with the names of tables
128125
let mut builder = InformationSchemaTablesBuilder::new();
129126

130-
for catalog_name in self.catalog_list.catalog_names() {
131-
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
132-
133-
for schema_name in catalog.schema_names() {
134-
if schema_name != INFORMATION_SCHEMA {
135-
let schema = catalog.schema(&schema_name).unwrap();
136-
for table_name in schema.table_names() {
137-
let table = schema.table(&table_name).unwrap();
138-
builder.add_table(
139-
&catalog_name,
140-
&schema_name,
141-
&table_name,
142-
table.table_type(),
143-
);
144-
}
127+
for schema_name in self.catalog.schema_names() {
128+
if schema_name != INFORMATION_SCHEMA {
129+
let schema = self.catalog.schema(&schema_name).unwrap();
130+
for table_name in schema.table_names() {
131+
let table = schema.table(&table_name).unwrap();
132+
builder.add_table(
133+
&self.catalog_name,
134+
&schema_name,
135+
&table_name,
136+
table.table_type(),
137+
);
145138
}
146139
}
147-
148-
// Add a final list for the information schema tables themselves
149-
builder.add_table(&catalog_name, INFORMATION_SCHEMA, TABLES, TableType::View);
150-
builder.add_table(&catalog_name, INFORMATION_SCHEMA, VIEWS, TableType::View);
151-
builder.add_table(
152-
&catalog_name,
153-
INFORMATION_SCHEMA,
154-
COLUMNS,
155-
TableType::View,
156-
);
157-
builder.add_table(
158-
&catalog_name,
159-
INFORMATION_SCHEMA,
160-
DF_SETTINGS,
161-
TableType::View,
162-
);
163140
}
164141

142+
// Add a final list for the information schema tables themselves
143+
builder.add_table(
144+
&self.catalog_name,
145+
INFORMATION_SCHEMA,
146+
TABLES,
147+
TableType::View,
148+
);
149+
builder.add_table(
150+
&self.catalog_name,
151+
INFORMATION_SCHEMA,
152+
VIEWS,
153+
TableType::View,
154+
);
155+
builder.add_table(
156+
&self.catalog_name,
157+
INFORMATION_SCHEMA,
158+
COLUMNS,
159+
TableType::View,
160+
);
161+
builder.add_table(
162+
&self.catalog_name,
163+
INFORMATION_SCHEMA,
164+
DF_SETTINGS,
165+
TableType::View,
166+
);
167+
165168
let mem_table: MemTable = builder.into();
166169

167170
Arc::new(mem_table)
@@ -170,21 +173,17 @@ impl InformationSchemaProvider {
170173
fn make_views(&self) -> Arc<dyn TableProvider> {
171174
let mut builder = InformationSchemaViewBuilder::new();
172175

173-
for catalog_name in self.catalog_list.catalog_names() {
174-
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
175-
176-
for schema_name in catalog.schema_names() {
177-
if schema_name != INFORMATION_SCHEMA {
178-
let schema = catalog.schema(&schema_name).unwrap();
179-
for table_name in schema.table_names() {
180-
let table = schema.table(&table_name).unwrap();
181-
builder.add_view(
182-
&catalog_name,
183-
&schema_name,
184-
&table_name,
185-
table.get_table_definition(),
186-
)
187-
}
176+
for schema_name in self.catalog.schema_names() {
177+
if schema_name != INFORMATION_SCHEMA {
178+
let schema = self.catalog.schema(&schema_name).unwrap();
179+
for table_name in schema.table_names() {
180+
let table = schema.table(&table_name).unwrap();
181+
builder.add_view(
182+
&self.catalog_name,
183+
&schema_name,
184+
&table_name,
185+
table.get_table_definition(),
186+
)
188187
}
189188
}
190189
}
@@ -197,25 +196,21 @@ impl InformationSchemaProvider {
197196
fn make_columns(&self) -> Arc<dyn TableProvider> {
198197
let mut builder = InformationSchemaColumnsBuilder::new();
199198

200-
for catalog_name in self.catalog_list.catalog_names() {
201-
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
202-
203-
for schema_name in catalog.schema_names() {
204-
if schema_name != INFORMATION_SCHEMA {
205-
let schema = catalog.schema(&schema_name).unwrap();
206-
for table_name in schema.table_names() {
207-
let table = schema.table(&table_name).unwrap();
208-
for (i, field) in table.schema().fields().iter().enumerate() {
209-
builder.add_column(
210-
&catalog_name,
211-
&schema_name,
212-
&table_name,
213-
field.name(),
214-
i,
215-
field.is_nullable(),
216-
field.data_type(),
217-
)
218-
}
199+
for schema_name in self.catalog.schema_names() {
200+
if schema_name != INFORMATION_SCHEMA {
201+
let schema = self.catalog.schema(&schema_name).unwrap();
202+
for table_name in schema.table_names() {
203+
let table = schema.table(&table_name).unwrap();
204+
for (i, field) in table.schema().fields().iter().enumerate() {
205+
builder.add_column(
206+
&self.catalog_name,
207+
&schema_name,
208+
&table_name,
209+
field.name(),
210+
i,
211+
field.is_nullable(),
212+
field.data_type(),
213+
)
219214
}
220215
}
221216
}

datafusion/core/src/execution/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -865,7 +865,7 @@ impl SessionContext {
865865
let state = self.state.read();
866866
let catalog = if information_schema {
867867
Arc::new(CatalogWithInformationSchema::new(
868-
Arc::downgrade(&state.catalog_list),
868+
name.clone(),
869869
Arc::downgrade(&state.config.config_options),
870870
catalog,
871871
))
@@ -1484,7 +1484,7 @@ impl SessionState {
14841484

14851485
let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
14861486
Arc::new(CatalogWithInformationSchema::new(
1487-
Arc::downgrade(&catalog_list),
1487+
config.default_catalog.clone(),
14881488
Arc::downgrade(&config.config_options),
14891489
Arc::new(default_catalog),
14901490
))

0 commit comments

Comments
 (0)