Skip to content

Commit e66e67f

Browse files
committed
fix conflicts
1 parent 7cb557e commit e66e67f

File tree

6 files changed

+13
-16
lines changed

6 files changed

+13
-16
lines changed

sql/pipelines/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,6 @@
3333
<name>Spark Project Declarative Pipelines Library</name>
3434
<url>https://spark.apache.org/</url>
3535
<dependencies>
36-
<dependency>
37-
<groupId>org.scala-lang</groupId>
38-
<artifactId>scala-library</artifactId>
39-
</dependency>
4036
<dependency>
4137
<groupId>org.apache.spark</groupId>
4238
<artifactId>spark-core_${scala.binary.version}</artifactId>

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,14 @@ case class ResolvedInput(input: Input, aliasIdentifier: AliasIdentifier)
6868
trait FlowFunction extends Logging {
6969

7070
/**
71-
* This function defines the transformations performed by a flow, expressed as a DataFrame.
71+
* This function defines the transformations performed by a flow, expressed as a [[DataFrame]].
7272
*
7373
* @param allInputs the set of identifiers for all the [[Input]]s defined in the
7474
* [[DataflowGraph]].
7575
* @param availableInputs the list of all [[Input]]s available to this flow
7676
* @param configuration the spark configurations that apply to this flow.
7777
* @param queryContext The context of the query being evaluated.
78-
* @return the inputs actually used, and the DataFrame expression for the flow
78+
* @return the inputs actually used, and the [[DataFrame]] expression for the flow
7979
*/
8080
def call(
8181
allInputs: Set[TableIdentifier],

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ abstract class GraphExecution(
6161
triggerFor = streamTrigger
6262
)
6363

64+
// Listener to process events and metrics.
6465
private val streamListener = new StreamListener(env, graphForExecution)
6566

6667
/**
@@ -186,7 +187,7 @@ abstract class GraphExecution(
186187
* If the function is called before the flow has not terminated yet, the behavior is undefined,
187188
* and may return [[UnexpectedRunFailure]].
188189
*/
189-
def getUpdateTerminationReason: RunTerminationReason
190+
def getRunTerminationReason: RunTerminationReason
190191

191192
def maxRetryAttemptsForFlow(flowName: TableIdentifier): Int = {
192193
val flow = graphForExecution.flow(flowName)
@@ -225,7 +226,7 @@ object GraphExecution extends Logging {
225226
sealed trait FlowExecutionStopReason {
226227
def cause: Throwable
227228
def flowDisplayName: String
228-
def updateTerminationReason: RunTerminationReason
229+
def runTerminationReason: RunTerminationReason
229230
def failureMessage: String
230231
// If true, we record this flow execution as STOPPED with a WARNING instead a FAILED with ERROR.
231232
def warnInsteadOfError: Boolean = false
@@ -240,7 +241,7 @@ object GraphExecution extends Logging {
240241
flowDisplayName: String,
241242
maxAllowedRetries: Int
242243
) extends FlowExecutionStopReason {
243-
override lazy val updateTerminationReason: RunTerminationReason = {
244+
override lazy val runTerminationReason: RunTerminationReason = {
244245
QueryExecutionFailure(flowDisplayName, maxAllowedRetries, Option(cause))
245246
}
246247
override lazy val failureMessage: String = {

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelinesErrors.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ object PipelinesErrors extends Logging {
155155
)
156156
}
157157
if (shouldRethrow) {
158-
throw RunTerminationException(reason.updateTerminationReason)
158+
throw RunTerminationException(reason.runTerminationReason)
159159
}
160160
}
161161
}

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecution.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ class TriggeredGraphExecution(
255255
}
256256
}
257257
if (allFlowsDone) {
258-
onCompletion(getUpdateTerminationReason)
258+
onCompletion(getRunTerminationReason)
259259
}
260260
}
261261

@@ -403,7 +403,7 @@ class TriggeredGraphExecution(
403403

404404
override def stop(): Unit = { stopInternal(stopTopologicalExecutionThread = true) }
405405

406-
override def getUpdateTerminationReason: RunTerminationReason = {
406+
override def getRunTerminationReason: RunTerminationReason = {
407407
val success =
408408
pipelineState.valuesIterator.forall(TERMINAL_NON_FAILURE_STREAM_STATES.contains)
409409
if (success) {
@@ -421,7 +421,7 @@ class TriggeredGraphExecution(
421421
}
422422
.collectFirst {
423423
case (_, _, GraphExecution.StopFlowExecution(reason)) =>
424-
reason.updateTerminationReason
424+
reason.runTerminationReason
425425
}
426426

427427
executionFailureOpt.getOrElse(UnexpectedRunFailure())

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/elements.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ trait GraphElement {
6767
trait Input extends GraphElement {
6868

6969
/**
70-
* Returns a DataFrame that is a result of loading data from this [[Input]].
70+
* Returns a [[DataFrame]] that is a result of loading data from this [[Input]].
7171
* @param readOptions Type of input. Used to determine streaming/batch
72-
* @return Streaming or batch DataFrame of this Input's data.
72+
* @return Streaming or batch [[DataFrame]] of this Input's data.
7373
*/
7474
def load(readOptions: InputReadOptions): DataFrame
7575
}
@@ -82,7 +82,7 @@ sealed trait Output {
8282

8383
/**
8484
* Normalized storage location used for storing materializations for this [[Output]].
85-
* If None, it means this [[Output]] has not been normalized yet.
85+
* If [[None]], it means this [[Output]] has not been normalized yet.
8686
*/
8787
def normalizedPath: Option[String]
8888

0 commit comments

Comments
 (0)