-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52823][SQL] Support DSv2 Join pushdown for Oracle connector #51519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-52823][SQL] Support DSv2 Join pushdown for Oracle connector #51519
Conversation
...ests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/JDBCJoinPushdownIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCPushdownTestUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcSQLQueryBuilder.scala
Outdated
Show resolved
Hide resolved
connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2JoinPushdownSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCJoinPushdownIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCJoinPushdownIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCJoinPushdownIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCJoinPushdownIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
|
||
protected def caseConvert(tableName: String): String = tableName | ||
|
||
protected def withConnection[T](f: Connection => T): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this suite more generic and to decouple it from JDBC to make it reusable by other non JDBC connectors?
Maybe some class hieararchy like:
JoinPushdownIntegrationSuiteBase
JDBCJoinPushdownIntegrationSuiteBase extends JoinPushdownIntegrationSuiteBase
OracleJoinPushdownIntegrationSuiteBase extends JDBCJoinPushdownIntegrationSuiteBase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do it in separate PR if it's fine with you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am ok with that, that cause a little bit extra work for reviewers, but that is fine. Just have in mind we need another layer of abstraction, so we can easier do a followup.
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCJoinPushdownIntegrationSuite.scala
Outdated
Show resolved
Hide resolved
d5cf5f1
to
e6bdb53
Compare
override val url = s"jdbc:h2:${tempDir.getCanonicalPath};user=testUser;password=testPass" | ||
|
||
override val catalogName: String = "h2" | ||
override val namespaceOpt: Option[String] = Some("test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why it's an opt? most dialects must have a schema, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've followed similar design as V2JDBCTest
which has namespaceOpt
as Option. For example, MsSqlServerIntegrationSuite
and MySQLIntegrationSuite
don't override it.
I guess these tests will still work if we add a schema, I just wanted to be consistent with V2JDBCTest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is maybe personal preference, but I like to explicitly put schema actually. It is a little bit confusing to me we have tests where we use just catalog.table
, because it is unintuitive we use 2 part identifier for catalog.table
instead of schema.table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have any preference here. I can make schema to not be an Option if it's a red flag here.
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownSuite.scala
Outdated
Show resolved
Hide resolved
...ore/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
Outdated
Show resolved
Hide resolved
...ore/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
Outdated
Show resolved
Hide resolved
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + | ||
s"$catalogAndNamespace.${caseConvert(joinTableName1)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can provide here sequence of relations instead of doing string concat, it would be better fit for your signature.
Also, can you name parameter here?
s"$catalogAndNamespace.${caseConvert(joinTableName1)}, " + | |
s"$catalogAndNamespace.${caseConvert(joinTableName1)}" | |
expectedTables = Seq( | |
s"$catalogAndNamespace.${caseConvert(joinTableName1)}", | |
s"$catalogAndNamespace.${caseConvert(joinTableName1)}" | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really? If you have duplicate tables, like in my case, it's not asserted that it is shown 2 times in explain.
With string, we are doing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to have same validation, but to make more friendly API, which accepts sequence of relations. We can still check whether all of those are in explain output. And, also, we can verify ScanBuilderHolder
members easier than with arbitrary input string.
withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { | ||
val df = sql(sqlQuery) | ||
|
||
checkJoinPushed( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We did not check whether column pruning actually happened, that can be done by checking leaf relation output
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done now. UUIDs are used in column names so it's kind of hard to add expected schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, ahahha, thanks
...ore/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
Show resolved
Hide resolved
...ore/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
Outdated
Show resolved
Hide resolved
d61b609
to
f26b1a1
Compare
...ts/src/test/scala/org/apache/spark/sql/jdbc/v2/join/OracleJoinPushdownIntegrationSuite.scala
Show resolved
Hide resolved
...ore/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
Outdated
Show resolved
Hide resolved
6b54dec
to
7d6f701
Compare
2883d33
to
ca3be2d
Compare
3a56763
to
cf1275e
Compare
if (f2.name.nonEmpty) { | ||
assert(f1.name == f2.name) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there some better way to do this? It is very uncommon implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally I would do dfSchema.equals(schema)
but I can't do it since dfSchema will have random names.
I don't see a nicer way for doing this. We can remove the name
check though so it is cleaner.
protected val supportsSamplePushdown: Boolean = true | ||
|
||
protected val supportsFilterPushdown: Boolean = true | ||
|
||
protected val supportsLimitPushdown: Boolean = true | ||
|
||
protected val supportsAggregatePushdown: Boolean = true | ||
|
||
protected val supportsSortPushdown: Boolean = true | ||
|
||
protected val supportsOffsetPushdown: Boolean = true | ||
|
||
protected val supportsColumnPruning: Boolean = true | ||
|
||
protected val supportsJoinPushdown: Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are introducing a state for these util methods. Can we avoid that?
If we can make util methods stateless, that would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it is not stateless since we are extending ExplainSuiteHelper
, right? So I don't really think this is problematic. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it seems we only have util methods here?
https://github.com/urosstan-db/spark/blob/1570206f58bc0858e8936b42df8f7c9b34b661c2/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala#L33
I would like to make it simpler, methods should check whether filter is removed or not, while in derived suites we can call them using:
checkFilterPushed(df, pushed = pushdownSupported && pushdownEnabled)
It is beneficial to check whether filters are still in plan when pushdown is not supported, sometimes pushdown rules/strategies just swallow nodes from plan making correctness issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better now? I moved these into JDBCV2JoinPushdownIntegrationSuiteBase
. I still don't agree that ExplainSuite is stateless because if you go level deeper you will see states. It's not really pure util class.
It is beneficial to check whether filters are still in plan when pushdown is not supported
However, I agree with this, so I changed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't agree that ExplainSuite is stateless because if you go level deeper you will see states
Good catch, I did not see those ones. Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make DataSourcePushdownTestUtils stateless? It is not hard blocker, just to avoid accidental merge before comment resolution 😄
|
||
protected val supportsColumnPruning: Boolean = true | ||
|
||
protected val supportsJoinPushdown: Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we have JdbcDialect
instance in this suite, we can get these supportsXYZ
values from it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have supportsLimit
, supportsOffset
, and supportsJoin
. Others don't exist. I would leave it as it is as it can confuse someone why other variables don't use dialect.
thanks, merging to master! |
### What changes were proposed in this pull request? In #50921, Join pushdown was added for DSv2. In #51519, testing has been changed to be extensible for all the dialects. With this PR, I am enabling DSv2 join pushdown for Postgres connector as well. For this purpose, `PostgresDialect` has now `supportsJoin` equal to true. Also, inherited `JDBCV2JoinPushdownIntegrationSuiteBase` to test Postgres connector. ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### Does this PR introduce _any_ user-facing change? Inner joins will be pushed down to Postgres data source only if `spark.sql.optimizer.datasourceV2JoinPushdown` SQL conf is set to true. Currently, the default value is false. Previously, Spark SQL query ``` SELECT tbl1.id, tbl1.name, tbl2.id FROM postgresCatalog.tbl1 t1 JOIN postgresCatalog.tbl2 t2 ON t1.id = t2.id + 1 ``` would produce the following Optimized plan: ``` == Optimized Logical Plan == Join Inner, (id#0 = (id#1 + 1)) :- Filter isnotnull(id#0) : +- RelationV2[id#0] postgresCatalog.tbl1 +- Filter isnotnull(id#1, name#2) +- RelationV2[id#1, name#2] postgresCatalog.tbl2 ``` Now, with join pushdown enabled, the plan would be: ``` Project [ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c#3 AS id#0, ID#4 AS id#1, NAME#5 AS name#2] +- RelationV2[ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c#3, ID#4, NAME#5] postgresCatalog.tbl1 ``` When join is pushed down, the physical plan will contain `PushedJoins` information, which is the array of all the tables joined. For example, in the above case it would be: ``` PushedJoins: [postgresCatalog.tbl1, postgresCatalog.tbl2] ``` The generated SQL query would be: ``` SELECT "ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c", "ID", "NAME" FROM ( SELECT "ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c", "ID", "NAME" FROM ( SELECT "ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c", "ID", "NAME" FROM ( SELECT "ID" AS "ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c", "NAME" FROM "SYSTEM"."TBL1" WHERE ("ID" IS NOT NULL) ) join_subquery_4 INNER JOIN ( SELECT "ID" FROM "SYSTEM"."TBL2" WHERE ("ID" IS NOT NULL) ) join_subquery_5 ON "ID_974bb0c2_a32c_4d5b_b6ee_745efa1f3a0c" = "ID" ) ) SPARK_GEN_SUBQ_30 ``` ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? Closes #51594 from PetarVasiljevic-DB/support_join_for_postgres. Authored-by: Petar Vasiljevic <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
In #50921, Join pushdown was added for DSv2 and it was only enabled for H2 dialect.
With this PR, I am enabling DSv2 join pushdown for Oracle connector as well.
For this purpose,
OracleDialect
has nowsupportsJoin
equal to true.Also, changed SQL query generation to use
tableOrQuery
method instead ofoptions.tableOrQuery
.The rest of the change is test only:
V2JDBCTest
to new traitV2JDBCPushdownTestUtils
JDBCJoinPushdownIntegrationSuite
that can be used for testing other connectors as wellOracleJoinPushdownIntegrationSuite
as the first implementation of the traitJDBCV2JoinPushdownSuite
to inheritJDBCJoinPushdownIntegrationSuite
Why are the changes needed?
Does this PR introduce any user-facing change?
Inner joins will be pushed down to Oracle data source only if
spark.sql.optimizer.datasourceV2JoinPushdown
SQL conf is set to true. Currently, the default value is false.Previously, Spark SQL query
would produce the following Optimized plan:
Now, with join pushdown enabled, the plan would be:
When join is pushed down, the physical plan will contain
PushedJoins
information, which is the array of all the tables joined. For example, in the above case it would be:The generated SQL query would be:
How was this patch tested?
New tests.
Was this patch authored or co-authored using generative AI tooling?