Skip to content

Commit 2b2a2a2

Browse files
aokolnychyidongjoon-hyun
authored andcommitted
[SPARK-54004][SQL] Fix uncaching table by name without cascading
### What changes were proposed in this pull request? This PR fixes uncaching table by name without cascading. ### Why are the changes needed? These changes are needed to invalidate data cache correctly. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with a test that previously failed. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52712 from aokolnychyi/spark-54004. Authored-by: Anton Okolnychyi <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 5059bab commit 2b2a2a2

File tree

3 files changed

+47
-1
lines changed

3 files changed

+47
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2121

2222
import org.apache.spark.internal.{Logging, MessageWithContext}
2323
import org.apache.spark.internal.LogKeys._
24+
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
2425
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
2526
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
2627
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
@@ -224,7 +225,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
224225
nameInCache.length == name.length && nameInCache.zip(name).forall(conf.resolver.tupled)
225226
}
226227

227-
plan match {
228+
EliminateSubqueryAliases(plan) match {
228229
case LogicalRelationWithTable(_, Some(catalogTable)) =>
229230
isSameName(catalogTable.identifier.nameParts)
230231

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import scala.collection.mutable.HashSet
2626
import scala.concurrent.duration._
2727

2828
import org.apache.spark.{CleanerListener, SparkRuntimeException}
29+
import org.apache.spark.SparkConf
2930
import org.apache.spark.executor.DataReadMethod._
3031
import org.apache.spark.executor.DataReadMethod.DataReadMethod
3132
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
@@ -34,6 +35,7 @@ import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
3435
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
3536
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, JoinStrategyHint, SHUFFLE_HASH}
3637
import org.apache.spark.sql.catalyst.util.DateTimeConstants
38+
import org.apache.spark.sql.connector.catalog.InMemoryCatalog
3739
import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, RDDScanExec, SparkPlan, SparkPlanInfo}
3840
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEPropagateEmptyRelation}
3941
import org.apache.spark.sql.execution.columnar._
@@ -57,6 +59,9 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
5759
with AdaptiveSparkPlanHelper {
5860
import testImplicits._
5961

62+
override def sparkConf: SparkConf = super.sparkConf
63+
.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName)
64+
6065
setupTestData()
6166

6267
override def afterEach(): Unit = {
@@ -1853,4 +1858,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
18531858
assert(!spark.catalog.tableExists("SPARK_52684"))
18541859
}
18551860
}
1861+
1862+
test("uncache DSv2 table via cache manager correctly uncaches views with logical plans") {
1863+
val t = "testcat.tbl"
1864+
withTable(t, "v") {
1865+
sql(s"CREATE TABLE $t (id int, data string) USING foo")
1866+
sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
1867+
1868+
// cache table
1869+
sql(s"CACHE TABLE $t")
1870+
assertCached(sql(s"SELECT * FROM $t"))
1871+
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, "a"), Row(2, "b")))
1872+
1873+
// create and cache view
1874+
spark.table(t).select("id").createOrReplaceTempView("v")
1875+
sql("SELECT * FROM v").cache()
1876+
assertCached(sql("SELECT * FROM v"))
1877+
checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2)))
1878+
1879+
// must invalidate only table, view must remain cached (cascade = false)
1880+
spark.sharedState.cacheManager.uncacheTableOrView(
1881+
spark,
1882+
Seq("testcat", "tbl"),
1883+
cascade = false)
1884+
assertNotCached(sql(s"SELECT * FROM $t"))
1885+
assertCached(sql("SELECT * FROM v"))
1886+
1887+
// must invalidate view (cascade = true)
1888+
spark.sharedState.cacheManager.uncacheTableOrView(
1889+
spark,
1890+
Seq("testcat", "tbl"),
1891+
cascade = true)
1892+
1893+
// verify view is not cached anymore
1894+
assertNotCached(sql("SELECT * FROM v"))
1895+
}
1896+
}
18561897
}

sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,10 @@ abstract class QueryTest extends PlanTest {
232232
s"level $storageLevel, but it doesn't.")
233233
}
234234

235+
def assertNotCached(query: Dataset[_]): Unit = {
236+
assertCached(query, numCachedTables = 0)
237+
}
238+
235239
/**
236240
* Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans.
237241
*/

0 commit comments

Comments
 (0)