Skip to content

Commit bad0f7d

Browse files
renozhangMarcelo Vanzin
authored and
Marcelo Vanzin
committed
[SPARK-16095][YARN] Yarn cluster mode should report correct state to SparkLauncher
## What changes were proposed in this pull request? Yarn cluster mode should return correct state for SparkLauncher ## How was this patch tested? unit test Author: peng.zhang <[email protected]> Closes apache#13962 from renozhang/SPARK-16095-spark-launcher-wrong-state.
1 parent d17e5f2 commit bad0f7d

File tree

2 files changed

+31
-15
lines changed

2 files changed

+31
-15
lines changed

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

+8-1
Original file line numberDiff line numberDiff line change
@@ -1080,7 +1080,14 @@ private[spark] class Client(
10801080
case YarnApplicationState.RUNNING =>
10811081
reportLauncherState(SparkAppHandle.State.RUNNING)
10821082
case YarnApplicationState.FINISHED =>
1083-
reportLauncherState(SparkAppHandle.State.FINISHED)
1083+
report.getFinalApplicationStatus match {
1084+
case FinalApplicationStatus.FAILED =>
1085+
reportLauncherState(SparkAppHandle.State.FAILED)
1086+
case FinalApplicationStatus.KILLED =>
1087+
reportLauncherState(SparkAppHandle.State.KILLED)
1088+
case _ =>
1089+
reportLauncherState(SparkAppHandle.State.FINISHED)
1090+
}
10841091
case YarnApplicationState.FAILED =>
10851092
reportLauncherState(SparkAppHandle.State.FAILED)
10861093
case YarnApplicationState.KILLED =>

yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala

+23-14
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
120120
finalState should be (SparkAppHandle.State.FAILED)
121121
}
122122

123+
test("run Spark in yarn-cluster mode failure after sc initialized") {
124+
val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass))
125+
finalState should be (SparkAppHandle.State.FAILED)
126+
}
127+
123128
test("run Python application in yarn-client mode") {
124129
testPySpark(true)
125130
}
@@ -259,6 +264,16 @@ private[spark] class SaveExecutorInfo extends SparkListener {
259264
}
260265
}
261266

267+
private object YarnClusterDriverWithFailure extends Logging with Matchers {
268+
def main(args: Array[String]): Unit = {
269+
val sc = new SparkContext(new SparkConf()
270+
.set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
271+
.setAppName("yarn test with failure"))
272+
273+
throw new Exception("exception after sc initialized")
274+
}
275+
}
276+
262277
private object YarnClusterDriver extends Logging with Matchers {
263278

264279
val WAIT_TIMEOUT_MILLIS = 10000
@@ -287,19 +302,19 @@ private object YarnClusterDriver extends Logging with Matchers {
287302
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
288303
data should be (Set(1, 2, 3, 4))
289304
result = "success"
305+
306+
// Verify that the config archive is correctly placed in the classpath of all containers.
307+
val confFile = "/" + Client.SPARK_CONF_FILE
308+
assert(getClass().getResource(confFile) != null)
309+
val configFromExecutors = sc.parallelize(1 to 4, 4)
310+
.map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
311+
.collect()
312+
assert(configFromExecutors.find(_ == null) === None)
290313
} finally {
291314
Files.write(result, status, StandardCharsets.UTF_8)
292315
sc.stop()
293316
}
294317

295-
// Verify that the config archive is correctly placed in the classpath of all containers.
296-
val confFile = "/" + Client.SPARK_CONF_FILE
297-
assert(getClass().getResource(confFile) != null)
298-
val configFromExecutors = sc.parallelize(1 to 4, 4)
299-
.map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
300-
.collect()
301-
assert(configFromExecutors.find(_ == null) === None)
302-
303318
// verify log urls are present
304319
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
305320
assert(listeners.size === 1)
@@ -330,9 +345,6 @@ private object YarnClusterDriver extends Logging with Matchers {
330345
}
331346

332347
private object YarnClasspathTest extends Logging {
333-
334-
var exitCode = 0
335-
336348
def error(m: String, ex: Throwable = null): Unit = {
337349
logError(m, ex)
338350
// scalastyle:off println
@@ -361,7 +373,6 @@ private object YarnClasspathTest extends Logging {
361373
} finally {
362374
sc.stop()
363375
}
364-
System.exit(exitCode)
365376
}
366377

367378
private def readResource(resultPath: String): Unit = {
@@ -374,8 +385,6 @@ private object YarnClasspathTest extends Logging {
374385
} catch {
375386
case t: Throwable =>
376387
error(s"loading test.resource to $resultPath", t)
377-
// set the exit code if not yet set
378-
exitCode = 2
379388
} finally {
380389
Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
381390
}

0 commit comments

Comments
 (0)