Skip to content

Commit

Permalink
[DCJ-457] Stairway can be built with a provided ThreadPoolTaskExecutor (
Browse files Browse the repository at this point in the history
#149)

* [DCJ-457] Stairway can be built with a provided ThreadPoolTaskExecutor

This allows calling services to bring their own instrumented executor to serve as their Stairway's threadpool.

If unspecified in StairwayBuilder, a DefaultThreadPoolTaskExecutor will be constructed and used.

DefaultThreadPoolTaskExecutor is a public class: calling services may elect to instantiate it themselves (e.g. a Spring Boot service would register this as a Bean for automatic actuator instrumentation).

* Also assert that DefaultThreadPoolTaskExecutors are running

* Docstring references default maxParallelFlights directly

If this value were to change in the future, the javadoc rendered view would reflect the change.

* StairwayImpl constructor executor init follows existing convention

Mainly because my usage of Optional.ofNullable…orElse… constructs a new default executor every time, and throws it away if the builder has one set.  This is discouraged.

I decided to adhere with the existing convention for initializing instance variables from the StairwayBuilder object, which makes the constructor easier to follow.  If it's updated in the future with Optional handling, that would be acceptable as long as it's updated completely.

* Update mavenLocal development docs

This matches TDR's current build file -- the useMavenLocal flag makes it a little easier to leverage locally published artifacts on demand.
  • Loading branch information
okotsopoulos authored Jul 1, 2024
1 parent c5a9f98 commit c4e3b26
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 64 deletions.
33 changes: 33 additions & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,39 @@ match the values in the sql file above:
For folks working on Terra, the Stairway configuration is embedded within the component
configuration, so these steps are included in component developer setup.

## Local Publishing
When working on this library, it is often helpful to be able to quickly test out changes
in the context of a service repo (e.g. `terra-workspace-manager` or `terra-resource-buffer`)
running a local server.

Gradle makes this easy with a `mavenLocal` target for publishing and loading packages:

1. Publish from Stairway to your machine's local Maven cache.

```
./gradlew publishToMavenLocal
```

Your package will be in `~/.m2/repository`.
2. From the service repo, add `mavenLocal()` to the _first_ repository location
build.gradle file (e.g. before `mavenCentral()`).

```
# terra-workspace-manager/build.gradle
// If true, search local repository (~/.m2/repository/) first for dependencies.
def useMavenLocal = true
repositories {
if (useMavenLocal) {
mavenLocal() // must be listed first to take effect
}
mavenCentral()
...
```

That's it! Your service should pick up locally-published changes. If your changes involved bumping
this library's version, be careful to update version numbers accordingly.

## SourceClear

[SourceClear](https://srcclr.github.io) is a static analysis tool that scans a project's Java
Expand Down
1 change: 1 addition & 0 deletions stairway/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies {
implementation group: 'org.openapitools', name: 'jackson-databind-nullable', version: '0.2.6'

// Spring
implementation group: 'org.springframework', name: 'spring-context', version: '6.1.8'
implementation group: 'org.springframework', name: 'spring-web', version: '6.1.8'

// Annotations
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package bio.terra.stairway;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class DefaultThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

static final int DEFAULT_MAX_PARALLEL_FLIGHTS = 20;

/**
* An initialized {@link ThreadPoolTaskExecutor}.
*
* @param maxParallelFlights the desired pool size, honored if a positive integer, defaults to
* DEFAULT_MAX_PARALLEL_FLIGHTS otherwise.
*/
public DefaultThreadPoolTaskExecutor(Integer maxParallelFlights) {
super();
int poolSize = getPoolSize(maxParallelFlights);
super.setCorePoolSize(poolSize);
super.setMaxPoolSize(poolSize);
super.setKeepAliveSeconds(0);
super.setThreadNamePrefix("stairway-thread-");
super.initialize();
}

private int getPoolSize(Integer maxParallelFlights) {
if (maxParallelFlights == null || maxParallelFlights <= 0) {
return DEFAULT_MAX_PARALLEL_FLIGHTS;
}
return maxParallelFlights;
}
}
24 changes: 22 additions & 2 deletions stairway/src/main/java/bio/terra/stairway/StairwayBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* This builder class is the way to constructing a Stairway instance. For example,
Expand All @@ -21,6 +22,7 @@ public class StairwayBuilder {
private final List<StairwayHook> stairwayHooks = new ArrayList<>();
private Integer maxParallelFlights;
private Integer maxQueuedFlights;
private ThreadPoolTaskExecutor executor;
private Object applicationContext;
private ExceptionSerializer exceptionSerializer;
private String stairwayName;
Expand All @@ -29,8 +31,8 @@ public class StairwayBuilder {
private Duration completedFlightRetention;

/**
* Determines the size of the thread pool used for running Stairway flights. Default is
* DEFAULT_MAX_PARALLEL_FLIGHTS (20 at this moment)
* Determines the size of the thread pool used for running Stairway flights. Default is {@value
* DefaultThreadPoolTaskExecutor#DEFAULT_MAX_PARALLEL_FLIGHTS}.
*
* @param maxParallelFlights maximum parallel flights to run
* @return this
Expand Down Expand Up @@ -66,6 +68,24 @@ public Integer getMaxQueuedFlights() {
return maxQueuedFlights;
}

/**
* Allow a caller to provide their own {@link ThreadPoolTaskExecutor} to use as this Stairway's
* threadpool, e.g. as to supply an instrumented executor with custom metrics collection. Default
* is a new instance of {@link DefaultThreadPoolTaskExecutor}.
*
* @param executor to use as this Stairway's threadpool. If not supplied, one will be constructed
* to honor maxParallelFlights.
* @return this
*/
public StairwayBuilder executor(ThreadPoolTaskExecutor executor) {
this.executor = executor;
return this;
}

public ThreadPoolTaskExecutor getExecutor() {
return executor;
}

/**
* @param applicationContext application context passed along to Flight constructors. Default is
* null.
Expand Down
48 changes: 24 additions & 24 deletions stairway/src/main/java/bio/terra/stairway/impl/StairwayImpl.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bio.terra.stairway.impl;

import bio.terra.stairway.Control;
import bio.terra.stairway.DefaultThreadPoolTaskExecutor;
import bio.terra.stairway.ExceptionSerializer;
import bio.terra.stairway.Flight;
import bio.terra.stairway.FlightDebugInfo;
Expand All @@ -25,6 +26,7 @@
import jakarta.annotation.Nullable;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -34,14 +36,14 @@
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* StairwayImpl holds the implementation of the Stairway library. This class is not intended for
* direct use by clients.
*/
public class StairwayImpl implements Stairway {
private static final Logger logger = LoggerFactory.getLogger(StairwayImpl.class);
private static final int DEFAULT_MAX_PARALLEL_FLIGHTS = 20;
private static final int DEFAULT_MAX_QUEUED_FLIGHTS = 2;
private static final int MIN_QUEUED_FLIGHTS = 0;
private static final int SCHEDULED_POOL_CORE_THREADS = 5;
Expand All @@ -50,8 +52,8 @@ public class StairwayImpl implements Stairway {
private final Object applicationContext;
private final ExceptionSerializer exceptionSerializer;
private final String stairwayName; // always identical to stairwayId
private final int maxParallelFlights;
private final int maxQueuedFlights;
private final ThreadPoolTaskExecutor executor;
private final WorkQueueManager queueManager;
private final HookWrapper hookWrapper;
private final Duration retentionCheckInterval;
Expand Down Expand Up @@ -83,11 +85,6 @@ public class StairwayImpl implements Stairway {
* @throws StairwayExecutionException on invalid input
*/
public StairwayImpl(StairwayBuilder builder) throws StairwayExecutionException {
this.maxParallelFlights =
(builder.getMaxParallelFlights() == null)
? DEFAULT_MAX_PARALLEL_FLIGHTS
: builder.getMaxParallelFlights();

this.maxQueuedFlights =
(builder.getMaxQueuedFlights() == null)
? DEFAULT_MAX_QUEUED_FLIGHTS
Expand All @@ -105,6 +102,11 @@ public StairwayImpl(StairwayBuilder builder) throws StairwayExecutionException {
? "stairway" + UUID.randomUUID().toString()
: builder.getStairwayName();

this.executor =
(builder.getExecutor() == null)
? new DefaultThreadPoolTaskExecutor(builder.getMaxParallelFlights())
: builder.getExecutor();

this.queueManager = new WorkQueueManager(this, builder.getWorkQueue());

this.applicationContext = builder.getApplicationContext();
Expand Down Expand Up @@ -215,7 +217,7 @@ public void recoverStairway(String stairwayName) throws InterruptedException {
}

private void configureThreadPools() {
threadPool = new StairwayThreadPool(maxParallelFlights);
threadPool = new StairwayThreadPool(executor);

scheduledPool = new ScheduledThreadPoolExecutor(SCHEDULED_POOL_CORE_THREADS);
// If we have retention settings then set up the regular flight cleaner
Expand Down Expand Up @@ -250,9 +252,11 @@ public boolean quietDown(long waitTimeout, TimeUnit unit) {

quietingDown.set(true);
queueManager.shutdown(workQueueWaitSeconds);
threadPool.shutdown();

ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor();
threadPoolExecutor.shutdown();
try {
return threadPool.awaitTermination(threadPoolWaitSeconds, unit);
return threadPoolExecutor.awaitTermination(threadPoolWaitSeconds, unit);
} catch (InterruptedException ex) {
return false;
}
Expand All @@ -272,7 +276,8 @@ public boolean terminate(long waitTimeout, TimeUnit unit)
throws StairwayException, InterruptedException {
quietingDown.set(true);
queueManager.shutdownNow();
List<Runnable> neverStartedFlights = threadPool.shutdownNow();
ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor();
List<Runnable> neverStartedFlights = threadPoolExecutor.shutdownNow();
for (Runnable flightRunnable : neverStartedFlights) {
FlightRunner flightRunner = (FlightRunner) flightRunnable;
FlightContextImpl flightContext = flightRunner.getFlightContext();
Expand All @@ -286,7 +291,7 @@ public boolean terminate(long waitTimeout, TimeUnit unit)
logger.warn("Unable to requeue never-started flight: " + flightDesc, ex);
}
}
return threadPool.awaitTermination(waitTimeout, unit);
return threadPoolExecutor.awaitTermination(waitTimeout, unit);
}

/**
Expand Down Expand Up @@ -404,15 +409,15 @@ public void submitWithDebugInfo(
public boolean spaceAvailable() {
logger.debug(
"Space available? active: "
+ threadPool.getActiveFlights()
+ executor.getActiveCount()
+ " of max: "
+ maxParallelFlights
+ executor.getMaxPoolSize()
+ " queueSize: "
+ threadPool.getQueuedFlights()
+ executor.getQueueSize()
+ " of max: "
+ maxQueuedFlights);
return ((threadPool.getActiveFlights() < maxParallelFlights)
|| (threadPool.getQueuedFlights() < maxQueuedFlights));
return ((executor.getActiveCount() < executor.getMaxPoolSize())
|| (executor.getQueueSize() < maxQueuedFlights));
}

/**
Expand Down Expand Up @@ -579,11 +584,6 @@ FlightDao getFlightDao() {
return flightDao;
}

// Exposed for work queue listener
ThreadPoolExecutor getThreadPool() {
return threadPool;
}

void exitFlight(FlightContextImpl context)
throws StairwayException,
DatabaseOperationException,
Expand Down Expand Up @@ -661,9 +661,9 @@ private void launchFlight(FlightContextImpl flightContext) {
if (logger.isDebugEnabled()) {
logger.debug(
"Stairway thread pool: "
+ threadPool.getActiveCount()
+ executor.getActiveCount()
+ " active from pool of "
+ threadPool.getPoolSize());
+ executor.getPoolSize());
}
logger.info("Launching flight " + flightContext.flightDesc());
threadPool.submitWithMdcAndFlightContext(runner, flightContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,15 @@
import bio.terra.stairway.FlightContext;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

class StairwayThreadPool extends ThreadPoolExecutor {
private static final Logger logger = LoggerFactory.getLogger(StairwayThreadPool.class);
AtomicInteger activeTasks;
class StairwayThreadPool {

StairwayThreadPool(int maxParallelFlights) {
super(
maxParallelFlights,
maxParallelFlights,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
activeTasks = new AtomicInteger();
}

int getActiveFlights() {
return activeTasks.get();
}
private final ThreadPoolTaskExecutor executor;

int getQueuedFlights() {
return getQueue().size();
StairwayThreadPool(ThreadPoolTaskExecutor executor) {
this.executor = executor;
}

/**
Expand All @@ -55,7 +36,7 @@ protected Future<?> submitWithMdcAndFlightContext(
MdcUtils.overwriteContext(initialContext);
}
};
return super.submit(flightRunnerWithMdc);
return executor.submit(flightRunnerWithMdc);
}

private void initializeFlightMdc(
Expand All @@ -67,16 +48,4 @@ private void initializeFlightMdc(
MdcUtils.addFlightContextToMdc(flightContext);
MdcUtils.removeStepContextFromMdc(flightContext);
}

protected void beforeExecute(Thread t, Runnable r) {
int active = activeTasks.incrementAndGet();
logger.debug("before: " + active);
super.beforeExecute(t, r);
}

protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
int active = activeTasks.decrementAndGet();
logger.debug("after: " + active);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package bio.terra.stairway;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.stream.Stream;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@Tag("unit")
class DefaultThreadPoolTaskExecutorTest {

private static Stream<Arguments> defaultThreadPoolTaskExecutor() {
return Stream.of(
Arguments.of(null, DefaultThreadPoolTaskExecutor.DEFAULT_MAX_PARALLEL_FLIGHTS),
Arguments.of(-1, DefaultThreadPoolTaskExecutor.DEFAULT_MAX_PARALLEL_FLIGHTS),
Arguments.of(0, DefaultThreadPoolTaskExecutor.DEFAULT_MAX_PARALLEL_FLIGHTS),
Arguments.of(1, 1),
Arguments.of(50, 50));
}

@ParameterizedTest
@MethodSource
void defaultThreadPoolTaskExecutor(Integer maxParallelFlights, Integer expectedPoolSize) {
var executor = new DefaultThreadPoolTaskExecutor(maxParallelFlights);
assertThat(executor.getCorePoolSize(), equalTo(expectedPoolSize));
assertThat(executor.getMaxPoolSize(), equalTo(expectedPoolSize));
assertThat(executor.getKeepAliveSeconds(), equalTo(0));
assertThat(executor.getThreadNamePrefix(), equalTo("stairway-thread-"));
assertTrue(executor.isRunning());
}
}
Loading

0 comments on commit c4e3b26

Please sign in to comment.