Skip to content

Commit b60d358

Browse files
FindDataSourceTable Logical Resolution Rule
1 parent 6fd4753 commit b60d358

File tree

1 file changed

+78
-12
lines changed

1 file changed

+78
-12
lines changed

docs/logical-analysis-rules/FindDataSourceTable.md

Lines changed: 78 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,103 @@
22
title: FindDataSourceTable
33
---
44

5-
# FindDataSourceTable Logical Evaluation Rule
5+
# FindDataSourceTable Logical Resolution Rule
66

7-
`FindDataSourceTable` is a catalyst/Rule.md[Catalyst rule] for <<apply, resolving UnresolvedCatalogRelations>> (of Spark and Hive tables) in a logical query plan.
7+
`FindDataSourceTable` is a [Catalyst rule](../catalyst/Rule.md) to [resolve UnresolvedCatalogRelation logical operators](#apply) (of Spark and Hive tables) in a logical query plan (`Rule[LogicalPlan]`).
88

9-
`FindDataSourceTable` is part of [additional rules](../Analyzer.md#extendedResolutionRules) in `Resolution` fixed-point batch of rules.
9+
`FindDataSourceTable` is used by [Hive](../hive/HiveSessionStateBuilder.md#analyzer) and [Spark](../BaseSessionStateBuilder.md#analyzer) Analyzers as part of their [extendedResolutionRules](../Analyzer.md#extendedResolutionRules).
1010

11-
[[sparkSession]][[creating-instance]]
12-
`FindDataSourceTable` takes a single [SparkSession](../SparkSession.md) to be created.
11+
## Creating Instance
12+
13+
`FindDataSourceTable` takes the following to be created:
14+
15+
* <span id="sparkSession"> [SparkSession](../SparkSession.md)
16+
17+
`FindDataSourceTable` is created when:
18+
19+
* `HiveSessionStateBuilder` is requested for the [Analyzer](../hive/HiveSessionStateBuilder.md#analyzer)
20+
* `BaseSessionStateBuilder` is requested for the [Analyzer](../BaseSessionStateBuilder.md#analyzer)
21+
22+
## Execute Rule { #apply }
23+
24+
??? note "Rule"
25+
26+
```scala
27+
apply(
28+
plan: LogicalPlan): LogicalPlan
29+
```
30+
31+
`apply` is part of the [Rule](../catalyst/Rule.md#apply) abstraction.
32+
33+
`apply` traverses the given [LogicalPlan](../logical-operators/LogicalPlan.md) (from top to leaves) to resolve `UnresolvedCatalogRelation`s of the following logical operators:
34+
35+
1. [InsertIntoStatement](../logical-operators/InsertIntoStatement.md) with a non-streaming `UnresolvedCatalogRelation` of [Spark (DataSource) table](../connectors/DDLUtils.md#isDatasourceTable)
36+
1. [InsertIntoStatement](../logical-operators/InsertIntoStatement.md) with a non-streaming `UnresolvedCatalogRelation` of a Hive table
37+
1. [AppendData](../logical-operators/AppendData.md) (that is not [by name](../logical-operators/AppendData.md#isByName)) with a [DataSourceV2Relation](../logical-operators/DataSourceV2Relation.md) of [V1Table](../connector/V1Table.md)
38+
1. A non-streaming `UnresolvedCatalogRelation` of [Spark (DataSource) table](../connectors/DDLUtils.md#isDatasourceTable)
39+
1. A non-streaming `UnresolvedCatalogRelation` of a Hive table
40+
1. A streaming `UnresolvedCatalogRelation`
41+
1. A `StreamingRelationV2` ([Spark Structured Streaming]({{ book.structured_streaming }}/logical-operators/StreamingRelationV2/)) over a streaming `UnresolvedCatalogRelation`
42+
43+
??? note "Streaming and Non-Streaming `UnresolvedCatalogRelation`s"
44+
The difference between streaming and non-streaming `UnresolvedCatalogRelation`s is the [isStreaming](../logical-operators/LogicalPlan.md#isStreaming) flag that is disabled (`false`) by default.
45+
46+
`apply`...FIXME
47+
48+
### Create StreamingRelation { #getStreamingRelation }
49+
50+
```scala
51+
getStreamingRelation(
52+
table: CatalogTable,
53+
extraOptions: CaseInsensitiveStringMap): StreamingRelation
54+
```
55+
56+
`getStreamingRelation` creates a `StreamingRelation` ([Spark Structured Streaming]({{ book.structured_streaming }}/logical-operators/StreamingRelation/)) with a [DataSource](../DataSource.md#creating-instance) with the following:
57+
58+
Property | Value
59+
-|-
60+
[DataSource provider](../DataSource.md#className) | The [provider](../CatalogTable.md#provider) of the given [CatalogTable](../CatalogTable.md)
61+
[User-specified schema](../DataSource.md#userSpecifiedSchema) | The [schema](../CatalogTable.md#schema) of the given [CatalogTable](../CatalogTable.md)
62+
[Options](../DataSource.md#options) | [DataSource options](../connectors/DataSourceUtils.md#generateDatasourceOptions) based on the given `extraOptions` and the [CatalogTable](../CatalogTable.md)
63+
[CatalogTable](../DataSource.md#catalogTable) | The given [CatalogTable](../CatalogTable.md)
64+
65+
---
66+
67+
`getStreamingRelation` is used when:
68+
69+
* `FindDataSourceTable` is requested to resolve streaming `UnresolvedCatalogRelation`s
70+
71+
## Demo
1372

1473
```text
1574
scala> :type spark
1675
org.apache.spark.sql.SparkSession
76+
```
1777

78+
```scala
1879
// Example: InsertIntoTable with UnresolvedCatalogRelation
1980
// Drop tables to make the example reproducible
2081
val db = spark.catalog.currentDatabase
2182
Seq("t1", "t2").foreach { t =>
2283
spark.sharedState.externalCatalog.dropTable(db, t, ignoreIfNotExists = true, purge = true)
2384
}
85+
```
2486

87+
```scala
2588
// Create tables
2689
sql("CREATE TABLE t1 (id LONG) USING parquet")
2790
sql("CREATE TABLE t2 (id LONG) USING orc")
91+
```
2892

93+
```text
2994
import org.apache.spark.sql.catalyst.dsl.plans._
3095
val plan = table("t1").insertInto(tableName = "t2", overwrite = true)
3196
scala> println(plan.numberedTreeString)
3297
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
3398
01 +- 'UnresolvedRelation `t1`
99+
```
34100

101+
```text
35102
// Transform the logical plan with ResolveRelations logical rule first
36103
// so UnresolvedRelations become UnresolvedCatalogRelations
37104
import spark.sessionState.analyzer.ResolveRelations
@@ -40,7 +107,9 @@ scala> println(planWithUnresolvedCatalogRelations.numberedTreeString)
40107
00 'InsertIntoTable 'UnresolvedRelation `t2`, true, false
41108
01 +- 'SubqueryAlias t1
42109
02 +- 'UnresolvedCatalogRelation `default`.`t1`, org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
110+
```
43111

112+
```text
44113
// Let's resolve UnresolvedCatalogRelations then
45114
import org.apache.spark.sql.execution.datasources.FindDataSourceTable
46115
val r = new FindDataSourceTable(spark)
@@ -52,21 +121,17 @@ scala> println(tablesResolvedPlan.numberedTreeString)
52121
02 +- Relation[id#10L] parquet
53122
```
54123

55-
## <span id="apply"> Executing Rule
124+
<!---
125+
## Review Me
56126
57-
```scala
58-
apply(
59-
plan: LogicalPlan): LogicalPlan
60-
```
127+
## Executing Rule { #apply }
61128
62129
`apply` resolves `UnresolvedCatalogRelation`s for Spark (Data Source) and Hive tables:
63130
64131
* `apply` [creates HiveTableRelation logical operators](#readDataSourceTable) for `UnresolvedCatalogRelation`s of Spark tables (incl. `InsertIntoTable`s)
65132
66133
* `apply` [creates LogicalRelation logical operators](#readHiveTable) for `InsertIntoTable`s with `UnresolvedCatalogRelation` of a Hive table or `UnresolvedCatalogRelation`s of a Hive table
67134
68-
`apply` is part of [Rule](../catalyst/Rule.md#apply) contract.
69-
70135
=== [[readHiveTable]] Creating HiveTableRelation Logical Operator -- `readHiveTable` Internal Method
71136
72137
[source, scala]
@@ -94,3 +159,4 @@ readDataSourceTable(
94159
If not available, `readDataSourceTable` [creates a new DataSource](../DataSource.md) for the [provider](../CatalogTable.md#provider) (of the input `CatalogTable`) with the extra `path` option (based on the `locationUri` of the [storage](../CatalogTable.md#storage) of the input `CatalogTable`). `readDataSourceTable` requests the `DataSource` to [resolve the relation and create a corresponding BaseRelation](../DataSource.md#resolveRelation) that is then used to create a [LogicalRelation](../logical-operators/LogicalRelation.md) with the input [CatalogTable](../CatalogTable.md).
95160
96161
NOTE: `readDataSourceTable` is used when `FindDataSourceTable` is requested to <<apply, resolve an UnresolvedCatalogRelation in a logical plan>> (for data source tables).
162+
-->

0 commit comments

Comments
 (0)