Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: pipeline spark k-sigma anomaly filtering #13

Merged
merged 4 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import mean, stddev, median, abs, col
from ...interfaces import MonitoringBaseInterface
from ...._pipeline_utils.models import Libraries, SystemType


class KSigmaAnomalyDetection(MonitoringBaseInterface):
"""
Anomaly detection with the k-sigma method. This method either computes the mean and standard deviation, or the median and the median absolute deviation (MAD) of the data.
The k-sigma method then filters out all data points that are k times the standard deviation away from the mean, or k times the MAD away from the median.
Assuming a normal distribution, this method keeps around 99.7% of the data points when k=3 and use_median=False.
"""

def __init__(
self,
spark: SparkSession,
df: DataFrame,
column_names: list[str],
k_value: int = 3,
use_median: bool = False,
) -> None:
if len(column_names) == 0:
raise Exception("You must provide at least one column name")
if len(column_names) > 1:
raise NotImplemented("Multiple columns are not supported yet")
self.column_names = column_names

self.use_median = use_median
self.spark = spark
self.df = df
self.k_value = k_value

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def filter_anomalies(self) -> DataFrame:
"""
Filter anomalies based on the k-sigma rule
"""

column_name = self.column_names[0]
mean_value, deviation = 0, 0

if mean_value is None:
raise Exception("Couldn't calculate mean value")

if self.use_median:
mean_value = self.df.select(median(column_name)).first()
if mean_value is None:
raise Exception("Couldn't calculate median value")
mean_value = mean_value[0]

deviation = self.df.agg(median(abs(col(column_name) - mean_value))).first()
if deviation is None:
raise Exception("Couldn't calculate mean value")
deviation = deviation[0]
else:
stats = self.df.select(
mean(column_name), stddev(self.column_names[0])
).first()
if stats is None:
raise Exception("Couldn't calculate mean value and standard deviation")

mean_value = stats[0]
deviation = stats[1]

shift = self.k_value * deviation
lower_bound = mean_value - shift
upper_bound = mean_value + shift

return self.df.filter(
(self.df[column_name] >= lower_bound)
& (self.df[column_name] <= upper_bound)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from pyspark.sql import SparkSession

from src.sdk.python.rtdip_sdk.pipelines.monitoring.spark.data_quality.k_sigma_anomaly_detection import (
KSigmaAnomalyDetection,
)

# Normal data mean=10 stddev=5 + 3 anomalies
# fmt: off
normal_input_values = [ 5.19811497, 8.34437927, 3.62104032, 10.02819525, 6.1183447 ,
20.10067378, 10.32313075, 14.090119 , 21.43078927, 2.76624332,
10.84089416, 1.90722629, 11.19750641, 13.70925639, 5.61011921,
4.50072694, 13.79440311, 13.30173747, 7.07183589, 12.79853139, 100]

normal_expected_values = [ 5.19811497, 8.34437927, 3.62104032, 10.02819525, 6.1183447 ,
20.10067378, 10.32313075, 14.090119 , 21.43078927, 2.76624332,
10.84089416, 1.90722629, 11.19750641, 13.70925639, 5.61011921,
4.50072694, 13.79440311, 13.30173747, 7.07183589, 12.79853139]
# fmt: on

# These values are tricky for the mean method, as the anomaly has a big effect on the mean
input_values = [1, 2, 3, 4, 20]
expected_values = [1, 2, 3, 4]


def test_filter_with_mean(spark_session: SparkSession):
# Test with normal data
normal_input_df = spark_session.createDataFrame(
[(float(num),) for num in normal_input_values], schema=["value"]
)
normal_expected_df = spark_session.createDataFrame(
[(float(num),) for num in normal_expected_values], schema=["value"]
)

normal_filtered_df = KSigmaAnomalyDetection(
spark_session,
normal_input_df,
column_names=["value"],
k_value=3,
use_median=False,
).filter_anomalies()

assert normal_expected_df.collect() == normal_filtered_df.collect()

# Test with data that has an anomaly that shifts the mean significantly
input_df = spark_session.createDataFrame(
[(float(num),) for num in input_values], schema=["value"]
)
expected_df = spark_session.createDataFrame(
[(float(num),) for num in expected_values], schema=["value"]
)

filtered_df = KSigmaAnomalyDetection(
spark_session, input_df, column_names=["value"], k_value=3, use_median=False
).filter_anomalies()

assert expected_df.collect() != filtered_df.collect()


def test_filter_with_median(spark_session: SparkSession):
# Test with normal data
normal_input_df = spark_session.createDataFrame(
[(float(num),) for num in normal_input_values], schema=["value"]
)
normal_expected_df = spark_session.createDataFrame(
[(float(num),) for num in normal_expected_values], schema=["value"]
)

normal_filtered_df = KSigmaAnomalyDetection(
spark_session,
normal_input_df,
column_names=["value"],
k_value=3,
use_median=True,
).filter_anomalies()

normal_expected_df.show()
normal_filtered_df.show()

assert normal_expected_df.collect() == normal_filtered_df.collect()

# Test with data that has an anomaly that shifts the mean significantly
input_df = spark_session.createDataFrame(
[(float(num),) for num in input_values], schema=["value"]
)
expected_df = spark_session.createDataFrame(
[(float(num),) for num in expected_values], schema=["value"]
)

filtered_df = KSigmaAnomalyDetection(
spark_session, input_df, column_names=["value"], k_value=3, use_median=True
).filter_anomalies()

assert expected_df.collect() == filtered_df.collect()
Loading