Skip to content

Commit e0c4d1a

Browse files
Enhance Profiler config to execute Python Script (#1465)
Added Support for Execution Python Script in the Profiler Config
1 parent 94df54f commit e0c4d1a

File tree

6 files changed

+141
-11
lines changed

6 files changed

+141
-11
lines changed

pyproject.toml

+6
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ dependencies = [
6262
"databricks-labs-pylint~=0.4.0",
6363
"mypy~=1.10.0",
6464
"numpy==1.26.4",
65+
"pandas==1.4.1",
6566
]
6667

6768
[project.entry-points.databricks]
@@ -94,6 +95,8 @@ cache_dir = ".venv/pytest-cache"
9495
asyncio_mode = "auto"
9596
asyncio_default_fixture_loop_scope="function"
9697

98+
[tool.mypy]
99+
exclude = ["tests/resources/.*"]
97100

98101
[tool.black]
99102
target-version = ["py310"]
@@ -182,6 +185,9 @@ fail-under = 10.0
182185
# file locks
183186
ignore-patterns = ["^\\.#"]
184187

188+
# Ignore files under tests/resources
189+
ignore-paths = ["tests/resources"]
190+
185191
# List of module names for which member attributes should not be checked (useful
186192
# for modules/projects where namespaces are manipulated during runtime and thus
187193
# existing member attributes cannot be deduced by static analysis). It supports

src/databricks/labs/remorph/assessments/pipeline.py

+42-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
from pathlib import Path
2+
3+
import json
24
import logging
5+
import subprocess
36
import yaml
47
import duckdb
58

9+
from databricks.labs.remorph.connections.credential_manager import cred_file
10+
611
from databricks.labs.remorph.assessments.profiler_config import PipelineConfig, Step
712
from databricks.labs.remorph.connections.database_manager import DatabaseManager
813

@@ -27,8 +32,16 @@ def execute(self):
2732
logging.info("Pipeline execution completed")
2833

2934
def _execute_step(self, step: Step):
30-
logging.debug(f"Reading query from file: {step.extract_query}")
31-
with open(step.extract_query, 'r', encoding='utf-8') as file:
35+
if step.type == "sql":
36+
self._execute_sql_step(step)
37+
elif step.type == "python":
38+
self._execute_python_step(step)
39+
else:
40+
logging.error(f"Unsupported step type: {step.type}")
41+
42+
def _execute_sql_step(self, step: Step):
43+
logging.debug(f"Reading query from file: {step.extract_source}")
44+
with open(step.extract_source, 'r', encoding='utf-8') as file:
3245
query = file.read()
3346

3447
# Execute the query using the database manager
@@ -38,6 +51,33 @@ def _execute_step(self, step: Step):
3851
# Save the result to duckdb
3952
self._save_to_db(result, step.name, str(step.mode))
4053

54+
def _execute_python_step(self, step: Step):
55+
logging.debug(f"Executing Python script: {step.extract_source}")
56+
db_path = str(self.db_path_prefix / DB_NAME)
57+
credential_config = str(cred_file("remorph"))
58+
59+
try:
60+
result = subprocess.run(
61+
["python", step.extract_source, "--db-path", db_path, "--credential-config-path", credential_config],
62+
check=True,
63+
capture_output=True,
64+
text=True,
65+
)
66+
67+
try:
68+
output = json.loads(result.stdout)
69+
if output["status"] == "success":
70+
logging.info(f"Python script completed: {output['message']}")
71+
else:
72+
raise RuntimeError(f"Script reported error: {output['message']}")
73+
except json.JSONDecodeError:
74+
logging.info(f"Python script output: {result.stdout}")
75+
76+
except subprocess.CalledProcessError as e:
77+
error_msg = e.stderr
78+
logging.error(f"Python script failed: {error_msg}")
79+
raise RuntimeError(f"Script execution failed: {error_msg}") from e
80+
4181
def _save_to_db(self, result, step_name: str, mode: str, batch_size: int = 1000):
4282
self._create_dir(self.db_path_prefix)
4383
conn = duckdb.connect(str(self.db_path_prefix) + '/' + DB_NAME)

src/databricks/labs/remorph/assessments/profiler_config.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
class Step:
66
name: str
77
type: str | None
8-
extract_query: str
8+
extract_source: str
99
mode: str | None
1010
frequency: str | None
1111
flag: str | None

tests/integration/assessments/test_pipeline.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def pipeline_config():
1818
config = PipelineClass.load_config_from_yaml(config_path)
1919

2020
for step in config.steps:
21-
step.extract_query = f"{prefix}/../../{step.extract_query}"
21+
step.extract_source = f"{prefix}/../../{step.extract_source}"
2222
return config
2323

2424

@@ -31,7 +31,7 @@ def test_run_pipeline(extractor, pipeline_config, get_logger):
3131
def verify_output(get_logger, path):
3232
conn = duckdb.connect(str(Path(path)) + "/" + DB_NAME)
3333

34-
expected_tables = ["usage", "inventory"]
34+
expected_tables = ["usage", "inventory", "random_data"]
3535
logger = get_logger
3636
for table in expected_tables:
3737
try:
+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import pandas as pd
2+
import duckdb
3+
import argparse
4+
import json
5+
import sys
6+
import numpy as np
7+
import logging
8+
from datetime import datetime, timedelta
9+
10+
11+
def generate_random_dataset(size=10):
12+
# Generate dates for the last 30 days
13+
end_date = datetime.now()
14+
start_date = end_date - timedelta(days=30)
15+
dates = pd.date_range(start=start_date, end=end_date, periods=size)
16+
17+
data = {
18+
'id': range(1, size + 1),
19+
'date': dates,
20+
'category': np.random.choice(['Low', 'Medium', 'High'], size),
21+
'department': np.random.choice(['Sales', 'Marketing', 'Engineering', 'Support'], size),
22+
'is_active': np.random.choice([True, False], size, p=[0.8, 0.2]),
23+
'score': np.random.uniform(0, 100, size).round(2),
24+
}
25+
26+
return pd.DataFrame(data)
27+
28+
29+
def execute():
30+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
31+
logger = logging.getLogger(__name__)
32+
33+
parser = argparse.ArgumentParser(description='Generate and store random dataset in DuckDB')
34+
parser.add_argument('--db-path', type=str, required=True, help='Path to DuckDB database file')
35+
parser.add_argument(
36+
'--credential-config-path', type=str, required=True, help='Path string containing credential configuration'
37+
)
38+
args = parser.parse_args()
39+
credential_file = args.credential_config_path
40+
41+
if not credential_file.endswith('credentials.yml'):
42+
msg = "Credential config file must have 'credentials.yml' extension"
43+
# This is the output format expected by the pipeline.py which orchestrates the execution of this script
44+
print(json.dumps({"status": "error", "message": msg}), file=sys.stderr)
45+
raise ValueError("Credential config file must have 'credentials.yml' extension")
46+
47+
try:
48+
df = generate_random_dataset()
49+
logger.info(f'DataFrame columns: {df.columns}')
50+
# Connect to DuckDB
51+
conn = duckdb.connect(args.db_path)
52+
53+
# Create table with appropriate schema
54+
conn.execute(
55+
"""
56+
CREATE OR REPLACE TABLE random_data (
57+
id INTEGER,
58+
date TIMESTAMP,
59+
category VARCHAR,
60+
department VARCHAR,
61+
is_active BOOLEAN,
62+
score DOUBLE
63+
)
64+
"""
65+
)
66+
67+
conn.execute("INSERT INTO random_data SELECT * FROM df")
68+
conn.close()
69+
# This is the output format expected by the pipeline.py which orchestrates the execution of this script
70+
print(json.dumps({"status": "success", "message": "Data loaded successfully"}))
71+
72+
except Exception as e:
73+
print(json.dumps({"status": "error", "message": str(e)}), file=sys.stderr)
74+
sys.exit(1)
75+
76+
77+
if __name__ == '__main__':
78+
execute()

tests/resources/assessments/pipeline_config.yml

+12-6
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,26 @@ version: "1.0"
33
extract_folder: /tmp/extracts/
44
steps:
55
- name: inventory
6-
type: inventory
7-
extract_query: resources/assessments/inventory.sql
6+
type: sql
7+
extract_source: resources/assessments/inventory.sql
88
mode: overwrite
99
frequency: daily
1010
flag: active
1111
- name: usage
12-
type: usage
13-
extract_query: resources/assessments/usage.sql
12+
type: sql
13+
extract_source: resources/assessments/usage.sql
1414
mode: overwrite
1515
frequency: weekly
1616
flag: active
1717
- name: usage_2
18-
type: usage_2
19-
extract_query: resources/assessments/usage.sql
18+
type: sql
19+
extract_source: resources/assessments/usage.sql
2020
mode: overwrite
2121
frequency: daily
2222
flag: inactive
23+
- name: random_data
24+
type: python
25+
extract_source: resources/assessments/db_extract.py
26+
mode: overwrite
27+
frequency: daily
28+
flag: active

0 commit comments

Comments
 (0)