Skip to content

Commit 80293e4

Browse files
committed
UpdateTerminationReason -> RunTerminationReason
1 parent 697cc73 commit 80293e4

File tree

5 files changed

+38
-58
lines changed

5 files changed

+38
-58
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,9 @@ abstract class GraphExecution(
184184
/**
185185
* Returns the reason why this flow execution has terminated.
186186
* If the function is called before the flow has not terminated yet, the behavior is undefined,
187-
* and may return [[UnexpectedUpdateFailure]].
187+
* and may return [[UnexpectedRunFailure]].
188188
*/
189-
def getUpdateTerminationReason: UpdateTerminationReason
189+
def getUpdateTerminationReason: RunTerminationReason
190190

191191
def maxRetryAttemptsForFlow(flowName: TableIdentifier): Int = {
192192
val flow = graphForExecution.flow(flowName)
@@ -225,7 +225,7 @@ object GraphExecution extends Logging {
225225
sealed trait FlowExecutionStopReason {
226226
def cause: Throwable
227227
def flowDisplayName: String
228-
def updateTerminationReason: UpdateTerminationReason
228+
def updateTerminationReason: RunTerminationReason
229229
def failureMessage: String
230230
// If true, we record this flow execution as STOPPED with a WARNING instead a FAILED with ERROR.
231231
def warnInsteadOfError: Boolean = false
@@ -240,7 +240,7 @@ object GraphExecution extends Logging {
240240
flowDisplayName: String,
241241
maxAllowedRetries: Int
242242
) extends FlowExecutionStopReason {
243-
override lazy val updateTerminationReason: UpdateTerminationReason = {
243+
override lazy val updateTerminationReason: RunTerminationReason = {
244244
QueryExecutionFailure(flowDisplayName, maxAllowedRetries, Option(cause))
245245
}
246246
override lazy val failureMessage: String = {

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ object PipelinesErrors extends Logging {
155155
)
156156
}
157157
if (shouldRethrow) {
158-
throw UpdateTerminationException(reason.updateTerminationReason)
158+
throw RunTerminationException(reason.updateTerminationReason)
159159
}
160160
}
161161
}
Lines changed: 25 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -20,93 +20,73 @@ package org.apache.spark.sql.pipelines.graph
2020
import org.apache.spark.sql.catalyst.TableIdentifier
2121
import org.apache.spark.sql.pipelines.common.RunState
2222

23-
sealed trait UpdateTerminationReason {
23+
sealed trait RunTerminationReason {
2424

25-
/** Terminal state for the given update. */
25+
/** Terminal state for the given run. */
2626
def terminalState: RunState
2727

2828
/**
29-
* User visible message associated with update termination. This will also be set as the message
30-
* in the associated terminal update progress log.
29+
* User visible message associated with run termination. This will also be set as the message
30+
* in the associated terminal run progress log.
3131
*/
3232
def message: String
3333

3434
/**
35-
* Exception associated with the given update termination. This exception will be
36-
* included in the error details in the associated terminal update progress event.
35+
* Exception associated with the given run termination. This exception will be
36+
* included in the error details in the associated terminal run progress event.
3737
*/
3838
def cause: Option[Throwable]
39-
40-
/**
41-
* Whether this termination reason should override the partial execution failure thrown during
42-
* flow execution.
43-
*/
44-
def overridesPartialExecutionFailure: Boolean = false
4539
}
4640

4741
/**
48-
* Helper exception class that indicates that an update has to be terminated and
42+
* Helper exception class that indicates that a run has to be terminated and
4943
* tracks the associated termination reason.
5044
*/
51-
case class UpdateTerminationException(reason: UpdateTerminationReason) extends Exception
45+
case class RunTerminationException(reason: RunTerminationReason) extends Exception
5246

5347
// ===============================================================
54-
// ============ Graceful update termination states ===============
48+
// ============ Graceful run termination states ==================
5549
// ===============================================================
5650

57-
/** Indicates that a triggered update has successfully completed execution. */
58-
case class UpdateCompletion() extends UpdateTerminationReason {
51+
/** Indicates that a triggered run has successfully completed execution. */
52+
case class RunCompletion() extends RunTerminationReason {
5953
override def terminalState: RunState = RunState.COMPLETED
60-
override def message: String = s"Update is $terminalState."
54+
override def message: String = s"Run is $terminalState."
6155
override def cause: Option[Throwable] = None
6256
}
6357

64-
/**
65-
* Indicates that the update is being terminated since the schema for a given flow
66-
* changed during execution.
67-
*/
68-
case class UpdateSchemaChange(flowName: String, override val cause: Option[Throwable])
69-
extends UpdateTerminationReason {
70-
override def terminalState: RunState = RunState.CANCELED
71-
override def message: String =
72-
s"Update has been cancelled due to a schema change in $flowName, " +
73-
s"and will be restarted."
74-
75-
override def overridesPartialExecutionFailure: Boolean = true
76-
}
77-
7858
// ===============================================================
79-
// ======================= Update failures =======================
59+
// ======================= Run failures ==========================
8060
// ===============================================================
8161

82-
/** Indicates that an update entered the failed state.. */
83-
abstract sealed class UpdateFailure extends UpdateTerminationReason {
62+
/** Indicates that an run entered the failed state.. */
63+
abstract sealed class RunFailure extends RunTerminationReason {
8464

8565
/** Whether or not this failure is considered fatal / irrecoverable. */
8666
def isFatal: Boolean
8767

8868
override def terminalState: RunState = RunState.FAILED
8969
}
9070

91-
/** Indicates that update has failed due to a query execution failure. */
71+
/** Indicates that run has failed due to a query execution failure. */
9272
case class QueryExecutionFailure(
9373
flowName: String,
9474
maxRetries: Int,
9575
override val cause: Option[Throwable])
96-
extends UpdateFailure {
76+
extends RunFailure {
9777
override def isFatal: Boolean = false
9878

9979
override def message: String =
10080
if (maxRetries == 0) {
101-
s"Update is $terminalState since flow '$flowName' has failed."
81+
s"Run is $terminalState since flow '$flowName' has failed."
10282
} else {
103-
s"Update is $terminalState since flow '$flowName' has failed more " +
83+
s"Run is $terminalState since flow '$flowName' has failed more " +
10484
s"than $maxRetries times."
10585
}
10686
}
10787

10888
/** Abstract class used to identify failures related to failures stopping an operation/timeouts. */
109-
abstract class FailureStoppingOperation extends UpdateFailure {
89+
abstract class FailureStoppingOperation extends RunFailure {
11090

11191
/** Name of the operation that failed to stop. */
11292
def operation: String
@@ -120,22 +100,22 @@ case class FailureStoppingFlow(flowIdentifiers: Seq[TableIdentifier])
120100
override def message: String = {
121101
if (flowIdentifiers.nonEmpty) {
122102
val flowNamesToPrint = flowIdentifiers.map(_.toString).sorted.take(5).mkString(", ")
123-
s"Update is $terminalState since following flows have failed to stop: " +
103+
s"Run is $terminalState since following flows have failed to stop: " +
124104
s"$flowNamesToPrint."
125105
} else {
126-
s"Update is $terminalState since stopping flow execution has failed."
106+
s"Run is $terminalState since stopping flow execution has failed."
127107
}
128108
}
129109
override def cause: Option[Throwable] = None
130110
}
131111

132112
/**
133-
* Update could not be associated with a proper root cause.
113+
* Run could not be associated with a proper root cause.
134114
* This is not expected and likely indicates a bug.
135115
*/
136-
case class UnexpectedUpdateFailure() extends UpdateFailure {
116+
case class UnexpectedRunFailure() extends RunFailure {
137117
override def isFatal: Boolean = false
138118
override def message: String =
139-
s"Update $terminalState unexpectedly."
119+
s"Run $terminalState unexpectedly."
140120
override def cause: Option[Throwable] = None
141121
}

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
4444
class TriggeredGraphExecution(
4545
graphForExecution: DataflowGraph,
4646
env: PipelineUpdateContext,
47-
onCompletion: UpdateTerminationReason => Unit = _ => (),
47+
onCompletion: RunTerminationReason => Unit = _ => (),
4848
clock: Clock = new SystemClock()
4949
) extends GraphExecution(graphForExecution, env) {
5050

@@ -118,7 +118,7 @@ class TriggeredGraphExecution(
118118
case ex: Throwable =>
119119
logError(s"Exception thrown while stopping the update...", ex)
120120
} finally {
121-
onCompletion(UnexpectedUpdateFailure())
121+
onCompletion(UnexpectedRunFailure())
122122
}
123123
}
124124
)
@@ -393,7 +393,7 @@ class TriggeredGraphExecution(
393393
.map(_.get._1)
394394

395395
if (flowsFailedToStop.nonEmpty) {
396-
throw UpdateTerminationException(FailureStoppingFlow(flowsFailedToStop))
396+
throw RunTerminationException(FailureStoppingFlow(flowsFailedToStop))
397397
}
398398
}
399399

@@ -403,11 +403,11 @@ class TriggeredGraphExecution(
403403

404404
override def stop(): Unit = { stopInternal(stopTopologicalExecutionThread = true) }
405405

406-
override def getUpdateTerminationReason: UpdateTerminationReason = {
406+
override def getUpdateTerminationReason: RunTerminationReason = {
407407
val success =
408408
pipelineState.valuesIterator.forall(TERMINAL_NON_FAILURE_STREAM_STATES.contains)
409409
if (success) {
410-
return UpdateCompletion()
410+
return RunCompletion()
411411
}
412412

413413
val executionFailureOpt = failureTracker.iterator
@@ -424,7 +424,7 @@ class TriggeredGraphExecution(
424424
reason.updateTerminationReason
425425
}
426426

427-
executionFailureOpt.getOrElse(UnexpectedUpdateFailure())
427+
executionFailureOpt.getOrElse(UnexpectedRunFailure())
428428
}
429429
}
430430

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ class TriggeredGraphExecutionSuite extends ExecutionTest {
463463
expectedEventLevel = EventLevel.ERROR,
464464
msgChecker = msg =>
465465
msg.contains(
466-
"Update is FAILED since flow 'spark_catalog.test_db.branch_2' has failed more than " +
466+
"Run is FAILED since flow 'spark_catalog.test_db.branch_2' has failed more than " +
467467
"2 times"
468468
)
469469
)
@@ -901,7 +901,7 @@ class TriggeredGraphExecutionSuite extends ExecutionTest {
901901
expectedEventLevel = EventLevel.ERROR,
902902
msgChecker = msg =>
903903
msg.contains(
904-
"Update is FAILED since flow 'spark_catalog.test_db.a' has failed more than 2 times"
904+
"Run is FAILED since flow 'spark_catalog.test_db.a' has failed more than 2 times"
905905
)
906906
)
907907
}

0 commit comments

Comments
 (0)