-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcustomer_landing_to_trusted.py
62 lines (55 loc) · 2.02 KB
/
customer_landing_to_trusted.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
for alias, frame in mapping.items():
frame.toDF().createOrReplaceTempView(alias)
result = spark.sql(query)
return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node Landing Customer Zone
LandingCustomerZone_node1698089543781 = glueContext.create_dynamic_frame.from_options(
format_options={"multiline": False},
connection_type="s3",
format="json",
connection_options={
"paths": ["s3://whiterose-lake-house/customer/landing/"],
"recurse": True,
},
transformation_ctx="LandingCustomerZone_node1698089543781",
)
# Script generated for node SQL Query
SqlQuery1809 = """
select * from myDataSource
where sharewithresearchasofdate is not null;
"""
SQLQuery_node1699004829590 = sparkSqlQuery(
glueContext,
query=SqlQuery1809,
mapping={"myDataSource": LandingCustomerZone_node1698089543781},
transformation_ctx="SQLQuery_node1699004829590",
)
# Script generated for node Trusted Customer Zone
TrustedCustomerZone_node1698089945465 = glueContext.getSink(
path="s3://whiterose-lake-house/customer/trusted/",
connection_type="s3",
updateBehavior="LOG",
partitionKeys=[],
enableUpdateCatalog=True,
transformation_ctx="TrustedCustomerZone_node1698089945465",
)
TrustedCustomerZone_node1698089945465.setCatalogInfo(
catalogDatabase="whiterose", catalogTableName="customer_trusted"
)
TrustedCustomerZone_node1698089945465.setFormat("json")
TrustedCustomerZone_node1698089945465.writeFrame(SQLQuery_node1699004829590)
job.commit()