Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expand ListingSchemaProvider to support register and deregister table #3150

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 43 additions & 20 deletions crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use dashmap::DashMap;
Expand All @@ -13,9 +13,9 @@ use futures::TryStreamExt;
use object_store::ObjectStore;

use crate::errors::DeltaResult;
use crate::open_table_with_storage_options;
use crate::storage::*;
use crate::table::builder::ensure_table_uri;
use crate::DeltaTableBuilder;

const DELTA_LOG_FOLDER: &str = "_delta_log";

Expand All @@ -36,7 +36,7 @@ pub struct ListingSchemaProvider {
/// Underlying object store
store: Arc<dyn ObjectStore>,
/// A map of table names to a fully quilfied storage location
tables: DashMap<String, String>,
tables: Mutex<DashMap<String, Arc<dyn TableProvider>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to wrap the DashMap in a Mutex? I thought that DashMap handles mutability / locking internally and can just be put in an Arc if it needs to be shared across threads?

Then again, I might be remembering that wrong.

/// Options used to create underlying object stores
storage_options: StorageOptions,
}
Expand All @@ -54,7 +54,7 @@ impl ListingSchemaProvider {
Ok(Self {
authority: uri.to_string(),
store,
tables: DashMap::new(),
tables: Mutex::new(DashMap::new()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the necessity of the lock? Dashmap shouldn't need a locking mechanism, it's a direct replacement for things such as this.

storage_options,
})
}
Expand All @@ -73,6 +73,7 @@ impl ListingSchemaProvider {
parent = p;
}
}

for table in tables.into_iter() {
let table_name = normalize_table_name(table)?;
let table_path = table
Expand All @@ -81,7 +82,14 @@ impl ListingSchemaProvider {
.to_string();
if !self.table_exist(&table_name) {
let table_url = format!("{}/{}", self.authority, table_path);
self.tables.insert(table_name.to_string(), table_url);
let Ok(delta_table) = DeltaTableBuilder::from_uri(table_url)
.with_storage_options(self.storage_options.0.clone())
.load()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to load every table on listing. You only need a list of tables, loading them gives you much more than that. Additionally, this might be a performance issue or cause a lot of unnecessary API calls if you have a very large catalog and never read 99% of the tables in it.

.await
else {
continue;
};
let _ = self.register_table(table_name, Arc::new(delta_table));
}
}
Ok(())
Expand All @@ -108,39 +116,54 @@ impl SchemaProvider for ListingSchemaProvider {
}

fn table_names(&self) -> Vec<String> {
self.tables.iter().map(|t| t.key().clone()).collect()
self.tables
.lock()
.expect("Can't lock tables")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a reason to keep the locking mechanism I think we should handle these errors instead of an expect panic when we can't acquire a lock.

.iter()
.map(|t| t.key().clone())
.collect()
}

async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let Some(location) = self.tables.get(name).map(|t| t.clone()) else {
let Some(provider) = self
.tables
.lock()
.expect("Can't lock tables")
.get(name)
.map(|t| t.clone())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would .cloned() suffice here instead?

else {
return Ok(None);
};
let provider =
open_table_with_storage_options(location, self.storage_options.0.clone()).await?;
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
Ok(Some(provider))
}

fn register_table(
&self,
_name: String,
_table: Arc<dyn TableProvider>,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support registering tables".to_owned(),
))
self.tables
.lock()
.expect("Can't lock tables")
.insert(name, table.clone());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will replace the table provider if it currently exists, are you sure this is what you want?

Ok(Some(table))
}

fn deregister_table(
&self,
_name: &str,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
Err(DataFusionError::Execution(
"schema provider does not support deregistering tables".to_owned(),
))
if let Some(table) = self.tables.lock().expect("Can't lock tables").remove(name) {
return Ok(Some(table.1));
}
Ok(None)
}

fn table_exist(&self, name: &str) -> bool {
self.tables.contains_key(name)
self.tables
.lock()
.expect("Can't lock tables")
.contains_key(name)
}
}

Expand Down
Loading