forked from buildkite/python-pipenv-example
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdbt_cloud_ci copy.py.bak
135 lines (106 loc) · 3.58 KB
/
dbt_cloud_ci copy.py.bak
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# python/dbt_cloud.py
from urllib.request import Request, urlopen
import argparse
import json
import time
import os
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--steps", type=str, nargs="+")
args = parser.parse_args()
# fmt: off
api_base = os.getenv("DBT_CLOUD_URL", "https://cloud.getdbt.com")
job_cause = os.getenv("DBT_CLOUD_JOB_CAUSE", "github_actions_pull_request")
git_branch = os.getenv("DBT_CLOUD_JOB_BRANCH", None)
schema_override = os.getenv("DBT_CLOUD_JOB_SCHEMA_OVERRIDE", None)
api_token = os.environ["DBT_CLOUD_API_TOKEN"]
account_id = os.environ["DBT_CLOUD_ACCOUNT_ID"]
project_id = os.environ["DBT_CLOUD_PROJECT_ID"]
job_id = os.environ["DBT_CLOUD_JOB_ID"]
job_config = f"""
Request configuration:
api_base: {api_base}
job_cause: {job_cause}
git_branch: {git_branch}
schema_override: {schema_override}
account_id: {account_id}
project_id: {project_id}
job_id: {job_id}
"""
# fmt: on
req_auth_header = {
"Authorization": f"Token {api_token}",
"Content-Type": "application/json",
}
req_job_url = f"{api_base}/api/v2/accounts/{account_id}/jobs/{job_id}/run/"
run_status_map = {
1: "Queued",
2: "Starting",
3: "Running",
10: "Success",
20: "Error",
30: "Cancelled",
}
def run_dbt_cloud_job(
url,
headers,
cause,
branch=None,
schema_override=None,
steps=None,
) -> int:
"""Trigger a dbt Cloud job and returns the job id."""
req_payload = {"cause": cause}
if branch:
req_payload["git_branch"] = branch.replace("refs/heads/", "")
if schema_override:
req_payload["schema_override"] = schema_override.replace("-", "_")
if steps:
req_payload["steps_override"] = steps
print(f"Triggering job:\n url: {url}\n payload: {req_payload}\n")
data = json.dumps(req_payload).encode()
request = Request(method="POST", data=data, headers=headers, url=url)
with urlopen(request) as req:
response = req.read().decode("utf-8")
run_job_resp = json.loads(response)
return run_job_resp["data"]["id"]
def get_run_status(url, headers) -> str:
"""Get the status of a running dbt Cloud job."""
request = Request(headers=headers, url=url)
with urlopen(request) as req:
response = req.read().decode("utf-8")
req_status_resp = json.loads(response)
run_status_code = req_status_resp["data"]["status"]
run_status = run_status_map[run_status_code]
return run_status
def main():
print(job_config)
job_steps = args.steps
run_id: int = None
try:
run_id = run_dbt_cloud_job(
url=req_job_url,
headers=req_auth_header,
cause=job_cause,
branch=git_branch,
schema_override=schema_override,
steps=job_steps,
)
except Exception as e:
print(f"ERROR! - Could not trigger dbt Cloud job:\n{e}")
raise
req_status_url = f"{api_base}/api/v2/accounts/{account_id}/runs/{run_id}/"
run_status_link = (
f"{api_base}/#/accounts/{account_id}/projects/{project_id}/runs/{run_id}/"
)
time.sleep(30)
while True:
run_status = get_run_status(req_status_url, req_auth_header)
print(f"Run status -> {run_status}")
if run_status in ["Error", "Cancelled"]:
raise Exception(f"Run failed or cancelled. See why at {run_status_link}")
if run_status == "Success":
print(f"Job completed successfully! See details at {run_status_link}")
return
time.sleep(10)
if __name__ == "__main__":
main()