Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Iceberg V2: support aws instance profile auth #50876

Merged
merged 9 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import io.airbyte.cdk.output.ExceptionHandler
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

private val logger = KotlinLogging.logger {}

@Singleton
@Requires(property = Operation.PROPERTY, value = "check")
@Requires(env = ["destination"])
Expand All @@ -40,6 +43,7 @@ class CheckOperation<T : ConfigurationSpecification, C : DestinationConfiguratio
)
outputConsumer.accept(successMessage)
} catch (t: Throwable) {
logger.warn(t) { "Caught throwable during CHECK" }
val (traceMessage, statusMessage) = exceptionHandler.handleCheckFailure(t)
outputConsumer.accept(traceMessage)
outputConsumer.accept(statusMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.11
dockerImageTag: 0.2.12
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package io.airbyte.integrations.destination.s3_data_lake
import io.airbyte.cdk.load.file.s3.S3ClientFactory
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sts.StsClient
Expand Down Expand Up @@ -40,9 +41,13 @@ class GlueCredentialsProvider private constructor(private val delegate: AwsCrede
val provider =
when (mode) {
AWS_CREDENTIALS_MODE_STATIC_CREDS -> {
StaticCredentialsProvider.create(
AwsBasicCredentials.create(accessKey, secretKey)
)
if (accessKey != null && secretKey != null) {
StaticCredentialsProvider.create(
AwsBasicCredentials.create(accessKey, secretKey)
)
} else {
DefaultCredentialsProvider.create()
}
}
AWS_CREDENTIALS_MODE_ASSUME_ROLE -> {
StsAssumeRoleCredentialsProvider.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,27 +354,31 @@ class S3DataLakeUtil(
private fun buildKeyBasedClientProperties(
config: S3DataLakeConfiguration
): Map<String, String> {
val awsAccessKeyId =
requireNotNull(config.awsAccessKeyConfiguration.accessKeyId) {
"AWS Access Key ID is required for key-based authentication"
}
val awsSecretAccessKey =
requireNotNull(config.awsAccessKeyConfiguration.secretAccessKey) {
"AWS Secret Access Key is required for key-based authentication"
}
val clientCredentialsProviderPrefix = "${AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER}."

return mapOf(
S3FileIOProperties.ACCESS_KEY_ID to awsAccessKeyId,
S3FileIOProperties.SECRET_ACCESS_KEY to awsSecretAccessKey,
AwsClientProperties.CLIENT_REGION to config.s3BucketConfiguration.s3BucketRegion.region,
AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER to
GlueCredentialsProvider::class.java.name,
"${AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER}.$AWS_CREDENTIALS_MODE" to
AWS_CREDENTIALS_MODE_STATIC_CREDS,
"${clientCredentialsProviderPrefix}${ACCESS_KEY_ID}" to awsAccessKeyId,
"${clientCredentialsProviderPrefix}${SECRET_ACCESS_KEY}" to awsSecretAccessKey
)
val properties =
mutableMapOf(
AwsClientProperties.CLIENT_REGION to
config.s3BucketConfiguration.s3BucketRegion.region,
AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER to
GlueCredentialsProvider::class.java.name,
"${AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER}.$AWS_CREDENTIALS_MODE" to
AWS_CREDENTIALS_MODE_STATIC_CREDS,
)

// If we don't have explicit S3 creds, fall back to the default creds provider chain.
// For example, this should allow us to use AWS instance profiles.
val awsAccessKeyId = config.awsAccessKeyConfiguration.accessKeyId
val awsSecretAccessKey = config.awsAccessKeyConfiguration.secretAccessKey
if (awsAccessKeyId != null && awsSecretAccessKey != null) {
properties[S3FileIOProperties.ACCESS_KEY_ID] = awsAccessKeyId
properties[S3FileIOProperties.SECRET_ACCESS_KEY] = awsSecretAccessKey
properties["${clientCredentialsProviderPrefix}${ACCESS_KEY_ID}"] = awsAccessKeyId
properties["${clientCredentialsProviderPrefix}${SECRET_ACCESS_KEY}"] =
awsSecretAccessKey
}

return properties
}

fun toIcebergSchema(stream: DestinationStream, pipeline: MapperPipeline): Schema {
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3-data-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ for more information.

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------|:---------------------------------------------|
| 0.2.12 | 2025-01-10 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
| 0.2.11 | 2025-01-10 | [\#50971](https://github.com/airbytehq/airbyte/pull/50971) | Internal refactor in AWS auth flow |
| 0.2.10 | 2025-01-09 | [\#50400](https://github.com/airbytehq/airbyte/pull/50400) | Add S3DataLakeTypesComparator |
| 0.2.9 | 2025-01-09 | [\#51022](https://github.com/airbytehq/airbyte/pull/51022) | Rename all classes and files from Iceberg V2 |
Expand Down
Loading