Skip to content

Commit fbc6de2

Browse files
authored
support EMR on EKS (#2)
1 parent 091bea8 commit fbc6de2

File tree

4 files changed

+116
-4
lines changed

4 files changed

+116
-4
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ mvn clean install -DskipTests
2626
```
2727
This will create *target/spark-streaming-sql-s3-connector-<versiion>.jar* file which contains the connector code and its dependencies. The jar file will also be installed to local maven repository.
2828

29+
The jar file can also be downloaded at https://awslabs-code-us-east-1.s3.amazonaws.com/spark-streaming-sql-s3-connector/spark-streaming-sql-s3-connector-0.0.1.jar. Change the jar file name based on version
30+
2931
Current version is compatible with Spark 3.2 and above.
3032

3133
## How to test

pom.xml

+11-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
<groupId>com.amazonaws</groupId>
2222
<artifactId>spark-streaming-sql-s3-connector</artifactId>
23-
<version>0.0.1</version>
23+
<version>0.0.2</version>
2424
<packaging>jar</packaging>
2525
<name>Spark Structure Streaming S3 Connector</name>
2626

@@ -56,6 +56,11 @@
5656
<artifactId>sqs</artifactId>
5757
<version>${aws.java.sdk2.version}</version>
5858
</dependency>
59+
<dependency>
60+
<groupId>software.amazon.awssdk</groupId>
61+
<artifactId>sts</artifactId>
62+
<version>${aws.java.sdk2.version}</version>
63+
</dependency>
5964
<dependency>
6065
<groupId>software.amazon.awssdk</groupId>
6166
<artifactId>netty-nio-client</artifactId>
@@ -158,7 +163,6 @@
158163
<artifactId>commons-logging</artifactId>
159164
</exclusion>
160165
</exclusions>
161-
<scope>test</scope>
162166
</dependency>
163167
</dependencies>
164168
<build>
@@ -201,6 +205,11 @@
201205
<include>software.amazon.awssdk:apache-client:*</include>
202206
<include>software.amazon.awssdk:arns:*</include>
203207
<include>software.amazon.awssdk:auth:*</include>
208+
<include>software.amazon.awssdk:sts:*</include>
209+
<include>software.amazon.awssdk:netty-nio-client:*</include>
210+
<include>software.amazon.awssdk:http-auth-spi:*</include>
211+
<include>software.amazon.awssdk:http-auth-aws:*</include>
212+
<include>software.amazon.awssdk:http-auth:*</include>
204213
<include>software.amazon.awssdk:aws-core:*</include>
205214
<include>software.amazon.awssdk:aws-query-protocol:*</include>
206215
<include>software.amazon.awssdk:aws-xml-protocol:*</include>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.amazonaws.spark.sql.streaming.connector
18+
19+
import java.io.Closeable
20+
21+
import scala.annotation.tailrec
22+
import scala.util.{Failure, Success, Try}
23+
24+
import software.amazon.awssdk.auth.credentials.{AwsCredentials, AwsCredentialsProvider, DefaultCredentialsProvider}
25+
26+
/**
27+
* Serializable interface providing a method executors can call to obtain an
28+
* AWSCredentialsProvider instance for authenticating to AWS services.
29+
*/
30+
sealed trait ConnectorAwsCredentialsProvider extends Serializable with Closeable {
31+
def provider: AwsCredentialsProvider
32+
override def close(): Unit = {}
33+
}
34+
35+
case class RetryableDefaultCredentialsProvider() extends AwsCredentialsProvider with Closeable {
36+
// private val provider = DefaultCredentialsProvider.create()
37+
38+
private val provider = DefaultCredentialsProvider.builder()
39+
.asyncCredentialUpdateEnabled(true)
40+
.build()
41+
42+
private val MAX_ATTEMPT = 10
43+
private val SLEEP_TIME = 1000
44+
45+
override def resolveCredentials(): AwsCredentials = {
46+
@tailrec
47+
def getCredentialsWithRetry(retries: Int): AwsCredentials = {
48+
Try {
49+
provider.resolveCredentials()
50+
} match {
51+
case Success(credentials) =>
52+
credentials
53+
case Failure(_) if retries > 0 =>
54+
Thread.sleep(SLEEP_TIME)
55+
getCredentialsWithRetry(retries - 1) // Recursive call to retry
56+
case Failure(exception) =>
57+
throw exception
58+
}
59+
}
60+
61+
getCredentialsWithRetry(MAX_ATTEMPT)
62+
}
63+
64+
override def close(): Unit = {
65+
provider.close()
66+
}
67+
}
68+
69+
case class ConnectorDefaultCredentialsProvider() extends ConnectorAwsCredentialsProvider {
70+
71+
private var providerOpt: Option[RetryableDefaultCredentialsProvider] = None
72+
override def provider: AwsCredentialsProvider = {
73+
if (providerOpt.isEmpty) {
74+
providerOpt = Some(RetryableDefaultCredentialsProvider())
75+
}
76+
providerOpt.get
77+
}
78+
79+
override def close(): Unit = {
80+
providerOpt.foreach(_.close())
81+
}
82+
}
83+
84+
85+
class Builder {
86+
def build(): ConnectorAwsCredentialsProvider = {
87+
ConnectorDefaultCredentialsProvider()
88+
}
89+
}
90+
91+
object ConnectorAwsCredentialsProvider {
92+
def builder: Builder = new Builder
93+
}
94+

src/main/scala/com/amazonaws/spark/sql/streaming/connector/client/AsyncSqsClientBuilder.scala

+9-2
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
*/
1717
package com.amazonaws.spark.sql.streaming.connector.client
1818

19-
import com.amazonaws.spark.sql.streaming.connector.{FileMetadata, S3ConnectorSourceOptions}
20-
import com.amazonaws.spark.sql.streaming.connector.Utils.DEFAULT_CONNECTION_ACQUIRE_TIMEOUT
2119
import java.time.Duration
2220
import java.util.function.Consumer
21+
2322
import scala.language.implicitConversions
23+
24+
import com.amazonaws.spark.sql.streaming.connector.{ConnectorAwsCredentialsProvider, FileMetadata, S3ConnectorSourceOptions}
25+
import com.amazonaws.spark.sql.streaming.connector.Utils.DEFAULT_CONNECTION_ACQUIRE_TIMEOUT
26+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
2427
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
2528
import software.amazon.awssdk.core.retry.RetryPolicy
2629
import software.amazon.awssdk.core.retry.backoff.BackoffStrategy
@@ -34,6 +37,9 @@ class AsyncSqsClientBuilder[T] extends AsyncClientBuilder[T] {
3437
var options: S3ConnectorSourceOptions = _
3538
var consumer: (FileMetadata[T]) => Unit = _
3639

40+
private val credentialsProvider: AwsCredentialsProvider =
41+
ConnectorAwsCredentialsProvider.builder.build().provider
42+
3743
implicit def toConsumer[A](function: A => Unit): Consumer[A] = new Consumer[A]() {
3844
override def accept(arg: A): Unit = function.apply(arg)
3945
}
@@ -77,6 +83,7 @@ class AsyncSqsClientBuilder[T] extends AsyncClientBuilder[T] {
7783
)
7884
.region(Region.of(options.queueRegion))
7985
.overrideConfiguration(clientOverrideConfiguration)
86+
.credentialsProvider(credentialsProvider)
8087
.build
8188

8289
}

0 commit comments

Comments
 (0)