Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sql/src/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ async fn purify_create_sink(
// we _could_ use them to build a complete Iceberg client (both catalog and storage).
// TODO(kynan): Actually use those sink-specific creds here instead of ignoring them.
let _catalog = connection
.connect(storage_configuration, InTask::No)
.connect(storage_configuration, InTask::No, None)
.await
.map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
}
Expand Down
73 changes: 48 additions & 25 deletions src/storage-types/src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,15 +718,28 @@ impl<C: ConnectionAccess> IcebergCatalogConnection<C> {
}

impl IcebergCatalogConnection<InlinedConnection> {
/// Connects to the catalog.
///
/// `credentials_override`, when set, supplies the AWS credentials provider
/// for an S3 Tables catalog instead of building one from the connection. The
/// Iceberg sink passes a prefetched, cached provider here so catalog requests
/// never resolve credentials on their own path. It is ignored for the REST
/// catalog, which is not AWS-authenticated.
pub async fn connect(
&self,
storage_configuration: &StorageConfiguration,
in_task: InTask,
credentials_override: Option<SharedCredentialsProvider>,
) -> Result<Arc<dyn Catalog>, anyhow::Error> {
match self.catalog {
IcebergCatalogImpl::S3TablesRest(ref s3tables) => {
self.connect_s3tables(s3tables, storage_configuration, in_task)
.await
self.connect_s3tables(
s3tables,
storage_configuration,
in_task,
credentials_override,
)
.await
}
IcebergCatalogImpl::Rest(ref rest) => {
self.connect_rest(rest, storage_configuration, in_task)
Expand Down Expand Up @@ -761,28 +774,10 @@ impl IcebergCatalogConnection<InlinedConnection> {
s3tables: &S3TablesRestIcebergCatalog,
storage_configuration: &StorageConfiguration,
in_task: InTask,
credentials_override: Option<SharedCredentialsProvider>,
) -> Result<Arc<dyn Catalog>, anyhow::Error> {
let secret_reader = &storage_configuration.connection_context.secrets_reader;
let aws_ref = &s3tables.aws_connection;
let aws_config = aws_ref
.connection
.load_sdk_config(
&storage_configuration.connection_context,
aws_ref.connection_id,
in_task,
ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
)
.await
.with_context(|| {
format!(
"failed to load AWS SDK config for S3 Tables Iceberg catalog \
(connection id: {}, auth method: {}, catalog uri: {}, warehouse: {})",
aws_ref.connection_id,
aws_ref.connection.auth_method(),
self.uri,
s3tables.warehouse
)
})?;

let aws_region = aws_ref
.connection
Expand Down Expand Up @@ -819,9 +814,37 @@ impl IcebergCatalogConnection<InlinedConnection> {
// Sign REST catalog requests with the Materialize AWS credential chain
// via a custom `RequestAuthenticator`. For AssumeRole auth, also feed
// the chain to OpenDAL's S3 loader so data-file IO uses the same creds.
let credentials_provider = aws_config
.credentials_provider()
.ok_or_else(|| anyhow!("aws_config missing credentials provider"))?;
//
// Prefer the caller-supplied prefetched provider so catalog requests
// never resolve credentials on their own path. Otherwise build one from
// the connection.
let credentials_provider = match credentials_override {
Some(provider) => provider,
None => {
let aws_config = aws_ref
.connection
.load_sdk_config(
&storage_configuration.connection_context,
aws_ref.connection_id,
in_task,
ENFORCE_EXTERNAL_ADDRESSES.get(storage_configuration.config_set()),
)
.await
.with_context(|| {
format!(
"failed to load AWS SDK config for S3 Tables Iceberg catalog \
(connection id: {}, auth method: {}, catalog uri: {}, warehouse: {})",
aws_ref.connection_id,
aws_ref.connection.auth_method(),
self.uri,
s3tables.warehouse
)
})?;
aws_config
.credentials_provider()
.ok_or_else(|| anyhow!("aws_config missing credentials provider"))?
}
};

let authenticator = Arc::new(Sigv4Authenticator {
provider: credentials_provider.clone(),
Expand Down Expand Up @@ -953,7 +976,7 @@ impl IcebergCatalogConnection<InlinedConnection> {
storage_configuration: &StorageConfiguration,
) -> Result<(), ConnectionValidationError> {
let catalog = self
.connect(storage_configuration, InTask::No)
.connect(storage_configuration, InTask::No, None)
.await
.map_err(|e| {
ConnectionValidationError::Other(anyhow!("failed to connect to catalog: {e}"))
Expand Down
Loading
Loading