From 2d82be9815fca2eefacd95e392ca0389fa72acbd Mon Sep 17 00:00:00 2001 From: Christian Munz Date: Sun, 10 Nov 2024 01:36:41 +0100 Subject: [PATCH 1/8] Init & generate/flag missing values Signed-off-by: Christian Munz --- .../data_quality/missing_value_imputation.py | 165 ++++++++++++++++++ .../test_missing_value_imputation.py | 77 ++++++++ 2 files changed, 242 insertions(+) create mode 100644 src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py create mode 100644 tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py new file mode 100644 index 000000000..d0c28b831 --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py @@ -0,0 +1,165 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pyspark.sql import DataFrame as PySparkDataFrame, functions as F +from pyspark.sql.functions import col, udf +from pyspark.sql.types import StringType, TimestampType, FloatType, ArrayType +from pyspark.sql.window import Window +from datetime import timedelta +from typing import List +from ...interfaces import WranglerBaseInterface +from ...._pipeline_utils.models import Libraries, SystemType + + +class MissingValueImputation(WranglerBaseInterface): + """ + TODO + + Args: + TODO + + Returns: + TODO + + Example + -------- + TODO + """ + + + df: PySparkDataFrame + + def __init__( + self, + df: PySparkDataFrame + ) -> None: + self.df = df + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + + def filter(self) -> PySparkDataFrame: + """ + Imputate missing values based on [] + """ + + if not all(col_ in self.df.columns for col_ in ["TagName", "EventTime", "Value"]): + raise ValueError("Columns not as expected") + + if self._is_column_type(self.df, "EventTime", StringType): + self.df = self.df.withColumn("EventTime", F.to_timestamp("EventTime")) + if self._is_column_type(self.df, "Value", StringType): + self.df = self.df.withColumn("Value", self.df["Value"].cast(FloatType())) + + dfs_by_source = self._split_by_source() + + flagged_dfs: List[PySparkDataFrame] = [] + + for source, df in dfs_by_source.items(): + flagged_df = self._flag_missing_values(df) + + print(flagged_df.show(truncate=False)) # Current testing + flagged_dfs.append(flagged_df) + + return self.df + + + def _flag_missing_values(self, df) -> PySparkDataFrame: + window_spec = Window.partitionBy("TagName").orderBy("EventTime") + + df = df.withColumn("prev_event_time", F.lag("EventTime").over(window_spec)) + df = df.withColumn("time_diff_seconds", + (F.unix_timestamp("EventTime") - F.unix_timestamp("prev_event_time"))) + + df_diff = df.filter(F.col("time_diff_seconds").isNotNull()) + interval_counts = df_diff.groupBy("time_diff_seconds").count() + most_frequent_interval = interval_counts.orderBy(F.desc("count")).first() + expected_interval = most_frequent_interval["time_diff_seconds"] if most_frequent_interval else None + + tolerance_percentage = 15 + tolerance = (expected_interval * tolerance_percentage) / 100 if expected_interval else 0 + + existing_timestamps = df.select("TagName", "EventTime").rdd \ + .map(lambda row: (row["TagName"], row["EventTime"])).groupByKey().collectAsMap() + + def generate_missing_timestamps(prev_event_time, event_time, tag_name): + # Check for first row + if prev_event_time is None or event_time is None or expected_interval is None: + return [] + + # Check against existing timestamps to avoid duplicates + tag_timestamps = set(existing_timestamps.get(tag_name, [])) + missing_timestamps = [] + current_time = prev_event_time + + while current_time < event_time: + next_expected_time = current_time + timedelta(seconds=expected_interval) + time_diff = abs((next_expected_time - event_time).total_seconds()) + if time_diff <= tolerance: + break + if next_expected_time not in tag_timestamps: + missing_timestamps.append(next_expected_time) + current_time = next_expected_time + + return missing_timestamps + + generate_missing_timestamps_udf = udf(generate_missing_timestamps, ArrayType(TimestampType())) + + df_with_missing = df.withColumn( + "missing_timestamps", + generate_missing_timestamps_udf("prev_event_time", "EventTime", "TagName") + ) + + df_missing_entries = df_with_missing.select( + "TagName", + F.explode("missing_timestamps").alias("EventTime"), + F.lit("Good").alias("Status"), + F.lit(float('nan')).cast(FloatType()).alias("Value") + ) + + df_combined = df.select("TagName", "EventTime", "Status", "Value").union(df_missing_entries).orderBy( + "EventTime") + + return df_combined + + + def _split_by_source(self) -> dict: + # + tag_names = self.df.select("TagName").distinct().collect() + tag_names = [row["TagName"] for row in tag_names] + source_dict = {tag: self.df.filter(col("TagName") == tag).orderBy("EventTime") for tag in tag_names} + + return source_dict + + + def _is_column_type(self, df, column_name, data_type): + # + type_ = df.schema[column_name] + + return isinstance(type_.dataType, data_type) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py new file mode 100644 index 000000000..12e8bc181 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py @@ -0,0 +1,77 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +from pyspark.sql import SparkSession +from pyspark.sql.dataframe import DataFrame +from pyspark.sql.types import StructType, StructField, StringType + + +from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.missing_value_imputation import ( + MissingValueImputation, +) + + +@pytest.fixture(scope="session") +def spark_session(): + return SparkSession.builder.master("local[2]").appName("test").getOrCreate() + + +def test_missing_value_imputation(spark_session: SparkSession): + + schema = StructType([ + StructField("TagName", StringType(), True), + StructField("EventTime", StringType(), True), + StructField("Status", StringType(), True), + StructField("Value", StringType(), True) + ]) + + data = [ + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), + #("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:11.000", "Good", "0.340000004"), # Test value + ("A2PS64V0J.:ZUX09R", "2024-01-01 19:42:37.000", "Good", "0.150000006"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 15:39:03.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29.000", "Good", "0.119999997"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 07:32:55.000", "Good", "0.129999995"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21.000", "Good", "0.340000004"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 06:08:00", "Good", "5921.549805"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 05:14:00", "Good", "5838.216797"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 01:37:00", "Good", "5607.825684"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 00:26:00", "Good", "5563.708008"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 06:08:00", "Good", "5921.549805"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 05:14:00", "Good", "5838.216797"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 01:37:00", "Good", "5607.825684"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 00:26:00", "Good", "5563.708008"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 06:08:00", "Good", "5921.549805"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 05:14:00", "Good", "5838.216797"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 01:37:00", "Good", "5607.825684"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 00:26:00", "Good", "5563.708008"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 06:08:00", "Good", "5921.549805"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 05:14:00", "Good", "5838.216797"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 01:37:00", "Good", "5607.825684"), + #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 00:26:00", "Good", "5563.708008"), + ] + + df = spark_session.createDataFrame(data, schema=schema) + + missing_value_imputation = MissingValueImputation(df) + imputed_df = missing_value_imputation.filter() + + assert isinstance(imputed_df, DataFrame) + #TODO + From 69fddde882f933516dbceaa90f17075c1a4f350c Mon Sep 17 00:00:00 2001 From: Christian Munz Date: Sun, 10 Nov 2024 19:22:10 +0100 Subject: [PATCH 2/8] Flagging refinement, systemds & spline interpolation init Signed-off-by: Christian Munz --- environment.yml | 1 + .../data_quality/missing_value_imputation.py | 105 ++++++++++++++++-- .../test_missing_value_imputation.py | 100 ++++++++++++----- 3 files changed, 170 insertions(+), 36 deletions(-) diff --git a/environment.yml b/environment.yml index 193d7dedd..59308eac2 100644 --- a/environment.yml +++ b/environment.yml @@ -86,3 +86,4 @@ dependencies: - pandas>=1.5.2,<2.2.0 - moto[s3]>=5.0.16,<6.0.0 - pyarrow>=14.0.1,<17.0.0 + - systemds>=3.0.0,<3.3.0 \ No newline at end of file diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py index d0c28b831..42dbd4fbd 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py @@ -12,10 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -from pyspark.sql import DataFrame as PySparkDataFrame, functions as F +from pyspark.sql import DataFrame as PySparkDataFrame, functions as F, Row from pyspark.sql.functions import col, udf from pyspark.sql.types import StringType, TimestampType, FloatType, ArrayType from pyspark.sql.window import Window +from scipy.interpolate import UnivariateSpline +from systemds.context import SystemDSContext +from systemds.operator.algorithm import imputeByFD, imputeByMean +import numpy as np from datetime import timedelta from typing import List from ...interfaces import WranglerBaseInterface @@ -37,7 +41,6 @@ class MissingValueImputation(WranglerBaseInterface): TODO """ - df: PySparkDataFrame def __init__( @@ -72,25 +75,109 @@ def filter(self) -> PySparkDataFrame: if not all(col_ in self.df.columns for col_ in ["TagName", "EventTime", "Value"]): raise ValueError("Columns not as expected") - if self._is_column_type(self.df, "EventTime", StringType): - self.df = self.df.withColumn("EventTime", F.to_timestamp("EventTime")) - if self._is_column_type(self.df, "Value", StringType): + if not self._is_column_type(self.df, "EventTime", TimestampType): + format_1 = "yyyy-MM-dd HH:mm:ss.SSS" + format_2 = "dd.MM.yyyy HH:mm" + if self._is_column_type(self.df, "EventTime", StringType): + # Attempt to parse the first format, then fallback to the second + self.df = self.df.withColumn( + "EventTime", + F.coalesce( + F.to_timestamp("EventTime", "yyyy-MM-dd HH:mm:ss.SSS"), + F.to_timestamp("EventTime", "dd.MM.yyyy HH:mm:ss") + ) + ) + if not self._is_column_type(self.df, "Value", FloatType): self.df = self.df.withColumn("Value", self.df["Value"].cast(FloatType())) dfs_by_source = self._split_by_source() - flagged_dfs: List[PySparkDataFrame] = [] + imputed_dfs: List[PySparkDataFrame] = [] for source, df in dfs_by_source.items(): + # Compute, insert and flag all the missing entries flagged_df = self._flag_missing_values(df) - print(flagged_df.show(truncate=False)) # Current testing - flagged_dfs.append(flagged_df) + #print(flagged_df.show(flagged_df.count(), False)) # Current testing + + # Impute the missing values of flagged entries + #imputed_df_DS = self._impute_missing_values_DS(flagged_df) + imputed_df_SP = self._impute_missing_values_SP(flagged_df) + # TODO + #imputed_dfs.append(imputed_df) return self.df + def _impute_missing_values_SP(self, df) -> PySparkDataFrame: + # Imputes missing values by Spline Interpolation + data = np.array(df.select("Value").rdd.flatMap(lambda x: x).collect(), dtype=float) + + mask = np.isnan(data) + + x_data = np.arange(len(data)) + y_data = data[~mask] + + # Spline interpolation (cubic smoothing spline) + spline = UnivariateSpline(x_data[~mask], y_data, s=0) + + # Impute missing values + data_imputed = data.copy() + data_imputed[mask] = spline(x_data[mask]) + + data_imputed_list = data_imputed.tolist() + + imputed_rdd = df.rdd.zipWithIndex().map(lambda row: Row( + TagName=row[0][0], + EventTime=row[0][1], + Status=row[0][2], + Value=float(data_imputed_list[row[1]]) + )) + imputed_df = imputed_rdd.toDF(df.schema) + + #print("Imputed DataFrame:") + #print(imputed_df.show(imputed_df.count(), False)) + + return imputed_df + + + def _impute_missing_values_DS(self, df) -> PySparkDataFrame: + # Impute missing values with Apache SystemDS functions: TODO + value_array = np.array(df.select("Value").rdd.flatMap(lambda x: x).collect()).reshape(-1, 1) + mask_array = np.array([0]) + + with SystemDSContext() as sds_context: + + X = sds_context.from_numpy(value_array) + mask = sds_context.from_numpy(mask_array) + + imputed_matrix_result = imputeByMean(X, mask) + + imputed_values: np.array(None) + + for node in imputed_matrix_result: + try: + imputed_values = node.compute() + except Exception as e: + print(e) + break + + imputed_values_list = imputed_values.tolist() + + imputed_rdd = df.rdd.zipWithIndex().map(lambda row: Row( + TagName=row[0][0], + EventTime=row[0][1], + Status=row[0][2], + Value=float(imputed_values_list[row[1]]) + )) + + imputed_df = imputed_rdd.toDF(df.schema) + + return imputed_df + + def _flag_missing_values(self, df) -> PySparkDataFrame: + window_spec = Window.partitionBy("TagName").orderBy("EventTime") df = df.withColumn("prev_event_time", F.lag("EventTime").over(window_spec)) @@ -102,7 +189,7 @@ def _flag_missing_values(self, df) -> PySparkDataFrame: most_frequent_interval = interval_counts.orderBy(F.desc("count")).first() expected_interval = most_frequent_interval["time_diff_seconds"] if most_frequent_interval else None - tolerance_percentage = 15 + tolerance_percentage = 5 tolerance = (expected_interval * tolerance_percentage) / 100 if expected_interval else 0 existing_timestamps = df.select("TagName", "EventTime").rdd \ diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py index 12e8bc181..4d2fd58d1 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py @@ -38,33 +38,79 @@ def test_missing_value_imputation(spark_session: SparkSession): ]) data = [ - ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", "0.340000004"), - ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "0.150000006"), - ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "0.129999995"), - ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "0.119999997"), - ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "0.129999995"), - #("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:11.000", "Good", "0.340000004"), # Test value - ("A2PS64V0J.:ZUX09R", "2024-01-01 19:42:37.000", "Good", "0.150000006"), - ("A2PS64V0J.:ZUX09R", "2024-01-01 15:39:03.000", "Good", "0.129999995"), - ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29.000", "Good", "0.119999997"), - ("A2PS64V0J.:ZUX09R", "2024-01-01 07:32:55.000", "Good", "0.129999995"), - ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21.000", "Good", "0.340000004"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 06:08:00", "Good", "5921.549805"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 05:14:00", "Good", "5838.216797"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 01:37:00", "Good", "5607.825684"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 00:26:00", "Good", "5563.708008"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 06:08:00", "Good", "5921.549805"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 05:14:00", "Good", "5838.216797"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 01:37:00", "Good", "5607.825684"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 00:26:00", "Good", "5563.708008"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 06:08:00", "Good", "5921.549805"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 05:14:00", "Good", "5838.216797"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 01:37:00", "Good", "5607.825684"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 00:26:00", "Good", "5563.708008"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 06:08:00", "Good", "5921.549805"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 05:14:00", "Good", "5838.216797"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 01:37:00", "Good", "5607.825684"), - #("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2024-01-02 00:26:00", "Good", "5563.708008"), + # Setup controlled Test + ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21.000", "Good", "1.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 07:32:55.000", "Good", "2.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29.000", "Good", "3.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 15:39:03.000", "Good", "4.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 19:42:37.000", "Good", "5.0"), + #("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:11.000", "Good", "6.0"), # Test values + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "7.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "8.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "9.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "10.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:13:46.000", "Good", "11.0"), # Tolerance Test + ("A2PS64V0J.:ZUX09R", "2024-01-03 00:07:20.000", "Good", "12.0"), + #("A2PS64V0J.:ZUX09R", "2024-01-03 04:10:54.000", "Good", "13.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 08:14:28.000", "Good", "14.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 12:18:02.000", "Good", "15.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 16:21:36.000", "Good", "16.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 20:25:10.000", "Good", "17.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-04 00:28:44.000", "Good", "18.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-04 04:32:18.000", "Good", "19.0"), + # Real missing values + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:01:43", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:02:44", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:04:44", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:05:44", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:11:46", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:13:46", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:16:47", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:19:48", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:20:48", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:25:50", "Good", "4681.35791"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:26:50", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:27:50", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:28:50", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:31:51", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:32:52", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:42:52", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:42:54", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:43:54", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:44:54", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:45:54", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:46:55", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:47:55", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:51:56", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:52:56", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:55:57", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:56:58", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:57:58", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:59:59", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:00:59", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:05:01", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:10:02", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:11:03", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:13:06", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:17:07", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:18:07", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:20:07", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:21:07", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:25:09", "Good", "4676.456055"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:26:09", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:30:09", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:35:10", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:36:10", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:40:11", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:42:11", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:43:11", "Good", "4705.867676"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:44:11", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:46:11", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:47:11", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:53:13", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:54:13", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:55:13", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:56:13", "Good", "4700.96582"), ] df = spark_session.createDataFrame(data, schema=schema) From 8d12261e6cfc76e0a176dee337daba83c55a7d82 Mon Sep 17 00:00:00 2001 From: Christian Munz Date: Fri, 15 Nov 2024 23:57:51 +0100 Subject: [PATCH 3/8] documentation MVI Signed-off-by: Christian Munz --- .../data_quality/missing_value_imputation.py | 180 ++++++++++++------ .../test_missing_value_imputation.py | 2 +- 2 files changed, 121 insertions(+), 61 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py index 42dbd4fbd..a5f5ef3f3 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from pyspark.sql import DataFrame as PySparkDataFrame, functions as F, Row +from pyspark.sql import SparkSession, DataFrame as PySparkDataFrame, functions as F, Row from pyspark.sql.functions import col, udf from pyspark.sql.types import StringType, TimestampType, FloatType, ArrayType from pyspark.sql.window import Window @@ -28,25 +28,81 @@ class MissingValueImputation(WranglerBaseInterface): """ - TODO - - Args: - TODO - - Returns: - TODO + Imputes missing values in a univariate time series creating a continuous curve of data points. For that, the + time intervals of each individual source is calculated, to then insert empty records at the missing timestamps with + NaN values. Through spline interpolation the missing NaN values are calculated resulting in a consistent data set + and thus enhance your data quality. Example -------- - TODO + from pyspark.sql import SparkSession + from pyspark.sql.dataframe import DataFrame + from pyspark.sql.types import StructType, StructField, StringType + from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.missing_value_imputation import ( + MissingValueImputation, + ) + + @pytest.fixture(scope="session") + def spark_session(): + return SparkSession.builder.master("local[2]").appName("test").getOrCreate() + + spark = spark_session() + + schema = StructType([ + StructField("TagName", StringType(), True), + StructField("EventTime", StringType(), True), + StructField("Status", StringType(), True), + StructField("Value", StringType(), True) + ]) + + data = [ + # Setup controlled Test + ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21.000", "Good", "1.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 07:32:55.000", "Good", "2.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29.000", "Good", "3.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 15:39:03.000", "Good", "4.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 19:42:37.000", "Good", "5.0"), + #("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:11.000", "Good", "6.0"), # Test values + #("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "7.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "8.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "9.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "10.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:13:46.000", "Good", "11.0"), # Tolerance Test + ("A2PS64V0J.:ZUX09R", "2024-01-03 00:07:20.000", "Good", "10.0"), + #("A2PS64V0J.:ZUX09R", "2024-01-03 04:10:54.000", "Good", "9.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 08:14:28.000", "Good", "8.0"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:01:43", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:02:44", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:04:44", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:05:44", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:11:46", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:13:46", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:16:47", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:19:48", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:20:48", "Good", "4691.161621"), + ] + + df = spark.createDataFrame(data, schema=schema) + + missing_value_imputation = MissingValueImputation(spark, df) + imputed_df = missing_value_imputation.filter() + + print(imputed_df.show(imputed_df.count(), False)) + + ``` + + Parameters: + df (DataFrame): Dataframe containing the raw data. """ df: PySparkDataFrame def __init__( self, + spark: SparkSession, df: PySparkDataFrame ) -> None: + self.spark = spark self.df = df @staticmethod @@ -67,49 +123,8 @@ def settings() -> dict: return {} - def filter(self) -> PySparkDataFrame: - """ - Imputate missing values based on [] - """ - - if not all(col_ in self.df.columns for col_ in ["TagName", "EventTime", "Value"]): - raise ValueError("Columns not as expected") - - if not self._is_column_type(self.df, "EventTime", TimestampType): - format_1 = "yyyy-MM-dd HH:mm:ss.SSS" - format_2 = "dd.MM.yyyy HH:mm" - if self._is_column_type(self.df, "EventTime", StringType): - # Attempt to parse the first format, then fallback to the second - self.df = self.df.withColumn( - "EventTime", - F.coalesce( - F.to_timestamp("EventTime", "yyyy-MM-dd HH:mm:ss.SSS"), - F.to_timestamp("EventTime", "dd.MM.yyyy HH:mm:ss") - ) - ) - if not self._is_column_type(self.df, "Value", FloatType): - self.df = self.df.withColumn("Value", self.df["Value"].cast(FloatType())) - - dfs_by_source = self._split_by_source() - - imputed_dfs: List[PySparkDataFrame] = [] - - for source, df in dfs_by_source.items(): - # Compute, insert and flag all the missing entries - flagged_df = self._flag_missing_values(df) - - #print(flagged_df.show(flagged_df.count(), False)) # Current testing - - # Impute the missing values of flagged entries - #imputed_df_DS = self._impute_missing_values_DS(flagged_df) - imputed_df_SP = self._impute_missing_values_SP(flagged_df) - # TODO - #imputed_dfs.append(imputed_df) - - return self.df - - - def _impute_missing_values_SP(self, df) -> PySparkDataFrame: + @staticmethod + def _impute_missing_values_sp(df) -> PySparkDataFrame: # Imputes missing values by Spline Interpolation data = np.array(df.select("Value").rdd.flatMap(lambda x: x).collect(), dtype=float) @@ -141,7 +156,8 @@ def _impute_missing_values_SP(self, df) -> PySparkDataFrame: return imputed_df - def _impute_missing_values_DS(self, df) -> PySparkDataFrame: + @staticmethod + def _impute_missing_values_ds(df) -> PySparkDataFrame: # Impute missing values with Apache SystemDS functions: TODO value_array = np.array(df.select("Value").rdd.flatMap(lambda x: x).collect()).reshape(-1, 1) mask_array = np.array([0]) @@ -176,8 +192,9 @@ def _impute_missing_values_DS(self, df) -> PySparkDataFrame: return imputed_df - def _flag_missing_values(self, df) -> PySparkDataFrame: - + @staticmethod + def _flag_missing_values(df) -> PySparkDataFrame: + # Calculates interval and inserts empty records at missing timestamps with NaN values window_spec = Window.partitionBy("TagName").orderBy("EventTime") df = df.withColumn("prev_event_time", F.lag("EventTime").over(window_spec)) @@ -236,8 +253,55 @@ def generate_missing_timestamps(prev_event_time, event_time, tag_name): return df_combined + @staticmethod + def _is_column_type(df, column_name, data_type): + # Helper method for data type checking + type_ = df.schema[column_name] + + return isinstance(type_.dataType, data_type) + + + def filter(self) -> PySparkDataFrame: + """ + Imputate missing values based on [Spline Interpolation, ] + """ + if not all(col_ in self.df.columns for col_ in ["TagName", "EventTime", "Value"]): + raise ValueError("Columns not as expected") + + if not self._is_column_type(self.df, "EventTime", TimestampType): + if self._is_column_type(self.df, "EventTime", StringType): + # Attempt to parse the first format, then fallback to the second + self.df = self.df.withColumn( + "EventTime", + F.coalesce( + F.to_timestamp("EventTime", "yyyy-MM-dd HH:mm:ss.SSS"), + F.to_timestamp("EventTime", "dd.MM.yyyy HH:mm:ss") + ) + ) + if not self._is_column_type(self.df, "Value", FloatType): + self.df = self.df.withColumn("Value", self.df["Value"].cast(FloatType())) + + dfs_by_source = self._split_by_source() + + imputed_dfs: List[PySparkDataFrame] = [] + + for source, df in dfs_by_source.items(): + # Compute, insert and flag all the missing entries + flagged_df = self._flag_missing_values(df) + + #print(flagged_df.show(flagged_df.count(), False)) # Current testing + + # Impute the missing values of flagged entries + #imputed_df_DS = self._impute_missing_values_ds(flagged_df) + imputed_df_SP = self._impute_missing_values_sp(flagged_df) + # TODO + #imputed_dfs.append(imputed_df) + + return self.df + + def _split_by_source(self) -> dict: - # + # Helper method to separate individual time series based on their source tag_names = self.df.select("TagName").distinct().collect() tag_names = [row["TagName"] for row in tag_names] source_dict = {tag: self.df.filter(col("TagName") == tag).orderBy("EventTime") for tag in tag_names} @@ -245,8 +309,4 @@ def _split_by_source(self) -> dict: return source_dict - def _is_column_type(self, df, column_name, data_type): - # - type_ = df.schema[column_name] - return isinstance(type_.dataType, data_type) diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py index 4d2fd58d1..ca5246dd3 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py @@ -115,7 +115,7 @@ def test_missing_value_imputation(spark_session: SparkSession): df = spark_session.createDataFrame(data, schema=schema) - missing_value_imputation = MissingValueImputation(df) + missing_value_imputation = MissingValueImputation(spark_session, df) imputed_df = missing_value_imputation.filter() assert isinstance(imputed_df, DataFrame) From c9d104f393077efaa35f0fa6134fac0217423500 Mon Sep 17 00:00:00 2001 From: Christian Munz Date: Sun, 17 Nov 2024 02:06:47 +0100 Subject: [PATCH 4/8] unit tests missing value imputation Signed-off-by: Christian Munz --- .../data_quality/missing_value_imputation.py | 18 +- .../test_missing_value_imputation.py | 192 +++++++++++++++++- 2 files changed, 197 insertions(+), 13 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py index a5f5ef3f3..c51b75f42 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py @@ -292,12 +292,22 @@ def filter(self) -> PySparkDataFrame: #print(flagged_df.show(flagged_df.count(), False)) # Current testing # Impute the missing values of flagged entries - #imputed_df_DS = self._impute_missing_values_ds(flagged_df) - imputed_df_SP = self._impute_missing_values_sp(flagged_df) # TODO - #imputed_dfs.append(imputed_df) + #imputed_df_DS = self._impute_missing_values_ds(flagged_df) + imputed_df_sp = self._impute_missing_values_sp(flagged_df) + + imputed_df_sp = imputed_df_sp.withColumn("EventTime", col("EventTime").cast("string")) \ + .withColumn("Value", col("Value").cast("string")) + + #print(imputed_df_sp.show(imputed_df_sp.count(), False)) + + imputed_dfs.append(imputed_df_sp) + + result_df = imputed_dfs[0] + for df in imputed_dfs[1:]: + result_df = result_df.unionByName(df) - return self.df + return result_df def _split_by_source(self) -> dict: diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py index ca5246dd3..80956908a 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py @@ -15,6 +15,7 @@ from pyspark.sql import SparkSession from pyspark.sql.dataframe import DataFrame +from pyspark.sql.functions import col, unix_timestamp, abs as A from pyspark.sql.types import StructType, StructField, StringType @@ -37,8 +38,7 @@ def test_missing_value_imputation(spark_session: SparkSession): StructField("Value", StringType(), True) ]) - data = [ - # Setup controlled Test + test_data = [ ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21.000", "Good", "1.0"), ("A2PS64V0J.:ZUX09R", "2024-01-01 07:32:55.000", "Good", "2.0"), ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29.000", "Good", "3.0"), @@ -52,9 +52,9 @@ def test_missing_value_imputation(spark_session: SparkSession): ("A2PS64V0J.:ZUX09R", "2024-01-02 20:13:46.000", "Good", "11.0"), # Tolerance Test ("A2PS64V0J.:ZUX09R", "2024-01-03 00:07:20.000", "Good", "12.0"), #("A2PS64V0J.:ZUX09R", "2024-01-03 04:10:54.000", "Good", "13.0"), - ("A2PS64V0J.:ZUX09R", "2024-01-03 08:14:28.000", "Good", "14.0"), + #("A2PS64V0J.:ZUX09R", "2024-01-03 08:14:28.000", "Good", "14.0"), ("A2PS64V0J.:ZUX09R", "2024-01-03 12:18:02.000", "Good", "15.0"), - ("A2PS64V0J.:ZUX09R", "2024-01-03 16:21:36.000", "Good", "16.0"), + #("A2PS64V0J.:ZUX09R", "2024-01-03 16:21:36.000", "Good", "16.0"), ("A2PS64V0J.:ZUX09R", "2024-01-03 20:25:10.000", "Good", "17.0"), ("A2PS64V0J.:ZUX09R", "2024-01-04 00:28:44.000", "Good", "18.0"), ("A2PS64V0J.:ZUX09R", "2024-01-04 04:32:18.000", "Good", "19.0"), @@ -113,11 +113,185 @@ def test_missing_value_imputation(spark_session: SparkSession): ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:56:13", "Good", "4700.96582"), ] - df = spark_session.createDataFrame(data, schema=schema) + expected_data = [ + ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21", "Good", "1.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 07:32:55", "Good", "2.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29", "Good", "3.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 15:39:03", "Good", "4.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 19:42:37", "Good", "5.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:10", "Good", "6.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45", "Good", "7.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11", "Good", "8.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42", "Good", "9.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12", "Good", "10.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:13:46", "Good", "11.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 00:07:20", "Good", "12.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 04:10:50", "Good", "13.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 08:14:20", "Good", "14.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 12:18:02", "Good", "15.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 16:21:30", "Good", "16.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 20:25:10", "Good", "17.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-04 00:28:44", "Good", "18.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-04 04:32:18", "Good", "19.0"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:01:43", "Good", "4686.26"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:02:44", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:03:44", "Good", "4688.019"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:04:44", "Good", "4686.26"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:05:44", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:06:44", "Good", "4694.203"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:07:44", "Good", "4693.92"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:08:44", "Good", "4691.6475"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:09:44", "Good", "4688.722"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:10:44", "Good", "4686.481"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:11:46", "Good", "4686.26"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:12:46", "Good", "4688.637"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:13:46", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:14:46", "Good", "4691.4985"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:15:46", "Good", "4690.817"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:16:47", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:17:47", "Good", "4693.7354"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:18:47", "Good", "4696.372"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:19:48", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:20:48", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:21:48", "Good", "4684.8516"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:22:48", "Good", "4679.2305"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:23:48", "Good", "4675.784"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:24:48", "Good", "4675.998"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:25:50", "Good", "4681.358"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:26:50", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:27:50", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:28:50", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:29:50", "Good", "4691.056"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:30:50", "Good", "4694.813"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:31:51", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:32:52", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:33:52", "Good", "4685.6963"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:34:52", "Good", "4681.356"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:35:52", "Good", "4678.175"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:36:52", "Good", "4676.186"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:37:52", "Good", "4675.423"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:38:52", "Good", "4675.9185"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:39:52", "Good", "4677.707"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:40:52", "Good", "4680.8213"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:41:52", "Good", "4685.295"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:42:52", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:42:54", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:43:52", "Good", "4692.863"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:43:54", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:44:54", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:45:54", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:46:55", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:47:55", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:48:55", "Good", "4689.178"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:49:55", "Good", "4692.111"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:50:55", "Good", "4695.794"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:51:56", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:52:56", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:53:56", "Good", "4687.381"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:54:56", "Good", "4687.1104"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:55:57", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:56:58", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:57:58", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:58:58", "Good", "4693.161"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:59:59", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:00:59", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:01:59", "Good", "4688.2207"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:02:59", "Good", "4689.07"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:03:59", "Good", "4692.1904"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:05:01", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:06:01", "Good", "4699.3506"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:07:01", "Good", "4701.433"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:08:01", "Good", "4701.872"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:09:01", "Good", "4700.228"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:10:02", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:11:03", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:12:03", "Good", "4692.6973"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:13:06", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:14:06", "Good", "4695.113"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:15:06", "Good", "4691.5415"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:16:06", "Good", "4689.0054"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:17:07", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:18:07", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:19:07", "Good", "4688.7515"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:20:07", "Good", "4686.26"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:21:07", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:22:07", "Good", "4700.935"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:23:07", "Good", "4687.808"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:24:07", "Good", "4675.1323"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:25:09", "Good", "4676.456"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:26:09", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:27:09", "Good", "4708.868"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:28:09", "Good", "4711.2476"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:29:09", "Good", "4707.2603"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:30:09", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:31:09", "Good", "4695.7764"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:32:09", "Good", "4692.5146"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:33:09", "Good", "4691.358"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:34:09", "Good", "4692.482"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:35:10", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:36:10", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:37:10", "Good", "4702.4126"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:38:10", "Good", "4700.763"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:39:10", "Good", "4697.9897"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:40:11", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:41:11", "Good", "4696.747"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:42:11", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:43:11", "Good", "4705.8677"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:44:11", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:45:11", "Good", "4695.9624"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:46:11", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:47:11", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:48:11", "Good", "4702.187"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:49:11", "Good", "4699.401"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:50:11", "Good", "4695.0015"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:51:11", "Good", "4691.3823"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:52:11", "Good", "4690.9385"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:53:13", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:54:13", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:55:13", "Good", "4686.26"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:56:13", "Good", "4700.966"), + ] + + test_df = spark_session.createDataFrame(test_data, schema=schema) + expected_df = spark_session.createDataFrame(expected_data, schema=schema) + + missing_value_imputation = MissingValueImputation(spark_session, test_df) + actual_df = missing_value_imputation.filter() + + #print(actual_df.show(actual_df.count(), False)) + + assert isinstance(actual_df, DataFrame) + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + + def assert_dataframe_similar(expected_df, actual_df, tolerance=1e-4, time_tolerance_seconds=5): + + expected_df = expected_df.orderBy(["TagName", "EventTime"]) + actual_df = actual_df.orderBy(["TagName", "EventTime"]) + + expected_df = expected_df.withColumn("Value", col("Value").cast("float")) + actual_df = actual_df.withColumn("Value", col("Value").cast("float")) + + for expected_row, actual_row in zip(expected_df.collect(), actual_df.collect()): + for expected_val, actual_val, column_name in zip(expected_row, actual_row, expected_df.columns): + if column_name == "Value": + assert abs(expected_val - actual_val) < tolerance, f"Value mismatch: {expected_val} != {actual_val}" + elif column_name == "EventTime": + expected_event_time = unix_timestamp(col("EventTime")).cast("timestamp") + actual_event_time = unix_timestamp(col("EventTime")).cast("timestamp") + + time_diff = A(expected_event_time.cast("long") - actual_event_time.cast("long")) + condition = time_diff <= time_tolerance_seconds + + mismatched_rows = expected_df.join(actual_df, on=["TagName", "EventTime"], how="inner") \ + .filter(~condition) + + assert mismatched_rows.count() == 0, f"EventTime mismatch: {expected_val} != {actual_val} (tolerance: {time_tolerance_seconds}s)" + else: + assert expected_val == actual_val, f"Mismatch in column '{column_name}': {expected_val} != {actual_val}" + + assert_dataframe_similar(expected_df, actual_df, tolerance=1e-4) - missing_value_imputation = MissingValueImputation(spark_session, df) - imputed_df = missing_value_imputation.filter() - assert isinstance(imputed_df, DataFrame) - #TODO From f91de962feec7b1296d20857162a9045c5755ba3 Mon Sep 17 00:00:00 2001 From: Christian Munz Date: Mon, 18 Nov 2024 19:16:34 +0100 Subject: [PATCH 5/8] refactorings missing value imputation Signed-off-by: Christian Munz --- .../data_quality/missing_value_imputation.py | 84 +++++-------------- .../test_missing_value_imputation.py | 6 -- 2 files changed, 21 insertions(+), 69 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py index c51b75f42..3373439b2 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py @@ -17,8 +17,6 @@ from pyspark.sql.types import StringType, TimestampType, FloatType, ArrayType from pyspark.sql.window import Window from scipy.interpolate import UnivariateSpline -from systemds.context import SystemDSContext -from systemds.operator.algorithm import imputeByFD, imputeByMean import numpy as np from datetime import timedelta from typing import List @@ -93,6 +91,8 @@ def spark_session(): Parameters: df (DataFrame): Dataframe containing the raw data. + tolerance_percentage (int): Percentage value that indicates how much the time series data points may vary + in each interval """ df: PySparkDataFrame @@ -100,10 +100,12 @@ def spark_session(): def __init__( self, spark: SparkSession, - df: PySparkDataFrame + df: PySparkDataFrame, + tolerance_percentage: int = 5, ) -> None: self.spark = spark self.df = df + self.tolerance_percentage = tolerance_percentage @staticmethod def system_type(): @@ -125,21 +127,19 @@ def settings() -> dict: @staticmethod def _impute_missing_values_sp(df) -> PySparkDataFrame: - # Imputes missing values by Spline Interpolation + """ + Imputes missing values by Spline Interpolation + """ data = np.array(df.select("Value").rdd.flatMap(lambda x: x).collect(), dtype=float) - mask = np.isnan(data) x_data = np.arange(len(data)) y_data = data[~mask] - # Spline interpolation (cubic smoothing spline) spline = UnivariateSpline(x_data[~mask], y_data, s=0) - # Impute missing values data_imputed = data.copy() data_imputed[mask] = spline(x_data[mask]) - data_imputed_list = data_imputed.tolist() imputed_rdd = df.rdd.zipWithIndex().map(lambda row: Row( @@ -150,51 +150,15 @@ def _impute_missing_values_sp(df) -> PySparkDataFrame: )) imputed_df = imputed_rdd.toDF(df.schema) - #print("Imputed DataFrame:") - #print(imputed_df.show(imputed_df.count(), False)) - - return imputed_df - - - @staticmethod - def _impute_missing_values_ds(df) -> PySparkDataFrame: - # Impute missing values with Apache SystemDS functions: TODO - value_array = np.array(df.select("Value").rdd.flatMap(lambda x: x).collect()).reshape(-1, 1) - mask_array = np.array([0]) - - with SystemDSContext() as sds_context: - - X = sds_context.from_numpy(value_array) - mask = sds_context.from_numpy(mask_array) - - imputed_matrix_result = imputeByMean(X, mask) - - imputed_values: np.array(None) - - for node in imputed_matrix_result: - try: - imputed_values = node.compute() - except Exception as e: - print(e) - break - - imputed_values_list = imputed_values.tolist() - - imputed_rdd = df.rdd.zipWithIndex().map(lambda row: Row( - TagName=row[0][0], - EventTime=row[0][1], - Status=row[0][2], - Value=float(imputed_values_list[row[1]]) - )) - - imputed_df = imputed_rdd.toDF(df.schema) - return imputed_df @staticmethod - def _flag_missing_values(df) -> PySparkDataFrame: - # Calculates interval and inserts empty records at missing timestamps with NaN values + def _flag_missing_values(df, tolerance_percentage) -> PySparkDataFrame: + """ + Determines intervals of each respective source time series and inserts empty records at missing timestamps + with NaN values + """ window_spec = Window.partitionBy("TagName").orderBy("EventTime") df = df.withColumn("prev_event_time", F.lag("EventTime").over(window_spec)) @@ -206,7 +170,6 @@ def _flag_missing_values(df) -> PySparkDataFrame: most_frequent_interval = interval_counts.orderBy(F.desc("count")).first() expected_interval = most_frequent_interval["time_diff_seconds"] if most_frequent_interval else None - tolerance_percentage = 5 tolerance = (expected_interval * tolerance_percentage) / 100 if expected_interval else 0 existing_timestamps = df.select("TagName", "EventTime").rdd \ @@ -255,7 +218,9 @@ def generate_missing_timestamps(prev_event_time, event_time, tag_name): @staticmethod def _is_column_type(df, column_name, data_type): - # Helper method for data type checking + """ + Helper method for data type checking + """ type_ = df.schema[column_name] return isinstance(type_.dataType, data_type) @@ -286,21 +251,15 @@ def filter(self) -> PySparkDataFrame: imputed_dfs: List[PySparkDataFrame] = [] for source, df in dfs_by_source.items(): - # Compute, insert and flag all the missing entries - flagged_df = self._flag_missing_values(df) - - #print(flagged_df.show(flagged_df.count(), False)) # Current testing + # Determine, insert and flag all the missing entries + flagged_df = self._flag_missing_values(df, self.tolerance_percentage) # Impute the missing values of flagged entries - # TODO - #imputed_df_DS = self._impute_missing_values_ds(flagged_df) imputed_df_sp = self._impute_missing_values_sp(flagged_df) imputed_df_sp = imputed_df_sp.withColumn("EventTime", col("EventTime").cast("string")) \ .withColumn("Value", col("Value").cast("string")) - #print(imputed_df_sp.show(imputed_df_sp.count(), False)) - imputed_dfs.append(imputed_df_sp) result_df = imputed_dfs[0] @@ -311,12 +270,11 @@ def filter(self) -> PySparkDataFrame: def _split_by_source(self) -> dict: - # Helper method to separate individual time series based on their source + """ + Helper method to separate individual time series based on their source + """ tag_names = self.df.select("TagName").distinct().collect() tag_names = [row["TagName"] for row in tag_names] source_dict = {tag: self.df.filter(col("TagName") == tag).orderBy("EventTime") for tag in tag_names} return source_dict - - - diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py index 80956908a..bf626fbb5 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py @@ -18,7 +18,6 @@ from pyspark.sql.functions import col, unix_timestamp, abs as A from pyspark.sql.types import StructType, StructField, StringType - from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.missing_value_imputation import ( MissingValueImputation, ) @@ -258,8 +257,6 @@ def test_missing_value_imputation(spark_session: SparkSession): missing_value_imputation = MissingValueImputation(spark_session, test_df) actual_df = missing_value_imputation.filter() - #print(actual_df.show(actual_df.count(), False)) - assert isinstance(actual_df, DataFrame) assert expected_df.columns == actual_df.columns @@ -292,6 +289,3 @@ def assert_dataframe_similar(expected_df, actual_df, tolerance=1e-4, time_tolera assert expected_val == actual_val, f"Mismatch in column '{column_name}': {expected_val} != {actual_val}" assert_dataframe_similar(expected_df, actual_df, tolerance=1e-4) - - - From cd518871c6801ab9731a9da075ce611288a19333 Mon Sep 17 00:00:00 2001 From: Christian Munz Date: Mon, 18 Nov 2024 19:32:49 +0100 Subject: [PATCH 6/8] updated dependencies Signed-off-by: Christian Munz --- environment.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/environment.yml b/environment.yml index 59308eac2..67a5cabcd 100644 --- a/environment.yml +++ b/environment.yml @@ -85,5 +85,4 @@ dependencies: - eth-typing>=4.2.3,<5.0.0 - pandas>=1.5.2,<2.2.0 - moto[s3]>=5.0.16,<6.0.0 - - pyarrow>=14.0.1,<17.0.0 - - systemds>=3.0.0,<3.3.0 \ No newline at end of file + - pyarrow>=14.0.1,<17.0.0 \ No newline at end of file From 0a9fe2f7ad570266320749878ce033d2afb1d2d5 Mon Sep 17 00:00:00 2001 From: Christian Munz Date: Mon, 18 Nov 2024 19:43:40 +0100 Subject: [PATCH 7/8] refactorings Signed-off-by: Christian Munz --- src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py index ca8335aa2..2ed82eb4d 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py @@ -17,3 +17,4 @@ from .spark.data_quality.normalization.normalization_minmax import * from .spark.data_quality.normalization.normalization_zscore import * from .spark.data_quality.normalization.denormalization import * +from .spark.data_quality.missing_value_imputation import * From db7e33af0ea29bb5cf1c92bb265ea3532e0ab76d Mon Sep 17 00:00:00 2001 From: Christian Munz Date: Mon, 18 Nov 2024 20:08:48 +0100 Subject: [PATCH 8/8] reformatting Signed-off-by: Christian Munz --- .../data_quality/missing_value_imputation.py | 86 ++++++++++++------- .../test_missing_value_imputation.py | 67 ++++++++++----- 2 files changed, 102 insertions(+), 51 deletions(-) diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py index 3373439b2..d784787cc 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py @@ -124,13 +124,14 @@ def libraries(): def settings() -> dict: return {} - @staticmethod def _impute_missing_values_sp(df) -> PySparkDataFrame: """ Imputes missing values by Spline Interpolation """ - data = np.array(df.select("Value").rdd.flatMap(lambda x: x).collect(), dtype=float) + data = np.array( + df.select("Value").rdd.flatMap(lambda x: x).collect(), dtype=float + ) mask = np.isnan(data) x_data = np.arange(len(data)) @@ -142,17 +143,18 @@ def _impute_missing_values_sp(df) -> PySparkDataFrame: data_imputed[mask] = spline(x_data[mask]) data_imputed_list = data_imputed.tolist() - imputed_rdd = df.rdd.zipWithIndex().map(lambda row: Row( - TagName=row[0][0], - EventTime=row[0][1], - Status=row[0][2], - Value=float(data_imputed_list[row[1]]) - )) + imputed_rdd = df.rdd.zipWithIndex().map( + lambda row: Row( + TagName=row[0][0], + EventTime=row[0][1], + Status=row[0][2], + Value=float(data_imputed_list[row[1]]), + ) + ) imputed_df = imputed_rdd.toDF(df.schema) return imputed_df - @staticmethod def _flag_missing_values(df, tolerance_percentage) -> PySparkDataFrame: """ @@ -162,22 +164,38 @@ def _flag_missing_values(df, tolerance_percentage) -> PySparkDataFrame: window_spec = Window.partitionBy("TagName").orderBy("EventTime") df = df.withColumn("prev_event_time", F.lag("EventTime").over(window_spec)) - df = df.withColumn("time_diff_seconds", - (F.unix_timestamp("EventTime") - F.unix_timestamp("prev_event_time"))) + df = df.withColumn( + "time_diff_seconds", + (F.unix_timestamp("EventTime") - F.unix_timestamp("prev_event_time")), + ) df_diff = df.filter(F.col("time_diff_seconds").isNotNull()) interval_counts = df_diff.groupBy("time_diff_seconds").count() most_frequent_interval = interval_counts.orderBy(F.desc("count")).first() - expected_interval = most_frequent_interval["time_diff_seconds"] if most_frequent_interval else None + expected_interval = ( + most_frequent_interval["time_diff_seconds"] + if most_frequent_interval + else None + ) - tolerance = (expected_interval * tolerance_percentage) / 100 if expected_interval else 0 + tolerance = ( + (expected_interval * tolerance_percentage) / 100 if expected_interval else 0 + ) - existing_timestamps = df.select("TagName", "EventTime").rdd \ - .map(lambda row: (row["TagName"], row["EventTime"])).groupByKey().collectAsMap() + existing_timestamps = ( + df.select("TagName", "EventTime") + .rdd.map(lambda row: (row["TagName"], row["EventTime"])) + .groupByKey() + .collectAsMap() + ) def generate_missing_timestamps(prev_event_time, event_time, tag_name): # Check for first row - if prev_event_time is None or event_time is None or expected_interval is None: + if ( + prev_event_time is None + or event_time is None + or expected_interval is None + ): return [] # Check against existing timestamps to avoid duplicates @@ -196,26 +214,30 @@ def generate_missing_timestamps(prev_event_time, event_time, tag_name): return missing_timestamps - generate_missing_timestamps_udf = udf(generate_missing_timestamps, ArrayType(TimestampType())) + generate_missing_timestamps_udf = udf( + generate_missing_timestamps, ArrayType(TimestampType()) + ) df_with_missing = df.withColumn( "missing_timestamps", - generate_missing_timestamps_udf("prev_event_time", "EventTime", "TagName") + generate_missing_timestamps_udf("prev_event_time", "EventTime", "TagName"), ) df_missing_entries = df_with_missing.select( "TagName", F.explode("missing_timestamps").alias("EventTime"), F.lit("Good").alias("Status"), - F.lit(float('nan')).cast(FloatType()).alias("Value") + F.lit(float("nan")).cast(FloatType()).alias("Value"), ) - df_combined = df.select("TagName", "EventTime", "Status", "Value").union(df_missing_entries).orderBy( - "EventTime") + df_combined = ( + df.select("TagName", "EventTime", "Status", "Value") + .union(df_missing_entries) + .orderBy("EventTime") + ) return df_combined - @staticmethod def _is_column_type(df, column_name, data_type): """ @@ -225,12 +247,13 @@ def _is_column_type(df, column_name, data_type): return isinstance(type_.dataType, data_type) - def filter(self) -> PySparkDataFrame: """ Imputate missing values based on [Spline Interpolation, ] """ - if not all(col_ in self.df.columns for col_ in ["TagName", "EventTime", "Value"]): + if not all( + col_ in self.df.columns for col_ in ["TagName", "EventTime", "Value"] + ): raise ValueError("Columns not as expected") if not self._is_column_type(self.df, "EventTime", TimestampType): @@ -240,8 +263,8 @@ def filter(self) -> PySparkDataFrame: "EventTime", F.coalesce( F.to_timestamp("EventTime", "yyyy-MM-dd HH:mm:ss.SSS"), - F.to_timestamp("EventTime", "dd.MM.yyyy HH:mm:ss") - ) + F.to_timestamp("EventTime", "dd.MM.yyyy HH:mm:ss"), + ), ) if not self._is_column_type(self.df, "Value", FloatType): self.df = self.df.withColumn("Value", self.df["Value"].cast(FloatType())) @@ -257,8 +280,9 @@ def filter(self) -> PySparkDataFrame: # Impute the missing values of flagged entries imputed_df_sp = self._impute_missing_values_sp(flagged_df) - imputed_df_sp = imputed_df_sp.withColumn("EventTime", col("EventTime").cast("string")) \ - .withColumn("Value", col("Value").cast("string")) + imputed_df_sp = imputed_df_sp.withColumn( + "EventTime", col("EventTime").cast("string") + ).withColumn("Value", col("Value").cast("string")) imputed_dfs.append(imputed_df_sp) @@ -268,13 +292,15 @@ def filter(self) -> PySparkDataFrame: return result_df - def _split_by_source(self) -> dict: """ Helper method to separate individual time series based on their source """ tag_names = self.df.select("TagName").distinct().collect() tag_names = [row["TagName"] for row in tag_names] - source_dict = {tag: self.df.filter(col("TagName") == tag).orderBy("EventTime") for tag in tag_names} + source_dict = { + tag: self.df.filter(col("TagName") == tag).orderBy("EventTime") + for tag in tag_names + } return source_dict diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py index bf626fbb5..b45bb5e41 100644 --- a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py @@ -30,12 +30,14 @@ def spark_session(): def test_missing_value_imputation(spark_session: SparkSession): - schema = StructType([ - StructField("TagName", StringType(), True), - StructField("EventTime", StringType(), True), - StructField("Status", StringType(), True), - StructField("Value", StringType(), True) - ]) + schema = StructType( + [ + StructField("TagName", StringType(), True), + StructField("EventTime", StringType(), True), + StructField("Status", StringType(), True), + StructField("Value", StringType(), True), + ] + ) test_data = [ ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21.000", "Good", "1.0"), @@ -43,17 +45,22 @@ def test_missing_value_imputation(spark_session: SparkSession): ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29.000", "Good", "3.0"), ("A2PS64V0J.:ZUX09R", "2024-01-01 15:39:03.000", "Good", "4.0"), ("A2PS64V0J.:ZUX09R", "2024-01-01 19:42:37.000", "Good", "5.0"), - #("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:11.000", "Good", "6.0"), # Test values + # ("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:11.000", "Good", "6.0"), # Test values ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "7.0"), ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "8.0"), ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "9.0"), ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "10.0"), - ("A2PS64V0J.:ZUX09R", "2024-01-02 20:13:46.000", "Good", "11.0"), # Tolerance Test + ( + "A2PS64V0J.:ZUX09R", + "2024-01-02 20:13:46.000", + "Good", + "11.0", + ), # Tolerance Test ("A2PS64V0J.:ZUX09R", "2024-01-03 00:07:20.000", "Good", "12.0"), - #("A2PS64V0J.:ZUX09R", "2024-01-03 04:10:54.000", "Good", "13.0"), - #("A2PS64V0J.:ZUX09R", "2024-01-03 08:14:28.000", "Good", "14.0"), + # ("A2PS64V0J.:ZUX09R", "2024-01-03 04:10:54.000", "Good", "13.0"), + # ("A2PS64V0J.:ZUX09R", "2024-01-03 08:14:28.000", "Good", "14.0"), ("A2PS64V0J.:ZUX09R", "2024-01-03 12:18:02.000", "Good", "15.0"), - #("A2PS64V0J.:ZUX09R", "2024-01-03 16:21:36.000", "Good", "16.0"), + # ("A2PS64V0J.:ZUX09R", "2024-01-03 16:21:36.000", "Good", "16.0"), ("A2PS64V0J.:ZUX09R", "2024-01-03 20:25:10.000", "Good", "17.0"), ("A2PS64V0J.:ZUX09R", "2024-01-04 00:28:44.000", "Good", "18.0"), ("A2PS64V0J.:ZUX09R", "2024-01-04 04:32:18.000", "Good", "19.0"), @@ -262,7 +269,9 @@ def test_missing_value_imputation(spark_session: SparkSession): assert expected_df.columns == actual_df.columns assert expected_df.schema == actual_df.schema - def assert_dataframe_similar(expected_df, actual_df, tolerance=1e-4, time_tolerance_seconds=5): + def assert_dataframe_similar( + expected_df, actual_df, tolerance=1e-4, time_tolerance_seconds=5 + ): expected_df = expected_df.orderBy(["TagName", "EventTime"]) actual_df = actual_df.orderBy(["TagName", "EventTime"]) @@ -271,21 +280,37 @@ def assert_dataframe_similar(expected_df, actual_df, tolerance=1e-4, time_tolera actual_df = actual_df.withColumn("Value", col("Value").cast("float")) for expected_row, actual_row in zip(expected_df.collect(), actual_df.collect()): - for expected_val, actual_val, column_name in zip(expected_row, actual_row, expected_df.columns): + for expected_val, actual_val, column_name in zip( + expected_row, actual_row, expected_df.columns + ): if column_name == "Value": - assert abs(expected_val - actual_val) < tolerance, f"Value mismatch: {expected_val} != {actual_val}" + assert ( + abs(expected_val - actual_val) < tolerance + ), f"Value mismatch: {expected_val} != {actual_val}" elif column_name == "EventTime": - expected_event_time = unix_timestamp(col("EventTime")).cast("timestamp") - actual_event_time = unix_timestamp(col("EventTime")).cast("timestamp") + expected_event_time = unix_timestamp(col("EventTime")).cast( + "timestamp" + ) + actual_event_time = unix_timestamp(col("EventTime")).cast( + "timestamp" + ) - time_diff = A(expected_event_time.cast("long") - actual_event_time.cast("long")) + time_diff = A( + expected_event_time.cast("long") + - actual_event_time.cast("long") + ) condition = time_diff <= time_tolerance_seconds - mismatched_rows = expected_df.join(actual_df, on=["TagName", "EventTime"], how="inner") \ - .filter(~condition) + mismatched_rows = expected_df.join( + actual_df, on=["TagName", "EventTime"], how="inner" + ).filter(~condition) - assert mismatched_rows.count() == 0, f"EventTime mismatch: {expected_val} != {actual_val} (tolerance: {time_tolerance_seconds}s)" + assert ( + mismatched_rows.count() == 0 + ), f"EventTime mismatch: {expected_val} != {actual_val} (tolerance: {time_tolerance_seconds}s)" else: - assert expected_val == actual_val, f"Mismatch in column '{column_name}': {expected_val} != {actual_val}" + assert ( + expected_val == actual_val + ), f"Mismatch in column '{column_name}': {expected_val} != {actual_val}" assert_dataframe_similar(expected_df, actual_df, tolerance=1e-4)