Skip to content

Commit e82b72a

Browse files
Poll the count of running step executions
1 parent 21555c0 commit e82b72a

File tree

8 files changed

+143
-42
lines changed

8 files changed

+143
-42
lines changed

spring-batch-core/src/main/java/org/springframework/batch/core/BatchStatus.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.batch.core;
1818

19+
import java.util.Set;
20+
1921
/**
2022
* Enumeration representing the status of an execution.
2123
*
@@ -71,6 +73,8 @@ public enum BatchStatus {
7173
*/
7274
UNKNOWN;
7375

76+
public static final Set<BatchStatus> RUNNING_STATUSES = Set.of(STARTING, STARTED, STOPPING);
77+
7478
/**
7579
* Convenience method to return the higher value status of the statuses passed to the
7680
* method.
@@ -87,7 +91,7 @@ public static BatchStatus max(BatchStatus status1, BatchStatus status2) {
8791
* @return true if the status is STARTING, STARTED, STOPPING
8892
*/
8993
public boolean isRunning() {
90-
return this == STARTING || this == STARTED || this == STOPPING;
94+
return RUNNING_STATUSES.contains(this);
9195
}
9296

9397
/**

spring-batch-core/src/main/java/org/springframework/batch/core/explore/JobExplorer.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818
import java.util.List;
1919
import java.util.Set;
2020

21-
import org.springframework.batch.core.JobExecution;
22-
import org.springframework.batch.core.JobInstance;
23-
import org.springframework.batch.core.JobParameters;
24-
import org.springframework.batch.core.StepExecution;
21+
import org.springframework.batch.core.*;
2522
import org.springframework.batch.core.launch.NoSuchJobException;
2623
import org.springframework.batch.item.ExecutionContext;
2724
import org.springframework.lang.Nullable;
@@ -87,6 +84,14 @@ default JobInstance getLastJobInstance(String jobName) {
8784
@Nullable
8885
StepExecution getStepExecution(@Nullable Long jobExecutionId, @Nullable Long stepExecutionId);
8986

87+
/**
88+
* Find {@link StepExecution}s by IDs and parent {@link JobExecution} ID
89+
* @param jobExecutionId given job execution id
90+
* @param stepExecutionIds given step execution ids
91+
* @return collection of {@link StepExecution}
92+
*/
93+
Set<StepExecution> getStepExecutions(Long jobExecutionId, Set<Long> stepExecutionIds);
94+
9095
/**
9196
* @param instanceId {@link Long} The ID for the {@link JobInstance} to obtain.
9297
* @return the {@code JobInstance} that has this ID, or {@code null} if not found.
@@ -170,4 +175,11 @@ default JobExecution getLastJobExecution(JobInstance jobInstance) {
170175
*/
171176
long getJobInstanceCount(@Nullable String jobName) throws NoSuchJobException;
172177

178+
/**
179+
* Retrieve number of step executions that match the step execution ids and the batch statuses
180+
* @param stepExecutionIds given step execution ids
181+
* @param matchingBatchStatuses given batch statuses to match against
182+
* @return number of {@link StepExecution} matching the criteria
183+
*/
184+
long getStepExecutionCount(Set<Long> stepExecutionIds, Set<BatchStatus> matchingBatchStatuses);
173185
}

spring-batch-core/src/main/java/org/springframework/batch/core/explore/support/SimpleJobExplorer.java

+23-5
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616

1717
package org.springframework.batch.core.explore.support;
1818

19-
import org.springframework.batch.core.JobExecution;
20-
import org.springframework.batch.core.JobInstance;
21-
import org.springframework.batch.core.JobParameters;
22-
import org.springframework.batch.core.StepExecution;
19+
import org.springframework.batch.core.*;
2320
import org.springframework.batch.core.explore.JobExplorer;
2421
import org.springframework.batch.core.launch.NoSuchJobException;
2522
import org.springframework.batch.core.repository.dao.ExecutionContextDao;
@@ -147,7 +144,20 @@ public StepExecution getStepExecution(@Nullable Long jobExecutionId, @Nullable L
147144
return stepExecution;
148145
}
149146

150-
@Nullable
147+
@Nullable
148+
@Override
149+
public Set<StepExecution> getStepExecutions(Long jobExecutionId, Set<Long> stepExecutionIds) {
150+
JobExecution jobExecution = jobExecutionDao.getJobExecution(jobExecutionId);
151+
if (jobExecution == null) {
152+
return null;
153+
}
154+
getJobExecutionDependencies(jobExecution);
155+
Set<StepExecution> stepExecutions = stepExecutionDao.getStepExecutions(jobExecution, stepExecutionIds);
156+
stepExecutions.forEach(this::getStepExecutionDependencies);
157+
return stepExecutions;
158+
}
159+
160+
@Nullable
151161
@Override
152162
public JobInstance getJobInstance(@Nullable Long instanceId) {
153163
return jobInstanceDao.getJobInstance(instanceId);
@@ -180,6 +190,14 @@ public long getJobInstanceCount(@Nullable String jobName) throws NoSuchJobExcept
180190
return jobInstanceDao.getJobInstanceCount(jobName);
181191
}
182192

193+
@Override
194+
public long getStepExecutionCount(Set<Long> stepExecutionIds, Set<BatchStatus> matchingBatchStatuses) {
195+
if (stepExecutionIds.isEmpty() || matchingBatchStatuses.isEmpty()) {
196+
return 0;
197+
}
198+
return stepExecutionDao.countStepExecutions(stepExecutionIds, matchingBatchStatuses);
199+
}
200+
183201
/**
184202
* @return instance of {@link JobInstanceDao}.
185203
* @since 5.1

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/AbstractJdbcBatchMetadataDao.java

+24
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package org.springframework.batch.core.repository.dao;
1818

1919
import java.sql.Types;
20+
import java.util.Collection;
21+
import java.util.Map;
22+
import java.util.stream.Collectors;
2023

2124
import org.springframework.beans.factory.InitializingBean;
2225
import org.springframework.jdbc.core.JdbcOperations;
@@ -51,6 +54,14 @@ protected String getQuery(String base) {
5154
return StringUtils.replace(base, "%PREFIX%", tablePrefix);
5255
}
5356

57+
protected String getQuery(String base, Map<String, Collection<?>> collectionParams) {
58+
String query = getQuery(base);
59+
for (Map.Entry<String, Collection<?>> collectionParam : collectionParams.entrySet()) {
60+
query = createParameterizedQuery(query, collectionParam.getKey(), collectionParam.getValue());
61+
}
62+
return query;
63+
}
64+
5465
protected String getTablePrefix() {
5566
return tablePrefix;
5667
}
@@ -85,4 +96,17 @@ public void afterPropertiesSet() throws Exception {
8596
Assert.state(jdbcTemplate != null, "JdbcOperations is required");
8697
}
8798

99+
/**
100+
* Replaces a given placeholder with a number of parameters (i.e. "?").
101+
*
102+
* @param sqlTemplate given sql template
103+
* @param placeholder placeholder that is being used for parameters
104+
* @param parameters collection of parameters with variable size
105+
*
106+
* @return sql query replaced with a number of parameters
107+
*/
108+
private static String createParameterizedQuery(String sqlTemplate, String placeholder, Collection<?> parameters) {
109+
String params = parameters.stream().map(p -> "?").collect(Collectors.joining(", "));
110+
return sqlTemplate.replace(placeholder, params);
111+
}
88112
}

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/JdbcStepExecutionDao.java

+34-1
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
import java.util.Collection;
2727
import java.util.Iterator;
2828
import java.util.List;
29+
import java.util.Map;
30+
import java.util.Set;
2931
import java.util.concurrent.locks.Lock;
3032
import java.util.concurrent.locks.ReentrantLock;
33+
import java.util.stream.Stream;
3134

3235
import org.apache.commons.logging.Log;
3336
import org.apache.commons.logging.LogFactory;
@@ -92,7 +95,16 @@ public class JdbcStepExecutionDao extends AbstractJdbcBatchMetadataDao implement
9295

9396
private static final String GET_STEP_EXECUTION = GET_RAW_STEP_EXECUTIONS + " AND STEP_EXECUTION_ID = ?";
9497

95-
private static final String GET_LAST_STEP_EXECUTION = """
98+
private static final String GET_STEP_EXECUTIONS_BY_IDS = GET_RAW_STEP_EXECUTIONS + " and STEP_EXECUTION_ID IN (%STEP_EXECUTION_IDS%)";
99+
100+
private static final String COUNT_STEP_EXECUTIONS_BY_IDS_AND_STATUSES = """
101+
SELECT COUNT(*)
102+
FROM %PREFIX%STEP_EXECUTION SE
103+
WHERE SE.STEP_EXECUTION_ID IN (%STEP_EXECUTION_IDS%)
104+
AND SE.STATUS IN (%STEP_STATUSES%)
105+
""";
106+
107+
private static final String GET_LAST_STEP_EXECUTION = """
96108
SELECT SE.STEP_EXECUTION_ID, SE.STEP_NAME, SE.START_TIME, SE.END_TIME, SE.STATUS, SE.COMMIT_COUNT, SE.READ_COUNT, SE.FILTER_COUNT, SE.WRITE_COUNT, SE.EXIT_CODE, SE.EXIT_MESSAGE, SE.READ_SKIP_COUNT, SE.WRITE_SKIP_COUNT, SE.PROCESS_SKIP_COUNT, SE.ROLLBACK_COUNT, SE.LAST_UPDATED, SE.VERSION, SE.CREATE_TIME, JE.JOB_EXECUTION_ID, JE.START_TIME, JE.END_TIME, JE.STATUS, JE.EXIT_CODE, JE.EXIT_MESSAGE, JE.CREATE_TIME, JE.LAST_UPDATED, JE.VERSION
97109
FROM %PREFIX%JOB_EXECUTION JE
98110
JOIN %PREFIX%STEP_EXECUTION SE ON SE.JOB_EXECUTION_ID = JE.JOB_EXECUTION_ID
@@ -334,6 +346,16 @@ public StepExecution getStepExecution(JobExecution jobExecution, Long stepExecut
334346
}
335347
}
336348

349+
@Override
350+
@Nullable
351+
public Set<StepExecution> getStepExecutions(JobExecution jobExecution, Set<Long> stepExecutionIds) {
352+
List<StepExecution> executions = getJdbcTemplate().query(
353+
getQuery(GET_STEP_EXECUTIONS_BY_IDS, Map.of("%STEP_EXECUTION_IDS%", stepExecutionIds)),
354+
new StepExecutionRowMapper(jobExecution),
355+
Stream.concat(Stream.of(jobExecution.getId()), stepExecutionIds.stream()).toArray(Object[]::new));
356+
return Set.copyOf(executions);
357+
}
358+
337359
@Override
338360
public StepExecution getLastStepExecution(JobInstance jobInstance, String stepName) {
339361
List<StepExecution> executions = getJdbcTemplate().query(getQuery(GET_LAST_STEP_EXECUTION), (rs, rowNum) -> {
@@ -368,6 +390,17 @@ public long countStepExecutions(JobInstance jobInstance, String stepName) {
368390
jobInstance.getInstanceId(), stepName);
369391
}
370392

393+
@Override
394+
public long countStepExecutions(Collection<Long> stepExecutionIds, Collection<BatchStatus> matchingBatchStatuses) {
395+
return getJdbcTemplate().queryForObject(
396+
getQuery(
397+
COUNT_STEP_EXECUTIONS_BY_IDS_AND_STATUSES,
398+
Map.of("%STEP_EXECUTION_IDS%", stepExecutionIds, "%STEP_STATUSES%", matchingBatchStatuses)
399+
),
400+
Long.class,
401+
Stream.concat(stepExecutionIds.stream(), matchingBatchStatuses.stream().map(BatchStatus::name)).toArray(Object[]::new));
402+
}
403+
371404
/**
372405
* Delete the given step execution.
373406
* @param stepExecution the step execution to delete

spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/StepExecutionDao.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package org.springframework.batch.core.repository.dao;
1818

1919
import java.util.Collection;
20+
import java.util.Set;
2021

22+
import org.springframework.batch.core.BatchStatus;
2123
import org.springframework.batch.core.JobExecution;
2224
import org.springframework.batch.core.JobInstance;
2325
import org.springframework.batch.core.StepExecution;
@@ -62,6 +64,14 @@ public interface StepExecutionDao {
6264
@Nullable
6365
StepExecution getStepExecution(JobExecution jobExecution, Long stepExecutionId);
6466

67+
/**
68+
* Get a collection of {@link StepExecution} matching job execution and step execution ids.
69+
* @param jobExecution the parent job execution
70+
* @param stepExecutionIds the step execution ids
71+
* @return collection of {@link StepExecution}
72+
*/
73+
Set<StepExecution> getStepExecutions(JobExecution jobExecution, Set<Long> stepExecutionIds);
74+
6575
/**
6676
* Retrieve the last {@link StepExecution} for a given {@link JobInstance} ordered by
6777
* creation time and then id.
@@ -91,7 +101,15 @@ default long countStepExecutions(JobInstance jobInstance, String stepName) {
91101
throw new UnsupportedOperationException();
92102
}
93103

94-
/**
104+
/**
105+
* Count {@link StepExecution} that match the ids and statuses of them - avoid loading them into memory
106+
* @param stepExecutionIds given step execution ids
107+
* @param matchingBatchStatuses
108+
* @return the count of matching steps
109+
*/
110+
long countStepExecutions(Collection<Long> stepExecutionIds, Collection<BatchStatus> matchingBatchStatuses);
111+
112+
/**
95113
* Delete the given step execution.
96114
* @param stepExecution the step execution to delete
97115
* @since 5.0

spring-batch-core/src/test/java/org/springframework/batch/core/launch/support/CommandLineJobRunnerTests.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -517,12 +517,6 @@ public JobInstance getJobInstance(@Nullable Long instanceId) {
517517
throw new UnsupportedOperationException();
518518
}
519519

520-
@Nullable
521-
@Override
522-
public JobInstance getJobInstance(String jobName, JobParameters jobParameters) {
523-
throw new UnsupportedOperationException();
524-
}
525-
526520
@Nullable
527521
@Override
528522
public JobInstance getLastJobInstance(String jobName) {
@@ -551,6 +545,11 @@ public StepExecution getStepExecution(@Nullable Long jobExecutionId, @Nullable L
551545
throw new UnsupportedOperationException();
552546
}
553547

548+
@Override
549+
public Set<StepExecution> getStepExecutions(Long jobExecutionId, Set<Long> stepExecutionIds) {
550+
throw new UnsupportedOperationException();
551+
}
552+
554553
@Override
555554
public List<String> getJobNames() {
556555
throw new UnsupportedOperationException();
@@ -579,6 +578,10 @@ public long getJobInstanceCount(@Nullable String jobName) throws NoSuchJobExcept
579578
}
580579
}
581580

581+
@Override
582+
public long getStepExecutionCount(Set<Long> stepExecutionIds, Set<BatchStatus> matchingBatchStatuses) {
583+
throw new UnsupportedOperationException();
584+
}
582585
}
583586

584587
public static class StubJobParametersConverter implements JobParametersConverter {

spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java

+13-24
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.commons.logging.Log;
2929
import org.apache.commons.logging.LogFactory;
3030

31-
import org.springframework.batch.core.JobExecution;
31+
import org.springframework.batch.core.BatchStatus;
3232
import org.springframework.batch.core.Step;
3333
import org.springframework.batch.core.StepExecution;
3434
import org.springframework.batch.core.explore.JobExplorer;
@@ -251,29 +251,18 @@ protected Set<StepExecution> doHandle(StepExecution managerStepExecution,
251251

252252
private Set<StepExecution> pollReplies(final StepExecution managerStepExecution, final Set<StepExecution> split)
253253
throws Exception {
254-
final Set<StepExecution> result = new HashSet<>(split.size());
255-
256-
Callable<Set<StepExecution>> callback = () -> {
257-
Set<Long> currentStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet());
258-
JobExecution jobExecution = jobExplorer.getJobExecution(managerStepExecution.getJobExecutionId());
259-
jobExecution.getStepExecutions()
260-
.stream()
261-
.filter(stepExecution -> currentStepExecutionIds.contains(stepExecution.getId()))
262-
.filter(stepExecution -> !result.contains(stepExecution))
263-
.filter(stepExecution -> !stepExecution.getStatus().isRunning())
264-
.forEach(result::add);
265-
266-
if (logger.isDebugEnabled()) {
267-
logger.debug(String.format("Currently waiting on %s partitions to finish", split.size()));
268-
}
269-
270-
if (result.size() == split.size()) {
271-
return result;
272-
}
273-
else {
274-
return null;
275-
}
276-
};
254+
Callable<Set<StepExecution>> callback = () -> {
255+
Set<Long> currentStepExecutionIds = split.stream().map(StepExecution::getId).collect(Collectors.toSet());
256+
long runningStepExecutions = jobExplorer.getStepExecutionCount(currentStepExecutionIds, BatchStatus.RUNNING_STATUSES);
257+
if(runningStepExecutions > 0 && !split.isEmpty()) {
258+
if(logger.isDebugEnabled()) {
259+
logger.debug(String.format("Currently waiting on %s out of %s partitions to finish", runningStepExecutions, split.size()));
260+
}
261+
return null;
262+
} else {
263+
return jobExplorer.getStepExecutions(managerStepExecution.getJobExecutionId(), currentStepExecutionIds);
264+
}
265+
};
277266

278267
Poller<Set<StepExecution>> poller = new DirectPoller<>(pollInterval);
279268
Future<Set<StepExecution>> resultsFuture = poller.poll(callback);

0 commit comments

Comments
 (0)