-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSpark_scala_call_procedure.scala
60 lines (50 loc) · 2.24 KB
/
Spark_scala_call_procedure.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import com.amazonaws.services.glue.ChoiceOption
import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.MappingSpec
import com.amazonaws.services.glue.ResolveSpec
import com.amazonaws.services.glue.errors.CallSite
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job
import com.amazonaws.services.glue.util.JsonOptions
import org.apache.spark.SparkContext
import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import com.amazonaws.services.glue.log.GlueLogger
import com.amazonaws.AmazonWebServiceRequest
import com.amazonaws.services.secretsmanager._
import com.amazonaws.services.secretsmanager.model._
object GlueApp {
def main(sysArgs: Array[String]) {
val spark: SparkContext = new SparkContext()
val glueContext: GlueContext = new GlueContext(spark)
// @params: [JOB_NAME]
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val logger = new GlueLogger
val client = AWSSecretsManagerClientBuilder.standard().build()
val secretName = "secretName"
val getSecretValueRequest = new GetSecretValueRequest().withSecretId(secretName)
val getSecretValueResult = client.getSecretValue(getSecretValueRequest)
val secret = JsonOptions(getSecretValueResult.getSecretString()).toMap
val jdbc_driver : String = "org.postgresql.Driver"
val jdbc_url : String = "jdbc:postgresql://" + secret("host") + ":" + secret("port") + "/" + secret("dbname")
import java.sql.{Connection, DriverManager, ResultSet}
classOf[org.postgresql.Driver]
val connection_string = "jdbc:postgresql://"+secret("host")+":"+secret("port")+"/"+secret("dbname")+"?user="+secret("username").toString+"&password="+secret("password").toString
val conn = DriverManager.getConnection(connection_string)
try {
val stm = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
val rs = stm.execute("call pssetl.test()")
println(rs)
}
catch {
case ex: Exception => println(ex)
}
finally {
conn.close()
}
Job.commit()
println("DONE.")
}
}