Skip to content

Commit c09b3ba

Browse files
BrendanWalshmhamilton723
authored andcommitted
chore: Adding Spark35 support
1 parent d9149d1 commit c09b3ba

File tree

45 files changed

+125
-92
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+125
-92
lines changed

README.md

+16
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,22 @@ In Microsoft Fabric notebooks SynapseML is already installed. To change the vers
112112

113113
In Azure Synapse notebooks please place the following in the first cell of your notebook.
114114

115+
- For Spark 3.5 Pools:
116+
117+
```bash
118+
%%configure -f
119+
{
120+
"name": "synapseml",
121+
"conf": {
122+
"spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.3",
123+
"spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
124+
"spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind",
125+
"spark.yarn.user.classpath.first": "true",
126+
"spark.sql.parquet.enableVectorizedReader": "false"
127+
}
128+
}
129+
```
130+
115131
- For Spark 3.4 Pools:
116132

117133
```bash

build.sbt

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import scala.xml.transform.{RewriteRule, RuleTransformer}
77
import scala.xml.{Node => XmlNode, NodeSeq => XmlNodeSeq, _}
88

99
val condaEnvName = "synapseml"
10-
val sparkVersion = "3.4.1"
10+
val sparkVersion = "3.5.0"
1111
name := "synapseml"
1212
ThisBuild / organization := "com.microsoft.azure"
1313
ThisBuild / scalaVersion := "2.12.17"
@@ -34,7 +34,7 @@ val extraDependencies = Seq(
3434
"com.jcraft" % "jsch" % "0.1.54",
3535
"org.apache.httpcomponents.client5" % "httpclient5" % "5.1.3",
3636
"org.apache.httpcomponents" % "httpmime" % "4.5.13",
37-
"com.linkedin.isolation-forest" %% "isolation-forest_3.4.2" % "3.0.4"
37+
"com.linkedin.isolation-forest" %% "isolation-forest_3.5.0" % "3.0.5"
3838
exclude("com.google.protobuf", "protobuf-java") exclude("org.apache.spark", "spark-mllib_2.12")
3939
exclude("org.apache.spark", "spark-core_2.12") exclude("org.apache.spark", "spark-avro_2.12")
4040
exclude("org.apache.spark", "spark-sql_2.12"),

cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/bing/BingImageSearch.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import org.apache.spark.injections.UDFUtils
1515
import org.apache.spark.ml.ComplexParamsReadable
1616
import org.apache.spark.ml.util._
1717
import org.apache.spark.sql.Row
18-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
18+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1919
import org.apache.spark.sql.functions.{col, explode}
2020
import org.apache.spark.sql.types._
2121
import spray.json.DefaultJsonProtocol._
@@ -44,7 +44,7 @@ object BingImageSearch extends ComplexParamsReadable[BingImageSearch] with Seria
4444
): Lambda = {
4545
Lambda({ df =>
4646
val outputSchema = df.schema.add(bytesCol, BinaryType, nullable = true)
47-
val encoder = RowEncoder(outputSchema)
47+
val encoder = ExpressionEncoder(outputSchema)
4848
df.toDF().mapPartitions { rows =>
4949
val futures = rows.map { row: Row =>
5050
(Future {

cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/openai/OpenAIPrompt.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import org.apache.http.entity.AbstractHttpEntity
1313
import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, ParamValidators}
1414
import org.apache.spark.ml.util.Identifiable
1515
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
16-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
16+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1717
import org.apache.spark.sql.functions.udf
1818
import org.apache.spark.sql.types.{DataType, StructType}
1919
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, functions => F, types => T}
@@ -119,7 +119,7 @@ class OpenAIPrompt(override val uid: String) extends Transformer
119119
} else {
120120
row
121121
}
122-
})(RowEncoder(df.schema))
122+
})(ExpressionEncoder(df.schema))
123123
}
124124

125125
override def transform(dataset: Dataset[_]): DataFrame = {

cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeakerEmotionInference.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.apache.http.entity.{AbstractHttpEntity, StringEntity}
1212
import org.apache.spark.ml.util.Identifiable
1313
import org.apache.spark.ml.{ComplexParamsReadable, NamespaceInjections, PipelineModel, Transformer}
1414
import org.apache.spark.sql.Row
15-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
15+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1616
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
1717
import org.apache.spark.sql.types.{DataType, StringType, StructType}
1818
import spray.json.DefaultJsonProtocol.StringJsonFormat
@@ -93,7 +93,7 @@ class SpeakerEmotionInference(override val uid: String)
9393
converter(row.getAs[Row](row.fieldIndex(getOutputCol)))
9494
)
9595
new GenericRowWithSchema((row.toSeq.dropRight(1) ++ Seq(ssml)).toArray, newSchema): Row
96-
})(RowEncoder({
96+
})(ExpressionEncoder({
9797
newSchema
9898
}))
9999
})

cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/SpeechToTextSDK.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.injections.SConf
2424
import org.apache.spark.ml.param._
2525
import org.apache.spark.ml.util._
2626
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
27-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
27+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2828
import org.apache.spark.sql.functions._
2929
import org.apache.spark.sql.types._
3030
import org.apache.spark.sql.{DataFrame, Dataset, Row}
@@ -400,7 +400,7 @@ abstract class SpeechSDKBase extends Transformer
400400
ArrayType(responseTypeBinding.schema)
401401
}
402402

403-
val enc = RowEncoder(enrichedDf.schema.add(getOutputCol, addedSchema))
403+
val enc = ExpressionEncoder(enrichedDf.schema.add(getOutputCol, addedSchema))
404404
val sc = df.sparkSession.sparkContext
405405
val bConf = sc.broadcast(new SConf(sc.hadoopConfiguration))
406406
val isUriAudio = df.schema(getAudioDataCol).dataType match {

cognitive/src/main/scala/com/microsoft/azure/synapse/ml/services/speech/TextToSpeech.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import org.apache.hadoop.io.{IOUtils => HUtils}
1515
import org.apache.spark.ml.param.{Param, ParamMap}
1616
import org.apache.spark.ml.util._
1717
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
18-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
18+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1919
import org.apache.spark.sql.types.StructType
2020
import org.apache.spark.sql.{DataFrame, Dataset, Row}
2121
import org.apache.spark.util.SerializableConfiguration
@@ -152,7 +152,7 @@ class TextToSpeech(override val uid: String)
152152
}
153153
Row.fromSeq(row.toSeq ++ Seq(errorRow))
154154
}.get
155-
}(RowEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema)))
155+
}(ExpressionEncoder(dataset.schema.add(getErrorCol, SpeechSynthesisError.schema)))
156156
}
157157

158158
override def copy(extra: ParamMap): Transformer = defaultCopy(extra)

core/src/main/scala/com/microsoft/azure/synapse/ml/core/env/PackageUtils.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ object PackageUtils {
2121
// Use a fixed version for local testing
2222
// val PackageMavenCoordinate = s"$PackageGroup:$PackageName:1.0.8"
2323

24-
private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.4.1"
24+
private val AvroCoordinate = "org.apache.spark:spark-avro_2.12:3.5.0"
2525
val PackageRepository: String = SparkMLRepository
2626

2727
// If testing onnx package with snapshots repo, make sure to switch to using

core/src/main/scala/com/microsoft/azure/synapse/ml/core/schema/SparkBindings.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ package com.microsoft.azure.synapse.ml.core.schema
55

66
import org.apache.spark.sql.Row
77
import org.apache.spark.sql.catalyst.InternalRow
8-
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
8+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
99
import org.apache.spark.sql.types.StructType
1010

1111
import scala.reflect.runtime.universe.TypeTag
@@ -14,7 +14,7 @@ abstract class SparkBindings[T: TypeTag] extends Serializable {
1414

1515
lazy val schema: StructType = enc.schema
1616
private lazy val enc: ExpressionEncoder[T] = ExpressionEncoder[T]().resolveAndBind()
17-
private lazy val rowEnc: ExpressionEncoder[Row] = RowEncoder(enc.schema).resolveAndBind()
17+
private lazy val rowEnc: ExpressionEncoder[Row] = ExpressionEncoder(enc.schema).resolveAndBind()
1818

1919
// WARNING: each time you use this function on a dataframe, you should make a new converter.
2020
// Spark does some magic that makes this leak memory if re-used on a

core/src/main/scala/com/microsoft/azure/synapse/ml/explainers/LIMEBase.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import org.apache.spark.ml.Transformer
1414
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
1515
import org.apache.spark.ml.linalg.Vector
1616
import org.apache.spark.ml.param._
17-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
17+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1818
import org.apache.spark.sql.expressions.UserDefinedFunction
1919
import org.apache.spark.sql.functions._
2020
import org.apache.spark.sql.types._
@@ -44,7 +44,7 @@ object LIMEUtils extends SLogging {
4444
case field if colsToSquish.contains(field.name) => StructField(field.name, ArrayType(field.dataType))
4545
case f => f
4646
})
47-
val encoder = RowEncoder(schema)
47+
val encoder = ExpressionEncoder(schema)
4848
val indiciesToSquish = colsToSquish.map(df.schema.fieldIndex)
4949
df.mapPartitions { it =>
5050
val isEmpty = it.isEmpty

core/src/main/scala/com/microsoft/azure/synapse/ml/featurize/text/MultiNGram.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import org.apache.spark.ml._
1212
import org.apache.spark.ml.feature._
1313
import org.apache.spark.ml.param._
1414
import org.apache.spark.ml.util._
15-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
15+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1616
import org.apache.spark.sql.types._
1717
import org.apache.spark.sql.{DataFrame, Dataset, Row}
1818

@@ -56,7 +56,7 @@ class MultiNGram(override val uid: String)
5656
.map(col => row.getAs[Seq[String]](col))
5757
.reduce(_ ++ _)
5858
Row.fromSeq(row.toSeq :+ mergedNGrams)
59-
}(RowEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType))))
59+
}(ExpressionEncoder(intermediateDF.schema.add(getOutputCol, ArrayType(StringType))))
6060
.drop(intermediateOutputCols: _*)
6161
}, dataset.columns.length)
6262
}

core/src/main/scala/com/microsoft/azure/synapse/ml/io/binary/BinaryFileReader.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import com.microsoft.azure.synapse.ml.core.schema.BinaryFileSchema
88
import com.microsoft.azure.synapse.ml.core.utils.AsyncUtils
99
import org.apache.commons.io.IOUtils
1010
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
11-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
11+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1212
import org.apache.spark.sql.types.BinaryType
1313
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
1414

@@ -85,7 +85,7 @@ object BinaryFileReader {
8585
timeout: Int
8686
): DataFrame = {
8787
val outputSchema = df.schema.add(bytesCol, BinaryType, nullable = true)
88-
val encoder = RowEncoder(outputSchema)
88+
val encoder = ExpressionEncoder(outputSchema)
8989
val hconf = ConfUtils.getHConf(df)
9090

9191
df.mapPartitions { rows =>

core/src/main/scala/com/microsoft/azure/synapse/ml/io/http/HTTPTransformer.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import org.apache.spark.injections.UDFUtils
1313
import org.apache.spark.ml.param._
1414
import org.apache.spark.ml.util.Identifiable
1515
import org.apache.spark.ml.{ComplexParamsReadable, ComplexParamsWritable, Transformer}
16-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
16+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1717
import org.apache.spark.sql.expressions.UserDefinedFunction
1818
import org.apache.spark.sql.types._
1919
import org.apache.spark.sql.{DataFrame, Dataset, Row}
@@ -118,7 +118,7 @@ class HTTPTransformer(val uid: String)
118118
override def transform(dataset: Dataset[_]): DataFrame = {
119119
logTransform[DataFrame]({
120120
val df = dataset.toDF()
121-
val enc = RowEncoder(transformSchema(df.schema))
121+
val enc = ExpressionEncoder(transformSchema(df.schema))
122122
val colIndex = df.schema.fieldNames.indexOf(getInputCol)
123123
val fromRow = HTTPRequestData.makeFromRowConverter
124124
val toRow = HTTPResponseData.makeToRowConverter

core/src/main/scala/com/microsoft/azure/synapse/ml/io/image/ImageUtils.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import org.apache.hadoop.fs.Path
1111
import org.apache.spark.ml.ImageInjections
1212
import org.apache.spark.ml.image.ImageSchema
1313
import org.apache.spark.sql.catalyst.InternalRow
14-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
14+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1515
import org.apache.spark.sql.{DataFrame, Row}
1616

1717
import java.awt.color.ColorSpace
@@ -117,7 +117,7 @@ object ImageUtils {
117117

118118
def readFromPaths(df: DataFrame, pathCol: String, imageCol: String = "image"): DataFrame = {
119119
val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema)
120-
val encoder = RowEncoder(outputSchema)
120+
val encoder = ExpressionEncoder(outputSchema)
121121
val hconf = ConfUtils.getHConf(df)
122122
df.mapPartitions { rows =>
123123
rows.map { row =>
@@ -133,7 +133,7 @@ object ImageUtils {
133133

134134
def readFromBytes(df: DataFrame, pathCol: String, bytesCol: String, imageCol: String = "image"): DataFrame = {
135135
val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema)
136-
val encoder = RowEncoder(outputSchema)
136+
val encoder = ExpressionEncoder(outputSchema)
137137
df.mapPartitions { rows =>
138138
rows.map { row =>
139139
val path = row.getAs[String](pathCol)
@@ -150,7 +150,7 @@ object ImageUtils {
150150
imageCol: String = "image",
151151
dropPrefix: Boolean = false): DataFrame = {
152152
val outputSchema = df.schema.add(imageCol, ImageSchema.columnSchema)
153-
val encoder = RowEncoder(outputSchema)
153+
val encoder = ExpressionEncoder(outputSchema)
154154
df.mapPartitions { rows =>
155155
rows.map { row =>
156156
val encoded = row.getAs[String](bytesCol)

core/src/main/scala/com/microsoft/azure/synapse/ml/stages/MiniBatchTransformer.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.param.TransformerParam
99
import org.apache.spark.ml.Transformer
1010
import org.apache.spark.ml.param._
1111
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
12-
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
12+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1313
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
1414
import org.apache.spark.sql.types._
1515
import org.apache.spark.sql.{DataFrame, Dataset, Row}
@@ -35,7 +35,7 @@ trait MiniBatchBase extends Transformer with DefaultParamsWritable with Wrappabl
3535
def transform(dataset: Dataset[_]): DataFrame = {
3636
logTransform[DataFrame]({
3737
val outputSchema = transformSchema(dataset.schema)
38-
implicit val outputEncoder: ExpressionEncoder[Row] = RowEncoder(outputSchema)
38+
implicit val outputEncoder: ExpressionEncoder[Row] = ExpressionEncoder(outputSchema)
3939
dataset.toDF().mapPartitions { it =>
4040
if (it.isEmpty) {
4141
it
@@ -215,7 +215,7 @@ class FlattenBatch(val uid: String)
215215
override def transform(dataset: Dataset[_]): DataFrame = {
216216
logTransform[DataFrame]({
217217
val outputSchema = transformSchema(dataset.schema)
218-
implicit val outputEncoder: ExpressionEncoder[Row] = RowEncoder(outputSchema)
218+
implicit val outputEncoder: ExpressionEncoder[Row] = ExpressionEncoder(outputSchema)
219219

220220
dataset.toDF().mapPartitions(it =>
221221
it.flatMap { rowOfLists =>

core/src/main/scala/com/microsoft/azure/synapse/ml/stages/PartitionConsolidator.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import com.microsoft.azure.synapse.ml.logging.{FeatureNames, SynapseMLLogging}
99
import org.apache.spark.ml.param._
1010
import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable}
1111
import org.apache.spark.ml.{ComplexParamsWritable, Transformer}
12-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
12+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1313
import org.apache.spark.sql.types._
1414
import org.apache.spark.sql.{DataFrame, Dataset, Row}
1515

@@ -39,7 +39,7 @@ class PartitionConsolidator(val uid: String)
3939
} else {
4040
Iterator()
4141
}
42-
}(RowEncoder(dataset.schema))
42+
}(ExpressionEncoder(dataset.schema))
4343
}, dataset.columns.length)
4444
}
4545

core/src/main/scala/com/microsoft/azure/synapse/ml/train/ComputeModelStatistics.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, Multiclas
1717
import org.apache.spark.mllib.linalg.{Matrices, Matrix}
1818
import org.apache.spark.rdd.RDD
1919
import org.apache.spark.sql._
20-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
20+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2121
import org.apache.spark.sql.functions._
2222
import org.apache.spark.sql.types._
2323

@@ -252,7 +252,7 @@ class ComputeModelStatistics(override val uid: String) extends Transformer
252252
confusionMatrix: Matrix,
253253
resultDF: DataFrame): DataFrame = {
254254
val schema = resultDF.schema.add(MetricConstants.ConfusionMatrix, SQLDataTypes.MatrixType)
255-
resultDF.map { row => Row.fromSeq(row.toSeq :+ confusionMatrix.asML) }(RowEncoder(schema))
255+
resultDF.map { row => Row.fromSeq(row.toSeq :+ confusionMatrix.asML) }(ExpressionEncoder(schema))
256256
}
257257

258258
private def selectAndCastToDF(dataset: Dataset[_],

core/src/main/scala/org/apache/spark/ml/source/image/PatchedImageFileFormat.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import org.apache.spark.internal.Logging
1313
import org.apache.spark.ml.image.ImageSchema
1414
import org.apache.spark.sql.SparkSession
1515
import org.apache.spark.sql.catalyst.InternalRow
16-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
16+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1717
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
1818
import org.apache.spark.sql.execution.datasources._
1919
import org.apache.spark.sql.sources._
@@ -118,7 +118,7 @@ class PatchedImageFileFormat extends ImageFileFormat with Serializable with Logg
118118
if (requiredSchema.isEmpty) {
119119
filteredResult.map(_ => emptyUnsafeRow)
120120
} else {
121-
val converter = RowEncoder(requiredSchema)
121+
val converter = ExpressionEncoder(requiredSchema)
122122
filteredResult.map(row => converter.createSerializer()(row))
123123
}
124124
}

core/src/main/scala/org/apache/spark/sql/execution/streaming/DistributedHTTPSource.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import com.microsoft.azure.synapse.ml.io.http.{HTTPRequestData, HTTPResponseData
77
import com.sun.net.httpserver.{HttpExchange, HttpHandler, HttpServer}
88
import org.apache.spark.internal.Logging
99
import org.apache.spark.sql._
10-
import org.apache.spark.sql.catalyst.encoders.RowEncoder
10+
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
1111
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
1212
import org.apache.spark.sql.execution.streaming.continuous.HTTPSourceV2
1313
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider, StreamSourceProvider}
@@ -218,7 +218,7 @@ class DistributedHTTPSource(name: String,
218218
private[spark] val infoSchema = new StructType()
219219
.add("machine", StringType).add("ip", StringType).add("id", StringType)
220220

221-
private[spark] val infoEnc = RowEncoder(infoSchema)
221+
private[spark] val infoEnc = ExpressionEncoder(infoSchema)
222222

223223
// Access point to run code on nodes through mapPartitions
224224
// TODO do this by hooking deeper into spark,
@@ -284,7 +284,7 @@ class DistributedHTTPSource(name: String,
284284
.map{ case (id, request) =>
285285
Row.fromSeq(Seq(Row(null, id, null), toRow(request))) //scalastyle:ignore null
286286
}.toIterator
287-
}(RowEncoder(HTTPSourceV2.Schema))
287+
}(ExpressionEncoder(HTTPSourceV2.Schema))
288288
}
289289

290290
override def commit(end: OffsetV2): Unit = synchronized {

0 commit comments

Comments
 (0)