Skip to content

Commit

Permalink
feat(python): add capability to read unity catalog (uc://) uris
Browse files Browse the repository at this point in the history
This adds capability to read directly from uc:// uris using the
local catalog-unity crate. This also exposes the UC temporary
credentials in storage_options of the `DeltaTable` instance so
polars or similar readers can use it.

Signed-off-by: Omkar P <[email protected]>
  • Loading branch information
omkar-foss committed Feb 11, 2025
1 parent cf5f38a commit 2fdb6e2
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 7 deletions.
145 changes: 140 additions & 5 deletions crates/catalog-unity/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ compile_error!(
for this crate to function properly."
);

use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION};
use reqwest::Url;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;

use crate::credential::{
AzureCliCredential, ClientSecretOAuthProvider, CredentialProvider, WorkspaceOAuthProvider,
Expand All @@ -19,11 +23,13 @@ use crate::models::{
};

use deltalake_core::data_catalog::DataCatalogResult;
use deltalake_core::{DataCatalog, DataCatalogError};
use deltalake_core::{DataCatalog, DataCatalogError, DeltaResult, DeltaTableBuilder, Path};

use crate::client::retry::*;
use deltalake_core::storage::str_is_truthy;

use deltalake_core::storage::{
factories, str_is_truthy, IORuntime, ObjectStoreFactory, ObjectStoreRef, RetryConfigParse,
StorageOptions,
};
pub mod client;
pub mod credential;
#[cfg(feature = "datafusion")]
Expand Down Expand Up @@ -201,6 +207,11 @@ pub enum UnityCatalogConfigKey {
/// - `azure_use_azure_cli`
/// - `use_azure_cli`
UseAzureCli,

/// Allow http url (e.g. http://localhost:8080/api/2.1/...)
/// Supported keys:
/// - `unity_allow_http_url`
AllowHttpUrl,
}

impl FromStr for UnityCatalogConfigKey {
Expand Down Expand Up @@ -246,6 +257,7 @@ impl FromStr for UnityCatalogConfigKey {
| "unity_workspace_url"
| "databricks_workspace_url"
| "databricks_host" => Ok(UnityCatalogConfigKey::WorkspaceUrl),
"allow_http_url" | "unity_allow_http_url" => Ok(UnityCatalogConfigKey::AllowHttpUrl),
_ => Err(DataCatalogError::UnknownConfigKey {
catalog: "unity",
key: s.to_string(),
Expand All @@ -259,6 +271,7 @@ impl AsRef<str> for UnityCatalogConfigKey {
fn as_ref(&self) -> &str {
match self {
UnityCatalogConfigKey::AccessToken => "unity_access_token",
UnityCatalogConfigKey::AllowHttpUrl => "unity_allow_http_url",
UnityCatalogConfigKey::AuthorityHost => "unity_authority_host",
UnityCatalogConfigKey::AuthorityId => "unity_authority_id",
UnityCatalogConfigKey::ClientId => "unity_client_id",
Expand Down Expand Up @@ -311,6 +324,9 @@ pub struct UnityCatalogBuilder {
/// When set to true, azure cli has to be used for acquiring access token
use_azure_cli: bool,

/// When set to true, http will be allowed in the catalog url
allow_http_url: bool,

/// Retry config
retry_config: RetryConfig,

Expand All @@ -333,6 +349,9 @@ impl UnityCatalogBuilder {
) -> DataCatalogResult<Self> {
match UnityCatalogConfigKey::from_str(key.as_ref())? {
UnityCatalogConfigKey::AccessToken => self.bearer_token = Some(value.into()),
UnityCatalogConfigKey::AllowHttpUrl => {
self.allow_http_url = str_is_truthy(&value.into())
}
UnityCatalogConfigKey::ClientId => self.client_id = Some(value.into()),
UnityCatalogConfigKey::ClientSecret => self.client_secret = Some(value.into()),
UnityCatalogConfigKey::AuthorityId => self.authority_id = Some(value.into()),
Expand Down Expand Up @@ -431,6 +450,45 @@ impl UnityCatalogBuilder {
self
}

/// Returns the storage location and temporary token to be used with the
/// Unity Catalog table.
pub async fn get_uc_location_and_token(
table_uri: &str,
) -> Result<(String, HashMap<String, String>), UnityCatalogError> {
let uri_parts: Vec<&str> = table_uri[5..].split('.').collect();
if uri_parts.len() != 3 {
panic!("Invalid Unity Catalog URI: {}", table_uri);
}

let catalog_id = uri_parts[0];
let database_name = uri_parts[1];
let table_name = uri_parts[2];

let unity_catalog = match UnityCatalogBuilder::from_env().build() {
Ok(uc) => uc,
Err(_e) => panic!("Unable to build Unity Catalog."),
};
let storage_location = match unity_catalog
.get_table_storage_location(Some(catalog_id.to_string()), database_name, table_name)
.await
{
Ok(s) => s,
Err(_e) => panic!("Unable to find the table's storage location."),
};
let temp_creds_res = unity_catalog
.get_temp_table_credentials(catalog_id, database_name, table_name)
.await?;
let credentials = match temp_creds_res {
TableTempCredentialsResponse::Success(temp_creds) => {
temp_creds.get_credentials().unwrap()
}
TableTempCredentialsResponse::Error(_error) => {
panic!("Unable to get temporary credentials from Unity Catalog.")
}
};
Ok((storage_location, credentials))
}

fn get_credential_provider(&self) -> Option<CredentialProvider> {
if let Some(token) = self.bearer_token.as_ref() {
return Some(CredentialProvider::BearerToken(token.clone()));
Expand Down Expand Up @@ -488,7 +546,12 @@ impl UnityCatalogBuilder {
.trim_end_matches('/')
.to_string();

let client = self.client_options.client()?;
let client_options = if self.allow_http_url {
self.client_options.with_allow_http(true)
} else {
self.client_options
};
let client = client_options.client()?;

Ok(UnityCatalog {
client,
Expand Down Expand Up @@ -649,7 +712,7 @@ impl UnityCatalog {
self.catalog_url(),
catalog_id.as_ref(),
database_name.as_ref(),
table_name.as_ref()
table_name.as_ref(),
))
.header(AUTHORIZATION, token)
.send()
Expand Down Expand Up @@ -692,6 +755,67 @@ impl UnityCatalog {
}
}

#[derive(Clone, Default, Debug)]
pub struct UnityCatalogFactory {}

impl RetryConfigParse for UnityCatalogFactory {}

impl ObjectStoreFactory for UnityCatalogFactory {
fn parse_url_opts(
&self,
table_uri: &Url,
options: &StorageOptions,
) -> DeltaResult<(ObjectStoreRef, Path)> {
use futures::executor::block_on;

let result = block_on(UnityCatalogBuilder::get_uc_location_and_token(
table_uri.as_str(),
));

let (table_path, temp_creds) = match result {
Ok(tup) => tup,
Err(_err) => panic!("Unable to get UC location and token."),
};

let mut storage_options = options.0.clone();

if !temp_creds.is_empty() {
storage_options.extend(temp_creds);
}

let mut builder =
DeltaTableBuilder::from_uri(&table_path).with_io_runtime(IORuntime::default());
if !storage_options.is_empty() {
builder = builder.with_storage_options(storage_options.clone());
}

let prefix = Path::parse(table_uri.path())?;
let store = builder.build()?.object_store();

Ok((store, prefix))
}
}

impl LogStoreFactory for UnityCatalogFactory {
fn with_options(
&self,
store: ObjectStoreRef,
location: &Url,
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
Ok(default_logstore(store, location, options))
}
}

/// Register an [ObjectStoreFactory] for common UnityCatalogFactory [Url] schemes
pub fn register_handlers(_additional_prefixes: Option<Url>) {
let factory = Arc::new(UnityCatalogFactory::default());
let scheme = "uc";
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(), factory.clone());
logstores().insert(url.clone(), factory.clone());
}

#[async_trait::async_trait]
impl DataCatalog for UnityCatalog {
type Error = UnityCatalogError;
Expand Down Expand Up @@ -731,6 +855,7 @@ mod tests {
use crate::models::tests::{GET_SCHEMA_RESPONSE, GET_TABLE_RESPONSE, LIST_SCHEMAS_RESPONSE};
use crate::models::*;
use crate::UnityCatalogBuilder;
use deltalake_core::DataCatalog;
use httpmock::prelude::*;

#[tokio::test]
Expand Down Expand Up @@ -788,5 +913,15 @@ mod tests {
get_table_response.unwrap(),
GetTableResponse::Success(_)
));

let storage_location = client
.get_table_storage_location(
Some("catalog_name".to_string()),
"schema_name",
"table_name",
)
.await
.unwrap();
assert!(storage_location.eq_ignore_ascii_case("string"));
}
}
5 changes: 3 additions & 2 deletions crates/core/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,9 @@ fn resolve_uri_type(table_uri: impl AsRef<str>) -> DeltaResult<UriType> {
Ok(UriType::LocalPath(PathBuf::from(table_uri)))
} else {
Err(DeltaTableError::InvalidTableLocation(format!(
"Unknown scheme: {}",
scheme
"Unknown scheme: {}. Known schemes: {}",
scheme,
known_schemes.join(",")
)))
}
} else {
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub use deltalake_core::*;
pub use deltalake_aws as aws;
#[cfg(feature = "azure")]
pub use deltalake_azure as azure;
#[cfg(feature = "unity-experimental")]
pub use deltalake_catalog_unity as unity_catalog;
#[cfg(feature = "gcs")]
pub use deltalake_gcp as gcp;
#[cfg(feature = "hdfs")]
Expand Down
1 change: 1 addition & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2407,6 +2407,7 @@ fn _internal(m: &Bound<'_, PyModule>) -> PyResult<()> {
deltalake::hdfs::register_handlers(None);
deltalake_mount::register_handlers(None);
deltalake::lakefs::register_handlers(None);
deltalake::unity_catalog::register_handlers(None);

let py = m.py();
m.add("DeltaError", py.get_type_bound::<DeltaError>())?;
Expand Down

0 comments on commit 2fdb6e2

Please sign in to comment.