Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hotfix/#53 inteval filtering datetime datatype column #72

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,27 @@
from datetime import timedelta

import pandas as pd
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame


from ...._pipeline_utils.models import Libraries, SystemType
from ...interfaces import WranglerBaseInterface


class IntervalFiltering(WranglerBaseInterface):
"""
Cleanses a DataFrame by removing rows outside a specified interval window.
Example:

Cleanses a DataFrame by removing rows outside a specified interval window. Supported time stamp columns are DateType and StringType.

Parameters:
spark (SparkSession): A SparkSession object.
df (DataFrame): PySpark DataFrame to be converted
interval (int): The interval length for cleansing.
interval_unit (str): 'hours', 'minutes', 'seconds' or 'milliseconds' to specify the unit of the interval.
interval_unit (str): 'hours', 'minutes', 'seconds' or 'milliseconds' to specify the unit of the interval.
time_stamp_column_name (str): The name of the column containing the time stamps. Default is 'EventTime'.
tolerance (int): The tolerance for the interval. Default is None.
"""

""" Default time stamp column name if not set in the constructor """
Expand All @@ -57,6 +59,68 @@ def __init__(
else:
self.time_stamp_column_name = time_stamp_column_name

def filter(self) -> DataFrame:
"""
Filters the DataFrame based on the interval
"""

if self.time_stamp_column_name not in self.df.columns:
raise ValueError(
f"Column {self.time_stamp_column_name} not found in the DataFrame."
)
is_string_time_stamp = isinstance(
self.df.schema[self.time_stamp_column_name].dataType, StringType
)

original_schema = self.df.schema
self.df = self.convert_column_to_timestamp().orderBy(
self.time_stamp_column_name
)

self.df.show()

tolerance_in_ms = None
if self.tolerance is not None:
tolerance_in_ms = self.get_time_delta(self.tolerance).total_seconds() * 1000
print(tolerance_in_ms)

time_delta_in_ms = self.get_time_delta(self.interval).total_seconds() * 1000

rows = self.df.collect()
last_time_stamp = rows[0][self.time_stamp_column_name]
first_row = rows[0].asDict()

first_row[self.time_stamp_column_name] = (
self.format_date_time_to_string(first_row[self.time_stamp_column_name])
if is_string_time_stamp
else first_row[self.time_stamp_column_name]
)

cleansed_df = [first_row]

for i in range(1, len(rows)):
current_row = rows[i]
current_time_stamp = current_row[self.time_stamp_column_name]

if self.check_if_outside_of_interval(
current_time_stamp, last_time_stamp, time_delta_in_ms, tolerance_in_ms
):
current_row_dict = current_row.asDict()
current_row_dict[self.time_stamp_column_name] = (
self.format_date_time_to_string(
current_row_dict[self.time_stamp_column_name]
)
if is_string_time_stamp
else current_row_dict[self.time_stamp_column_name]
)

cleansed_df.append(current_row_dict)
last_time_stamp = current_time_stamp

result_df = self.spark.createDataFrame(cleansed_df, schema=original_schema)

return result_df

@staticmethod
def system_type():
"""
Expand All @@ -82,6 +146,7 @@ def convert_column_to_timestamp(self) -> DataFrame:
except Exception as e:
raise ValueError(
f"Error converting column {self.time_stamp_column_name} to timestamp: {e}"
f"{self.df.schema[self.time_stamp_column_name].dataType} might be unsupported!"
)

def get_time_delta(self, value: int) -> timedelta:
Expand Down Expand Up @@ -121,54 +186,3 @@ def format_date_time_to_string(self, time_stamp: pd.Timestamp) -> str:
return time_stamp.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
except Exception as e:
raise ValueError(f"Error converting timestamp to string: {e}")

def filter(self) -> DataFrame:
"""
Filters the DataFrame based on the interval
"""

if self.time_stamp_column_name not in self.df.columns:
raise ValueError(
f"Column {self.time_stamp_column_name} not found in the DataFrame."
)

original_schema = self.df.schema
self.df = self.convert_column_to_timestamp().orderBy(
self.time_stamp_column_name
)

tolerance_in_ms = None
if self.tolerance is not None:
tolerance_in_ms = self.get_time_delta(self.tolerance).total_seconds() * 1000
print(tolerance_in_ms)

time_delta_in_ms = self.get_time_delta(self.interval).total_seconds() * 1000

rows = self.df.collect()
last_time_stamp = rows[0][self.time_stamp_column_name]
first_row = rows[0].asDict()
first_row[self.time_stamp_column_name] = self.format_date_time_to_string(
first_row[self.time_stamp_column_name]
)

cleansed_df = [first_row]

for i in range(1, len(rows)):
current_row = rows[i]
current_time_stamp = current_row[self.time_stamp_column_name]

if self.check_if_outside_of_interval(
current_time_stamp, last_time_stamp, time_delta_in_ms, tolerance_in_ms
):
current_row_dict = current_row.asDict()
current_row_dict[self.time_stamp_column_name] = (
self.format_date_time_to_string(
current_row_dict[self.time_stamp_column_name]
)
)
cleansed_df.append(current_row_dict)
last_time_stamp = current_time_stamp

result_df = self.spark.createDataFrame(cleansed_df, schema=original_schema)

return result_df
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@
import logging

import pandas

from pandas import DataFrame
from datetime import datetime

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType


class DataFrameLogHandler(logging.Handler):
"""
Expand All @@ -41,19 +38,9 @@ class DataFrameLogHandler(logging.Handler):
"""

logs_df: DataFrame = None
spark = None

def __init__(self):
self.spark = SparkSession.builder.appName("Dataframe Log Handler").getOrCreate()
df_schema = StructType(
[
StructField("timestamp", DateType(), True),
StructField("name", StringType(), True),
StructField("level", StringType(), True),
StructField("message", StringType(), True),
]
)
self.logs_df = self.spark.createDataFrame([], schema=df_schema)
self.logs_df = DataFrame(columns=["timestamp", "name", "level", "message"])
super().__init__()

def emit(self, record: logging.LogRecord) -> None:
Expand All @@ -65,8 +52,10 @@ def emit(self, record: logging.LogRecord) -> None:
"message": record.msg,
}

new_log_df_row = self.spark.createDataFrame([log_entry])
self.logs_df = self.logs_df.union(new_log_df_row)
new_log_df_row = pandas.DataFrame(
log_entry, columns=["timestamp", "name", "level", "message"], index=[0]
)
self.logs_df = pandas.concat([self.logs_df, new_log_df_row], ignore_index=True)

def get_logs_as_df(self) -> DataFrame:
return self.logs_df
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime

import pytest

from pyspark.sql import SparkSession
Expand All @@ -24,6 +26,10 @@ def spark_session():
return SparkSession.builder.master("local[2]").appName("test").getOrCreate()


def convert_to_datetime(date_time: str):
return datetime.strptime(date_time, "%Y-%m-%d %H:%M:%S.%f")


def test_interval_detection_easy(spark_session: SparkSession):
expected_df = spark_session.createDataFrame(
[
Expand Down Expand Up @@ -302,3 +308,31 @@ def test_interval_tolerance(spark_session: SparkSession):
assert expected_df.columns == actual_df.columns
assert expected_df.schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()


def test_interval_detection_date_time_columns(spark_session: SparkSession):
expected_df = spark_session.createDataFrame(
[
("A2PS64V0JR", convert_to_datetime("2024-01-02 20:03:46.000")),
("A2PS64asd.:ZUX09R", convert_to_datetime("2024-01-02 21:06:46.000")),
("A2PS64V0J.:ZUasdX09R", convert_to_datetime("2024-01-02 23:03:46.035")),
],
["TagName", "EventTime"],
)
df = spark_session.createDataFrame(
[
("A2PS64V0JR", convert_to_datetime("2024-01-02 20:03:46.000")),
("A2PS64asd.:ZUX09R", convert_to_datetime("2024-01-02 21:06:46.000")),
("A2PS64V0J.:ZUX09R", convert_to_datetime("2024-01-02 21:09:45.999")),
("A2PS64asd.:ZUX09R", convert_to_datetime("2024-01-02 21:12:46.030")),
("A2PS64V0J.:ZUasdX09R", convert_to_datetime("2024-01-02 23:03:46.035")),
],
["TagName", "EventTime"],
)

interval_filtering_wrangler = IntervalFiltering(spark_session, df, 1, "hours")
actual_df = interval_filtering_wrangler.filter()

assert expected_df.columns == actual_df.columns
assert expected_df.schema == actual_df.schema
assert expected_df.collect() == actual_df.collect()
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import os

import pytest
from pyspark.sql import DataFrame
from pandas import DataFrame
from pyspark.sql import SparkSession

from src.sdk.python.rtdip_sdk.pipelines.logging.spark.runtime_log_collector import (
Expand All @@ -38,8 +38,8 @@ def spark():
spark.stop()


def test_logger_manager_basic_function(spark):
df = spark.createDataFrame([(1, "2024-02-11 00:00:00.000")], ["Index", "EventTime"])
def test_logger_manager_basic_function():
df = DataFrame()
monitor = IdentifyMissingDataInterval(
df=df,
interval="10s",
Expand Down Expand Up @@ -78,7 +78,7 @@ def test_df_output(spark, caplog):

result_df = log_collector.get_logs_as_df()

assert result_df.count() == 6
assert result_df.shape[0] == 6


def test_file_logging(spark, caplog):
Expand Down