Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Iceberg V2: support aws instance profile auth #50876

Merged
merged 9 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ import io.airbyte.cdk.output.ExceptionHandler
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import jakarta.inject.Singleton

private val logger = KotlinLogging.logger {}

@Singleton
@Requires(property = Operation.PROPERTY, value = "check")
@Requires(env = ["destination"])
Expand All @@ -40,6 +43,7 @@ class CheckOperation<T : ConfigurationSpecification, C : DestinationConfiguratio
)
outputConsumer.accept(successMessage)
} catch (t: Throwable) {
logger.warn(t) { "Caught throwable during CHECK" }
edgao marked this conversation as resolved.
Show resolved Hide resolved
val (traceMessage, statusMessage) = exceptionHandler.handleCheckFailure(t)
outputConsumer.accept(traceMessage)
outputConsumer.accept(statusMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ data:
alias: airbyte-connector-testing-secret-store
connectorType: destination
definitionId: 716ca874-520b-4902-9f80-9fad66754b89
dockerImageTag: 0.2.7
dockerImageTag: 0.2.8
dockerRepository: airbyte/destination-s3-data-lake
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-data-lake
githubIssueLabel: destination-s3-data-lake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ package io.airbyte.integrations.destination.iceberg.v2
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentials
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider

const val ACCESS_KEY_ID = "access-key-id"
const val SECRET_ACCESS_KEY = "secret-access-key"

class GlueCredentialsProvider private constructor(private val credentials: AwsBasicCredentials) :
class GlueCredentialsProvider private constructor(private val credentials: AwsCredentials) :
AwsCredentialsProvider {
override fun resolveCredentials(): AwsCredentials {
return this.credentials
Expand All @@ -20,13 +21,15 @@ class GlueCredentialsProvider private constructor(private val credentials: AwsBa
companion object {
@JvmStatic
fun create(properties: Map<String, String>): GlueCredentialsProvider {
val accessKey =
properties[ACCESS_KEY_ID]
?: throw IllegalArgumentException("Missing property: access-key-id")
val secretKey =
properties[SECRET_ACCESS_KEY]
?: throw IllegalArgumentException("Missing property: secret-access-key")
return GlueCredentialsProvider(AwsBasicCredentials.create(accessKey, secretKey))
val accessKey = properties[ACCESS_KEY_ID]
val secretKey = properties[SECRET_ACCESS_KEY]
val creds =
if (accessKey != null && secretKey != null) {
AwsBasicCredentials.create(accessKey, secretKey)
} else {
DefaultCredentialsProvider.create().resolveCredentials()
}
return GlueCredentialsProvider(creds)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -340,25 +340,29 @@ class IcebergUtil(
}

private fun buildKeyBasedClientProperties(config: IcebergV2Configuration): Map<String, String> {
val awsAccessKeyId =
requireNotNull(config.awsAccessKeyConfiguration.accessKeyId) {
"AWS Access Key ID is required for key-based authentication"
}
val awsSecretAccessKey =
requireNotNull(config.awsAccessKeyConfiguration.secretAccessKey) {
"AWS Secret Access Key is required for key-based authentication"
}
val clientCredentialsProviderPrefix = "${AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER}."

return mapOf(
S3FileIOProperties.ACCESS_KEY_ID to awsAccessKeyId,
S3FileIOProperties.SECRET_ACCESS_KEY to awsSecretAccessKey,
AwsClientProperties.CLIENT_REGION to config.s3BucketConfiguration.s3BucketRegion.region,
AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER to
GlueCredentialsProvider::class.java.name,
"${clientCredentialsProviderPrefix}${ACCESS_KEY_ID}" to awsAccessKeyId,
"${clientCredentialsProviderPrefix}${SECRET_ACCESS_KEY}" to awsSecretAccessKey
)
val properties =
mutableMapOf(
AwsClientProperties.CLIENT_REGION to
config.s3BucketConfiguration.s3BucketRegion.region,
AwsClientProperties.CLIENT_CREDENTIALS_PROVIDER to
GlueCredentialsProvider::class.java.name,
)

// If we don't have explicit S3 creds, fall back to the default creds provider chain.
// For example, this should allow us to use AWS instance profiles.
val awsAccessKeyId = config.awsAccessKeyConfiguration.accessKeyId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine but now that I see this behavior, I think you are right, we should use that pattern for the Assume role path. I don't think we should make the change in this PR though

val awsSecretAccessKey = config.awsAccessKeyConfiguration.secretAccessKey
if (awsAccessKeyId != null && awsSecretAccessKey != null) {
properties[S3FileIOProperties.ACCESS_KEY_ID] = awsAccessKeyId
properties[S3FileIOProperties.SECRET_ACCESS_KEY] = awsSecretAccessKey
properties["${clientCredentialsProviderPrefix}${ACCESS_KEY_ID}"] = awsAccessKeyId
properties["${clientCredentialsProviderPrefix}${SECRET_ACCESS_KEY}"] =
awsSecretAccessKey
}

return properties
}

fun toIcebergSchema(stream: DestinationStream, pipeline: MapperPipeline): Schema {
Expand Down
9 changes: 5 additions & 4 deletions docs/integrations/destinations/s3-data-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ for more information.
<details>
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------|
| 0.2.7 | 2025-01-00 | [\#50957](https://github.com/airbytehq/airbyte/pull/50991) | Add support for GLUE RBAC (Assume role) |
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------|
| 0.2.8 | 2025-01-09 | [\#50876](https://github.com/airbytehq/airbyte/pull/50876) | Add support for AWS instance profile auth |
| 0.2.7 | 2025-01-08 | [\#50957](https://github.com/airbytehq/airbyte/pull/50957) | Add support for GLUE RBAC (Assume role) |
| 0.2.6 | 2025-01-08 | [\#50991](https://github.com/airbytehq/airbyte/pull/50991) | Initial public release. |

</details>
Loading