Skip to content

Scope information schema to catalog (#4408) #4409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ fn create_runtime_env() -> Result<RuntimeEnv> {
let object_store_provider = DatafusionCliObjectStoreProvider {};
let object_store_registry =
ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider)));
let rn_config = RuntimeConfig::new()
.with_object_store_registry(Arc::new(object_store_registry));
let rn_config =
RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry));
RuntimeEnv::new(rn_config)
}

Expand Down
155 changes: 75 additions & 80 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ use datafusion_common::Result;
use crate::datasource::{MemTable, TableProvider};
use crate::logical_expr::TableType;

use super::{
catalog::{CatalogList, CatalogProvider},
schema::SchemaProvider,
};
use super::{catalog::CatalogProvider, schema::SchemaProvider};

use crate::config::ConfigOptions;

Expand All @@ -52,22 +49,22 @@ const DF_SETTINGS: &str = "df_settings";
/// Wraps another [`CatalogProvider`] and adds a "information_schema"
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
catalog_name: String,
}

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
catalog_name: String,
config_options: Weak<RwLock<ConfigOptions>>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
config_options,
inner,
catalog_name,
}
}
}
Expand All @@ -87,13 +84,12 @@ impl CatalogProvider for CatalogWithInformationSchema {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Weak::upgrade(&self.catalog_list).and_then(|catalog_list| {
Weak::upgrade(&self.config_options).map(|config_options| {
Arc::new(InformationSchemaProvider {
catalog_list,
config_options,
}) as Arc<dyn SchemaProvider>
})
Weak::upgrade(&self.config_options).map(|config_options| {
Arc::new(InformationSchemaProvider {
catalog: Arc::clone(&self.inner),
catalog_name: self.catalog_name.clone(),
config_options,
}) as Arc<dyn SchemaProvider>
})
} else {
self.inner.schema(name)
Expand All @@ -117,7 +113,8 @@ impl CatalogProvider for CatalogWithInformationSchema {
/// providers, they will appear the next time the `information_schema`
/// table is queried.
struct InformationSchemaProvider {
catalog_list: Arc<dyn CatalogList>,
catalog: Arc<dyn CatalogProvider>,
catalog_name: String,
config_options: Arc<RwLock<ConfigOptions>>,
}

Expand All @@ -127,41 +124,47 @@ impl InformationSchemaProvider {
// create a mem table with the names of tables
let mut builder = InformationSchemaTablesBuilder::new();

for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
builder.add_table(
&catalog_name,
&schema_name,
&table_name,
table.table_type(),
);
}
for schema_name in self.catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = self.catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
builder.add_table(
&self.catalog_name,
&schema_name,
&table_name,
table.table_type(),
);
}
}

// Add a final list for the information schema tables themselves
builder.add_table(&catalog_name, INFORMATION_SCHEMA, TABLES, TableType::View);
builder.add_table(&catalog_name, INFORMATION_SCHEMA, VIEWS, TableType::View);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
COLUMNS,
TableType::View,
);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
DF_SETTINGS,
TableType::View,
);
}

// Add a final list for the information schema tables themselves
builder.add_table(
&self.catalog_name,
INFORMATION_SCHEMA,
TABLES,
TableType::View,
);
builder.add_table(
&self.catalog_name,
INFORMATION_SCHEMA,
VIEWS,
TableType::View,
);
builder.add_table(
&self.catalog_name,
INFORMATION_SCHEMA,
COLUMNS,
TableType::View,
);
builder.add_table(
&self.catalog_name,
INFORMATION_SCHEMA,
DF_SETTINGS,
TableType::View,
);

let mem_table: MemTable = builder.into();

Arc::new(mem_table)
Expand All @@ -170,21 +173,17 @@ impl InformationSchemaProvider {
fn make_views(&self) -> Arc<dyn TableProvider> {
let mut builder = InformationSchemaViewBuilder::new();

for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
builder.add_view(
&catalog_name,
&schema_name,
&table_name,
table.get_table_definition(),
)
}
for schema_name in self.catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = self.catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
builder.add_view(
&self.catalog_name,
&schema_name,
&table_name,
table.get_table_definition(),
)
}
}
}
Expand All @@ -197,25 +196,21 @@ impl InformationSchemaProvider {
fn make_columns(&self) -> Arc<dyn TableProvider> {
let mut builder = InformationSchemaColumnsBuilder::new();

for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
for (i, field) in table.schema().fields().iter().enumerate() {
builder.add_column(
&catalog_name,
&schema_name,
&table_name,
field.name(),
i,
field.is_nullable(),
field.data_type(),
)
}
for schema_name in self.catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
let schema = self.catalog.schema(&schema_name).unwrap();
for table_name in schema.table_names() {
let table = schema.table(&table_name).unwrap();
for (i, field) in table.schema().fields().iter().enumerate() {
builder.add_column(
&self.catalog_name,
&schema_name,
&table_name,
field.name(),
i,
field.is_nullable(),
field.data_type(),
)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ impl SessionContext {
let state = self.state.read();
let catalog = if information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&state.catalog_list),
name.clone(),
Arc::downgrade(&state.config.config_options),
catalog,
))
Expand Down Expand Up @@ -1484,7 +1484,7 @@ impl SessionState {

let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&catalog_list),
config.default_catalog.clone(),
Arc::downgrade(&config.config_options),
Arc::new(default_catalog),
))
Expand Down