Skip to content

Commit 6d80d1d

Browse files
authored
Forecast preprocessing schema (#772)
2 parents ae91e02 + 021cb04 commit 6d80d1d

File tree

7 files changed

+145
-30
lines changed

7 files changed

+145
-30
lines changed

ads/opctl/operator/lowcode/anomaly/model/anomaly_dataset.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
merge_category_columns,
1111
)
1212
from ads.opctl.operator.lowcode.common.data import AbstractData
13-
from ads.opctl.operator.lowcode.common.data import AbstractData
1413
from ads.opctl.operator.lowcode.anomaly.utils import get_frequency_of_datetime
1514
from ads.opctl import logger
1615
import pandas as pd

ads/opctl/operator/lowcode/anomaly/operator_config.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,21 @@ class TestData(InputData):
3636
"""Class representing operator specification test data details."""
3737

3838

39+
@dataclass(repr=True)
40+
class PreprocessingSteps(DataClassSerializable):
41+
"""Class representing preprocessing steps for operator."""
42+
43+
missing_value_imputation: bool = True
44+
outlier_treatment: bool = False
45+
46+
47+
@dataclass(repr=True)
48+
class DataPreprocessor(DataClassSerializable):
49+
"""Class representing operator specification preprocessing details."""
50+
51+
enabled: bool = True
52+
steps: PreprocessingSteps = field(default_factory=PreprocessingSteps)
53+
3954
@dataclass(repr=True)
4055
class AnomalyOperatorSpec(DataClassSerializable):
4156
"""Class representing operator specification."""
@@ -74,7 +89,9 @@ def __post_init__(self):
7489
self.generate_inliers if self.generate_inliers is not None else False
7590
)
7691
self.model_kwargs = self.model_kwargs or dict()
77-
92+
self.preprocessing = (
93+
self.preprocessing if self.preprocessing is not None else DataPreprocessor(enabled=True)
94+
)
7895

7996
@dataclass(repr=True)
8097
class AnomalyOperatorConfig(OperatorConfig):

ads/opctl/operator/lowcode/anomaly/schema.yaml

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,11 +307,23 @@ spec:
307307
description: "When provided, target_category_columns [list] indexes the data into multiple related datasets for anomaly detection"
308308

309309
preprocessing:
310-
type: boolean
310+
type: dict
311311
required: false
312-
default: true
313-
meta:
314-
description: "preprocessing and feature engineering can be disabled using this flag, Defaults to true"
312+
schema:
313+
enabled:
314+
type: boolean
315+
required: false
316+
default: true
317+
meta:
318+
description: "preprocessing and feature engineering can be disabled using this flag, Defaults to true"
319+
steps:
320+
type: dict
321+
required: false
322+
schema:
323+
missing_value_imputation:
324+
type: boolean
325+
required: false
326+
default: true
315327

316328
generate_report:
317329
type: boolean

ads/opctl/operator/lowcode/common/transformations.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,20 +58,26 @@ def run(self, data):
5858
clean_df = self._format_datetime_col(clean_df)
5959
clean_df = self._set_multi_index(clean_df)
6060

61-
if self.name == "historical_data":
62-
try:
63-
clean_df = self._missing_value_imputation_hist(clean_df)
64-
except Exception as e:
65-
logger.debug(f"Missing value imputation failed with {e.args}")
66-
if self.preprocessing:
67-
try:
68-
clean_df = self._outlier_treatment(clean_df)
69-
except Exception as e:
70-
logger.debug(f"Outlier Treatment failed with {e.args}")
71-
else:
72-
logger.debug("Skipping outlier treatment as preprocessing is disabled")
73-
elif self.name == "additional_data":
74-
clean_df = self._missing_value_imputation_add(clean_df)
61+
if self.preprocessing and self.preprocessing.enabled:
62+
if self.name == "historical_data":
63+
if self.preprocessing.steps.missing_value_imputation:
64+
try:
65+
clean_df = self._missing_value_imputation_hist(clean_df)
66+
except Exception as e:
67+
logger.debug(f"Missing value imputation failed with {e.args}")
68+
else:
69+
logger.info("Skipping missing value imputation because it is disabled")
70+
if self.preprocessing.steps.outlier_treatment:
71+
try:
72+
clean_df = self._outlier_treatment(clean_df)
73+
except Exception as e:
74+
logger.debug(f"Outlier Treatment failed with {e.args}")
75+
else:
76+
logger.info("Skipping outlier treatment because it is disabled")
77+
elif self.name == "additional_data":
78+
clean_df = self._missing_value_imputation_add(clean_df)
79+
else:
80+
logger.info("Skipping all preprocessing steps because preprocessing is disabled")
7581
return clean_df
7682

7783
def _remove_trailing_whitespace(self, df):

ads/opctl/operator/lowcode/forecast/operator_config.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,22 @@ class DateTimeColumn(DataClassSerializable):
2929
format: str = None
3030

3131

32+
@dataclass(repr=True)
33+
class PreprocessingSteps(DataClassSerializable):
34+
"""Class representing preprocessing steps for operator."""
35+
36+
missing_value_imputation: bool = True
37+
outlier_treatment: bool = True
38+
39+
40+
@dataclass(repr=True)
41+
class DataPreprocessor(DataClassSerializable):
42+
"""Class representing operator specification preprocessing details."""
43+
44+
enabled: bool = True
45+
steps: PreprocessingSteps = field(default_factory=PreprocessingSteps)
46+
47+
3248
@dataclass(repr=True)
3349
class Tuning(DataClassSerializable):
3450
"""Class representing operator specification tuning details."""
@@ -54,7 +70,7 @@ class ForecastOperatorSpec(DataClassSerializable):
5470
global_explanation_filename: str = None
5571
local_explanation_filename: str = None
5672
target_column: str = None
57-
preprocessing: bool = None
73+
preprocessing: DataPreprocessor = field(default_factory=DataPreprocessor)
5874
datetime_column: DateTimeColumn = field(default_factory=DateTimeColumn)
5975
target_category_columns: List[str] = field(default_factory=list)
6076
generate_report: bool = None
@@ -79,7 +95,7 @@ def __post_init__(self):
7995
self.confidence_interval_width = self.confidence_interval_width or 0.80
8096
self.report_filename = self.report_filename or "report.html"
8197
self.preprocessing = (
82-
self.preprocessing if self.preprocessing is not None else True
98+
self.preprocessing if self.preprocessing is not None else DataPreprocessor(enabled=True)
8399
)
84100
# For Report Generation. When user doesn't specify defaults to True
85101
self.generate_report = (

ads/opctl/operator/lowcode/forecast/schema.yaml

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,11 +286,27 @@ spec:
286286
default: target
287287

288288
preprocessing:
289-
type: boolean
289+
type: dict
290290
required: false
291-
default: true
292-
meta:
293-
description: "preprocessing and feature engineering can be disabled using this flag, Defaults to true"
291+
schema:
292+
enabled:
293+
type: boolean
294+
required: false
295+
default: true
296+
meta:
297+
description: "preprocessing and feature engineering can be disabled using this flag, Defaults to true"
298+
steps:
299+
type: dict
300+
required: false
301+
schema:
302+
missing_value_imputation:
303+
type: boolean
304+
required: false
305+
default: true
306+
outlier_treatment:
307+
type: boolean
308+
required: false
309+
default: true
294310

295311
generate_explanations:
296312
type: boolean

tests/operators/forecast/test_errors.py

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
ForecastInputDataError,
2727
)
2828
from ads.opctl.operator.cmd import run
29-
29+
import math
3030

3131
NUM_ROWS = 1000
3232
NUM_SERIES = 10
@@ -172,8 +172,9 @@ def run_yaml(tmpdirname, yaml_i, output_data_path):
172172
run(yaml_i, backend="operator.local", debug=True)
173173
subprocess.run(f"ls -a {output_data_path}", shell=True)
174174

175-
test_metrics = pd.read_csv(f"{tmpdirname}/results/test_metrics.csv")
176-
print(test_metrics)
175+
if 'test_data' in yaml_i['spec']:
176+
test_metrics = pd.read_csv(f"{tmpdirname}/results/test_metrics.csv")
177+
print(test_metrics)
177178
train_metrics = pd.read_csv(f"{tmpdirname}/results/metrics.csv")
178179
print(train_metrics)
179180

@@ -185,6 +186,7 @@ def populate_yaml(
185186
additional_data_path=None,
186187
test_data_path=None,
187188
output_data_path=None,
189+
preprocessing=None,
188190
):
189191
if historical_data_path is None:
190192
historical_data_path, additional_data_path, test_data_path = setup_rossman()
@@ -204,7 +206,8 @@ def populate_yaml(
204206
yaml_i["spec"]["datetime_column"]["name"] = "Date"
205207
yaml_i["spec"]["target_category_columns"] = ["Store"]
206208
yaml_i["spec"]["horizon"] = HORIZON
207-
209+
if preprocessing:
210+
yaml_i["spec"]["preprocessing"] = preprocessing
208211
if generate_train_metrics:
209212
yaml_i["spec"]["generate_metrics"] = generate_train_metrics
210213
if model == "autots":
@@ -372,6 +375,7 @@ def test_0_series(operator_setup, model):
372375
historical_data_path=historical_data_path,
373376
additional_data_path=additional_data_path,
374377
test_data_path=test_data_path,
378+
preprocessing={"enabled": False}
375379
)
376380
with pytest.raises(DataMismatchError):
377381
run_yaml(
@@ -429,6 +433,49 @@ def test_invalid_dates(operator_setup, model):
429433
)
430434

431435

436+
def test_disabling_outlier_treatment(operator_setup):
437+
tmpdirname = operator_setup
438+
NUM_ROWS = 100
439+
hist_data_0 = pd.concat(
440+
[
441+
HISTORICAL_DATETIME_COL[: NUM_ROWS - HORIZON],
442+
TARGET_COL[: NUM_ROWS - HORIZON],
443+
],
444+
axis=1,
445+
)
446+
outliers = [1000, -800]
447+
hist_data_0.at[40, 'Sales'] = outliers[0]
448+
hist_data_0.at[75, 'Sales'] = outliers[1]
449+
historical_data_path, additional_data_path, test_data_path = setup_artificial_data(
450+
tmpdirname, hist_data_0
451+
)
452+
453+
yaml_i, output_data_path = populate_yaml(
454+
tmpdirname=tmpdirname,
455+
model="arima",
456+
historical_data_path=historical_data_path
457+
)
458+
yaml_i["spec"].pop("target_category_columns")
459+
yaml_i["spec"].pop("additional_data")
460+
461+
# running default pipeline where outlier will be treated
462+
run_yaml(tmpdirname=tmpdirname, yaml_i=yaml_i, output_data_path=output_data_path)
463+
forecast_without_outlier = pd.read_csv(f"{tmpdirname}/results/forecast.csv")
464+
input_vals_without_outlier = set(forecast_without_outlier['input_value'])
465+
assert all(
466+
item not in input_vals_without_outlier for item in outliers), "forecast file should not contain any outliers"
467+
468+
# switching off outlier_treatment
469+
preprocessing_steps = {"missing_value_imputation": True, "outlier_treatment": False}
470+
preprocessing = {"enabled": True, "steps": preprocessing_steps}
471+
yaml_i["spec"]["preprocessing"] = preprocessing
472+
run_yaml(tmpdirname=tmpdirname, yaml_i=yaml_i, output_data_path=output_data_path)
473+
forecast_with_outlier = pd.read_csv(f"{tmpdirname}/results/forecast.csv")
474+
input_vals_with_outlier = set(forecast_with_outlier['input_value'])
475+
assert all(
476+
item in input_vals_with_outlier for item in outliers), "forecast file should contain all the outliers"
477+
478+
432479
@pytest.mark.parametrize("model", MODELS)
433480
def test_2_series(operator_setup, model):
434481
# Test w and w/o add data
@@ -454,12 +501,14 @@ def split_df(df):
454501
historical_data_path, additional_data_path, test_data_path = setup_artificial_data(
455502
tmpdirname, hist_data, add_data, test_data
456503
)
504+
preprocessing_steps = {"missing_value_imputation": True, "outlier_treatment": False}
457505
yaml_i, output_data_path = populate_yaml(
458506
tmpdirname=tmpdirname,
459507
model=model,
460508
historical_data_path=historical_data_path,
461509
additional_data_path=additional_data_path,
462510
test_data_path=test_data_path,
511+
preprocessing={"enabled": True, "steps": preprocessing_steps}
463512
)
464513
with pytest.raises(DataMismatchError):
465514
# 4 columns in historical data, but only 1 cat col specified

0 commit comments

Comments
 (0)