-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathinps.py
210 lines (168 loc) · 9.26 KB
/
inps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""Python package for statistical inference from non-probability samples"""
__version__ = "1.20"
import numpy as np
import pandas as pd
import sklearn
import pandas.api.types as types
from sklearn.compose import ColumnTransformer
from sklearn.compose import make_column_selector as column_selector
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import RobustScaler, OneHotEncoder
from sklearn.model_selection import StratifiedKFold
from sklearn.linear_model import RidgeCV, LogisticRegressionCV
from sklearn.ensemble import HistGradientBoostingRegressor, HistGradientBoostingClassifier
from scipy.stats import bootstrap
sklearn.set_config(enable_metadata_routing = True)
def default_preprocess(**kwargs):
return ColumnTransformer([
("numeric", Pipeline([
("normalizer", RobustScaler()),
("imputer", SimpleImputer(strategy = 'median', add_indicator = True, copy = False))
]), column_selector(dtype_include = 'number')),
("categorical",
OneHotEncoder(drop = 'if_binary', min_frequency = .05, handle_unknown = 'infrequent_if_exist'),
column_selector(dtype_exclude = 'number'))
], **kwargs)
def make_preprocess_estimator(base_estimator, **kwargs):
if hasattr(base_estimator, "set_fit_request"):
base_estimator = base_estimator.set_fit_request(sample_weight = True)
if hasattr(base_estimator, "set_score_request"):
base_estimator = base_estimator.set_score_request(sample_weight = True)
return Pipeline([
("preprocess", default_preprocess(**kwargs)),
("estimator", base_estimator)
])
def logistic_classifier(**kwargs):
return make_preprocess_estimator(LogisticRegressionCV(cv = StratifiedKFold(shuffle = True, random_state = 0), scoring = 'neg_log_loss', max_iter = 1000, **kwargs))
def linear_regressor(**kwargs):
return make_preprocess_estimator(RidgeCV(**kwargs))
valid_dtypes = (types.is_numeric_dtype, lambda dtype: isinstance(dtype, types.CategoricalDtype), types.is_datetime64_any_dtype, types.is_timedelta64_dtype)
def set_categories(data):
invalid_columns = [column for column, dtype in data.dtypes.items() if not any(is_dtype(dtype) for is_dtype in valid_dtypes)]
data[invalid_columns] = data[invalid_columns].astype("category", copy = False)
def boosting_classifier(**kwargs):
my_model = HistGradientBoostingClassifier(categorical_features = 'from_dtype', random_state = 0, early_stopping = True, max_iter = 1000, **kwargs)
return my_model.set_fit_request(sample_weight = True).set_score_request(sample_weight = True)
def boosting_regressor(**kwargs):
my_model = HistGradientBoostingRegressor(categorical_features = 'from_dtype', random_state = 0, early_stopping = True, max_iter = 1000, **kwargs)
return my_model.set_fit_request(sample_weight = True).set_score_request(sample_weight = True)
def calibration_weights(sample, population_totals, weights_column = None, population_size = None, max_steps = 1000, tolerance = 1e-6):
if weights_column is not None:
valid = sample[weights_column].notna()
d = sample.loc[valid, weights_column]
X = sample.loc[valid, population_totals.index].to_numpy(dtype = 'float64')
X = X * d.to_numpy().reshape(-1, 1)
elif population_size is not None:
X = sample[population_totals.index].to_numpy(dtype = 'float64')
d = population_size / X.shape[0]
X = X * d
else:
raise ValueError("weights_column or population_size must be set")
T = population_totals.to_numpy()
L = np.zeros(X.shape[1])
w = np.ones(X.shape[0])
success = False
for step in range(max_steps):
L += np.dot(np.linalg.pinv(np.dot(np.multiply(X.T, w), X)), (T - np.dot(X.T, w)))
w = np.exp(np.dot(X, L))
loss = np.max(np.abs((np.dot(X.T, w) - T) / T))
if loss < tolerance:
success = True
break
if not success: raise Exception("Calibration did not converge")
return w * d
def propensities(np_sample, p_sample, weights_column = None, covariates = None, model = None):
np_size = np_sample.shape[0]
p_size = p_sample.shape[0]
weights = np.ones(p_size) if weights_column is None else p_sample[weights_column]
if covariates is not None:
np_sample = np_sample.loc[:, covariates]
p_sample = p_sample.loc[:, covariates]
if model is None: model = logistic_classifier()
X = pd.concat((np_sample, p_sample), ignore_index = True, join = "inner", copy = False)
set_categories(X)
y = np.concatenate((np.ones(np_size, dtype = bool), np.zeros(p_size, dtype = bool)))
sample_weight = np.concatenate((np.repeat(np.sum(weights) / np_size, np_size), weights))
sample_weight /= np.mean(sample_weight)
model.fit(X, y, sample_weight = sample_weight)
return model.predict_proba(X)[:, tuple(model.classes_).index(True)]
def psa_weights(np_sample, p_sample, population_size = None, weights_column = None, covariates = None, model = None):
np_size = np_sample.shape[0]
my_propensities = propensities(np_sample, p_sample, weights_column, covariates, model)
optimal_weights = (1 - my_propensities) / my_propensities
if population_size:
optimal_weights *= population_size / np.sum(optimal_weights[:np_size])
return {"np": optimal_weights[:np_size], "p": optimal_weights[np_size:]}
def matching_values(np_sample, p_sample, target_column, target_category = None, covariates = None, model = None, training_weight = None):
y = np_sample[target_column]
if y.isna().any(): raise ValueError("Missing values in target column")
is_numerical = types.is_numeric_dtype(y) and target_category is None
if target_category is not None: y = y == target_category
if model is None: model = linear_regressor() if is_numerical else logistic_classifier()
if covariates is None: covariates = np_sample.columns.intersection(p_sample.columns)
np_sample = np_sample.loc[:, covariates]
p_sample = p_sample.loc[:, covariates]
set_categories(np_sample)
set_categories(p_sample)
if training_weight is None:
model.fit(np_sample, y)
else:
training_weight = training_weight / np.mean(training_weight)
model.fit(np_sample, y, sample_weight = training_weight)
def predict(X):
if is_numerical:
return model.predict(X)
else:
probs = model.predict_proba(X)
if target_category is None:
return {"categories": model.classes_, "probs": probs}
else:
return probs[:, tuple(model.classes_).index(True)]
return {"p": predict(p_sample), "np": predict(np_sample)}
def doubly_robust_estimation(np_sample, p_sample, target_column, target_category = None, weights_column = None, covariates = None, psa_model = None, matching_model = None):
if not types.is_numeric_dtype(np_sample[target_column]) and target_category is None:
raise ValueError("target_category must be set when the target variable is categorical.")
if weights_column is None:
weights = None
else:
weights = p_sample[weights_column]
imputed_weights = psa_weights(np_sample, p_sample, None, weights_column, covariates, psa_model)["np"]
imputed_values = matching_values(np_sample, p_sample, target_column, target_category, covariates, matching_model)
original_values = np_sample[target_column] if target_category is None else np_sample[target_column] == target_category
return np.average(imputed_values["p"], weights = weights) + np.average(original_values - imputed_values["np"], weights = imputed_weights)
def training_values(np_sample, p_sample, target_column, target_category = None, weights_column = None, covariates = None, psa_model = None, matching_model = None):
training_weight = psa_weights(np_sample, p_sample, None, weights_column, covariates, psa_model)["np"]
return matching_values(np_sample, p_sample, target_column, target_category, covariates, matching_model, training_weight)
def iqr(array):
percentiles = np.nanpercentile(array, (25, 75), method = "median_unbiased")
return percentiles[1] - percentiles[0]
def kw_weights(np_sample, p_sample, population_size = None, weights_column = None, covariates = None, model = None):
np_size = np_sample.shape[0]
my_propensities = propensities(np_sample, p_sample, weights_column, covariates, model)
kernels = my_propensities[np_size:].reshape(-1, 1) - my_propensities[:np_size]
m = min(np.std(kernels), iqr(kernels) / 1.349)
h = 0.9 * m / pow(kernels.size, 1/5)
kernels **= 2
kernels /= -2*h*h
np.exp(kernels, out = kernels)
np.clip(kernels, 1.49012e-8, None, out = kernels)
kernels /= np.sum(kernels, axis = 1).reshape(-1, 1)
if weights_column is not None: kernels *= p_sample[weights_column].to_numpy().reshape(-1, 1)
kernels = np.sum(kernels, axis = 0)
if population_size: kernels *= population_size / np.sum(kernels)
return kernels
def estimation(values, weights = None, axis = None):
return np.average(values, weights = weights, axis = axis)
def confidence_interval(values, weights = None):
method = 'BCa' if len(values) <= 10000 else 'percentile'
data = (values,) if weights is None else (values, weights)
ci = bootstrap(data, estimation, paired = True, n_resamples = 1000, method = method, random_state = 0, vectorized = True).confidence_interval
return (ci.low, ci.high)
def advanced_confidence_interval(np_sample, p_sample, estimator):
estimation = estimator(np_sample, p_sample)
def resample_estimator(np_index, p_index):
return estimator(np_sample.iloc[np_index], p_sample.iloc[p_index])
ci = bootstrap((np.arange(np_sample.shape[0]), np.arange(p_sample.shape[0])), resample_estimator,
n_resamples = 100, method = 'percentile', random_state = 0, vectorized = False).confidence_interval
return (estimation, ci.low, ci.high)