Skip to content

Commit 1578443

Browse files
Add Error handling Pipeline Execution (#1466)
Added Better Error Handling and additional test case to test out different behaviours of the `pipeline.py`
1 parent e0c4d1a commit 1578443

File tree

6 files changed

+71
-3
lines changed

6 files changed

+71
-3
lines changed

src/databricks/labs/remorph/assessments/pipeline.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,14 @@ def _execute_sql_step(self, step: Step):
4646

4747
# Execute the query using the database manager
4848
logging.info(f"Executing query: {query}")
49-
result = self.executor.execute_query(query)
49+
try:
50+
result = self.executor.execute_query(query)
5051

51-
# Save the result to duckdb
52-
self._save_to_db(result, step.name, str(step.mode))
52+
# Save the result to duckdb
53+
self._save_to_db(result, step.name, str(step.mode))
54+
except Exception as e:
55+
logging.error(f"SQL execution failed: {str(e)}")
56+
raise RuntimeError(f"SQL execution failed: {str(e)}") from e
5357

5458
def _execute_python_step(self, step: Step):
5559
logging.debug(f"Executing Python script: {step.extract_source}")

tests/integration/assessments/test_pipeline.py

+32
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,44 @@ def pipeline_config():
2222
return config
2323

2424

25+
@pytest.fixture(scope="module")
26+
def sql_failure_config():
27+
prefix = Path(__file__).parent
28+
config_path = f"{prefix}/../../resources/assessments/pipeline_config_sql_failure.yml"
29+
config = PipelineClass.load_config_from_yaml(config_path)
30+
for step in config.steps:
31+
step.extract_source = f"{prefix}/../../{step.extract_source}"
32+
return config
33+
34+
35+
@pytest.fixture(scope="module")
36+
def python_failure_config():
37+
prefix = Path(__file__).parent
38+
config_path = f"{prefix}/../../resources/assessments/pipeline_config_python_failure.yml"
39+
config = PipelineClass.load_config_from_yaml(config_path)
40+
for step in config.steps:
41+
step.extract_source = f"{prefix}/../../{step.extract_source}"
42+
return config
43+
44+
2545
def test_run_pipeline(extractor, pipeline_config, get_logger):
2646
pipeline = PipelineClass(config=pipeline_config, executor=extractor)
2747
pipeline.execute()
2848
assert verify_output(get_logger, pipeline_config.extract_folder)
2949

3050

51+
def test_run_sql_failure_pipeline(extractor, sql_failure_config, get_logger):
52+
pipeline = PipelineClass(config=sql_failure_config, executor=extractor)
53+
with pytest.raises(RuntimeError, match="SQL execution failed"):
54+
pipeline.execute()
55+
56+
57+
def test_run_python_failure_pipeline(extractor, python_failure_config, get_logger):
58+
pipeline = PipelineClass(config=python_failure_config, executor=extractor)
59+
with pytest.raises(RuntimeError, match="Script execution failed"):
60+
pipeline.execute()
61+
62+
3163
def verify_output(get_logger, path):
3264
conn = duckdb.connect(str(Path(path)) + "/" + DB_NAME)
3365

Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
SELECT * FROM non_existent_table;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import sys
2+
import json
3+
4+
5+
def main():
6+
print(json.dumps({"status": "error", "message": "This script is designed to fail"}), file=sys.stderr)
7+
sys.exit(1)
8+
9+
10+
if __name__ == '__main__':
11+
main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
name: "Python Failure Pipeline"
2+
version: "1.0"
3+
extract_folder: "tests/resources/assessments"
4+
steps:
5+
- name: invalid_python_step
6+
type: python
7+
flag: active
8+
extract_source: resources/assessments/invalid_script.py
9+
mode: overwrite
10+
frequency: daily
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
name: "SQL Failure Pipeline"
2+
version: "1.0"
3+
extract_folder: /tmp/extracts/
4+
steps:
5+
- name: invalid_sql_step
6+
type: sql
7+
flag: active
8+
extract_source: resources/assessments/invalid_query.sql
9+
mode: overwrite
10+
frequency: daily

0 commit comments

Comments
 (0)