Skip to content

Commit 79fcd7c

Browse files
SCHJonathansryza
authored andcommitted
sandy
1 parent cf668c0 commit 79fcd7c

File tree

4 files changed

+71
-87
lines changed

4 files changed

+71
-87
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,10 @@ trait FlowExecution {
9595
/** Context about this pipeline update. */
9696
def updateContext: PipelineUpdateContext
9797

98-
implicit val executionContext: ExecutionContext =
98+
/** The thread execution context for the current [[FlowExecution]]. */
99+
implicit val executionContext: ExecutionContext = {
99100
ExecutionContext.fromExecutor(FlowExecution.threadPool)
101+
}
100102

101103
/**
102104
* Stops execution of this [[FlowExecution]]. If you override this, please be sure to
@@ -107,8 +109,15 @@ trait FlowExecution {
107109
stopped.set(true)
108110
}
109111

112+
/** Returns an optional exception that occurred during execution, if any. */
110113
def exception: Option[Throwable] = _future.flatMap(_.value).flatMap(_.failed.toOption)
111114

115+
/**
116+
* Executes this PhysicalFlow synchronously to perform its intended update.
117+
* This method should be overridden by subclasses to provide the actual execution logic.
118+
*
119+
* @return a Future that completes when the execution is finished or stopped.
120+
*/
112121
def executeInternal(): Future[Unit]
113122

114123
/**
@@ -129,10 +138,7 @@ trait FlowExecution {
129138
executeInternal()
130139
.transform {
131140
case Success(_) => Success(ExecutionResult.FINISHED)
132-
// Add origin to exceptions raised while executing a flow i.e. inside the `Future`
133-
// created by the `executeInternal` method.
134-
case Failure(e) =>
135-
Failure(e)
141+
case Failure(e) => Failure(e)
136142
}
137143
.map(_ => ExecutionResult.FINISHED)
138144
.recover {
@@ -155,8 +161,10 @@ trait FlowExecution {
155161
}
156162

157163
object FlowExecution {
158-
private val threadPool: ThreadPoolExecutor =
164+
/** A thread pool used to execute [[FlowExecution]]s. */
165+
private val threadPool: ThreadPoolExecutor = {
159166
ThreadUtils.newDaemonCachedThreadPool("FlowExecution")
167+
}
160168
}
161169

162170
/** A [[FlowExecution]] that processes data statefully using Structured Streaming. */
@@ -190,6 +198,7 @@ trait StreamingFlowExecution extends FlowExecution with Logging {
190198
}
191199
}
192200

201+
/** A [[StreamingFlowExecution]] that writes a streaming DataFrame to a DLT [[Table]]. */
193202
class StreamingTableWrite(
194203
val identifier: TableIdentifier,
195204
val flow: ResolvedFlow,
@@ -217,6 +226,7 @@ class StreamingTableWrite(
217226
}
218227
}
219228

229+
/** A [[FlowExecution]] that writes a batch DataFrame to a DLT [[Table]]. */
220230
class BatchFlowExecution(
221231
val identifier: TableIdentifier,
222232
val flow: ResolvedFlow,

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala

Lines changed: 8 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
package org.apache.spark.sql.pipelines.graph
1919
import java.util.concurrent.{ConcurrentHashMap, TimeoutException}
2020

21-
import scala.annotation.unused
2221
import scala.concurrent.ExecutionContext
2322
import scala.jdk.CollectionConverters._
2423
import scala.util.{Failure, Success}
2524

26-
import org.apache.spark.SparkException
2725
import org.apache.spark.internal.Logging
2826
import org.apache.spark.sql.catalyst.TableIdentifier
2927
import org.apache.spark.sql.internal.SQLConf
@@ -70,29 +68,18 @@ abstract class GraphExecution(
7068
triggerFor = streamTrigger
7169
)
7270

73-
val SERIAL_PLANNING = "SERIAL"
74-
7571
// Listeners to process events and metrics.
7672
private val batchListener = new BatchListener()
7773
private val streamListener = new StreamListener(env, graphForExecution)
7874

79-
/**
80-
* Run the given planning function `f` for each flow in `flows`.
81-
*/
82-
protected def startPlanning(flows: Seq[ResolvedFlow])(
83-
f: (ResolvedFlow, String) => Unit
84-
): Unit = {
85-
flows.foreach(f(_, SERIAL_PLANNING))
86-
}
87-
8875
/**
8976
* Plans the logical [[ResolvedFlow]] into a [[FlowExecution]] and then starts executing it.
9077
* Implementation note: Thread safe
9178
*
9279
* @return None if the flow planner decided that there is no actual update required here.
9380
* Otherwise returns the corresponding physical flow.
9481
*/
95-
def startFlow(flow: ResolvedFlow): Option[FlowExecution] = {
82+
def planAndStartFlow(flow: ResolvedFlow): Option[FlowExecution] = {
9683
try {
9784
val physicalFlow = flowPlanner.plan(
9885
flow = graphForExecution.resolvedFlow(flow.identifier)
@@ -249,9 +236,12 @@ object GraphExecution extends Logging {
249236

250237
// Set of states after checking the exception for flow execution retryability analysis.
251238
sealed trait FlowExecutionAction
239+
/** Indicates that the flow execution should be retried. */
252240
case object RetryFlowExecution extends FlowExecutionAction
241+
/** Indicates that the flow execution should be stopped with a specific reason. */
253242
case class StopFlowExecution(reason: FlowExecutionStopReason) extends FlowExecutionAction
254243

244+
/** Represents the reason why a flow execution should be stopped. */
255245
sealed trait FlowExecutionStopReason {
256246
def cause: Throwable
257247
def flowDisplayName: String
@@ -261,28 +251,10 @@ object GraphExecution extends Logging {
261251
def warnInsteadOfError: Boolean = false
262252
}
263253

264-
@unused
265-
case class ReanalyzeFlowSchema(originalCause: Throwable, flowDisplayName: String)
266-
extends FlowExecutionStopReason {
267-
override lazy val updateTerminationReason: UpdateTerminationReason = {
268-
UpdateSchemaChange(flowDisplayName, Option(originalCause))
269-
}
270-
// Schema change can be automatically retried to handle
271-
override val warnInsteadOfError: Boolean = true
272-
override lazy val failureMessage: String = {
273-
s"Flow '$flowDisplayName' has encountered a schema change during execution and " +
274-
s"terminated. A new update using the new schema will be automatically started."
275-
}
276-
// Override the cause to make it more friendly for tracking purpose
277-
override lazy val cause: Throwable = {
278-
new SparkException(
279-
errorClass = "FLOW_SCHEMA_CHANGED",
280-
messageParameters = Map("flowName" -> flowDisplayName),
281-
cause = originalCause
282-
)
283-
}
284-
}
285-
254+
/**
255+
* Represents the [[FlowExecution]] should be stopped due to it failed with some retryable errors
256+
* and has exhausted all the retry attempts.
257+
*/
286258
private case class MaxRetryExceeded(
287259
cause: Throwable,
288260
flowDisplayName: String,
@@ -297,23 +269,6 @@ object GraphExecution extends Logging {
297269
}
298270
}
299271

300-
@unused
301-
case class NonRetryableException(cause: Throwable, flowDisplayName: String)
302-
extends FlowExecutionStopReason {
303-
override lazy val updateTerminationReason: UpdateTerminationReason = {
304-
QueryExecutionFailure(
305-
flowName = flowDisplayName,
306-
// Set maxRetries to 0 to not mention maxRetries in the error message.
307-
maxRetries = 0,
308-
cause = Option(cause)
309-
)
310-
}
311-
override lazy val failureMessage: String = {
312-
s"Flow '$flowDisplayName' has FAILED due to a non-retryable exception and will not be " +
313-
s"restarted in this update."
314-
}
315-
}
316-
317272
/**
318273
* Analyze the exception thrown by flow execution and figure out if we should retry the execution,
319274
* or we need to reanalyze the flow entirely to resolve issues like schema changes.

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,16 @@ import org.apache.spark.sql.pipelines.util.ExponentialBackoffStrategy
3131
import org.apache.spark.sql.streaming.Trigger
3232
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
3333

34-
sealed trait StreamState
35-
36-
object StreamState {
37-
case object QUEUED extends StreamState
38-
case object RUNNING extends StreamState
39-
case object EXCLUDED extends StreamState
40-
case object IDLE extends StreamState
41-
case object SKIPPED extends StreamState
42-
case object TERMINATED_WITH_ERROR extends StreamState
43-
case object CANCELED extends StreamState
44-
case object SUCCESSFUL extends StreamState
45-
}
46-
34+
/**
35+
* Executes all of the flows in the given graph in topological order. Each flow processes
36+
* all available data before downstream flows are triggered.
37+
*
38+
* @param graphForExecution the graph to execute.
39+
* @param env the context in which the graph is executed.
40+
* @param onCompletion a callback to execute after all streams are done. The boolean
41+
* argument is true if the execution was successful.
42+
* @param clock a clock used to determine the time of execution.
43+
*/
4744
class TriggeredGraphExecution(
4845
graphForExecution: DataflowGraph,
4946
env: PipelineUpdateContext,
@@ -219,16 +216,12 @@ class TriggeredGraphExecution(
219216
flowsToStart.append(graphForExecution.resolvedFlow(flowIdentifier))
220217
}
221218

222-
val (batchFlowsToStart, otherFlowsToStart) = flowsToStart.partition { f =>
223-
graphForExecution.resolvedFlow(f.identifier).isInstanceOf[CompleteFlow]
224-
}
225-
226-
def startFlowWithPlanningMode(flow: ResolvedFlow, mode: String): Unit = {
219+
def startFlow(flow: ResolvedFlow): Unit = {
227220
val flowIdentifier = flow.identifier
228-
logInfo(s"Starting flow ${flow.identifier} in $mode mode")
221+
logInfo(s"Starting flow ${flow.identifier}")
229222
env.flowProgressEventLogger.recordPlanningForBatchFlow(flow)
230223
try {
231-
val flowStarted = startFlow(flow)
224+
val flowStarted = planAndStartFlow(flow)
232225
if (flowStarted.nonEmpty) {
233226
pipelineState.put(flowIdentifier, StreamState.RUNNING)
234227
logInfo(s"Flow $flowIdentifier started.")
@@ -250,16 +243,12 @@ class TriggeredGraphExecution(
250243
}
251244
}
252245

253-
// start non-batch flows serially because the configs will be attached to the pipeline's spark
254-
// session (source dataframe's spark session)
255-
otherFlowsToStart.foreach(startFlowWithPlanningMode(_, SERIAL_PLANNING))
256-
257-
// only start MV flows in parallel if enabled
258-
startPlanning(batchFlowsToStart.toSeq) { (flow, mode) =>
259-
startFlowWithPlanningMode(flow, mode)
260-
}
246+
// start each flow serially
247+
flowsToStart.foreach(startFlow)
261248

262249
try {
250+
// Put thread to sleep for the configured polling interval to avoid busy-waiting
251+
// and holding one CPU core.
263252
Thread.sleep(pipelineConf.streamStatePollingInterval * 1000)
264253
} catch {
265254
case _: InterruptedException => return
@@ -453,6 +442,35 @@ case class TriggeredFailureInfo(
453442

454443
object TriggeredGraphExecution {
455444

445+
// All possible states of a data stream for a flow
446+
sealed trait StreamState
447+
object StreamState {
448+
// Stream is waiting on its parent tables to successfully finish processing
449+
// data to start running, in triggered execution
450+
case object QUEUED extends StreamState
451+
452+
// Stream is processing data
453+
case object RUNNING extends StreamState
454+
455+
// Stream excluded if it's not selected in the partial graph update API call.
456+
case object EXCLUDED extends StreamState
457+
458+
// Stream will not be rerun because it is a ONCE flow.
459+
case object IDLE extends StreamState
460+
461+
// Stream will not be run due to parent tables not finishing successfully in triggered execution
462+
case object SKIPPED extends StreamState
463+
464+
// Stream has been stopped with a fatal error
465+
case object TERMINATED_WITH_ERROR extends StreamState
466+
467+
// Stream stopped before completion in triggered execution
468+
case object CANCELED extends StreamState
469+
470+
// Stream successfully processed all available data in triggered execution
471+
case object SUCCESSFUL extends StreamState
472+
}
473+
456474
/**
457475
* List of terminal states which we don't consider as failures.
458476
*

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.classic.{DataFrame, Dataset}
2727
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog}
2828
import org.apache.spark.sql.execution.streaming.MemoryStream
2929
import org.apache.spark.sql.pipelines.common.{FlowStatus, RunState}
30+
import org.apache.spark.sql.pipelines.graph.TriggeredGraphExecution.StreamState
3031
import org.apache.spark.sql.pipelines.logging.EventLevel
3132
import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext}
3233
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

0 commit comments

Comments
 (0)