From 43785de6e77323f15bba91fac94706e2eee151f1 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Sun, 26 Jan 2025 15:41:37 +0800 Subject: [PATCH] [SPARK-50925][ML][PYTHON][CONNECT] Support GeneralizedLinearRegression on Connect ### What changes were proposed in this pull request? Support GeneralizedLinearRegression on Connect ### Why are the changes needed? for feature parity ### Does this PR introduce _any_ user-facing change? yes, new algorithm supported on connect ### How was this patch tested? added test ### Was this patch authored or co-authored using generative AI tooling? no Closes #49673 from zhengruifeng/ml_connect_glr. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../services/org.apache.spark.ml.Estimator | 1 + .../services/org.apache.spark.ml.Transformer | 1 + .../GeneralizedLinearRegression.scala | 4 +- python/pyspark/ml/regression.py | 2 + python/pyspark/ml/tests/test_regression.py | 102 ++++++++++++++++++ .../apache/spark/sql/connect/ml/MLUtils.scala | 28 +++++ 6 files changed, 137 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator index 5d811598095b9..9c1a1f5a19a62 100644 --- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator +++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator @@ -28,6 +28,7 @@ org.apache.spark.ml.classification.GBTClassifier # regression org.apache.spark.ml.regression.LinearRegression +org.apache.spark.ml.regression.GeneralizedLinearRegression org.apache.spark.ml.regression.DecisionTreeRegressor org.apache.spark.ml.regression.RandomForestRegressor org.apache.spark.ml.regression.GBTRegressor diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer index d2a8d6036d4e1..3f1ae52aaaf60 100644 --- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer +++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer @@ -44,6 +44,7 @@ org.apache.spark.ml.classification.GBTClassificationModel # regression org.apache.spark.ml.regression.LinearRegressionModel +org.apache.spark.ml.regression.GeneralizedLinearRegressionModel org.apache.spark.ml.regression.DecisionTreeRegressionModel org.apache.spark.ml.regression.RandomForestRegressionModel org.apache.spark.ml.regression.GBTRegressionModel diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index dc0b553e2c91d..dea182902acec 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -1009,6 +1009,8 @@ class GeneralizedLinearRegressionModel private[ml] ( with GeneralizedLinearRegressionBase with MLWritable with HasTrainingSummary[GeneralizedLinearRegressionTrainingSummary] { + private[ml] def this() = this(Identifiable.randomUID("glm"), Vectors.empty, Double.NaN) + /** * Sets the link prediction (linear predictor) column name. * @@ -1182,7 +1184,7 @@ object GeneralizedLinearRegressionModel extends MLReadable[GeneralizedLinearRegr @Since("2.0.0") class GeneralizedLinearRegressionSummary private[regression] ( dataset: Dataset[_], - origModel: GeneralizedLinearRegressionModel) extends Serializable { + origModel: GeneralizedLinearRegressionModel) extends Summary with Serializable { import GeneralizedLinearRegression._ diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index a3ab3ea675579..85800518473db 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -2795,6 +2795,7 @@ class GeneralizedLinearRegressionSummary(JavaWrapper): @property @since("2.0.0") + @try_remote_attribute_relation def predictions(self) -> DataFrame: """ Predictions output by the model's `transform` method. @@ -2850,6 +2851,7 @@ def residualDegreeOfFreedomNull(self) -> int: """ return self._call_java("residualDegreeOfFreedomNull") + @try_remote_attribute_relation def residuals(self, residualsType: str = "deviance") -> DataFrame: """ Get the residuals of the fitted model by type. diff --git a/python/pyspark/ml/tests/test_regression.py b/python/pyspark/ml/tests/test_regression.py index 604157e65c31a..16a94a6e0f67d 100644 --- a/python/pyspark/ml/tests/test_regression.py +++ b/python/pyspark/ml/tests/test_regression.py @@ -25,6 +25,10 @@ from pyspark.ml.regression import ( LinearRegression, LinearRegressionModel, + GeneralizedLinearRegression, + GeneralizedLinearRegressionModel, + GeneralizedLinearRegressionSummary, + GeneralizedLinearRegressionTrainingSummary, LinearRegressionSummary, LinearRegressionTrainingSummary, DecisionTreeRegressor, @@ -163,6 +167,104 @@ def test_linear_regression(self): model2 = LinearRegressionModel.load(d) self.assertEqual(str(model), str(model2)) + def test_generalized_linear_regression(self): + spark = self.spark + df = ( + spark.createDataFrame( + [ + (1, 1.0, Vectors.dense(0.0, 0.0)), + (2, 1.0, Vectors.dense(1.0, 2.0)), + (3, 2.0, Vectors.dense(0.0, 0.0)), + (4, 2.0, Vectors.dense(1.0, 1.0)), + ], + ["index", "label", "features"], + ) + .coalesce(1) + .sortWithinPartitions("index") + .select("label", "features") + ) + + glr = GeneralizedLinearRegression( + family="gaussian", + link="identity", + linkPredictionCol="p", + ) + glr.setRegParam(0.1) + glr.setMaxIter(1) + self.assertEqual(glr.getFamily(), "gaussian") + self.assertEqual(glr.getLink(), "identity") + self.assertEqual(glr.getLinkPredictionCol(), "p") + self.assertEqual(glr.getRegParam(), 0.1) + self.assertEqual(glr.getMaxIter(), 1) + + model = glr.fit(df) + self.assertTrue(np.allclose(model.intercept, 1.543859649122807, atol=1e-4), model.intercept) + self.assertTrue( + np.allclose(model.coefficients.toArray(), [0.43859649, -0.35087719], atol=1e-4), + model.coefficients, + ) + self.assertEqual(model.numFeatures, 2) + + vec = Vectors.dense(1.0, 2.0) + pred = model.predict(vec) + self.assertTrue(np.allclose(pred, 1.280701754385965, atol=1e-4), pred) + + expected_cols = ["label", "features", "p", "prediction"] + + output = model.transform(df) + self.assertEqual(output.columns, expected_cols) + self.assertEqual(output.count(), 4) + + # Model summary + self.assertTrue(model.hasSummary) + + summary = model.summary + self.assertIsInstance(summary, GeneralizedLinearRegressionSummary) + self.assertIsInstance(summary, GeneralizedLinearRegressionTrainingSummary) + self.assertEqual(summary.numIterations, 1) + self.assertEqual(summary.numInstances, 4) + self.assertEqual(summary.rank, 3) + self.assertTrue( + np.allclose( + summary.tValues, + [0.3725037662281711, -0.49418209022924164, 2.6589353685797654], + atol=1e-4, + ), + summary.tValues, + ) + self.assertTrue( + np.allclose( + summary.pValues, + [0.7729938686180984, 0.707802691825973, 0.22900885781807023], + atol=1e-4, + ), + summary.pValues, + ) + self.assertEqual(summary.predictions.columns, expected_cols) + self.assertEqual(summary.predictions.count(), 4) + self.assertEqual(summary.residuals().columns, ["devianceResiduals"]) + self.assertEqual(summary.residuals().count(), 4) + + summary2 = model.evaluate(df) + self.assertIsInstance(summary2, GeneralizedLinearRegressionSummary) + self.assertNotIsInstance(summary2, GeneralizedLinearRegressionTrainingSummary) + self.assertEqual(summary2.numInstances, 4) + self.assertEqual(summary2.rank, 3) + self.assertEqual(summary.predictions.columns, expected_cols) + self.assertEqual(summary.predictions.count(), 4) + self.assertEqual(summary2.residuals().columns, ["devianceResiduals"]) + self.assertEqual(summary2.residuals().count(), 4) + + # Model save & load + with tempfile.TemporaryDirectory(prefix="generalized_linear_regression") as d: + glr.write().overwrite().save(d) + glr2 = GeneralizedLinearRegression.load(d) + self.assertEqual(str(glr), str(glr2)) + + model.write().overwrite().save(d) + model2 = GeneralizedLinearRegressionModel.load(d) + self.assertEqual(str(model), str(model2)) + def test_decision_tree_regressor(self): df = self.df diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala index d6e13d301c7e1..fbcbf8f3f2048 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala @@ -532,6 +532,34 @@ private[ml] object MLUtils { (classOf[BinaryLogisticRegressionSummary], Set("scoreCol")), // Regression Models + ( + classOf[GeneralizedLinearRegressionModel], + Set("intercept", "coefficients", "numFeatures", "evaluate")), + ( + classOf[GeneralizedLinearRegressionSummary], + Set( + "aic", + "degreesOfFreedom", + "deviance", + "dispersion", + "nullDeviance", + "numInstances", + "predictionCol", + "predictions", + "rank", + "residualDegreeOfFreedom", + "residualDegreeOfFreedomNull", + "residuals")), + ( + classOf[GeneralizedLinearRegressionTrainingSummary], + Set( + "numIterations", + "solver", + "tValues", + "pValues", + "coefficientStandardErrors", + "coefficientsWithStatistics", + "toString")), (classOf[LinearRegressionModel], Set("intercept", "coefficients", "scale", "evaluate")), ( classOf[LinearRegressionSummary],