Skip to content

Commit

Permalink
Add Automitigator that can identify mitigations by searching. (#56)
Browse files Browse the repository at this point in the history
Added support for a new feature AutoMitigator.
This feature uses AutoML framework FLAML to search across combinations of data mitigations to identify the sequence of mitigations that provide the best model when applied on the dataset at hand.
The search space includes all possible data mitigations and their configurable parameter values. 
It also uses FLAML as the AutoML based estimator after applying the selected mitigations.
  • Loading branch information
yrajas authored Dec 17, 2023
1 parent 4d0a9e9 commit ad0445e
Show file tree
Hide file tree
Showing 8 changed files with 692 additions and 0 deletions.
3 changes: 3 additions & 0 deletions raimitigations/automitigator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .automitigator import AutoMitigator

__all__ = ["AutoMitigator"]
127 changes: 127 additions & 0 deletions raimitigations/automitigator/automitigator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import numpy as np
from functools import partial
from flaml import tune
from typing import Union, List
from pandas import DataFrame
from sklearn.base import BaseEstimator

from .searchspacebuilder import SearchSpaceBuilder
from .evaluator import Evaluator
from .automitigator_definitions import AutoMitigatorDefinitions as amd


class AutoMitigator(BaseEstimator):
"""
AutoMitigator is a class for automatically building and tuning a pipeline
for mitigating bias in a dataset.
:param int max_mitigations: The maximum number of mitigations to be applied to the dataset.
:param int num_samples: The number of samples to be generated for each hyperparameter configuration.
:param int time_budget_s: The time budget in seconds for the hyperparameter search.
:param bool use_ray: Whether to use Ray for parallelism.
:param dict tune_args: Keyword arguments to be passed to the tune.run method.
:param dict automl_args: Keyword arguments to be passed to the AutoML constructor.
"""

def __init__(
self,
max_mitigations: int = 1,
num_samples=5,
time_budget_s=None,
use_ray: bool = True,
tune_args: dict = {},
automl_args: dict = {},
):
self.max_mitigations = max_mitigations
self.num_samples = num_samples
self.time_budget_s = time_budget_s
self.use_ray = use_ray
self.tune_args = tune_args
self.automl_args = automl_args

def fit(self, X_train, y_train):
"""
Fit the model to the training data.
:param X_train: The training input samples.
:param y_train: The target values.
:return: Automitigator object.
:rtype: raimitigations.automitigator.AutoMitigator
:raises ValueError: If the number of mitigations is less than 1.
:raises ValueError: If the number of samples is less than 1.
:raises ValueError: If it is not able to fit a model with the mitigations applied.
"""
if self.max_mitigations < 1:
raise ValueError("At least one mitigation is necessary")
if self.num_samples < 1:
raise ValueError("num_samples should be at least 1")

if self.automl_args is None or len(self.automl_args) == 0:
self.automl_args = {"task": "classification", "time_budget": 30, "metric": "log_loss", "early_stop": True}

task = self.automl_args["task"] if "task" in self.automl_args else "classification"
search_space = SearchSpaceBuilder(self.max_mitigations, task).build()
evaluator = Evaluator(automl_args=self.automl_args)

analysis = tune.run(
partial(evaluator.evaluate, X_train, y_train),
config=search_space,
metric="loss",
mode="min",
num_samples=self.num_samples,
time_budget_s=self.time_budget_s,
search_alg="BlendSearch",
use_ray=self.use_ray,
**self.tune_args,
)

if analysis is None or analysis.best_trial is None:
raise ValueError("Failed to fit. Try adjusting the parameters.")

self._automl = analysis.best_result[amd.results_automl_key]
self._pipeline = analysis.best_result[amd.results_pipeline_key]
self._search_space = analysis.best_result[amd.search_space_key]

return self

def predict(
self,
X: Union[np.array, DataFrame, List[str], List[List[str]]],
**pred_kwargs,
):
"""
Predict the class for each sample in X.
:param Union[np.array, DataFrame, List[str], List[List[str]]] X: The input samples.
:param dict pred_kwargs: Keyword arguments to be passed to the predict method of the pipeline.
:return: The predicted classes.
:raises ValueError: If model has not been fit before.
"""
if not hasattr(self, "_pipeline"):
raise ValueError("You must fit the model before predicting")

return self._pipeline.predict(X, **pred_kwargs)

def predict_proba(
self,
X: Union[np.array, DataFrame, List[str], List[List[str]]],
**pred_kwargs,
):
"""
Predict the probability of each class for each sample in X.
:param X: The input samples.
:param dict pred_kwargs: Keyword arguments to be passed to the predict method of the pipeline.
:return: The predicted probabilities.
:raises ValueError: If model has not been fit before.
"""
if not hasattr(self, "_pipeline"):
raise ValueError("You must fit the model before predicting")

return self._pipeline.predict_proba(X, **pred_kwargs)
50 changes: 50 additions & 0 deletions raimitigations/automitigator/automitigator_definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
class AutoMitigatorDefinitions:
search_space_key = "search_space"

mitigations_key = "mitigations"
mitigation_name_key = "name"
mitigation_type_key = "type"

cohort_key = "cohort"
all_cohort = "all"

synthesizer = "synthesizer"
synthesizer_epochs_key = "epochs"
synthesizer_model_key = "model"

rebalancer = "rebalancer"
rebalancer_strategy_key = "strategy"

scaler = "scaler"
scaler_name_key = "scaler_name"
standard_scaler = "standard_scaler"
robust_scaler = "robust_scaler"
quantile_scaler = "quantile_scaler"
power_scaler = "power_scaler"
normalize_scaler = "normalize_scaler"
minmax_scaler = "minmax_scaler"

imputer = "imputer"
imputer_name_key = "imputer_name"
basic_imputer = "basic"
iterative_imputer = "iterative"
knn_imputer = "knn"

feature_selector = "feature_selector"
selector_name_key = "selector_name"
sequential_selector = "sequential_selector"
correlated_feature_selector = "correlated_feature_selector"
cfs_num_corr_th_key = "num_corr_th"
cfs_num_pvalue_th_key = "num_pvalue_th"
cfs_cat_corr_th_key = "cat_corr_th"
cfs_cat_pvalue_th_key = "cat_pvalue_th"
catboost_selector = "catboost_selector"
cs_test_size_key = "test_size"
cs_algorithm_key = "algorithm"
cs_steps_key = "steps"

no_mitigation = "nomitigation"

results_loss_key = "loss"
results_automl_key = "automl"
results_pipeline_key = "pipeline"
212 changes: 212 additions & 0 deletions raimitigations/automitigator/evaluator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import math
import traceback
import raimitigations.dataprocessing as dp
from flaml import AutoML
from imblearn.pipeline import Pipeline

from .mitigation_actions import MitigationActions
from .automitigator_definitions import AutoMitigatorDefinitions as amd


class Evaluator:
"""
Evaluates a given set of mitigations on a given dataset.
"""

def __init__(self, automl_args=None) -> None:
self.automl_args = automl_args
self.pipeline_steps = []
self.pipeline = None

def _pipeline_append(self, step):
"""
Append a step to the pipeline.
Note: This is done to support older versions of sklearn < 1.1 (required to support python 3.7)
which doesn't support an empty pipeline to be initialized nor appending a step to an existing pipeline
without an estimator at the end.
:param sklearn.pipeline.Pipeline pipeline: The pipeline to add the step to
:param step: The step to add
"""
self.pipeline_steps.append(step)

def evaluate(self, train_x, train_y, search_config):
"""
Evaluates a given set of mitigations on a given dataset.
:param train_x: The training data
:param train_y: The training labels
:param dict search_config: The search configuration
:return: The results of the evaluation
:rtype: dict
"""
# Sample config
# config: {'search_space':
# {'cohort': 'all',
# 'mitigations':
# {'action0': {'type': 0, 'strategy': 0, 'name': 'rebalancer'}}}}

self.pipeline = None
self.pipeline_steps = []
search_space = search_config[amd.search_space_key]
cohort = search_space[amd.cohort_key]
if cohort == amd.all_cohort:
return self.mitigate_full_dataset(train_x, train_y, search_space)

def _process_feature_selector(self, selector_type):
"""
Process the feature selector
:param dict selector_type: The feature selector configuration
:raises ValueError: If the feature selector is unknown
"""
selector_name = selector_type[amd.selector_name_key]
if selector_name == amd.sequential_selector:
self._pipeline_append((amd.sequential_selector, dp.SeqFeatSelection()))
elif selector_name == amd.correlated_feature_selector:
self._pipeline_append(
(
amd.correlated_feature_selector,
dp.CorrelatedFeatures(
num_corr_th=selector_type[amd.cfs_num_corr_th_key],
num_pvalue_th=selector_type[amd.cfs_num_pvalue_th_key],
cat_corr_th=selector_type[amd.cfs_cat_corr_th_key],
cat_pvalue_th=selector_type[amd.cfs_cat_pvalue_th_key],
),
)
)
elif selector_name == amd.catboost_selector:
self._pipeline_append(
(
amd.catboost_selector,
dp.CatBoostSelection(
test_size=selector_type[amd.cs_test_size_key],
algorithm=selector_type[amd.cs_algorithm_key],
steps=selector_type[amd.cs_steps_key],
verbose=False,
),
)
)

def mitigate_full_dataset(self, train_x, train_y, search_space):
"""
Evaluates a given set of mitigations on a given dataset.
:param train_x: The training data
:param train_y: The training labels
:param dict search_space: The search configuration
:return: The results of the evaluation
:rtype: dict
"""

mitigation_set = set()
for mitigation in search_space[amd.mitigations_key]:
config = search_space[amd.mitigations_key][mitigation]
mitigation_name = config[amd.mitigation_name_key]

# Skip if we've already seen this mitigation, except if it's nomitigation to allow for
# combinations with fewer mitigations to be evaluated
if (mitigation_name == amd.no_mitigation) or (mitigation_name not in mitigation_set):
mitigation_set.add(mitigation_name)
else:
continue

if mitigation_name == amd.synthesizer:
self._pipeline_append(
(
amd.synthesizer,
MitigationActions.get_synthesizer(
config[amd.synthesizer_epochs_key], config[amd.synthesizer_model_key]
),
)
)
elif mitigation_name == amd.rebalancer:
self._pipeline_append(
(
amd.rebalancer,
MitigationActions.get_rebalancer(
config[amd.mitigation_type_key], config[amd.rebalancer_strategy_key]
),
)
)
elif mitigation_name == amd.scaler:
self._process_scaler(config[amd.mitigation_type_key])
elif mitigation_name == amd.imputer:
self._process_imputer(config[amd.mitigation_type_key])
elif mitigation_name == amd.feature_selector:
self._process_feature_selector(config[amd.mitigation_type_key])
elif mitigation_name == amd.no_mitigation:
continue

fit_results = self._fit_model(train_x, train_y)
fit_results["search_space"] = search_space
return fit_results

def _process_imputer(self, imputer_type):
"""
Process the imputer
:param dict imputer_type: The imputer configuration
:raises ValueError: If the imputer is unknown
"""
imputer_name = imputer_type[amd.imputer_name_key]
if imputer_name == amd.basic_imputer:
self._pipeline_append((amd.basic_imputer, dp.BasicImputer()))
elif imputer_name == amd.iterative_imputer:
self._pipeline_append((amd.iterative_imputer, dp.IterativeDataImputer()))
elif imputer_name == amd.knn_imputer:
self._pipeline_append((amd.knn_imputer, dp.KNNDataImputer()))

def _process_scaler(self, scaler_type):
"""
Process the scaler
:param dict scaler_type: The scaler configuration
:raises ValueError: If the scaler is unknown
"""
scaler_name = scaler_type[amd.scaler_name_key]
if scaler_name == amd.standard_scaler:
self._pipeline_append((amd.standard_scaler, dp.DataStandardScaler()))
elif scaler_name == amd.robust_scaler:
self._pipeline_append((amd.robust_scaler, dp.DataRobustScaler()))
elif scaler_name == amd.quantile_scaler:
self._pipeline_append((amd.quantile_scaler, dp.DataQuantileTransformer()))
elif scaler_name == amd.power_scaler:
self._pipeline_append((amd.power_scaler, dp.DataPowerTransformer()))
elif scaler_name == amd.normalize_scaler:
self._pipeline_append((amd.normalize_scaler, dp.DataNormalizer()))
elif scaler_name == amd.minmax_scaler:
self._pipeline_append((amd.minmax_scaler, dp.DataMinMaxScaler()))

def _fit_model(self, train_x, train_y):
"""
Fit the model
:param train_x: The training data
:param train_y: The training labels
:return: The results of the evaluation
:rtype: dict
:raises ValueError: If the scaler is unknown
:return: Dictionary containing best loss, automl object and pipeline used
:rtype: dict
"""
automl = AutoML(**self.automl_args)
self._pipeline_append(("automl", automl))
self.pipeline = Pipeline(self.pipeline_steps)

try:
self.pipeline.fit(train_x, train_y)
loss = automl.best_loss
except Exception as ex:
print(f"Evaluating pipeline {self.pipeline} caused error {ex} with trace {traceback.format_exc()}")
loss = math.inf

return {amd.results_loss_key: loss, amd.results_automl_key: automl, amd.results_pipeline_key: self.pipeline}
Loading

0 comments on commit ad0445e

Please sign in to comment.