diff --git a/environment.yml b/environment.yml index 193d7dedd..67a5cabcd 100644 --- a/environment.yml +++ b/environment.yml @@ -85,4 +85,4 @@ dependencies: - eth-typing>=4.2.3,<5.0.0 - pandas>=1.5.2,<2.2.0 - moto[s3]>=5.0.16,<6.0.0 - - pyarrow>=14.0.1,<17.0.0 + - pyarrow>=14.0.1,<17.0.0 \ No newline at end of file diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py index ca8335aa2..2ed82eb4d 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/__init__.py @@ -17,3 +17,4 @@ from .spark.data_quality.normalization.normalization_minmax import * from .spark.data_quality.normalization.normalization_zscore import * from .spark.data_quality.normalization.denormalization import * +from .spark.data_quality.missing_value_imputation import * diff --git a/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py new file mode 100644 index 000000000..d784787cc --- /dev/null +++ b/src/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/missing_value_imputation.py @@ -0,0 +1,306 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pyspark.sql import SparkSession, DataFrame as PySparkDataFrame, functions as F, Row +from pyspark.sql.functions import col, udf +from pyspark.sql.types import StringType, TimestampType, FloatType, ArrayType +from pyspark.sql.window import Window +from scipy.interpolate import UnivariateSpline +import numpy as np +from datetime import timedelta +from typing import List +from ...interfaces import WranglerBaseInterface +from ...._pipeline_utils.models import Libraries, SystemType + + +class MissingValueImputation(WranglerBaseInterface): + """ + Imputes missing values in a univariate time series creating a continuous curve of data points. For that, the + time intervals of each individual source is calculated, to then insert empty records at the missing timestamps with + NaN values. Through spline interpolation the missing NaN values are calculated resulting in a consistent data set + and thus enhance your data quality. + + Example + -------- + from pyspark.sql import SparkSession + from pyspark.sql.dataframe import DataFrame + from pyspark.sql.types import StructType, StructField, StringType + from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.missing_value_imputation import ( + MissingValueImputation, + ) + + @pytest.fixture(scope="session") + def spark_session(): + return SparkSession.builder.master("local[2]").appName("test").getOrCreate() + + spark = spark_session() + + schema = StructType([ + StructField("TagName", StringType(), True), + StructField("EventTime", StringType(), True), + StructField("Status", StringType(), True), + StructField("Value", StringType(), True) + ]) + + data = [ + # Setup controlled Test + ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21.000", "Good", "1.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 07:32:55.000", "Good", "2.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29.000", "Good", "3.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 15:39:03.000", "Good", "4.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 19:42:37.000", "Good", "5.0"), + #("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:11.000", "Good", "6.0"), # Test values + #("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "7.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "8.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "9.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "10.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:13:46.000", "Good", "11.0"), # Tolerance Test + ("A2PS64V0J.:ZUX09R", "2024-01-03 00:07:20.000", "Good", "10.0"), + #("A2PS64V0J.:ZUX09R", "2024-01-03 04:10:54.000", "Good", "9.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 08:14:28.000", "Good", "8.0"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:01:43", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:02:44", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:04:44", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:05:44", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:11:46", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:13:46", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:16:47", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:19:48", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:20:48", "Good", "4691.161621"), + ] + + df = spark.createDataFrame(data, schema=schema) + + missing_value_imputation = MissingValueImputation(spark, df) + imputed_df = missing_value_imputation.filter() + + print(imputed_df.show(imputed_df.count(), False)) + + ``` + + Parameters: + df (DataFrame): Dataframe containing the raw data. + tolerance_percentage (int): Percentage value that indicates how much the time series data points may vary + in each interval + """ + + df: PySparkDataFrame + + def __init__( + self, + spark: SparkSession, + df: PySparkDataFrame, + tolerance_percentage: int = 5, + ) -> None: + self.spark = spark + self.df = df + self.tolerance_percentage = tolerance_percentage + + @staticmethod + def system_type(): + """ + Attributes: + SystemType (Environment): Requires PYSPARK + """ + return SystemType.PYSPARK + + @staticmethod + def libraries(): + libraries = Libraries() + return libraries + + @staticmethod + def settings() -> dict: + return {} + + @staticmethod + def _impute_missing_values_sp(df) -> PySparkDataFrame: + """ + Imputes missing values by Spline Interpolation + """ + data = np.array( + df.select("Value").rdd.flatMap(lambda x: x).collect(), dtype=float + ) + mask = np.isnan(data) + + x_data = np.arange(len(data)) + y_data = data[~mask] + + spline = UnivariateSpline(x_data[~mask], y_data, s=0) + + data_imputed = data.copy() + data_imputed[mask] = spline(x_data[mask]) + data_imputed_list = data_imputed.tolist() + + imputed_rdd = df.rdd.zipWithIndex().map( + lambda row: Row( + TagName=row[0][0], + EventTime=row[0][1], + Status=row[0][2], + Value=float(data_imputed_list[row[1]]), + ) + ) + imputed_df = imputed_rdd.toDF(df.schema) + + return imputed_df + + @staticmethod + def _flag_missing_values(df, tolerance_percentage) -> PySparkDataFrame: + """ + Determines intervals of each respective source time series and inserts empty records at missing timestamps + with NaN values + """ + window_spec = Window.partitionBy("TagName").orderBy("EventTime") + + df = df.withColumn("prev_event_time", F.lag("EventTime").over(window_spec)) + df = df.withColumn( + "time_diff_seconds", + (F.unix_timestamp("EventTime") - F.unix_timestamp("prev_event_time")), + ) + + df_diff = df.filter(F.col("time_diff_seconds").isNotNull()) + interval_counts = df_diff.groupBy("time_diff_seconds").count() + most_frequent_interval = interval_counts.orderBy(F.desc("count")).first() + expected_interval = ( + most_frequent_interval["time_diff_seconds"] + if most_frequent_interval + else None + ) + + tolerance = ( + (expected_interval * tolerance_percentage) / 100 if expected_interval else 0 + ) + + existing_timestamps = ( + df.select("TagName", "EventTime") + .rdd.map(lambda row: (row["TagName"], row["EventTime"])) + .groupByKey() + .collectAsMap() + ) + + def generate_missing_timestamps(prev_event_time, event_time, tag_name): + # Check for first row + if ( + prev_event_time is None + or event_time is None + or expected_interval is None + ): + return [] + + # Check against existing timestamps to avoid duplicates + tag_timestamps = set(existing_timestamps.get(tag_name, [])) + missing_timestamps = [] + current_time = prev_event_time + + while current_time < event_time: + next_expected_time = current_time + timedelta(seconds=expected_interval) + time_diff = abs((next_expected_time - event_time).total_seconds()) + if time_diff <= tolerance: + break + if next_expected_time not in tag_timestamps: + missing_timestamps.append(next_expected_time) + current_time = next_expected_time + + return missing_timestamps + + generate_missing_timestamps_udf = udf( + generate_missing_timestamps, ArrayType(TimestampType()) + ) + + df_with_missing = df.withColumn( + "missing_timestamps", + generate_missing_timestamps_udf("prev_event_time", "EventTime", "TagName"), + ) + + df_missing_entries = df_with_missing.select( + "TagName", + F.explode("missing_timestamps").alias("EventTime"), + F.lit("Good").alias("Status"), + F.lit(float("nan")).cast(FloatType()).alias("Value"), + ) + + df_combined = ( + df.select("TagName", "EventTime", "Status", "Value") + .union(df_missing_entries) + .orderBy("EventTime") + ) + + return df_combined + + @staticmethod + def _is_column_type(df, column_name, data_type): + """ + Helper method for data type checking + """ + type_ = df.schema[column_name] + + return isinstance(type_.dataType, data_type) + + def filter(self) -> PySparkDataFrame: + """ + Imputate missing values based on [Spline Interpolation, ] + """ + if not all( + col_ in self.df.columns for col_ in ["TagName", "EventTime", "Value"] + ): + raise ValueError("Columns not as expected") + + if not self._is_column_type(self.df, "EventTime", TimestampType): + if self._is_column_type(self.df, "EventTime", StringType): + # Attempt to parse the first format, then fallback to the second + self.df = self.df.withColumn( + "EventTime", + F.coalesce( + F.to_timestamp("EventTime", "yyyy-MM-dd HH:mm:ss.SSS"), + F.to_timestamp("EventTime", "dd.MM.yyyy HH:mm:ss"), + ), + ) + if not self._is_column_type(self.df, "Value", FloatType): + self.df = self.df.withColumn("Value", self.df["Value"].cast(FloatType())) + + dfs_by_source = self._split_by_source() + + imputed_dfs: List[PySparkDataFrame] = [] + + for source, df in dfs_by_source.items(): + # Determine, insert and flag all the missing entries + flagged_df = self._flag_missing_values(df, self.tolerance_percentage) + + # Impute the missing values of flagged entries + imputed_df_sp = self._impute_missing_values_sp(flagged_df) + + imputed_df_sp = imputed_df_sp.withColumn( + "EventTime", col("EventTime").cast("string") + ).withColumn("Value", col("Value").cast("string")) + + imputed_dfs.append(imputed_df_sp) + + result_df = imputed_dfs[0] + for df in imputed_dfs[1:]: + result_df = result_df.unionByName(df) + + return result_df + + def _split_by_source(self) -> dict: + """ + Helper method to separate individual time series based on their source + """ + tag_names = self.df.select("TagName").distinct().collect() + tag_names = [row["TagName"] for row in tag_names] + source_dict = { + tag: self.df.filter(col("TagName") == tag).orderBy("EventTime") + for tag in tag_names + } + + return source_dict diff --git a/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py new file mode 100644 index 000000000..b45bb5e41 --- /dev/null +++ b/tests/sdk/python/rtdip_sdk/pipelines/data_wranglers/spark/data_quality/test_missing_value_imputation.py @@ -0,0 +1,316 @@ +# Copyright 2022 RTDIP +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import pytest + +from pyspark.sql import SparkSession +from pyspark.sql.dataframe import DataFrame +from pyspark.sql.functions import col, unix_timestamp, abs as A +from pyspark.sql.types import StructType, StructField, StringType + +from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_quality.missing_value_imputation import ( + MissingValueImputation, +) + + +@pytest.fixture(scope="session") +def spark_session(): + return SparkSession.builder.master("local[2]").appName("test").getOrCreate() + + +def test_missing_value_imputation(spark_session: SparkSession): + + schema = StructType( + [ + StructField("TagName", StringType(), True), + StructField("EventTime", StringType(), True), + StructField("Status", StringType(), True), + StructField("Value", StringType(), True), + ] + ) + + test_data = [ + ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21.000", "Good", "1.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 07:32:55.000", "Good", "2.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29.000", "Good", "3.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 15:39:03.000", "Good", "4.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 19:42:37.000", "Good", "5.0"), + # ("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:11.000", "Good", "6.0"), # Test values + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "7.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "8.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", "9.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", "10.0"), + ( + "A2PS64V0J.:ZUX09R", + "2024-01-02 20:13:46.000", + "Good", + "11.0", + ), # Tolerance Test + ("A2PS64V0J.:ZUX09R", "2024-01-03 00:07:20.000", "Good", "12.0"), + # ("A2PS64V0J.:ZUX09R", "2024-01-03 04:10:54.000", "Good", "13.0"), + # ("A2PS64V0J.:ZUX09R", "2024-01-03 08:14:28.000", "Good", "14.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 12:18:02.000", "Good", "15.0"), + # ("A2PS64V0J.:ZUX09R", "2024-01-03 16:21:36.000", "Good", "16.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 20:25:10.000", "Good", "17.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-04 00:28:44.000", "Good", "18.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-04 04:32:18.000", "Good", "19.0"), + # Real missing values + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:01:43", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:02:44", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:04:44", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:05:44", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:11:46", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:13:46", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:16:47", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:19:48", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:20:48", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:25:50", "Good", "4681.35791"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:26:50", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:27:50", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:28:50", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:31:51", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:32:52", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:42:52", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:42:54", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:43:54", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:44:54", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:45:54", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:46:55", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:47:55", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:51:56", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:52:56", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:55:57", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:56:58", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:57:58", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 00:59:59", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:00:59", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:05:01", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:10:02", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:11:03", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:13:06", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:17:07", "Good", "4691.161621"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:18:07", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:20:07", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:21:07", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:25:09", "Good", "4676.456055"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:26:09", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:30:09", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:35:10", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:36:10", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:40:11", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:42:11", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:43:11", "Good", "4705.867676"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:44:11", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:46:11", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:47:11", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:53:13", "Good", "4696.063477"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:54:13", "Good", "4700.96582"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:55:13", "Good", "4686.259766"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "31.12.2023 01:56:13", "Good", "4700.96582"), + ] + + expected_data = [ + ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21", "Good", "1.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 07:32:55", "Good", "2.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29", "Good", "3.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 15:39:03", "Good", "4.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 19:42:37", "Good", "5.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:10", "Good", "6.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45", "Good", "7.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11", "Good", "8.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42", "Good", "9.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12", "Good", "10.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-02 20:13:46", "Good", "11.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 00:07:20", "Good", "12.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 04:10:50", "Good", "13.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 08:14:20", "Good", "14.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 12:18:02", "Good", "15.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 16:21:30", "Good", "16.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-03 20:25:10", "Good", "17.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-04 00:28:44", "Good", "18.0"), + ("A2PS64V0J.:ZUX09R", "2024-01-04 04:32:18", "Good", "19.0"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:01:43", "Good", "4686.26"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:02:44", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:03:44", "Good", "4688.019"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:04:44", "Good", "4686.26"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:05:44", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:06:44", "Good", "4694.203"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:07:44", "Good", "4693.92"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:08:44", "Good", "4691.6475"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:09:44", "Good", "4688.722"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:10:44", "Good", "4686.481"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:11:46", "Good", "4686.26"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:12:46", "Good", "4688.637"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:13:46", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:14:46", "Good", "4691.4985"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:15:46", "Good", "4690.817"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:16:47", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:17:47", "Good", "4693.7354"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:18:47", "Good", "4696.372"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:19:48", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:20:48", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:21:48", "Good", "4684.8516"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:22:48", "Good", "4679.2305"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:23:48", "Good", "4675.784"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:24:48", "Good", "4675.998"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:25:50", "Good", "4681.358"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:26:50", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:27:50", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:28:50", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:29:50", "Good", "4691.056"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:30:50", "Good", "4694.813"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:31:51", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:32:52", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:33:52", "Good", "4685.6963"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:34:52", "Good", "4681.356"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:35:52", "Good", "4678.175"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:36:52", "Good", "4676.186"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:37:52", "Good", "4675.423"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:38:52", "Good", "4675.9185"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:39:52", "Good", "4677.707"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:40:52", "Good", "4680.8213"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:41:52", "Good", "4685.295"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:42:52", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:42:54", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:43:52", "Good", "4692.863"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:43:54", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:44:54", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:45:54", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:46:55", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:47:55", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:48:55", "Good", "4689.178"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:49:55", "Good", "4692.111"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:50:55", "Good", "4695.794"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:51:56", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:52:56", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:53:56", "Good", "4687.381"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:54:56", "Good", "4687.1104"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:55:57", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:56:58", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:57:58", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:58:58", "Good", "4693.161"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 00:59:59", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:00:59", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:01:59", "Good", "4688.2207"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:02:59", "Good", "4689.07"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:03:59", "Good", "4692.1904"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:05:01", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:06:01", "Good", "4699.3506"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:07:01", "Good", "4701.433"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:08:01", "Good", "4701.872"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:09:01", "Good", "4700.228"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:10:02", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:11:03", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:12:03", "Good", "4692.6973"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:13:06", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:14:06", "Good", "4695.113"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:15:06", "Good", "4691.5415"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:16:06", "Good", "4689.0054"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:17:07", "Good", "4691.1616"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:18:07", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:19:07", "Good", "4688.7515"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:20:07", "Good", "4686.26"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:21:07", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:22:07", "Good", "4700.935"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:23:07", "Good", "4687.808"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:24:07", "Good", "4675.1323"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:25:09", "Good", "4676.456"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:26:09", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:27:09", "Good", "4708.868"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:28:09", "Good", "4711.2476"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:29:09", "Good", "4707.2603"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:30:09", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:31:09", "Good", "4695.7764"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:32:09", "Good", "4692.5146"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:33:09", "Good", "4691.358"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:34:09", "Good", "4692.482"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:35:10", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:36:10", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:37:10", "Good", "4702.4126"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:38:10", "Good", "4700.763"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:39:10", "Good", "4697.9897"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:40:11", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:41:11", "Good", "4696.747"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:42:11", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:43:11", "Good", "4705.8677"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:44:11", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:45:11", "Good", "4695.9624"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:46:11", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:47:11", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:48:11", "Good", "4702.187"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:49:11", "Good", "4699.401"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:50:11", "Good", "4695.0015"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:51:11", "Good", "4691.3823"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:52:11", "Good", "4690.9385"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:53:13", "Good", "4696.0635"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:54:13", "Good", "4700.966"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:55:13", "Good", "4686.26"), + ("-4O7LSSAM_3EA02:2GT7E02I_R_MP", "2023-12-31 01:56:13", "Good", "4700.966"), + ] + + test_df = spark_session.createDataFrame(test_data, schema=schema) + expected_df = spark_session.createDataFrame(expected_data, schema=schema) + + missing_value_imputation = MissingValueImputation(spark_session, test_df) + actual_df = missing_value_imputation.filter() + + assert isinstance(actual_df, DataFrame) + + assert expected_df.columns == actual_df.columns + assert expected_df.schema == actual_df.schema + + def assert_dataframe_similar( + expected_df, actual_df, tolerance=1e-4, time_tolerance_seconds=5 + ): + + expected_df = expected_df.orderBy(["TagName", "EventTime"]) + actual_df = actual_df.orderBy(["TagName", "EventTime"]) + + expected_df = expected_df.withColumn("Value", col("Value").cast("float")) + actual_df = actual_df.withColumn("Value", col("Value").cast("float")) + + for expected_row, actual_row in zip(expected_df.collect(), actual_df.collect()): + for expected_val, actual_val, column_name in zip( + expected_row, actual_row, expected_df.columns + ): + if column_name == "Value": + assert ( + abs(expected_val - actual_val) < tolerance + ), f"Value mismatch: {expected_val} != {actual_val}" + elif column_name == "EventTime": + expected_event_time = unix_timestamp(col("EventTime")).cast( + "timestamp" + ) + actual_event_time = unix_timestamp(col("EventTime")).cast( + "timestamp" + ) + + time_diff = A( + expected_event_time.cast("long") + - actual_event_time.cast("long") + ) + condition = time_diff <= time_tolerance_seconds + + mismatched_rows = expected_df.join( + actual_df, on=["TagName", "EventTime"], how="inner" + ).filter(~condition) + + assert ( + mismatched_rows.count() == 0 + ), f"EventTime mismatch: {expected_val} != {actual_val} (tolerance: {time_tolerance_seconds}s)" + else: + assert ( + expected_val == actual_val + ), f"Mismatch in column '{column_name}': {expected_val} != {actual_val}" + + assert_dataframe_similar(expected_df, actual_df, tolerance=1e-4)