Skip to content

Commit 2475b35

Browse files
vladimirg-dbcloud-fan
authored andcommitted
[SPARK-50665][SQL] Substitute LocalRelation with ComparableLocalRelation in NormalizePlan
### What changes were proposed in this pull request? Substitute `LocalRelation` with `ComparableLocalRelation` in `NormalizePlan`. `ComparableLocalRelation` has `Seq[Seq[Expression]]` instead of `Seq[InternalRow]`. The conversion happens through `Literal`s. ### Why are the changes needed? `LocalRelation`'s data field is incomparable if it contains maps, because `ArrayBasedMapData` doesn't define `equals`: apache#13847 ### Does this PR introduce _any_ user-facing change? No. This is to compare logical plans in the single-pass Analyzer. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? copilot.nvim. Closes apache#49287 from vladimirg-db/vladimirg-db/normalize-local-relation. Authored-by: Vladimir Golubev <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent a483dfd commit 2475b35

File tree

1 file changed

+33
-0
lines changed

1 file changed

+33
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/NormalizePlan.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.GetViewColumnByNameAndOrdinal
2121
import org.apache.spark.sql.catalyst.expressions._
2222
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
2323
import org.apache.spark.sql.catalyst.plans.logical._
24+
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
2425

2526
object NormalizePlan extends PredicateHelper {
2627
def apply(plan: LogicalPlan): LogicalPlan =
@@ -104,6 +105,8 @@ object NormalizePlan extends PredicateHelper {
104105
case Project(projectList, child) =>
105106
Project(normalizeProjectList(projectList), child)
106107
case c: KeepAnalyzedQuery => c.storeAnalyzedQuery()
108+
case localRelation: LocalRelation =>
109+
ComparableLocalRelation.fromLocalRelation(localRelation)
107110
}
108111
}
109112

@@ -134,3 +137,33 @@ object NormalizePlan extends PredicateHelper {
134137
case _ => condition // Don't reorder.
135138
}
136139
}
140+
141+
/**
142+
* A substitute for the [[LocalRelation]] that has comparable `data` field. [[LocalRelation]]'s
143+
* `data` is incomparable for maps, because [[ArrayBasedMapData]] doesn't define [[equals]].
144+
*/
145+
case class ComparableLocalRelation(
146+
override val output: Seq[Attribute],
147+
data: Seq[Seq[Expression]],
148+
override val isStreaming: Boolean,
149+
stream: Option[SparkDataStream]) extends LeafNode
150+
151+
object ComparableLocalRelation {
152+
def fromLocalRelation(localRelation: LocalRelation): ComparableLocalRelation = {
153+
val dataTypes = localRelation.output.map(_.dataType)
154+
ComparableLocalRelation(
155+
output = localRelation.output,
156+
data = localRelation.data.map { row =>
157+
if (row != null) {
158+
row.toSeq(dataTypes).zip(dataTypes).map {
159+
case (value, dataType) => Literal(value, dataType)
160+
}
161+
} else {
162+
Seq.empty
163+
}
164+
},
165+
isStreaming = localRelation.isStreaming,
166+
stream = localRelation.stream
167+
)
168+
}
169+
}

0 commit comments

Comments
 (0)