Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion aeon/transformations/collection/imbalance/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Supervised transformers to rebalance colelctions of time series."""

__all__ = ["ADASYN", "SMOTE", "OHIT"]
__all__ = ["ADASYN", "SMOTE", "OHIT", "ESMOTE"]

from aeon.transformations.collection.imbalance._adasyn import ADASYN
from aeon.transformations.collection.imbalance._esmote import ESMOTE
from aeon.transformations.collection.imbalance._ohit import OHIT
from aeon.transformations.collection.imbalance._smote import SMOTE
34 changes: 25 additions & 9 deletions aeon/transformations/collection/imbalance/_adasyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
# License: MIT
"""

from typing import Optional, Union

import numpy as np
from sklearn.utils import check_random_state

from aeon.transformations.collection.imbalance._smote import SMOTE

Expand All @@ -35,7 +36,7 @@ class ADASYN(SMOTE):
----------
random_state : int or None, optional (default=None)
Random seed for reproducibility.
k_neighbors : int, optional (default=5)
n_neighbors : int, optional (default=5)
Number of nearest neighbours used to construct synthetic samples.

References
Expand All @@ -55,12 +56,26 @@ class ADASYN(SMOTE):
>>> X_res, y_res = sampler.fit_transform(X, y)
"""

def __init__(self, random_state=None, k_neighbors=5):
super().__init__(random_state=random_state, k_neighbors=k_neighbors)
def __init__(
self,
n_neighbors: int = 5,
random_state=None,
distance: Union[str, callable] = "euclidean",
distance_params: Optional[dict] = None,
n_jobs: int = 1,
weights: Union[str, callable] = "uniform",
):
super().__init__(
random_state=random_state,
n_neighbors=n_neighbors,
distance=distance,
distance_params=distance_params,
n_jobs=n_jobs,
weights=weights,
)

def _transform(self, X, y=None):
X = np.squeeze(X, axis=1)
random_state = check_random_state(self.random_state)
X_resampled = [X.copy()]
y_resampled = [y.copy()]

Expand All @@ -70,8 +85,9 @@ def _transform(self, X, y=None):
continue
target_class_indices = np.flatnonzero(y == class_sample)
X_class = X[target_class_indices]
y_class = y[target_class_indices]

self.nn_.fit(X)
self.nn_.fit(X, y)
nns = self.nn_.kneighbors(X_class, return_distance=False)[:, 1:]
# The ratio is computed using a one-vs-rest manner. Using majority
# in multi-class would lead to slightly different results at the
Expand All @@ -97,14 +113,14 @@ def _transform(self, X, y=None):

# the nearest neighbors need to be fitted only on the current class
# to find the class NN to generate new samples
self.nn_.fit(X_class)
self.nn_.fit(X_class, y_class)
nns = self.nn_.kneighbors(X_class, return_distance=False)[:, 1:]

enumerated_class_indices = np.arange(len(target_class_indices))
rows = np.repeat(enumerated_class_indices, n_samples_generate)
cols = random_state.choice(n_neighbors, size=n_samples)
cols = self._random_state.choice(n_neighbors, size=n_samples)
diffs = X_class[nns[rows, cols]] - X_class[rows]
steps = random_state.uniform(size=(n_samples, 1))
steps = self._random_state.uniform(size=(n_samples, 1))
X_new = X_class[rows] + steps * diffs

X_new = X_new.astype(X.dtype)
Expand Down
226 changes: 226 additions & 0 deletions aeon/transformations/collection/imbalance/_esmote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
from collections import OrderedDict
from typing import Optional, Union

import numpy as np
from sklearn.utils import check_random_state

from aeon.clustering.averaging._ba_utils import _get_alignment_path
from aeon.transformations.collection import BaseCollectionTransformer
from aeon.transformations.collection.imbalance._single_class_knn import Single_Class_KNN
from aeon.utils.validation import check_n_jobs

__all__ = ["ESMOTE"]


class ESMOTE(BaseCollectionTransformer):
"""
Elastic Synthetic Minority Over-sampling Technique (ESMOTE).

Parameters
----------
n_neighbors : int, default=5
The number of nearest neighbors used to define the neighborhood of samples
to use to generate the synthetic time series.
distance : str or callable, default="twe"
The distance metric to use for the nearest neighbors search and alignment path
of synthetic time series.
weights : str or callable, default = 'uniform'
Mechanism for weighting a vote one of: ``'uniform'``, ``'distance'``,
or a callable
function.
random_state : int, RandomState instance or None, default=None
If `int`, random_state is the seed used by the random number generator;
If `RandomState` instance, random_state is the random number generator;
If `None`, the random number generator is the `RandomState` instance used
by `np.random`.

See Also
--------
ADASYN

References
----------
.. [1] Chawla et al. SMOTE: synthetic minority over-sampling technique, Journal
of Artificial Intelligence Research 16(1): 321–357, 2002.
https://dl.acm.org/doi/10.5555/1622407.1622416
"""

_tags = {
"capability:multivariate": False,
"capability:unequal_length": False,
"capability:multithreading": True,
"requires_y": True,
}

def __init__(
self,
n_neighbors=5,
distance: Union[str, callable] = "twe",
distance_params: Optional[dict] = None,
weights: Union[str, callable] = "uniform",
n_jobs: int = 1,
random_state=None,
):
self.random_state = random_state
self.n_neighbors = n_neighbors
self.distance = distance
self.weights = weights
self.distance_params = distance_params
self.n_jobs = n_jobs

self._random_state = None
self._distance_params = distance_params or {}

self.nn_ = None
super().__init__()

def _fit(self, X, y=None):
self._random_state = check_random_state(self.random_state)
self._n_jobs = check_n_jobs(self.n_jobs)
self.nn_ = Single_Class_KNN(
n_neighbors=self.n_neighbors + 1,
distance=self.distance,
distance_params=self._distance_params,
weights=self.weights,
n_jobs=self.n_jobs,
)

# generate sampling target by targeting all classes except the majority
unique, counts = np.unique(y, return_counts=True)
target_stats = dict(zip(unique, counts))
n_sample_majority = max(target_stats.values())
class_majority = max(target_stats, key=target_stats.get)
sampling_strategy = {
key: n_sample_majority - value
for (key, value) in target_stats.items()
if key != class_majority
}
self.sampling_strategy_ = OrderedDict(sorted(sampling_strategy.items()))
return self

def _transform(self, X, y=None):
X_resampled = [X.copy()]
y_resampled = [y.copy()]

# got the minority class label and the number needs to be generated
for class_sample, n_samples in self.sampling_strategy_.items():
if n_samples == 0:
continue
target_class_indices = np.flatnonzero(y == class_sample)
X_class = X[target_class_indices]
y_class = y[target_class_indices]

self.nn_.fit(X_class, y_class)
nns = self.nn_.kneighbors(X_class, return_distance=False)[:, 1:]
X_new, y_new = self._make_samples(
X_class,
y.dtype,
class_sample,
X_class,
nns,
n_samples,
1.0,
n_jobs=self.n_jobs,
)
X_resampled.append(X_new)
y_resampled.append(y_new)
X_synthetic = np.vstack(X_resampled)
y_synthetic = np.hstack(y_resampled)

return X_synthetic, y_synthetic

def _make_samples(
self, X, y_dtype, y_type, nn_data, nn_num, n_samples, step_size=1.0, n_jobs=1
):
samples_indices = self._random_state.randint(
low=0, high=nn_num.size, size=n_samples
)

steps = (
step_size
* self._random_state.uniform(low=0, high=1, size=n_samples)[:, np.newaxis]
)
rows = np.floor_divide(samples_indices, nn_num.shape[1])
cols = np.mod(samples_indices, nn_num.shape[1])
X_new = np.zeros((len(rows), *X.shape[1:]), dtype=X.dtype)
for count in range(len(rows)):
i = rows[count]
j = cols[count]
nn_ts = nn_data[nn_num[i, j]]
X_new[count] = self._generate_sample_use_elastic_distance(
X[i],
nn_ts,
distance=self.distance,
step=steps[count],
)

y_new = np.full(n_samples, fill_value=y_type, dtype=y_dtype)
return X_new, y_new

def _generate_sample_use_elastic_distance(
self,
curr_ts,
nn_ts,
distance,
step,
window: Union[float, None] = None,
g: float = 0.0,
epsilon: Union[float, None] = None,
nu: float = 0.001,
lmbda: float = 1.0,
independent: bool = True,
c: float = 1.0,
descriptor: str = "identity",
reach: int = 15,
warp_penalty: float = 1.0,
transformation_precomputed: bool = False,
transformed_x: Optional[np.ndarray] = None,
transformed_y: Optional[np.ndarray] = None,
return_bias=True,
):
"""
Generate a single synthetic sample using soft distance.

This is use soft distance to align the current time series with its nearest
neighbor, and then generate a synthetic sample by subtracting the aligned
nearest neighbor from the current time series.

# shape: (c, l) or (l)
# shape: (c, l) or (l)
"""
new_ts = curr_ts.copy()
alignment, _ = _get_alignment_path(
nn_ts,
curr_ts,
distance,
window,
g,
epsilon,
nu,
lmbda,
independent,
c,
descriptor,
reach,
warp_penalty,
transformation_precomputed,
transformed_x,
transformed_y,
)
path_list = [[] for _ in range(curr_ts.shape[1])]
for k, l in alignment:
path_list[k].append(l)

empty_of_array = np.zeros_like(curr_ts, dtype=float) # shape: (c, l)

for k, l in enumerate(path_list):
key = self._random_state.choice(l)
# Compute difference for all channels at this time step
empty_of_array[:, k] = curr_ts[:, k] - nn_ts[:, key]

bias = step * empty_of_array
if return_bias:
return bias

new_ts = new_ts - bias
return new_ts
24 changes: 24 additions & 0 deletions aeon/transformations/collection/imbalance/_single_class_knn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Wrapper of KNeighborsTimeSeriesClassifier named Single_Class_KNN.

It wraps the fit setup to ensure `_fit` is executed even when the dataset
contains only a single class.
"""

from aeon.classification.distance_based import KNeighborsTimeSeriesClassifier

__all__ = ["Single_Class_KNN"]


class Single_Class_KNN(KNeighborsTimeSeriesClassifier):
"""
KNN classifier for time series data, adapted to work with SMOTE.

This class is a wrapper around the original KNeighborsTimeSeriesClassifier
to ensure compatibility with the Signal class.
"""

def _fit_setup(self, X, y):
# KNN can support if all labels are the same so always return False for single
# class problem so the fit will always run
X, y, _ = super()._fit_setup(X, y)
return X, y, False
Loading