Skip to content

Commit

Permalink
Merge pull request #42 from amosproj/feature/28_linear_regression
Browse files Browse the repository at this point in the history
Feature/28 linear regression
  • Loading branch information
mollle authored Nov 19, 2024
2 parents 59fc58d + a2c4f02 commit 9638ae5
Show file tree
Hide file tree
Showing 7 changed files with 569 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/machine_learning/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .spark.linear_regression import *
19 changes: 19 additions & 0 deletions src/sdk/python/rtdip_sdk/pipelines/machine_learning/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from abc import abstractmethod

from great_expectations.compatibility.pyspark import DataFrame

from ..interfaces import PipelineComponentBaseInterface


class MachineLearningInterface(PipelineComponentBaseInterface):
@abstractmethod
def __init__(self, df: DataFrame):
pass

@abstractmethod
def train(self):
return self

@abstractmethod
def predict(self, *args, **kwargs):
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright 2022 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pyspark.sql import DataFrame
import pyspark.ml as ml
from pyspark.ml.evaluation import RegressionEvaluator
from ..interfaces import MachineLearningInterface
from ..._pipeline_utils.models import Libraries, SystemType


class LinearRegression(MachineLearningInterface):
"""
This function uses pyspark.ml.LinearRegression to train a linear regression model on time data.
And the uses the model to predict next values in the time series.
Args:
df (pyspark.sql.Dataframe): DataFrame containing the features and labels.
features_col (str): Name of the column containing the features (the input). Default is 'features'.
label_col (str): Name of the column containing the label (the input). Default is 'label'.
prediction_col (str): Name of the column to which the prediction will be written. Default is 'prediction'.
Returns:
PySparkDataFrame: Returns the original PySpark DataFrame without changes.
"""

def __init__(
self,
df: DataFrame,
features_col: str = "features",
label_col: str = "label",
prediction_col: str = "prediction",
) -> None:
self.df = df
self.features_col = features_col
self.label_col = label_col
self.prediction_col = prediction_col

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def split_data(self, train_ratio: float = 0.8):
"""
Splits the dataset into training and testing sets.
Args:
train_ratio (float): The ratio of the data to be used for training. Default is 0.8 (80% for training).
Returns:
DataFrame: Returns the training and testing datasets.
"""
train_df, test_df = self.df.randomSplit([train_ratio, 1 - train_ratio], seed=42)
return train_df, test_df

def train(self, train_df: DataFrame):
"""
Trains a linear regression model on the provided data.
"""
linear_regression = ml.regression.LinearRegression(
featuresCol=self.features_col,
labelCol=self.label_col,
predictionCol=self.prediction_col,
)

self.model = linear_regression.fit(train_df)
return self

def predict(self, prediction_df: DataFrame):
"""
Predicts the next values in the time series.
"""

return self.model.transform(
prediction_df,
)

def evaluate(self, test_df: DataFrame):
"""
Evaluates the trained model using RMSE.
Args:
test_df (DataFrame): The testing dataset to evaluate the model.
Returns:
float: The Root Mean Squared Error (RMSE) of the model.
"""
# Check the columns of the test DataFrame
print(f"Columns in test_df: {test_df.columns}")
test_df.show(5)

if self.prediction_col not in test_df.columns:
print(
f"Error: '{self.prediction_col}' column is missing in the test DataFrame."
)
return None

# Evaluator for RMSE
evaluator_rmse = RegressionEvaluator(
labelCol=self.label_col,
predictionCol=self.prediction_col,
metricName="rmse",
)
rmse = evaluator_rmse.evaluate(test_df)

# Evaluator for R²
evaluator_r2 = RegressionEvaluator(
labelCol=self.label_col, predictionCol=self.prediction_col, metricName="r2"
)
r2 = evaluator_r2.evaluate(test_df)

return rmse, r2
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .cols_to_vector import *
from .polynomial_features import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pyspark.ml.feature import VectorAssembler
from pyspark.sql import DataFrame
from ...._pipeline_utils.models import Libraries, SystemType
from ...interfaces import TransformerInterface


class ColsToVector(TransformerInterface):
"""
Converts columns containing numbers to a column containing a vector.
Parameters:
df (DataFrame): PySpark DataFrame
input_cols (list[str]): List of columns to convert to a vector.
output_col (str): Name of the output column where the vector will be stored.
override_col (bool): If True, the output column can override an existing column.
"""

def __init__(
self,
df: DataFrame,
input_cols: list[str],
output_col: str,
override_col: bool = False,
) -> None:
self.input_cols = input_cols
self.output_col = output_col
self.override_col = override_col
self.df = df

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def pre_transform_validation(self):
if self.output_col in self.df.columns and not self.override_col:
return False
return True

def post_transform_validation(self):
return True

def transform(self):
if not self.pre_transform_validation():
raise ValueError(
f"Output column {self.output_col} already exists and override_col is set to False."
)

temp_col = (
f"{self.output_col}_temp" if self.output_col in self.df.columns else None
)
transformed_df = VectorAssembler(
inputCols=self.input_cols, outputCol=(temp_col or self.output_col)
).transform(self.df)

if temp_col:
return transformed_df.drop(self.output_col).withColumnRenamed(
temp_col, self.output_col
)
return transformed_df
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2024 RTDIP
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pyspark.ml as ml
from pyspark.sql import DataFrame

from ...._pipeline_utils.models import Libraries, SystemType
from ...interfaces import TransformerInterface


class PolynomialFeatures(TransformerInterface):
"""
This transformer takes a vector column and generates polynomial combinations of the input features
up to the specified degree. For example, if the input vector is [a, b] and degree=2,
the output features will be [a, b, a^2, ab, b^2].
Parameters:
df (DataFrame): PySpark DataFrame
input_col (str): Name of the input column in the DataFrame that contains the feature vectors
output_col (str):
poly_degree (int): The degree of the polynomial features to generate
override_col (bool): If True, the output column can override an existing column.
"""

def __init__(
self,
df: DataFrame,
input_col: str,
output_col: str,
poly_degree: int,
override_col: bool = False,
):
self.df = df
self.input_col = input_col
self.output_col = output_col
self.poly_degree = poly_degree
self.override_col = override_col

@staticmethod
def system_type():
"""
Attributes:
SystemType (Environment): Requires PYSPARK
"""
return SystemType.PYSPARK

@staticmethod
def libraries():
libraries = Libraries()
return libraries

@staticmethod
def settings() -> dict:
return {}

def pre_transform_validation(self):
if not (self.input_col in self.df.columns):
return False
if self.output_col not in self.df.columns and not self.override_col:
return False
return isinstance(self.df.schema[self.input_col].dataType, ml.linalg.VectorUDT)

def post_transform_validation(self):
return True

def transform(self):
temp_col = (
f"{self.output_col}_temp" if self.output_col in self.df.columns else None
)
transformed_df = ml.feature.PolynomialExpansion(
degree=self.poly_degree,
inputCol=self.input_col,
outputCol=(temp_col or self.output_col),
).transform(self.df)

if temp_col:
return transformed_df.drop(self.output_col).withColumnRenamed(
temp_col, self.output_col
)
return transformed_df
Loading

0 comments on commit 9638ae5

Please sign in to comment.