Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added aeon_neuro/_wip/__init__.py
Empty file.
95 changes: 95 additions & 0 deletions aeon_neuro/_wip/classification/_nuttall_knn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""Cumstomed K-nearest neighbors classifier for Burg-type Nuttall-Strand algorithm."""

from sklearn.neighbors import KNeighborsClassifier
from aeon_neuro._wip.distances._get_optimal_weights import get_optimal_weight1, get_optimal_weight2
from aeon_neuro._wip.distances._riemannian_matrix import (
pairwise_riemannian_distance_1,
pairwise_riemannian_distance_2,
pairwise_weighted_riemannian_distance_1,
pairwise_weighted_riemannian_distance_2,
)


class NuttallKNN(KNeighborsClassifier):
"""Custom K-nearest neighbors classifier for Burg-type Nuttall-Strand algorithm.

Parameters
----------
mode : int 1 or 2
Choose a Riemannian distance function to be used, by default 1.
weighted : bool
If True, use optimal weighted Riemannian distance, by default False.
n_neighbors : int, optional
Number of neighbors to use, by default 3.
n_jobs : int, optional
Number of jobs to run in parallel, by default 1.

Examples
--------
>>> from aeon_neuro._wip.classification._nuttall_knn import NuttallKNN
>>> from aeon_neuro._wip.transformations.series._nuttall_strand import (
... NuttallStrand)
>>> from sklearn.metrics import accuracy_score
>>> from aeon.datasets import load_classification
>>> import numpy as np
>>> import warnings
>>> warnings.filterwarnings("ignore")
>>> n_freqs=25
>>> transformer = NuttallStrand(model_order=5, n_freqs=n_freqs, fmax=60)
>>> X, y= load_classification("EyesOpenShut")
>>> X_train = X[:56]
>>> X_test = X[56:58]
>>> y_train = y[:56]
>>> y_test = y[56:58]
>>> X_train_psd = []
>>> X_test_psd = []
>>> for i in range(X_train.shape[0]):
... X_train_psd.append(transformer.fit_transform(X_train[i]))
>>> for i in range(X_test.shape[0]):
... X_test_psd.append(transformer.fit_transform(X_test[i]))
>>> X_train_psd = np.array(X_train_psd)
>>> X_test_psd = np.array(X_test_psd)
>>> knn = NuttallKNN(mode=1, weighted=True, n_neighbors=1, n_jobs=-1)
>>> knn.fit(X_train_psd, y_train)
>>> y_pred = knn.predict(X_test_psd)
>>> accuracy_score(y_test, y_pred)
0.5
"""

def __init__(self, mode: int = 1, weighted: bool = False, n_neighbors=3, n_jobs=-1):
assert mode in [1, 2], "Mode must be 1 or 2"
self.mode = mode
self.weighted = weighted
super().__init__(metric='precomputed', n_neighbors=n_neighbors, n_jobs=n_jobs)

def fit(self, X, y):
self.X_train_ = X
distances = None
if self.mode == 1:
if self.weighted is False:
distances = pairwise_riemannian_distance_1(X)
else:
self.W_ = get_optimal_weight1(X, y)
distances = pairwise_weighted_riemannian_distance_1(X, self.W_)
else:
if self.weighted is False:
distances = pairwise_riemannian_distance_2(X)
else:
self.W_ = get_optimal_weight2(X, y)
distances = pairwise_weighted_riemannian_distance_2(X, self.W_)
super().fit(distances, y)


def predict(self, X):
distances = None
if self.mode == 1:
if self.weighted is False:
distances = pairwise_riemannian_distance_1(X, self.X_train_)
else:
distances = pairwise_weighted_riemannian_distance_1(X, self.W_, self.X_train_)
else:
if self.weighted is False:
distances = pairwise_riemannian_distance_2(X, self.X_train_)
else:
distances = pairwise_weighted_riemannian_distance_2(X, self.W_, self.X_train_)
return super().predict(distances)
Empty file.
160 changes: 160 additions & 0 deletions aeon_neuro/_wip/distances/_get_optimal_weights.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Calculate optimal weights for weighted Riemannian distance."""

import numpy as np
from scipy.linalg import sqrtm
from numpy.linalg import eig


def _get_inv(A, rcond_threshold=1e-10, cond_threshold=1e10):
r"""Calculate the inverse of a matrix."""
try:
# Calculate the condition number of the matrix
cond_A = np.linalg.cond(A)
if cond_A > cond_threshold:
# If the condition number is too large, use the pseudo-inverse
# print(f"Condition number is {cond_A:.2e}, using pinv instead of inv")
inv_A = np.linalg.pinv(A, rcond=rcond_threshold)
else:
# If the condition number is appropriate, calculate the inverse
inv_A = np.linalg.inv(A)
except np.linalg.LinAlgError:
# If the matrix is singular, use the pseudo-inverse
# print("Matrix is singular, using pinv instead of inv")
inv_A = np.linalg.pinv(A, rcond=rcond_threshold)

return inv_A


def _svd_w1(P_ik, P_jk):
P_half_ik = sqrtm(P_ik)
P_half_jk = sqrtm(P_jk)

# Singular Value Decomposition (SVD)
product_matrix = P_half_jk @ P_half_ik
# if product_matrix.dtype == np.complex256:
# product_matrix = product_matrix.astype(np.complex128)
V_ij1, Sigma_ij, V_ij2_H = np.linalg.svd(product_matrix)

# Construct unitary matrices
U_ik = V_ij1
U_jk = V_ij2_H.T.conj()

P_tilde_ik = P_half_ik @ U_ik
P_tilde_jk = P_half_jk @ U_jk

return P_tilde_ik, P_tilde_jk


def _compute_summation_matrices_w1(X_train_psd, y_train):
n_cases, k, M, M = X_train_psd.shape

M_S1 = np.zeros((M, M), dtype=np.complex128)
M_D1 = np.zeros((M, M), dtype=np.complex128)

# Compute the summation matrices
for i in range(n_cases):
for j in range(i+1, n_cases):
for m in range(k):
P_ik = X_train_psd[i, m]
P_jk = X_train_psd[j, m]

P_tilde_ik, P_tilde_jk = _svd_w1(P_ik, P_jk)

diff = P_tilde_ik - P_tilde_jk
diff_H = diff.conj().T

if y_train[i] == y_train[j]:
M_S1 += diff @ diff_H
else:
M_D1 += diff @ diff_H

return M_S1, M_D1


def _compute_summation_matrices_w2(X_train_psd, y_train):
n_cases, k, M, M = X_train_psd.shape

M_S1 = np.zeros((M, M), dtype=np.complex128)
M_D1 = np.zeros((M, M), dtype=np.complex128)

# Compute the summation matrices
for i in range(n_cases):
for j in range(i+1, n_cases):
for m in range(k):
P_ik = X_train_psd[i, m]
P_jk = X_train_psd[j, m]

P_half_ik = sqrtm(P_ik)
P_half_jk = sqrtm(P_jk)

diff = P_half_ik - P_half_jk
diff_H = diff.conj().T

if y_train[i] == y_train[j]:
M_S1 += diff @ diff_H
else:
M_D1 += diff @ diff_H

return M_S1, M_D1


def _eigenvector_decomposition(M_S1, M_D1, PCA_num):
# Perform eigenvalue decomposition on the matrix M_S1^-1 * M_D1
eigenvalues, eigenvectors = eig(_get_inv(M_S1) @ M_D1)

# Sort the eigenvectors based on the eigenvalues in descending order
idx = np.argsort(eigenvalues)[::-1]
top_eigenvectors = eigenvectors[:, idx[:PCA_num]]

return top_eigenvectors

def _compute_weighting_matrix(top_eigenvectors):
Omega = top_eigenvectors
W = Omega @ Omega.conj().T
return W


def get_optimal_weight1(X_train_psd, y_train):
"""Calculate optimal weights for weighted Riemannian distance 1.

Parameters
----------
X_train_psd : np.ndarray
Power spectral density matrices of shape (n_cases, n_freqs, n_channels, n_channels).
Get from Burg-type Nuttall-Strand transformation.
y_train : np.ndarray
Labels of shape (n_cases,).

Returns
-------
W1 : np.ndarray
Optimal weighting matrix of shape (n_channels, n_channels).
"""
PCA_num = X_train_psd.shape[-1]
M_S1, M_D1 = _compute_summation_matrices_w1(X_train_psd, y_train)
top_eigenvectors = _eigenvector_decomposition(M_S1, M_D1, PCA_num)
W1 = _compute_weighting_matrix(top_eigenvectors)
return W1


def get_optimal_weight2(X_train_psd, y_train):
"""Calculate optimal weights for weighted Riemannian distance 2.

Parameters
----------
X_train_psd : np.ndarray
Power spectral density matrices of shape (n_cases, n_freqs, n_channels, n_channels).
Get from Burg-type Nuttall-Strand transformation.
y_train : np.ndarray
Labels of shape (n_cases,).

Returns
-------
W2 : np.ndarray
Optimal weighting matrix of shape (n_channels, n_channels).
"""
PCA_num = X_train_psd.shape[-1]
M_S2, M_D2 = _compute_summation_matrices_w2(X_train_psd, y_train)
top_eigenvectors = _eigenvector_decomposition(M_S2, M_D2, PCA_num)
W2 = _compute_weighting_matrix(top_eigenvectors)
return W2
80 changes: 80 additions & 0 deletions aeon_neuro/_wip/distances/_riemannian.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Implement as a distance function here."""

import math

import numpy as np
from scipy.linalg import eig, logm


def riemannian_distance_a(
x: np.ndarray,
y: np.ndarray,
) -> float:
r"""Compute the Reimannian distance between two time series.

Parameters
----------
x : np.ndarray
First time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
y : np.ndarray
Second time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
"""
x_cov = np.cov(x)
y_cov = np.cov(y)
x_trace = np.trace(x_cov)
y_trace = np.trace(y_cov)
x_root = logm(x_cov)
c_temp = logm(np.matmul(x_root, np.matmul(y_cov, x_root)))
c = 2 * np.trace(c_temp)
return math.sqrt(x_trace + y_trace - c)


def riemannian_distance_b(
x: np.ndarray,
y: np.ndarray,
) -> float:
r"""Compute the Reimannian distance between two time series.

https://hal.science/hal-00820475/document
https://hal.science/hal-00602700/document

Parameters
----------
x : np.ndarray
First time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
y : np.ndarray
Second time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
"""
x_cov = np.cov(x)
y_cov = np.cov(y)
eigenvalues = eig(x_cov, y_cov)
distance = 0
for i in eigenvalues[0]:
distance += math.log(np.real(i)) ** 2
return math.sqrt(distance)


def riemannian_distance_c(
x: np.ndarray,
y: np.ndarray,
) -> float:
r"""Compute the Reimannian distance between two time series.

Parameters
----------
x : np.ndarray
First time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
y : np.ndarray
Second time series, either univariate, shape ``(n_timepoints,)``, or
multivariate, shape ``(n_channels, n_timepoints)``.
"""
x_cov = np.cov(x)
y_cov = np.cov(y)
x_inv_root = logm(np.linalg.inv(x_cov))
distance = logm(np.matmul(x_inv_root, np.matmul(y_cov, x_inv_root)))
return np.linalg.norm(distance)
Loading
Loading