Skip to content

Commit

Permalink
[SPARK-29682][SQL] Resolve conflicting attributes in Expand correctly
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR addresses issues where conflicting attributes in `Expand` are not correctly handled.

### Why are the changes needed?

```Scala
val numsDF = Seq(1, 2, 3, 4, 5, 6).toDF("nums")
val cubeDF = numsDF.cube("nums").agg(max(lit(0)).as("agcol"))
cubeDF.join(cubeDF, "nums").show
```
fails with the following exception:
```
org.apache.spark.sql.AnalysisException:
Failure when resolving conflicting references in Join:
'Join Inner
:- Aggregate [nums#38, spark_grouping_id#36], [nums#38, max(0) AS agcol#35]
:  +- Expand [List(nums#3, nums#37, 0), List(nums#3, null, 1)], [nums#3, nums#38, spark_grouping_id#36]
:     +- Project [nums#3, nums#3 AS nums#37]
:        +- Project [value#1 AS nums#3]
:           +- LocalRelation [value#1]
+- Aggregate [nums#38, spark_grouping_id#36], [nums#38, max(0) AS agcol#58]
   +- Expand [List(nums#3, nums#37, 0), List(nums#3, null, 1)], [nums#3, nums#38, spark_grouping_id#36]
                                                                         ^^^^^^^
      +- Project [nums#3, nums#3 AS nums#37]
         +- Project [value#1 AS nums#3]
            +- LocalRelation [value#1]

Conflicting attributes: nums#38
```
As you can see from the above plan, `num#38`, the output of `Expand` on the right side of `Join`, should have been handled to produce new attribute. Since the conflict is not resolved in `Expand`, the failure is happening upstream at `Aggregate`. This PR addresses handling conflicting attributes in `Expand`.

### Does this PR introduce any user-facing change?

Yes, the previous example now shows the following output:
```
+----+-----+-----+
|nums|agcol|agcol|
+----+-----+-----+
|   1|    0|    0|
|   6|    0|    0|
|   4|    0|    0|
|   2|    0|    0|
|   5|    0|    0|
|   3|    0|    0|
+----+-----+-----+
```
### How was this patch tested?

Added new unit test.

Closes apache#26441 from imback82/spark-29682.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
imback82 authored and cloud-fan committed Nov 14, 2019
1 parent b5a02d3 commit e46e487
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,18 @@ class Analyzer(
val newOutput = oldVersion.generatorOutput.map(_.newInstance())
(oldVersion, oldVersion.copy(generatorOutput = newOutput))

case oldVersion: Expand
if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
val producedAttributes = oldVersion.producedAttributes
val newOutput = oldVersion.output.map { attr =>
if (producedAttributes.contains(attr)) {
attr.newInstance()
} else {
attr
}
}
(oldVersion, oldVersion.copy(output = newOutput))

case oldVersion @ Window(windowExpressions, _, _, child)
if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
.nonEmpty =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,8 @@ case class Expand(
override lazy val references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))

override def producedAttributes: AttributeSet = AttributeSet(output diff child.output)

// This operator can reuse attributes (for example making them null when doing a roll up) so
// the constraints of the child may no longer be valid.
override protected lazy val validConstraints: Set[Expression] = Set.empty[Expression]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3304,6 +3304,15 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
""".stripMargin).collect()
}
}

test("SPARK-29682: Conflicting attributes in Expand are resolved") {
val numsDF = Seq(1, 2, 3).toDF("nums")
val cubeDF = numsDF.cube("nums").agg(max(lit(0)).as("agcol"))

checkAnswer(
cubeDF.join(cubeDF, "nums"),
Row(1, 0, 0) :: Row(2, 0, 0) :: Row(3, 0, 0) :: Nil)
}
}

case class Foo(bar: Option[String])

0 comments on commit e46e487

Please sign in to comment.