Skip to content

Commit 8470cb6

Browse files
committed
done
1 parent 4405843 commit 8470cb6

30 files changed

+4175
-130
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5854,6 +5854,67 @@ object SQLConf {
58545854
.booleanConf
58555855
.createWithDefault(true)
58565856

5857+
val PIPELINES_STREAM_STATE_POLLING_INTERVAL: ConfigEntry[Long] = {
5858+
buildConf("spark.sql.pipelines.execution.streamstate.pollingInterval")
5859+
.doc(
5860+
"Interval at which the stream state is polled for changes. This is used to check " +
5861+
"if the stream has failed and needs to be restarted."
5862+
)
5863+
.version("4.1.0")
5864+
.timeConf(TimeUnit.SECONDS)
5865+
.createWithDefault(1)
5866+
}
5867+
5868+
val PIPELINES_WATCHDOG_MIN_RETRY_TIME_IN_SECONDS: ConfigEntry[Long] = {
5869+
buildConf("spark.sql.pipelines.execution.watchdog.minRetryTime")
5870+
.doc(
5871+
"Initial duration between the time when we notice a flow has failed and when we try to " +
5872+
"restart the flow. The interval between flow restarts doubles with every stream " +
5873+
"failure up to the maximum value set in `pipelines.execution.watchdog.maxRetryTime`."
5874+
)
5875+
.version("4.1.0")
5876+
.timeConf(TimeUnit.SECONDS)
5877+
.createWithDefault(5)
5878+
}
5879+
5880+
val PIPELINES_WATCHDOG_MAX_RETRY_TIME_IN_SECONDS: ConfigEntry[Long] = {
5881+
buildConf("spark.sql.pipelines.execution.watchdog.maxRetryTime")
5882+
.doc(
5883+
"Maximum time interval at which flows will be restarted."
5884+
)
5885+
.version("4.1.0")
5886+
.timeConf(TimeUnit.SECONDS)
5887+
.createWithDefault(3600)
5888+
}
5889+
5890+
val PIPELINES_MAX_CONCURRENT_FLOWS: ConfigEntry[Int] = {
5891+
buildConf("spark.sql.pipelines.execution.maxConcurrentFlows")
5892+
.doc(
5893+
"Max number of flows to execute at once. Used to tune performance for triggered " +
5894+
"pipelines. Has no effect on continuous pipelines."
5895+
)
5896+
.version("4.1.0")
5897+
.intConf
5898+
.createWithDefault(16)
5899+
}
5900+
5901+
5902+
val PIPELINES_TIMEOUT_MS_FOR_TERMINATION_JOIN_AND_LOCK: ConfigEntry[Long] = {
5903+
buildConf("spark.sql.pipelines.timeoutMsForTerminationJoinAndLock")
5904+
.doc("Timeout to grab a lock for stopping update - default is 1hr.")
5905+
.version("4.1.0")
5906+
.timeConf(TimeUnit.MILLISECONDS)
5907+
.createWithDefault(60 * 60 * 1000)
5908+
}
5909+
5910+
val PIPELINES_MAX_FLOW_RETRY_ATTEMPTS: ConfigEntry[Int] = {
5911+
buildConf("spark.sql.pipelines.maxFlowRetryAttempts")
5912+
.doc("Maximum no. of times a flow can be retried")
5913+
.version("4.1.0")
5914+
.intConf
5915+
.createWithDefault(2)
5916+
}
5917+
58575918
/**
58585919
* Holds information about keys that have been deprecated.
58595920
*

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/common/GraphStates.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,22 @@ object FlowStatus {
4343
case object IDLE extends FlowStatus
4444
}
4545

46+
sealed trait RunState
47+
48+
object RunState {
49+
// Run is currently executing queries.
50+
case object RUNNING extends RunState
51+
52+
// Run is complete and all necessary resources are cleaned up.
53+
case object COMPLETED extends RunState
54+
55+
// Run has run into an error that could not be recovered from.
56+
case object FAILED extends RunState
57+
58+
// Run was canceled.
59+
case object CANCELED extends RunState
60+
}
61+
4662
// The type of the dataset.
4763
sealed trait DatasetType
4864
object DatasetType {

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DataflowGraph.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,14 +204,13 @@ class DataflowGraph(val flows: Seq[Flow], val tables: Seq[Table], val views: Seq
204204
validatePersistedViewSources()
205205
validateEveryDatasetHasFlow()
206206
validateTablesAreResettable()
207-
validateAppendOnceFlows()
208207
inferredSchema
209208
}.failed
210209

211210
/** Enforce every dataset has at least once input flow. For example its possible to define
212211
* streaming tables without a query; such tables should still have at least one flow
213212
* writing to it. */
214-
def validateEveryDatasetHasFlow(): Unit = {
213+
private def validateEveryDatasetHasFlow(): Unit = {
215214
(tables.map(_.identifier) ++ views.map(_.identifier)).foreach { identifier =>
216215
if (!flows.exists(_.destinationIdentifier == identifier)) {
217216
throw new AnalysisException(
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.pipelines.graph
19+
20+
import scala.jdk.CollectionConverters._
21+
import scala.util.control.{NonFatal, NoStackTrace}
22+
23+
import org.apache.spark.SparkException
24+
import org.apache.spark.internal.Logging
25+
import org.apache.spark.sql.catalyst.TableIdentifier
26+
import org.apache.spark.sql.connector.catalog.{
27+
CatalogV2Util,
28+
Identifier,
29+
TableCatalog,
30+
TableChange,
31+
TableInfo
32+
}
33+
import org.apache.spark.sql.connector.expressions.Expressions
34+
import org.apache.spark.sql.pipelines.graph.QueryOrigin.ExceptionHelpers
35+
import org.apache.spark.sql.pipelines.util.SchemaInferenceUtils.diffSchemas
36+
import org.apache.spark.sql.pipelines.util.SchemaMergingUtils
37+
38+
/**
39+
* [[DatasetManager]] is responsible for materializing tables in the catalog based on the given
40+
* graph. For each table in the graph, it will create a table if none exists (or if this is a
41+
* full refresh), or merge the schema of an existing table to match the new flows writing to it.
42+
*/
43+
object DatasetManager extends Logging {
44+
45+
/**
46+
* Wraps table materialization exceptions.
47+
*
48+
* The target use case of this exception is merely as a means to capture attribution -
49+
* 1. Indicate that the exception is associated with table materialization.
50+
* 2. Indicate which table materialization failed for.
51+
*
52+
* @param tableName The name of the table that failed to materialize.
53+
* @param cause The underlying exception that caused the materialization to fail.
54+
*/
55+
case class TableMaterializationException(
56+
tableName: String,
57+
cause: Throwable
58+
) extends Exception(cause)
59+
with NoStackTrace
60+
61+
/**
62+
* Materializes the tables in the given graph. This method will create or update the tables
63+
* in the catalog based on the given graph and context.
64+
*
65+
* @param virtualizedConnectedGraphWithTables The connected graph.
66+
* @param context The context for the pipeline update.
67+
* @return The graph with materialized tables.
68+
*/
69+
def materializeDatasets(
70+
virtualizedConnectedGraphWithTables: DataflowGraph,
71+
context: PipelineUpdateContext): DataflowGraph = {
72+
val (_, refreshTableIdentsSet, fullRefreshTableIdentsSet) = {
73+
DatasetManager.constructFullRefreshSet(virtualizedConnectedGraphWithTables.tables, context)
74+
}
75+
76+
/** Return all the tables that need to be materialized from the given graph. */
77+
def tablesToMatz(graph: DataflowGraph): Seq[TableRefreshType] = {
78+
graph.tables
79+
.filter(t => fullRefreshTableIdentsSet.contains(t.identifier))
80+
.map(table => TableRefreshType(table, isFullRefresh = true)) ++
81+
graph.tables
82+
.filter(t => refreshTableIdentsSet.contains(t.identifier))
83+
.map(table => TableRefreshType(table, isFullRefresh = false))
84+
}
85+
86+
val tablesToMaterialize =
87+
tablesToMatz(virtualizedConnectedGraphWithTables).map(t => t.table.identifier -> t).toMap
88+
89+
// normalized graph where backing tables for all dataset have been created and datasets
90+
// are all marked as normalized.
91+
val virtualizedMaterializedConnectedGraphWithTables: DataflowGraph = try {
92+
DataflowGraphTransformer
93+
.withDataflowGraphTransformer(virtualizedConnectedGraphWithTables) { transformer =>
94+
transformer.transformTables { table =>
95+
if (tablesToMaterialize.keySet.contains(table.identifier)) {
96+
try {
97+
materializeTable(
98+
virtualizedConnectedGraphWithTables,
99+
table,
100+
tablesToMaterialize(table.identifier).isFullRefresh,
101+
context
102+
)
103+
} catch {
104+
case NonFatal(e) =>
105+
throw TableMaterializationException(
106+
table.displayName,
107+
cause = e.addOrigin(table.origin)
108+
)
109+
}
110+
} else {
111+
table
112+
}
113+
}
114+
// TODO: Publish persisted views to the metastore.
115+
}
116+
.getDataflowGraph
117+
} catch {
118+
case e: SparkException if e.getCause != null => throw e.getCause
119+
}
120+
121+
virtualizedMaterializedConnectedGraphWithTables
122+
}
123+
124+
/**
125+
* Materializes a table in the catalog. This method will create or update the table in the
126+
* catalog based on the given table and context.
127+
* @param virtualizedConnectedGraphWithTables The connected graph. Used to infer the table schema.
128+
* @param table The table to be materialized.
129+
* @param isFullRefresh Whether this table should be full refreshed or not.
130+
* @param context The context for the pipeline update.
131+
* @return The materialized table (with additional metadata set).
132+
*/
133+
private def materializeTable(
134+
virtualizedConnectedGraphWithTables: DataflowGraph,
135+
table: Table,
136+
isFullRefresh: Boolean,
137+
context: PipelineUpdateContext
138+
): Table = {
139+
logInfo(s"Materializing metadata for table ${table.identifier}.")
140+
val catalogManager = context.spark.sessionState.catalogManager
141+
val catalog = (table.identifier.catalog match {
142+
case Some(catalogName) =>
143+
catalogManager.catalog(catalogName)
144+
case None =>
145+
catalogManager.currentCatalog
146+
}).asInstanceOf[TableCatalog]
147+
148+
val identifier =
149+
Identifier.of(Array(table.identifier.database.get), table.identifier.identifier)
150+
val outputSchema = table.specifiedSchema.getOrElse(
151+
virtualizedConnectedGraphWithTables.inferredSchema(table.identifier).asNullable
152+
)
153+
val mergedProperties = resolveTableProperties(table, identifier)
154+
155+
val exists = catalog.tableExists(identifier)
156+
157+
// Wipe the data if we need to
158+
if ((isFullRefresh || !table.isStreamingTableOpt.get) && exists) {
159+
context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}")
160+
}
161+
162+
// Alter the table if we need to
163+
if (exists) {
164+
val existingSchema = catalog.loadTable(identifier).schema()
165+
166+
val targetSchema = if (table.isStreamingTableOpt.get && !isFullRefresh) {
167+
SchemaMergingUtils.mergeSchemas(existingSchema, outputSchema)
168+
} else {
169+
outputSchema
170+
}
171+
172+
val columnChanges = diffSchemas(existingSchema, targetSchema)
173+
val setProperties = mergedProperties.map { case (k, v) => TableChange.setProperty(k, v) }
174+
catalog.alterTable(identifier, (columnChanges ++ setProperties).toArray: _*)
175+
}
176+
177+
// Create the table if we need to
178+
if (!exists) {
179+
catalog.createTable(
180+
identifier,
181+
new TableInfo.Builder()
182+
.withProperties(mergedProperties.asJava)
183+
.withColumns(CatalogV2Util.structTypeToV2Columns(outputSchema))
184+
.withPartitions(table.partitionCols.toSeq.flatten.map(Expressions.identity).toArray)
185+
.build()
186+
)
187+
}
188+
189+
table.copy(
190+
normalizedPath =
191+
Option(catalog.loadTable(identifier).properties().get(TableCatalog.PROP_LOCATION))
192+
)
193+
}
194+
195+
/**
196+
* Some fields on the [[Table]] object are represented as reserved table properties by the catalog
197+
* APIs. This method creates a table properties map that merges the user-provided table properties
198+
* with these reserved properties.
199+
*/
200+
private def resolveTableProperties(table: Table, identifier: Identifier): Map[String, String] = {
201+
val validatedAndCanonicalizedProps =
202+
PipelinesTableProperties.validateAndCanonicalize(
203+
table.properties,
204+
warnFunction = s => logWarning(s)
205+
)
206+
207+
val specialProps = Seq(
208+
(table.comment, "comment", TableCatalog.PROP_COMMENT),
209+
(table.format, "format", TableCatalog.PROP_PROVIDER)
210+
).map {
211+
case (value, name, reservedPropKey) =>
212+
validatedAndCanonicalizedProps.get(reservedPropKey).foreach { pc =>
213+
if (value.isDefined && value.get != pc) {
214+
throw new IllegalArgumentException(
215+
s"For dataset $identifier, $name '${value.get}' does not match value '$pc' for " +
216+
s"reserved table property '$reservedPropKey''"
217+
)
218+
}
219+
}
220+
reservedPropKey -> value
221+
}
222+
.collect { case (key, Some(value)) => key -> value }
223+
224+
validatedAndCanonicalizedProps ++ specialProps
225+
}
226+
227+
/**
228+
* A case class that represents the type of refresh for a table.
229+
* @param table The table to be refreshed.
230+
* @param isFullRefresh Whether this table should be fully refreshed or not.
231+
*/
232+
private case class TableRefreshType(table: Table, isFullRefresh: Boolean)
233+
234+
/**
235+
* Constructs the set of tables that should be fully refreshed and the set of tables that
236+
* should be refreshed.
237+
*/
238+
private def constructFullRefreshSet(
239+
graphTables: Seq[Table],
240+
context: PipelineUpdateContext
241+
): (Seq[Table], Seq[TableIdentifier], Seq[TableIdentifier]) = {
242+
val (fullRefreshTablesSet, refreshTablesSet) = {
243+
val specifiedFullRefreshTables = context.fullRefreshTables.filter(graphTables)
244+
val specifiedRefreshTables = context.refreshTables.filter(graphTables)
245+
246+
val (fullRefreshAllowed, fullRefreshNotAllowed) = specifiedFullRefreshTables.partition { t =>
247+
PipelinesTableProperties.resetAllowed.fromMap(t.properties)
248+
}
249+
250+
val refreshTables = (specifiedRefreshTables ++ fullRefreshNotAllowed).filterNot { t =>
251+
fullRefreshAllowed.contains(t)
252+
}
253+
254+
if (fullRefreshNotAllowed.nonEmpty) {
255+
logInfo(
256+
s"Skipping full refresh on some flows because " +
257+
s"${PipelinesTableProperties.resetAllowed.key} was set to false. Flows: " +
258+
s"$fullRefreshNotAllowed"
259+
)
260+
}
261+
262+
(fullRefreshAllowed, refreshTables)
263+
}
264+
val allRefreshTables = fullRefreshTablesSet ++ refreshTablesSet
265+
val refreshTableIdentsSet = refreshTablesSet.map(_.identifier)
266+
val fullRefreshTableIdentsSet = fullRefreshTablesSet.map(_.identifier)
267+
(allRefreshTables, refreshTableIdentsSet, fullRefreshTableIdentsSet)
268+
}
269+
}

0 commit comments

Comments
 (0)