Skip to content

Commit 231c982

Browse files
committed
Prefect 2.0 Deployment
1 parent b5879f0 commit 231c982

File tree

2 files changed

+121
-55
lines changed

2 files changed

+121
-55
lines changed

Module 5 - MLOPs/4. Orchestrate ML Pipeline/README.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
> `version_1` - Breaking the Jupyter Notebook to Python Script (Basic Code without workflow management)
66
> `version_2` - Code with Prefect Workflow - Defining the workflow and running them
7-
> `version_3` - Dealing with the variables and monitoring the workflow with Prefect Cloud
7+
> `version_3` - Deployment and Scheduling tasks
88
99

1010
### Why Prefect?
@@ -72,4 +72,30 @@ Try 'uvicorn --help' for help.
7272
7373
Error: Got unexpected extra argument (prefect.orion.api.server:create_app)
7474
Orion stopped!
75+
```
76+
77+
### Deployment of Prefect Flow
78+
79+
- `work_queue_name` is used to submit the deployment to the a specific work queue.
80+
- You don't need to create a work queue before using the work queue. A work queue will be created if it doesn't exist.
81+
82+
```python
83+
from prefect.deployments import Deployment
84+
from prefect.orion.schemas.schedules import IntervalSchedule
85+
from datetime import timedelta
86+
87+
deployment = Deployment.build_from_flow(
88+
flow=main,
89+
name="model_training",
90+
schedule=IntervalSchedule(interval=timedelta(minutes=5)),
91+
work_queue_name="ml"
92+
)
93+
94+
deployment.apply()
95+
```
96+
97+
### Running an Agent
98+
99+
```
100+
$ prefect agent start --work-queue "ml"
75101
```

Module 5 - MLOPs/4. Orchestrate ML Pipeline/version_3/workflow_v3.py

Lines changed: 94 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,79 +2,119 @@
22
import pandas as pd
33
from sklearn.model_selection import train_test_split
44
from sklearn.preprocessing import StandardScaler
5-
from prefect import task, Flow, Parameter
6-
7-
8-
######################################################################
9-
# Now that we understand how to add task and execute the flow.
10-
# Lets have a look at how to deal with variables.
11-
# If you find yourself frequently experimenting with different values of
12-
# one variable, it’s ideal to turn that variable into a Parameter.
13-
#
14-
# Monitor your Workflow
15-
#
16-
# Login to prefect cloud
17-
# prefect auth login --key <YOUR-KEY>
18-
#
19-
# Run
20-
# prefect create project "Iris Project"
21-
# prefect agent local start
22-
# flow.register(project_name="Iris Project")
23-
######################################################################
24-
5+
from sklearn.neighbors import KNeighborsClassifier
6+
from sklearn.model_selection import GridSearchCV
7+
import mlflow
8+
from prefect import task, flow
259

2610
@task
27-
def load_data(path: str) -> pd.DataFrame:
11+
def load_data(path: str, unwanted_cols: List) -> pd.DataFrame:
2812
data = pd.read_csv(path)
29-
return data
30-
31-
@task
32-
def remove_unwanted_cols(data: pd.DataFrame, unwanted_cols: List) -> pd.DataFrame:
3313
data.drop(unwanted_cols, axis=1, inplace=True)
3414
return data
3515

16+
3617
@task
37-
def get_classes(data: pd.DataFrame, target_col: str) -> List[str]:
38-
return list(data[target_col].unique())
18+
def get_classes(target_data: pd.Series) -> List[str]:
19+
return list(target_data.unique())
20+
3921

4022
@task
41-
def rescale_numerical_columns(data: pd.DataFrame, target_col: str) -> pd.DataFrame:
42-
43-
X = data.drop([target_col], axis=1)
23+
def get_scaler(data: pd.DataFrame) -> Any:
4424
# scaling the numerical features
4525
scaler = StandardScaler()
26+
scaler.fit(data)
27+
return scaler
4628

29+
30+
@task
31+
def rescale_data(data: pd.DataFrame, scaler: Any) -> pd.DataFrame:
32+
# scaling the numerical features
4733
# column names are (annoyingly) lost after Scaling
4834
# (i.e. the dataframe is converted to a numpy ndarray)
49-
data_rescaled = pd.DataFrame(scaler.fit_transform(X),
50-
columns = X.columns,
51-
index = X.index)
52-
35+
data_rescaled = pd.DataFrame(scaler.transform(data),
36+
columns = data.columns,
37+
index = data.index)
5338
return data_rescaled
5439

40+
5541
@task
56-
def split_data(input: pd.DataFrame, output: pd.Series, test_data_ratio: float) -> Dict[str, Any]:
57-
58-
X_tr, X_te, y_tr, y_te = train_test_split(input, output, test_size=test_data_ratio, random_state=0)
59-
42+
def split_data(input_: pd.DataFrame, output_: pd.Series, test_data_ratio: float) -> Dict[str, Any]:
43+
X_tr, X_te, y_tr, y_te = train_test_split(input_, output_, test_size=test_data_ratio, random_state=0)
6044
return {'X_TRAIN': X_tr, 'Y_TRAIN': y_tr, 'X_TEST': X_te, 'Y_TEST': y_te}
6145

62-
# Defining the complete Flow
63-
with Flow('data-engineer') as flow:
46+
47+
@task
48+
def find_best_model(X_train: pd.DataFrame, y_train: pd.Series, estimator: Any, parameters: List) -> Any:
49+
# Enabling automatic MLflow logging for scikit-learn runs
50+
mlflow.sklearn.autolog(max_tuning_runs=None)
51+
52+
with mlflow.start_run():
53+
clf = GridSearchCV(
54+
estimator=estimator,
55+
param_grid=parameters,
56+
scoring='accuracy',
57+
cv=5,
58+
return_train_score=True,
59+
verbose=1
60+
)
61+
clf.fit(X_train, y_train)
62+
63+
# Disabling autologging
64+
mlflow.sklearn.autolog(disable=True)
65+
66+
return clf
67+
68+
69+
# Workflow
70+
@flow
71+
def main(path: str='./data/iris.csv', target: str='Species', unwanted_cols: List[str]=['Id'], test_size: float=0.2):
72+
73+
mlflow.set_tracking_uri("sqlite:///mlflow.db")
74+
mlflow.set_experiment("Iris Species Prediction")
75+
6476
# Define Parameters
65-
target_col = 'Species'
66-
unwanted_cols = ['Id']
67-
test_data_ratio = Parameter("test_data_ratio", default=0.2)
68-
69-
# Run Functions
70-
data = load_data(path='data/iris.csv')
71-
72-
# Workflow
73-
remove_unwanted_cols(data=data, unwanted_cols=unwanted_cols)
74-
numerical_data = rescale_numerical_columns(data=data, target_col=target_col)
75-
classes = get_classes(data=data, target_col=target_col)
76-
train_test_dict = split_data(input=numerical_data, output=data[target_col], test_data_ratio=test_data_ratio)
77+
DATA_PATH = path
78+
TARGET_COL = target
79+
UNWANTED_COLS = unwanted_cols
80+
TEST_DATA_RATIO = test_size
81+
82+
# Load the Data
83+
dataframe = load_data(path=DATA_PATH, unwanted_cols=UNWANTED_COLS)
84+
85+
# Identify Target Variable
86+
target_data = dataframe[TARGET_COL]
87+
input_data = dataframe.drop([TARGET_COL], axis=1)
88+
89+
# Get Unique Classes
90+
classes = get_classes(target_data=target_data)
91+
92+
# Split the Data into Train and Test
93+
train_test_dict = split_data(input_=input_data, output_=target_data, test_data_ratio=TEST_DATA_RATIO)
94+
95+
# Rescaling Train and Test Data
96+
scaler = get_scaler(train_test_dict['X_TRAIN'])
97+
train_test_dict['X_TRAIN'] = rescale_data(data=train_test_dict['X_TRAIN'], scaler=scaler)
98+
train_test_dict['X_TEST'] = rescale_data(data=train_test_dict['X_TEST'], scaler=scaler)
7799

78-
# flow.run(parameters={'test_data_ratio': 0.3})
100+
# Model Training
101+
ESTIMATOR = KNeighborsClassifier()
102+
HYPERPARAMETERS = [{'n_neighbors':[i for i in range(1, 51)], 'p':[1, 2]}]
103+
classifier = find_best_model(train_test_dict['X_TRAIN'], train_test_dict['Y_TRAIN'], ESTIMATOR, HYPERPARAMETERS)
104+
print(classifier.best_params_)
105+
print(classifier.score(train_test_dict['X_TEST'], train_test_dict['Y_TEST']))
106+
107+
108+
# Deploy the main function
109+
from prefect.deployments import Deployment
110+
from prefect.orion.schemas.schedules import IntervalSchedule
111+
from datetime import timedelta
112+
113+
deployment = Deployment.build_from_flow(
114+
flow=main,
115+
name="model_training",
116+
schedule=IntervalSchedule(interval=timedelta(minutes=5)),
117+
work_queue_name="ml"
118+
)
79119

80-
flow.register(project_name="demo")
120+
deployment.apply()

0 commit comments

Comments
 (0)