Skip to content

Commit 3ab2af9

Browse files
committed
Propagate credentials for distributed execution
1 parent 7975400 commit 3ab2af9

File tree

3 files changed

+55
-2
lines changed

3 files changed

+55
-2
lines changed

crates/core/src/delta_datafusion/mod.rs

+33-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ use crate::delta_datafusion::expr::parse_predicate_expression;
7878
use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
7979
use crate::errors::{DeltaResult, DeltaTableError};
8080
use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt};
81-
use crate::logstore::LogStoreRef;
81+
use crate::logstore::{logstore_for, object_store_url, LogStoreRef};
8282
use crate::table::builder::ensure_table_uri;
8383
use crate::table::state::DeltaTableState;
8484
use crate::table::Constraint;
@@ -342,6 +342,7 @@ pub struct DeltaScanConfigBuilder {
342342
enable_parquet_pushdown: bool,
343343
/// Schema to scan table with
344344
schema: Option<SchemaRef>,
345+
options: HashMap<String, String>,
345346
}
346347

347348
impl Default for DeltaScanConfigBuilder {
@@ -352,6 +353,7 @@ impl Default for DeltaScanConfigBuilder {
352353
wrap_partition_values: None,
353354
enable_parquet_pushdown: true,
354355
schema: None,
356+
options: HashMap::new(),
355357
}
356358
}
357359
}
@@ -396,6 +398,11 @@ impl DeltaScanConfigBuilder {
396398
self
397399
}
398400

401+
pub fn with_options(mut self, options: HashMap<String, String>) -> Self {
402+
self.options = options;
403+
self
404+
}
405+
399406
/// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing
400407
pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult<DeltaScanConfig> {
401408
let file_column_name = if self.include_file_column {
@@ -438,6 +445,7 @@ impl DeltaScanConfigBuilder {
438445
wrap_partition_values: self.wrap_partition_values.unwrap_or(true),
439446
enable_parquet_pushdown: self.enable_parquet_pushdown,
440447
schema: self.schema.clone(),
448+
options: self.options.clone(),
441449
})
442450
}
443451
}
@@ -453,6 +461,8 @@ pub struct DeltaScanConfig {
453461
pub enable_parquet_pushdown: bool,
454462
/// Schema to read as
455463
pub schema: Option<SchemaRef>,
464+
465+
pub options: HashMap<String, String>,
456466
}
457467

458468
pub(crate) struct DeltaScanBuilder<'a> {
@@ -730,11 +740,14 @@ impl TableProvider for DeltaTable {
730740
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
731741
register_store(self.log_store(), session.runtime_env().clone());
732742
let filter_expr = conjunction(filters.iter().cloned());
743+
let mut config = DeltaScanConfig::default();
744+
config.options = self.config.options.clone();
733745

734746
let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session)
735747
.with_projection(projection)
736748
.with_limit(limit)
737749
.with_filter(filter_expr)
750+
.with_scan_config(config)
738751
.build()
739752
.await?;
740753

@@ -753,12 +766,15 @@ impl TableProvider for DeltaTable {
753766

754767
register_store(self.log_store(), session_state.runtime_env().clone());
755768
let filter_expr = conjunction(filters.iter().cloned());
769+
let mut config = DeltaScanConfig::default();
770+
config.options = self.config.options.clone();
756771

757772
let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session_state)
758773
.with_projection(projection)
759774
.with_projection_deep(projection_deep)
760775
.with_limit(limit)
761776
.with_filter(filter_expr)
777+
.with_scan_config(config)
762778
.build()
763779
.await?;
764780

@@ -970,6 +986,22 @@ impl ExecutionPlan for DeltaScan {
970986
partition: usize,
971987
context: Arc<TaskContext>,
972988
) -> DataFusionResult<SendableRecordBatchStream> {
989+
let source_uri = Url::parse(self.table_uri.as_str())
990+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
991+
let url_key = object_store_url(&source_uri);
992+
let runtime = context.runtime_env();
993+
if runtime.object_store(url_key).is_err() {
994+
let source_store = logstore_for(source_uri, self.config.options.clone(), None)
995+
.map_err(|e| DataFusionError::External(Box::new(e)))?;
996+
let object_store_url = source_store.object_store_url();
997+
998+
// check if delta store is already registered
999+
if runtime.object_store(object_store_url.clone()).is_err() {
1000+
runtime
1001+
.register_object_store(object_store_url.as_ref(), source_store.object_store());
1002+
}
1003+
}
1004+
// Now that everything is set up, execute the inner Delta
9731005
self.parquet_scan.execute(partition, context)
9741006
}
9751007

crates/core/src/table/builder.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ pub struct DeltaTableConfig {
8282
#[serde(skip_serializing, skip_deserializing)]
8383
/// When a runtime handler is provided, all IO tasks are spawn in that handle
8484
pub io_runtime: Option<IORuntime>,
85+
86+
pub options: HashMap<String, String>
8587
}
8688

8789
impl Default for DeltaTableConfig {
@@ -92,6 +94,7 @@ impl Default for DeltaTableConfig {
9294
log_buffer_size: num_cpus::get() * 4,
9395
log_batch_size: 1024,
9496
io_runtime: None,
97+
options: HashMap::new(),
9598
}
9699
}
97100
}
@@ -314,7 +317,10 @@ impl DeltaTableBuilder {
314317
/// This will not load the log, i.e. the table is not initialized. To get an initialized
315318
/// table use the `load` function
316319
pub fn build(self) -> DeltaResult<DeltaTable> {
317-
Ok(DeltaTable::new(self.build_storage()?, self.table_config))
320+
let log_store = self.build_storage()?;
321+
let mut config = self.table_config;
322+
config.options = self.storage_options.unwrap_or_default();
323+
Ok(DeltaTable::new(log_store, config))
318324
}
319325

320326
/// Build the [`DeltaTable`] and load its state

crates/deltalake/src/lib.rs

+15
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::Once;
12
/**
23
* The deltalake crate is currently just a meta-package shim for deltalake-core
34
*/
@@ -11,3 +12,17 @@ pub use deltalake_azure as azure;
1112
pub use deltalake_gcp as gcp;
1213
#[cfg(feature = "hdfs")]
1314
pub use deltalake_hdfs as hdfs;
15+
16+
pub fn ensure_initialized() {
17+
static INIT: Once = Once::new();
18+
INIT.call_once(|| {
19+
#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
20+
aws::register_handlers(None);
21+
#[cfg(feature = "azure")]
22+
azure::register_handlers(None);
23+
#[cfg(feature = "gcs")]
24+
gcp::register_handlers(None);
25+
#[cfg(feature = "hdfs")]
26+
hdfs::register_handlers(None);
27+
})
28+
}

0 commit comments

Comments
 (0)