Skip to content

Commit 5df833c

Browse files
committed
[SPARK-54010] Support restart counter in Spark app
### What changes were proposed in this pull request? This PR adds support for automatic restart counter reset based on application attempt duration. The feature introduces a new `restartCounterResetMillis` field in RestartConfig that allows the restart counter to be reset if an application runs successfully for a specified duration before terminating. Also added unit test and enhanced existing test `assertGeneratedCRDMatchesHelmChart` to give diff fore readability. ### Why are the changes needed? With this feature, users can distinguish between persistent failures (quick consecutive crashes) and applications that run for long periods between failures. ### Does this PR introduce _any_ user-facing change? A new optional configuration field restartCounterResetMillis added to the RestartConfig spec. ### How was this patch tested? Added unit test that validates restart counter works as expected. ### Was this patch authored or co-authored using generative AI tooling? No
1 parent aaf1542 commit 5df833c

File tree

8 files changed

+350
-6
lines changed

8 files changed

+350
-6
lines changed

build-tools/helm/spark-kubernetes-operator/crds/sparkapplications.spark.apache.org-v1.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17426,6 +17426,9 @@ spec:
1742617426
type: integer
1742717427
restartBackoffMillis:
1742817428
type: integer
17429+
restartCounterResetMillis:
17430+
default: -1
17431+
type: integer
1742917432
restartPolicy:
1743017433
enum:
1743117434
- Always
@@ -23735,6 +23738,8 @@ spec:
2373523738
properties:
2373623739
id:
2373723740
type: integer
23741+
restartCounter:
23742+
type: integer
2373823743
type: object
2373923744
stateTransitionHistory:
2374023745
additionalProperties:
@@ -24895,6 +24900,8 @@ spec:
2489524900
properties:
2489624901
id:
2489724902
type: integer
24903+
restartCounter:
24904+
type: integer
2489824905
type: object
2489924906
stateTransitionHistory:
2490024907
additionalProperties:

build-tools/helm/spark-kubernetes-operator/crds/sparkclusters.spark.apache.org-v1.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21802,6 +21802,8 @@ spec:
2180221802
properties:
2180321803
id:
2180421804
type: integer
21805+
restartCounter:
21806+
type: integer
2180521807
type: object
2180621808
stateTransitionHistory:
2180721809
additionalProperties:
@@ -21842,6 +21844,8 @@ spec:
2184221844
properties:
2184321845
id:
2184421846
type: integer
21847+
restartCounter:
21848+
type: integer
2184521849
type: object
2184621850
stateTransitionHistory:
2184721851
additionalProperties:

docs/spark_custom_resources.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,31 @@ restartConfig:
234234
restartBackoffMillis: 30000
235235
```
236236

237+
### Restart Counter reset
238+
239+
The restartCounterResetMillis field controls automatic restart counter resets for long-running
240+
application attempts. When set to a non-negative value (in milliseconds), the operator will reset
241+
the restart counter if an application attempt runs successfully for at least the specified duration
242+
before failing. This feature enables user to allow maximal x attempts if an app fails really
243+
fast (which could indicate some underlying issue other than the app itself) while allowing
244+
indefinite restarts when the app can survive given threshold.
245+
246+
For example, setting
247+
248+
```yaml
249+
250+
restartConfig:
251+
## 1hr
252+
restartCounterResetMillis: 3600000
253+
maxRestartAttempts: 3
254+
255+
```
256+
257+
means the application can fail and restart up to 3 times, but if any attempt runs for more than
258+
1 hour, the counter resets to zero, allowing another 3 restart attempts.
259+
260+
The default value is -1, which disables automatic counter resets.
261+
237262
### Timeouts
238263

239264
It's possible to configure applications to be proactively terminated and resubmitted in particular

spark-operator-api/build.gradle

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,31 @@ tasks.register("assertGeneratedCRDMatchesHelmChart") {
8989
"${stagedCRDFileBase}sparkclusters.spark.apache.org-v1.yaml"
9090
].execute().text.trim()
9191
if (generatedAppCRD != stagedAppCRD || generatedClusterCRD != stagedClusterCRD) {
92-
throw new GradleException("Generated CRD yaml does not match the staged version in " +
93-
"Helm Chart, please keep the chart updated.")
92+
def errorMessage = new StringBuilder("Generated CRD yaml does not match the staged " +
93+
"version in Helm Chart, please keep the chart updated.\n\n")
94+
95+
if (generatedAppCRD != stagedAppCRD) {
96+
errorMessage.append("=== SparkApplication CRD Differences ===\n")
97+
def appDiff = ["bash", "-c",
98+
"diff -u <(echo '${generatedAppCRD.replace("'", "'\\''")}' " +
99+
"| yq -P 'sort_keys(..)') <(echo '${stagedAppCRD.replace("'", "'\\''")}' " +
100+
"| yq -P 'sort_keys(..)')"]
101+
.execute().text
102+
errorMessage.append(appDiff ?: "Unable to generate diff\n")
103+
errorMessage.append("\n")
104+
}
105+
106+
if (generatedClusterCRD != stagedClusterCRD) {
107+
errorMessage.append("=== SparkCluster CRD Differences ===\n")
108+
def clusterDiff = ["bash", "-c",
109+
"diff -u <(echo '${generatedClusterCRD.replace("'", "'\\''")}' " +
110+
"| yq -P 'sort_keys(..)') <(echo '${stagedClusterCRD.replace("'", "'\\''")}' " +
111+
"| yq -P 'sort_keys(..)')"]
112+
.execute().text
113+
errorMessage.append(clusterDiff ?: "Unable to generate diff\n")
114+
}
115+
116+
throw new GradleException(errorMessage.toString())
94117
}
95118
}
96119
}

spark-operator-api/src/main/java/org/apache/spark/k8s/operator/spec/RestartConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
2323
import com.fasterxml.jackson.annotation.JsonInclude;
24+
import io.fabric8.generator.annotation.Default;
2425
import lombok.AllArgsConstructor;
2526
import lombok.Builder;
2627
import lombok.Data;
@@ -37,4 +38,8 @@ public class RestartConfig {
3738
@Builder.Default protected RestartPolicy restartPolicy = RestartPolicy.Never;
3839
@Builder.Default protected Long maxRestartAttempts = 3L;
3940
@Builder.Default protected Long restartBackoffMillis = 30000L;
41+
42+
@Default("-1")
43+
@Builder.Default
44+
protected Long restartCounterResetMillis = -1L;
4045
}

spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/ApplicationStatus.java

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121

2222
import static org.apache.spark.k8s.operator.Constants.EXCEED_MAX_RETRY_ATTEMPT_MESSAGE;
2323

24+
import java.time.Duration;
25+
import java.time.Instant;
26+
import java.util.ArrayList;
27+
import java.util.List;
2428
import java.util.Map;
2529
import java.util.TreeMap;
2630

@@ -115,7 +119,18 @@ public ApplicationStatus terminateOrRestart(
115119
currentAttemptSummary);
116120
}
117121

118-
if (currentAttemptSummary.getAttemptInfo().getId() >= restartConfig.getMaxRestartAttempts()) {
122+
boolean resetRestartCounter = false;
123+
if (restartConfig.getRestartCounterResetMillis() >= 0L) {
124+
resetRestartCounter =
125+
calculateCurrentAttemptDuration()
126+
.compareTo(Duration.ofMillis(restartConfig.getRestartCounterResetMillis()))
127+
>= 0;
128+
}
129+
130+
long effectiveAttemptId =
131+
resetRestartCounter ? 0L : currentAttemptSummary.getAttemptInfo().getRestartCounter();
132+
133+
if (effectiveAttemptId >= restartConfig.getMaxRestartAttempts()) {
119134
String stateMessage =
120135
String.format(EXCEED_MAX_RETRY_ATTEMPT_MESSAGE, restartConfig.getMaxRestartAttempts());
121136
if (stateMessageOverride != null && !stateMessageOverride.isEmpty()) {
@@ -138,7 +153,9 @@ public ApplicationStatus terminateOrRestart(
138153
currentAttemptSummary);
139154
}
140155

141-
AttemptInfo nextAttemptInfo = currentAttemptSummary.getAttemptInfo().createNextAttemptInfo();
156+
AttemptInfo nextAttemptInfo =
157+
currentAttemptSummary.getAttemptInfo().createNextAttemptInfo(resetRestartCounter);
158+
142159
ApplicationAttemptSummary nextAttemptSummary = new ApplicationAttemptSummary(nextAttemptInfo);
143160
ApplicationState state =
144161
new ApplicationState(ApplicationStateSummary.ScheduledToRestart, stateMessageOverride);
@@ -163,6 +180,44 @@ public ApplicationStatus terminateOrRestart(
163180
}
164181
}
165182

183+
/**
184+
* Finds the first state of the current application attempt.
185+
*
186+
* <p>This method traverses the state transition history in reverse order to find the most recent
187+
* initializing state (e.g., Submitted or ScheduledToRestart), which marks the beginning of the
188+
* current attempt. If no initializing state is found, it returns the first entry in the history.
189+
*
190+
* @return The ApplicationState representing the start of the current attempt.
191+
*/
192+
protected ApplicationState findFirstStateOfCurrentAttempt() {
193+
List<Map.Entry<Long, ApplicationState>> entries =
194+
new ArrayList<>(stateTransitionHistory.entrySet());
195+
for (int k = entries.size() - 1; k >= 0; k--) {
196+
Map.Entry<Long, ApplicationState> entry = entries.get(k);
197+
if (entry.getValue().getCurrentStateSummary().isInitializing()) {
198+
return entry.getValue();
199+
}
200+
}
201+
return entries.get(0).getValue();
202+
}
203+
204+
/**
205+
* Calculates the duration of the current application attempt.
206+
*
207+
* <p>The duration is calculated as the time between the first state of the current attempt (as
208+
* determined by {@link #findFirstStateOfCurrentAttempt()}) and the current state's last
209+
* transition time. This is particularly useful for determining whether the restart counter should
210+
* be reset based on the configured {@code restartCounterResetMillis}.
211+
*
212+
* @return A Duration representing the time elapsed since the start of the current attempt.
213+
*/
214+
protected Duration calculateCurrentAttemptDuration() {
215+
ApplicationState firstStateOfCurrentAttempt = findFirstStateOfCurrentAttempt();
216+
return Duration.between(
217+
Instant.parse(firstStateOfCurrentAttempt.getLastTransitionTime()),
218+
Instant.parse(currentState.getLastTransitionTime()));
219+
}
220+
166221
/**
167222
* Creates an ApplicationState indicating that the application is terminated without releasing
168223
* resources.

spark-operator-api/src/main/java/org/apache/spark/k8s/operator/status/AttemptInfo.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import lombok.EqualsAndHashCode;
2727
import lombok.Getter;
2828
import lombok.NoArgsConstructor;
29+
import lombok.Setter;
2930
import lombok.ToString;
3031

3132
/** Information about an attempt. */
@@ -38,13 +39,14 @@
3839
@JsonIgnoreProperties(ignoreUnknown = true)
3940
public class AttemptInfo {
4041
@Getter @Builder.Default protected final long id = 0L;
42+
@Getter @Setter protected long restartCounter;
4143

4244
/**
4345
* Creates a new AttemptInfo object representing the next attempt.
4446
*
4547
* @return A new AttemptInfo with an incremented ID.
4648
*/
47-
public AttemptInfo createNextAttemptInfo() {
48-
return new AttemptInfo(id + 1L);
49+
public AttemptInfo createNextAttemptInfo(boolean resetRestartCounter) {
50+
return new AttemptInfo(id + 1L, resetRestartCounter ? 1L : restartCounter + 1);
4951
}
5052
}

0 commit comments

Comments
 (0)