Skip to content

Commit aa30abe

Browse files
author
Skylar Pape
committed
Attemps to submit job with dataproc python sdk
1 parent e5a1847 commit aa30abe

File tree

9 files changed

+185
-52
lines changed

9 files changed

+185
-52
lines changed

devops/pyspark-requirements.txt

Lines changed: 0 additions & 2 deletions
This file was deleted.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
click == 8.2.1
2+
pyspark == 3.5.6
3+
# Potential pyspark dependency conflict: https://github.com/GoogleCloudPlatform/dataproc-templates/blob/main/python/requirements.txt#L1
4+
google-dataproc-templates >= 2.0.0b0
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
google-cloud-dataproc >= 5.23.0
2+
google-cloud-storage >= 3.5.0
3+
# For consistency's sake, this is the same version installed on cluster
4+
# and the same version defined in cluster-requirements.txt
5+
click == 8.2.1

src/pyspark/job.py

Lines changed: 31 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,45 @@
1+
from typing import Optional
12
import click
2-
import os
33
from pyspark.sql import SparkSession
4-
5-
from src_config import SrcConfig
6-
from src_db_type import SrcDbType
7-
8-
SOURCE_DB_DRIVERS: dict[SrcDbType, SrcConfig] = {
9-
SrcDbType.POSTGRESQL: SrcConfig(
10-
driver="org.postgresql.Driver",
11-
secret_id="wtr-read-replica",
12-
),
13-
}
4+
from utilities.args import jdbc_to_gbq_options
145

156
@click.command()
16-
@click.option('--src-db-type', required=True, type=click.Choice(SrcDbType, case_sensitive=False), help='Type of source database (e.g. mysql, postgres)')
17-
@click.option('--src-protocol-action', default=None, help='Action specified in JDBC protocol for source database, if any')
18-
@click.option('--src-host', required=True, help='Host of source database')
19-
@click.option('--src-port', required=True, help='Port for source database, if any')
20-
@click.option('--src-db-name', required=True, help='Database name of source database')
21-
@click.option('--src-params', default=None, help='Connection parameters for source database, if any')
22-
@click.option('--src-user', required=True, help='User to connect to source database with')
7+
@jdbc_to_gbq_options
238
def main(
24-
src_db_type: SrcDbType,
25-
src_protocol_action: str,
26-
src_host: str,
27-
src_port: int,
28-
src_db_name: str,
29-
src_params: str,
30-
src_user: str,
9+
input_url_secret: str,
10+
input_driver: str,
11+
input_table: Optional[str] = None,
12+
input_partition_column: Optional[str] = None,
13+
input_lower_bound: Optional[str] = None,
14+
input_upper_bound: Optional[str] = None,
15+
input_fetch_size: Optional[str] = None,
16+
input_session_init_statement: Optional[str] = None,
17+
num_partitions: Optional[int] = None,
18+
output_mode: Optional[str] = None,
3119
):
3220
"""
33-
PySpark job that tests DB replication to GBQ via JDBC
34-
"""
35-
src_jdbc_action = f":{src_protocol_action}" if src_protocol_action else ""
36-
src_jdbc_params = f"?{src_params}" if src_params else ""
37-
src_jdbc_url = f"jdbc:{src_db_type.value}{src_jdbc_action}://{src_host}:{src_port}/{src_db_name}{src_jdbc_params}"
38-
39-
src_config = SOURCE_DB_DRIVERS[src_db_type]
40-
src_properties = {
41-
"user": src_user,
42-
"password": os.environ['WTR_READ_REPLICA_PASSWORD'],
43-
"driver": src_config.driver,
44-
}
21+
PySpark job that replicates a JDBC-connected database to GBQ.
4522
23+
Simple wrapper around this template:
24+
https://github.com/GoogleCloudPlatform/dataproc-templates/tree/main/python/dataproc_templates/jdbc#arguments-2
25+
"""
4626
spark = SparkSession.builder \
47-
.appName("DbReplicationTest") \
48-
.master("spark://localhost:46411") \
27+
.appName("JdbcToGbq") \
4928
.getOrCreate()
5029

51-
data = ["Hello", "World", "PySpark", "Job"]
30+
data = [
31+
input_url_secret,
32+
input_driver,
33+
input_table,
34+
input_partition_column,
35+
input_lower_bound,
36+
input_upper_bound,
37+
input_fetch_size,
38+
input_session_init_statement,
39+
num_partitions,
40+
output_mode
41+
]
42+
print(data)
5243
rdd = spark.sparkContext.parallelize(data)
5344
print(f"Number of elements in RDD: {rdd.count()}")
5445

src/pyspark/src_config.py

Lines changed: 0 additions & 6 deletions
This file was deleted.

src/pyspark/src_db_type.py

Lines changed: 0 additions & 4 deletions
This file was deleted.

src/pyspark/submit_job.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from typing import Optional
2+
import click
3+
import re
4+
from google.cloud import dataproc_v1
5+
from google.cloud import storage
6+
from utilities.args import jdbc_to_gbq_options
7+
8+
# TODO: Make this configurable in CI
9+
GCP_PROJECT = "tmc-data-transfer"
10+
DATAPROC_CLUSTER_NAME = "data-transfer-cluster"
11+
DATAPROC_CLUSTER_REGION = "us-central1"
12+
GCS_PARENT_FOLDER = "gs://dataproc-staging-us-central1-386874222317-aaiovycl/pyspark"
13+
14+
@click.command()
15+
@jdbc_to_gbq_options
16+
def main(**kwargs):
17+
"""
18+
Submits a PySpark job to Dataproc to replicate a JDBC-connected database to GBQ.
19+
20+
Docs & Code Referenced:
21+
- https://docs.cloud.google.com/dataproc/docs/samples/dataproc-submit-job
22+
- https://github.com/googleapis/google-cloud-python/blob/2feb74032fd9c5cc7eaf6072ab03e9e8397bd434/packages/google-cloud-dataproc/google/cloud/dataproc_v1/types/jobs.py#L305
23+
"""
24+
# TODO: Make this configurable in CI
25+
job_client = dataproc_v1.JobControllerClient(
26+
client_options={
27+
"api_endpoint": f"{DATAPROC_CLUSTER_REGION}-dataproc.googleapis.com:443",
28+
}
29+
)
30+
31+
gcs_utilities_folder = f"{GCS_PARENT_FOLDER}/utilities"
32+
args = []
33+
for key, value in kwargs.items():
34+
if value is not None:
35+
args.append(f"--{key.replace('_', '-')}")
36+
args.append(value)
37+
job_config = {
38+
"placement": {
39+
"cluster_name": DATAPROC_CLUSTER_NAME,
40+
},
41+
"pyspark_job": {
42+
"main_python_file_uri": f"{GCS_PARENT_FOLDER}/cluster/job.py",
43+
"python_file_uris": [
44+
f"{gcs_utilities_folder}/args.py",
45+
f"{gcs_utilities_folder}/driver.py"
46+
],
47+
"args": args,
48+
},
49+
}
50+
51+
operation = job_client.submit_job_as_operation(
52+
request={
53+
"project_id": GCP_PROJECT,
54+
"region": DATAPROC_CLUSTER_REGION,
55+
"job": job_config
56+
}
57+
)
58+
response = operation.result()
59+
60+
# Dataproc job output is saved to the Cloud Storage bucket
61+
# allocated to the job. Use regex to obtain the bucket and blob info.
62+
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)
63+
output = (
64+
storage.Client()
65+
.get_bucket(matches.group(1))
66+
.blob(f"{matches.group(2)}.000000000")
67+
.download_as_bytes()
68+
.decode("utf-8")
69+
)
70+
print(f"Job finished successfully: {output}\r\n")
71+
72+
if __name__ == '__main__':
73+
main()

src/pyspark/utilities/args.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import click
2+
import functools
3+
4+
from utilities.driver import Driver
5+
6+
def jdbc_to_gbq_options(f: callable) -> callable:
7+
"""
8+
Click args for the JDBC to GBQ PySpark template:
9+
https://github.com/GoogleCloudPlatform/dataproc-templates/tree/main/python/dataproc_templates/jdbc#arguments-2
10+
"""
11+
@click.option(
12+
'--input-url-secret',
13+
required=True,
14+
help='Name of the Google secret whose value is a JDBC url that includes a password to connect to the input database',
15+
)
16+
@click.option(
17+
'--input-driver',
18+
required=True,
19+
type=click.Choice(Driver),
20+
help='Enum value for JDBC input driver name',
21+
)
22+
@click.option(
23+
'--input-table',
24+
required=True,
25+
help='JDBC input table name',
26+
)
27+
@click.option(
28+
'--input-partition-column',
29+
required=False,
30+
help='JDBC input table partition column name',
31+
)
32+
@click.option(
33+
'--input-lower-bound',
34+
required=False,
35+
help='JDBC input table partition column lower bound which is used to decide the partition stride',
36+
)
37+
@click.option(
38+
'--input-upper-bound',
39+
required=False,
40+
help='JDBC input table partition column upper bound which is used to decide the partition stride',
41+
)
42+
@click.option(
43+
'--input-fetch-size',
44+
required=False,
45+
help='Determines how many rows to fetch per round trip',
46+
)
47+
@click.option(
48+
'--input-session-init-statement',
49+
required=False,
50+
help='Custom SQL statement to execute in each reader database session',
51+
)
52+
@click.option(
53+
'--num-partitions',
54+
required=False,
55+
help='The maximum number of partitions that can be used for parallelism in table reading and writing. Same value will be used for both input and output jdbc connection. Default set to 10',
56+
)
57+
@click.option(
58+
'--output-mode',
59+
required=False,
60+
help='Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append)',
61+
)
62+
@functools.wraps(f)
63+
def wrapper(**kwargs):
64+
return f(**kwargs)
65+
return wrapper

src/pyspark/utilities/driver.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from enum import Enum
2+
3+
class Driver(Enum):
4+
MYSQL = "com.mysql.cj.jdbc.Driver"
5+
POSTGRESQL = "org.postgresql.Driver"
6+
SQL_SERVER = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
7+
ORACLE = "oracle.jdbc.driver.OracleDriver"

0 commit comments

Comments
 (0)