diff --git a/.github/workflows/tag-publish.yml b/.github/workflows/tag-publish.yml index 52650edb..1a0deddf 100644 --- a/.github/workflows/tag-publish.yml +++ b/.github/workflows/tag-publish.yml @@ -176,7 +176,7 @@ jobs: - name: Auth to GCP id: 'auth' - uses: google-github-actions/auth@v0 + uses: google-github-actions/auth@v2 with: token_format: 'access_token' workload_identity_provider: 'projects/1038484894585/locations/global/workloadIdentityPools/github-wi-pool/providers/github-wi-provider' @@ -184,7 +184,7 @@ jobs: # Install gcloud, `setup-gcloud` automatically picks up authentication from `auth`. - name: 'Set up Cloud SDK' - uses: 'google-github-actions/setup-gcloud@v0' + uses: google-github-actions/setup-gcloud@v2 - name: Explicitly auth Docker for Artifact Registry run: gcloud auth configure-docker $GOOGLE_DOCKER_REPOSITORY --quiet diff --git a/service/build.gradle b/service/build.gradle index 47ad118c..ab1df168 100644 --- a/service/build.gradle +++ b/service/build.gradle @@ -89,6 +89,7 @@ dependencies { // gcs implementation platform('com.google.cloud:libraries-bom:26.44.0') implementation 'com.google.cloud:google-cloud-storage' + implementation 'com.google.cloud:google-cloud-pubsub' liquibaseRuntime 'info.picocli:picocli:4.6.1' liquibaseRuntime 'org.postgresql:postgresql:42.6.1' diff --git a/service/src/main/java/bio/terra/pipelines/app/configuration/internal/NotificationConfiguration.java b/service/src/main/java/bio/terra/pipelines/app/configuration/internal/NotificationConfiguration.java new file mode 100644 index 00000000..32da2ce4 --- /dev/null +++ b/service/src/main/java/bio/terra/pipelines/app/configuration/internal/NotificationConfiguration.java @@ -0,0 +1,6 @@ +package bio.terra.pipelines.app.configuration.internal; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("pipelines.notifications") +public record NotificationConfiguration(String projectId, String topicId) {} diff --git a/service/src/main/java/bio/terra/pipelines/common/utils/FlightBeanBag.java b/service/src/main/java/bio/terra/pipelines/common/utils/FlightBeanBag.java index 163a88a2..d6d68c9a 100644 --- a/service/src/main/java/bio/terra/pipelines/common/utils/FlightBeanBag.java +++ b/service/src/main/java/bio/terra/pipelines/common/utils/FlightBeanBag.java @@ -9,6 +9,7 @@ import bio.terra.pipelines.dependencies.sam.SamService; import bio.terra.pipelines.dependencies.wds.WdsService; import bio.terra.pipelines.dependencies.workspacemanager.WorkspaceManagerService; +import bio.terra.pipelines.notifications.NotificationService; import bio.terra.pipelines.service.PipelineInputsOutputsService; import bio.terra.pipelines.service.PipelineRunsService; import bio.terra.pipelines.service.PipelinesService; @@ -38,6 +39,7 @@ public class FlightBeanBag { private final WorkspaceManagerService workspaceManagerService; private final RawlsService rawlsService; private final QuotasService quotasService; + private final NotificationService notificationService; private final ImputationConfiguration imputationConfiguration; private final CbasConfiguration cbasConfiguration; private final WdlPipelineConfiguration wdlPipelineConfiguration; @@ -54,6 +56,7 @@ public FlightBeanBag( CbasService cbasService, RawlsService rawlsService, QuotasService quotasService, + NotificationService notificationService, WorkspaceManagerService workspaceManagerService, ImputationConfiguration imputationConfiguration, CbasConfiguration cbasConfiguration, @@ -68,6 +71,7 @@ public FlightBeanBag( this.workspaceManagerService = workspaceManagerService; this.rawlsService = rawlsService; this.quotasService = quotasService; + this.notificationService = notificationService; this.imputationConfiguration = imputationConfiguration; this.cbasConfiguration = cbasConfiguration; this.wdlPipelineConfiguration = wdlPipelineConfiguration; diff --git a/service/src/main/java/bio/terra/pipelines/common/utils/StairwaySendFailedJobNotificationHook.java b/service/src/main/java/bio/terra/pipelines/common/utils/StairwaySendFailedJobNotificationHook.java new file mode 100644 index 00000000..14ccf7c6 --- /dev/null +++ b/service/src/main/java/bio/terra/pipelines/common/utils/StairwaySendFailedJobNotificationHook.java @@ -0,0 +1,56 @@ +package bio.terra.pipelines.common.utils; + +import static bio.terra.pipelines.common.utils.FlightUtils.flightMapKeyIsTrue; + +import bio.terra.pipelines.dependencies.stairway.JobMapKeys; +import bio.terra.pipelines.notifications.NotificationService; +import bio.terra.stairway.FlightContext; +import bio.terra.stairway.FlightMap; +import bio.terra.stairway.FlightStatus; +import bio.terra.stairway.HookAction; +import bio.terra.stairway.StairwayHook; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * A {@link StairwayHook} that sends a Job Failed Notification email via pubsub/Thurloe upon flight + * failure. + * + *
This hook action will only run if the flight's input parameters contain the JobMapKeys key for + * DO_SEND_JOB_FAILURE_NOTIFICATION_HOOK and the flight's status is not SUCCESS. + * + *
The JobMapKeys key for PIPELINE_NAME is required to send the notification.
+ */
+@Component
+public class StairwaySendFailedJobNotificationHook implements StairwayHook {
+ private final NotificationService notificationService;
+ private static final Logger logger =
+ LoggerFactory.getLogger(StairwaySendFailedJobNotificationHook.class);
+
+ public StairwaySendFailedJobNotificationHook(NotificationService notificationService) {
+ this.notificationService = notificationService;
+ }
+
+ @Override
+ public HookAction endFlight(FlightContext context) {
+
+ FlightMap inputParameters = context.getInputParameters();
+
+ if (flightMapKeyIsTrue(inputParameters, JobMapKeys.DO_SEND_JOB_FAILURE_NOTIFICATION_HOOK)
+ && context.getFlightStatus() != FlightStatus.SUCCESS) {
+ logger.info(
+ "Flight has status {}, sending failed job notification email", context.getFlightStatus());
+
+ FlightUtils.validateRequiredEntries(inputParameters, JobMapKeys.USER_ID);
+
+ UUID jobId = UUID.fromString(context.getFlightId());
+ String userId = inputParameters.get(JobMapKeys.USER_ID, String.class);
+
+ // send email notification
+ notificationService.configureAndSendPipelineRunFailedNotification(jobId, userId, context);
+ }
+ return HookAction.CONTINUE;
+ }
+}
diff --git a/service/src/main/java/bio/terra/pipelines/db/entities/PipelineRun.java b/service/src/main/java/bio/terra/pipelines/db/entities/PipelineRun.java
index c5513bf6..5aa49472 100644
--- a/service/src/main/java/bio/terra/pipelines/db/entities/PipelineRun.java
+++ b/service/src/main/java/bio/terra/pipelines/db/entities/PipelineRun.java
@@ -69,6 +69,9 @@ public class PipelineRun {
@Column(name = "description")
private String description;
+ @Column(name = "quota_consumed")
+ private Integer quotaConsumed;
+
/** Constructor for in progress or complete PipelineRun. */
public PipelineRun(
UUID jobId,
@@ -83,7 +86,8 @@ public PipelineRun(
Instant created,
Instant updated,
CommonPipelineRunStatusEnum status,
- String description) {
+ String description,
+ Integer quotaConsumed) {
this.jobId = jobId;
this.userId = userId;
this.pipelineId = pipelineId;
@@ -97,6 +101,7 @@ public PipelineRun(
this.updated = updated;
this.status = status;
this.description = description;
+ this.quotaConsumed = quotaConsumed;
}
/** Constructor for creating a new GCP pipeline run. Timestamps are auto-generated. */
diff --git a/service/src/main/java/bio/terra/pipelines/dependencies/stairway/JobMapKeys.java b/service/src/main/java/bio/terra/pipelines/dependencies/stairway/JobMapKeys.java
index 7ff217c6..7ee108b1 100644
--- a/service/src/main/java/bio/terra/pipelines/dependencies/stairway/JobMapKeys.java
+++ b/service/src/main/java/bio/terra/pipelines/dependencies/stairway/JobMapKeys.java
@@ -19,6 +19,8 @@ public class JobMapKeys {
"do_set_pipeline_run_status_failed_hook";
public static final String DO_INCREMENT_METRICS_FAILED_COUNTER_HOOK =
"do_increment_metrics_failed_counter_hook";
+ public static final String DO_SEND_JOB_FAILURE_NOTIFICATION_HOOK =
+ "do_send_job_failure_notification_hook";
JobMapKeys() {
throw new IllegalStateException("Attempted to instantiate utility class JobMapKeys");
diff --git a/service/src/main/java/bio/terra/pipelines/dependencies/stairway/JobService.java b/service/src/main/java/bio/terra/pipelines/dependencies/stairway/JobService.java
index 76ca289a..36a0d628 100644
--- a/service/src/main/java/bio/terra/pipelines/dependencies/stairway/JobService.java
+++ b/service/src/main/java/bio/terra/pipelines/dependencies/stairway/JobService.java
@@ -13,6 +13,7 @@
import bio.terra.pipelines.common.utils.FlightBeanBag;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.common.utils.StairwayFailedMetricsCounterHook;
+import bio.terra.pipelines.common.utils.StairwaySendFailedJobNotificationHook;
import bio.terra.pipelines.common.utils.StairwaySetPipelineRunStatusHook;
import bio.terra.pipelines.dependencies.stairway.exception.*;
import bio.terra.pipelines.dependencies.stairway.model.EnumeratedJob;
@@ -112,6 +113,8 @@ public void initialize() {
.addHook(new StairwayLoggingHook())
.addHook(new MonitoringHook(openTelemetry))
.addHook(new StairwayFailedMetricsCounterHook())
+ .addHook(
+ new StairwaySendFailedJobNotificationHook(flightBeanBag.getNotificationService()))
.addHook(new StairwaySetPipelineRunStatusHook(flightBeanBag.getPipelineRunsService()))
.exceptionSerializer(new StairwayExceptionSerializer(objectMapper)));
}
diff --git a/service/src/main/java/bio/terra/pipelines/notifications/BaseTeaspoonsJobNotification.java b/service/src/main/java/bio/terra/pipelines/notifications/BaseTeaspoonsJobNotification.java
new file mode 100644
index 00000000..f91bba07
--- /dev/null
+++ b/service/src/main/java/bio/terra/pipelines/notifications/BaseTeaspoonsJobNotification.java
@@ -0,0 +1,34 @@
+package bio.terra.pipelines.notifications;
+
+import lombok.Getter;
+
+/** Base class for Teaspoons job notifications. Contains common fields for all job notifications. */
+@Getter
+public abstract class BaseTeaspoonsJobNotification {
+ protected String notificationType;
+ protected String recipientUserId;
+ protected String pipelineDisplayName;
+ protected String jobId;
+ protected String timeSubmitted;
+ protected String timeCompleted;
+ protected String quotaRemaining;
+ protected String quotaConsumedByJob;
+ protected String userDescription;
+
+ protected BaseTeaspoonsJobNotification(
+ String recipientUserId,
+ String pipelineDisplayName,
+ String jobId,
+ String timeSubmitted,
+ String timeCompleted,
+ String quotaRemaining,
+ String userDescription) {
+ this.recipientUserId = recipientUserId;
+ this.pipelineDisplayName = pipelineDisplayName;
+ this.jobId = jobId;
+ this.timeSubmitted = timeSubmitted;
+ this.timeCompleted = timeCompleted;
+ this.quotaRemaining = quotaRemaining;
+ this.userDescription = userDescription;
+ }
+}
diff --git a/service/src/main/java/bio/terra/pipelines/notifications/NotificationService.java b/service/src/main/java/bio/terra/pipelines/notifications/NotificationService.java
new file mode 100644
index 00000000..26a8279f
--- /dev/null
+++ b/service/src/main/java/bio/terra/pipelines/notifications/NotificationService.java
@@ -0,0 +1,162 @@
+package bio.terra.pipelines.notifications;
+
+import static bio.terra.pipelines.app.controller.JobApiUtils.buildApiErrorReport;
+
+import bio.terra.pipelines.app.configuration.internal.NotificationConfiguration;
+import bio.terra.pipelines.db.entities.Pipeline;
+import bio.terra.pipelines.db.entities.PipelineRun;
+import bio.terra.pipelines.db.entities.UserQuota;
+import bio.terra.pipelines.generated.model.ApiErrorReport;
+import bio.terra.pipelines.service.PipelineRunsService;
+import bio.terra.pipelines.service.PipelinesService;
+import bio.terra.pipelines.service.QuotasService;
+import bio.terra.stairway.FlightContext;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Optional;
+import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service to encapsulate the logic for composing and sending email notifications to users about
+ * completed pipeline runs. Works with the Terra Thurloe service via PubSub messages.
+ */
+@Service
+public class NotificationService {
+ private static final Logger logger = LoggerFactory.getLogger(NotificationService.class);
+
+ private final PipelineRunsService pipelineRunsService;
+ private final PipelinesService pipelinesService;
+ private final QuotasService quotasService;
+ private final PubsubService pubsubService;
+ private final NotificationConfiguration notificationConfiguration;
+ private final ObjectMapper objectMapper;
+
+ public NotificationService(
+ PipelineRunsService pipelineRunsService,
+ PipelinesService pipelinesService,
+ QuotasService quotasService,
+ PubsubService pubsubService,
+ NotificationConfiguration notificationConfiguration,
+ ObjectMapper objectMapper) {
+ this.pipelineRunsService = pipelineRunsService;
+ this.pipelinesService = pipelinesService;
+ this.quotasService = quotasService;
+ this.pubsubService = pubsubService;
+ this.notificationConfiguration = notificationConfiguration;
+ this.objectMapper = objectMapper;
+ }
+
+ /**
+ * Pull together the common fields for a notification.
+ *
+ * @param jobId the job id
+ * @param userId the user id
+ * @param context the flight context (only needed for failed notifications)
+ * @param isSuccess whether the notification is for a succeeded job; if false, creates a failed
+ * notification
+ * @return the base notification object
+ */
+ private BaseTeaspoonsJobNotification createTeaspoonsJobNotification(
+ UUID jobId, String userId, FlightContext context, boolean isSuccess) {
+ PipelineRun pipelineRun = pipelineRunsService.getPipelineRun(jobId, userId);
+ Pipeline pipeline = pipelinesService.getPipelineById(pipelineRun.getPipelineId());
+ String pipelineDisplayName = pipeline.getDisplayName();
+
+ // if flight fails before quota steps on user's first run, there won't be a row for them yet
+ // in the quotas table
+ UserQuota userQuota =
+ quotasService.getOrCreateQuotaForUserAndPipeline(userId, pipeline.getName());
+ String quotaRemaining = String.valueOf(userQuota.getQuota() - userQuota.getQuotaConsumed());
+
+ if (isSuccess) { // succeeded
+ return new TeaspoonsJobSucceededNotification(
+ userId,
+ pipelineDisplayName,
+ jobId.toString(),
+ formatInstantToReadableString(pipelineRun.getCreated()),
+ formatInstantToReadableString(pipelineRun.getUpdated()),
+ pipelineRun.getQuotaConsumed().toString(),
+ quotaRemaining,
+ pipelineRun.getDescription());
+ } else { // failed
+ // get exception
+ Optional We expect this method to be called by the final step of a flight, at which point we assume
* that the pipeline_run has completed successfully. Therefore, we do not do any checks on the
@@ -296,12 +298,13 @@ public PipelineRun startPipelineRunInDb(UUID jobId, String userId) {
*/
@WriteTransaction
public PipelineRun markPipelineRunSuccessAndWriteOutputs(
- UUID jobId, String userId, Map This step expects the JobMapKeys.USER_ID in the input parameters and
+ * ImputationJobMapKeys.PIPELINE_RUN_OUTPUTS and ImputationJobMapKeys.EFFECTIVE_QUOTA_CONSUMED in
+ * the working map.
+ */
public class CompletePipelineRunStep implements Step {
private final PipelineRunsService pipelineRunsService;
private final Logger logger = LoggerFactory.getLogger(CompletePipelineRunStep.class);
@@ -21,6 +28,8 @@ public CompletePipelineRunStep(PipelineRunsService pipelineRunsService) {
}
@Override
+ @SuppressWarnings("java:S2259") // suppress warning for possible NPE when unboxing quotaConsumed,
+ // since we do validate that quotaConsumed is not null in `validateRequiredEntries`
public StepResult doStep(FlightContext flightContext) {
// validate and extract parameters from input map
var inputParameters = flightContext.getInputParameters();
@@ -31,11 +40,17 @@ public StepResult doStep(FlightContext flightContext) {
// validate and extract parameters from working map
var workingMap = flightContext.getWorkingMap();
- FlightUtils.validateRequiredEntries(workingMap, ImputationJobMapKeys.PIPELINE_RUN_OUTPUTS);
+ FlightUtils.validateRequiredEntries(
+ workingMap,
+ ImputationJobMapKeys.PIPELINE_RUN_OUTPUTS,
+ ImputationJobMapKeys.EFFECTIVE_QUOTA_CONSUMED);
Map This step expects JobMapKeys.USER_ID in the input parameters.
+ */
+public class SendJobSucceededNotificationStep implements Step {
+ private final NotificationService notificationService;
+ private final Logger logger = LoggerFactory.getLogger(SendJobSucceededNotificationStep.class);
+
+ public SendJobSucceededNotificationStep(NotificationService notificationService) {
+ this.notificationService = notificationService;
+ }
+
+ @Override
+ public StepResult doStep(FlightContext flightContext) {
+ // we place the entire logic of this step in a try-catch so that it cannot fail
+ try {
+ // validate and extract parameters from input map
+ var inputParameters = flightContext.getInputParameters();
+ FlightUtils.validateRequiredEntries(inputParameters, JobMapKeys.USER_ID);
+
+ UUID jobId = UUID.fromString(flightContext.getFlightId());
+ String userId = inputParameters.get(JobMapKeys.USER_ID, String.class);
+
+ // send email notification
+ notificationService.configureAndSendPipelineRunSucceededNotification(jobId, userId);
+ } catch (Exception e) {
+ logger.error("Failed to send email notification", e);
+ }
+ return StepResult.getStepResultSuccess();
+ }
+
+ @Override
+ public StepResult undoStep(FlightContext flightContext) {
+ // nothing to undo
+ return StepResult.getStepResultSuccess();
+ }
+}
diff --git a/service/src/main/resources/application.yml b/service/src/main/resources/application.yml
index 26f682fc..2bd58604 100644
--- a/service/src/main/resources/application.yml
+++ b/service/src/main/resources/application.yml
@@ -34,6 +34,9 @@ env:
imputation:
# the default currently points to the GCP dev workspace teaspoons-imputation-dev/teaspoons_imputation_dev_storage_workspace_20240726
storageWorkspaceStorageUrl: ${IMPUTATION_STORAGE_WORKSPACE_STORAGE_URL:gs://fc-secure-10efd4d7-392a-4e9e-89ea-d6629fbb06cc}
+ notifications:
+ projectId: ${NOTIFICATION_PROJECT_ID:broad-dsde-dev}
+ topicId: ${NOTIFICATION_TOPIC_ID:workbench-notifications-dev}
# Below here is non-deployment-specific
@@ -155,6 +158,10 @@ pipelines:
quotaConsumedPollingIntervalSeconds: 60
quotaConsumedUseCallCaching: false
+ notifications:
+ projectId: ${env.pipelines.notifications.projectId}
+ topicId: ${env.pipelines.notifications.topicId}
+
terra.common:
kubernetes:
in-kubernetes: ${env.kubernetes.in-kubernetes} # whether to use a pubsub queue for Stairway; if false, use a local queue
diff --git a/service/src/main/resources/db/changelog.xml b/service/src/main/resources/db/changelog.xml
index 0d3316ec..0e0a32e7 100644
--- a/service/src/main/resources/db/changelog.xml
+++ b/service/src/main/resources/db/changelog.xml
@@ -8,6 +8,7 @@
> pageResponse =
new PageResponse<>(
List.of(
@@ -757,7 +764,8 @@ private PipelineRun getPipelineRunPreparing(String description) {
createdTime,
updatedTime,
CommonPipelineRunStatusEnum.PREPARING,
- description);
+ description,
+ null);
}
/** helper method to create a PipelineRun object for a running job */
@@ -775,11 +783,16 @@ private PipelineRun getPipelineRunRunning() {
createdTime,
updatedTime,
CommonPipelineRunStatusEnum.RUNNING,
- TestUtils.TEST_PIPELINE_DESCRIPTION_1);
+ TestUtils.TEST_PIPELINE_DESCRIPTION_1,
+ null);
}
- /** helper method to create a PipelineRun object for a completed job. */
- private PipelineRun getPipelineRunWithStatus(CommonPipelineRunStatusEnum status) {
+ /**
+ * helper method to create a PipelineRun object for a completed job, specifying the status and
+ * quotaConsumed.
+ */
+ private PipelineRun getPipelineRunWithStatusAndQuotaConsumed(
+ CommonPipelineRunStatusEnum status, Integer quotaConsumed) {
return new PipelineRun(
newJobId,
testUser.getSubjectId(),
@@ -793,6 +806,7 @@ private PipelineRun getPipelineRunWithStatus(CommonPipelineRunStatusEnum status)
createdTime,
updatedTime,
status,
- TestUtils.TEST_PIPELINE_DESCRIPTION_1);
+ TestUtils.TEST_PIPELINE_DESCRIPTION_1,
+ quotaConsumed);
}
}
diff --git a/service/src/test/java/bio/terra/pipelines/notifications/NotificationServiceTest.java b/service/src/test/java/bio/terra/pipelines/notifications/NotificationServiceTest.java
new file mode 100644
index 00000000..5bd3d6ad
--- /dev/null
+++ b/service/src/test/java/bio/terra/pipelines/notifications/NotificationServiceTest.java
@@ -0,0 +1,308 @@
+package bio.terra.pipelines.notifications;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import bio.terra.pipelines.app.configuration.internal.NotificationConfiguration;
+import bio.terra.pipelines.common.utils.CommonPipelineRunStatusEnum;
+import bio.terra.pipelines.db.entities.Pipeline;
+import bio.terra.pipelines.db.entities.PipelineRun;
+import bio.terra.pipelines.db.entities.UserQuota;
+import bio.terra.pipelines.db.repositories.PipelineRunsRepository;
+import bio.terra.pipelines.dependencies.rawls.RawlsServiceApiException;
+import bio.terra.pipelines.service.PipelineRunsService;
+import bio.terra.pipelines.service.PipelinesService;
+import bio.terra.pipelines.service.QuotasService;
+import bio.terra.pipelines.testutils.BaseEmbeddedDbTest;
+import bio.terra.pipelines.testutils.TestUtils;
+import bio.terra.stairway.FlightContext;
+import bio.terra.stairway.StepResult;
+import bio.terra.stairway.StepStatus;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.UUID;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.mock.mockito.MockBean;
+
+class NotificationServiceTest extends BaseEmbeddedDbTest {
+ @InjectMocks @Autowired NotificationService notificationService;
+ @Autowired PipelineRunsService pipelineRunsService;
+ @Autowired PipelineRunsRepository pipelineRunsRepository;
+ @Autowired PipelinesService pipelinesService;
+ @Autowired QuotasService quotasService;
+ @Autowired NotificationConfiguration notificationConfiguration;
+ @Autowired ObjectMapper objectMapper;
+ @MockBean PubsubService pubsubService;
+ @Mock private FlightContext flightContext;
+
+ UUID testJobId = TestUtils.TEST_NEW_UUID;
+ String testUserId = TestUtils.TEST_USER_ID_1;
+ Integer testQuotaConsumedByJob = 1000;
+ String testUserDescription = TestUtils.TEST_USER_PROVIDED_DESCRIPTION;
+ String testErrorMessage = "test error message";
+
+ @Test
+ void formatDateTime() {
+ Instant instant = Instant.parse("2021-08-25T12:34:56.789Z");
+ String formattedDateTime = notificationService.formatInstantToReadableString(instant);
+ assertEquals("Wed, 25 Aug 2021 12:34:56 GMT", formattedDateTime);
+ }
+
+ @Test
+ void configureAndSendPipelineRunSucceededNotification() throws IOException {
+ Pipeline pipeline = pipelinesService.getPipelineById(1L);
+ PipelineRun writtenPipelineRun =
+ createCompletedPipelineRunInDb(pipeline, CommonPipelineRunStatusEnum.SUCCEEDED);
+
+ // initialize and set user quota
+ UserQuota userQuota =
+ quotasService.getOrCreateQuotaForUserAndPipeline(testUserId, pipeline.getName());
+ UserQuota updatedUserQuota =
+ quotasService.updateQuotaConsumed(userQuota, testQuotaConsumedByJob);
+ int expectedQuotaRemaining = updatedUserQuota.getQuota() - updatedUserQuota.getQuotaConsumed();
+
+ String stringifiedJobSucceededNotification =
+ objectMapper.writeValueAsString(
+ new TeaspoonsJobSucceededNotification(
+ testUserId,
+ pipeline.getDisplayName(),
+ testJobId.toString(),
+ notificationService.formatInstantToReadableString(writtenPipelineRun.getCreated()),
+ notificationService.formatInstantToReadableString(writtenPipelineRun.getUpdated()),
+ testQuotaConsumedByJob.toString(),
+ String.valueOf(expectedQuotaRemaining),
+ testUserDescription));
+ // success is a void method
+ doNothing()
+ .when(pubsubService)
+ .publishMessage(
+ notificationConfiguration.projectId(),
+ notificationConfiguration.topicId(),
+ stringifiedJobSucceededNotification);
+
+ notificationService.configureAndSendPipelineRunSucceededNotification(testJobId, testUserId);
+
+ // verify that the pubsub method was called
+ verify(pubsubService, times(1))
+ .publishMessage(
+ notificationConfiguration.projectId(),
+ notificationConfiguration.topicId(),
+ stringifiedJobSucceededNotification);
+ }
+
+ @Test
+ void configureAndSendPipelineRunSucceededNotificationIOException() throws IOException {
+ Pipeline pipeline = pipelinesService.getPipelineById(1L);
+ createCompletedPipelineRunInDb(pipeline, CommonPipelineRunStatusEnum.SUCCEEDED);
+
+ doThrow(new IOException()).when(pubsubService).publishMessage(any(), any(), any());
+
+ // exception should be caught
+ assertDoesNotThrow(
+ () ->
+ notificationService.configureAndSendPipelineRunSucceededNotification(
+ testJobId, testUserId));
+ }
+
+ @Test
+ void configureAndSendPipelineRunFailedNotification() throws IOException {
+ Pipeline pipeline = pipelinesService.getPipelineById(1L);
+ PipelineRun writtenPipelineRun =
+ createCompletedPipelineRunInDb(pipeline, CommonPipelineRunStatusEnum.FAILED);
+
+ // initialize and set a custom user quota value. this is not quota consumed by the job.
+ int customUserQuota = 2000;
+ UserQuota userQuota =
+ quotasService.getOrCreateQuotaForUserAndPipeline(testUserId, pipeline.getName());
+ UserQuota updatedUserQuota = quotasService.updateQuotaConsumed(userQuota, customUserQuota);
+ int expectedQuotaRemaining = updatedUserQuota.getQuota() - updatedUserQuota.getQuotaConsumed();
+
+ when(flightContext.getFlightId()).thenReturn(testJobId.toString());
+ RawlsServiceApiException rawlsServiceApiException =
+ new RawlsServiceApiException(testErrorMessage);
+ StepResult stepResultFailedWithException =
+ new StepResult(StepStatus.STEP_RESULT_FAILURE_FATAL, rawlsServiceApiException);
+ when(flightContext.getResult()).thenReturn(stepResultFailedWithException);
+
+ String stringifiedJobFailedNotification =
+ objectMapper.writeValueAsString(
+ new TeaspoonsJobFailedNotification(
+ testUserId,
+ pipeline.getDisplayName(),
+ testJobId.toString(),
+ testErrorMessage,
+ notificationService.formatInstantToReadableString(writtenPipelineRun.getCreated()),
+ notificationService.formatInstantToReadableString(writtenPipelineRun.getUpdated()),
+ String.valueOf(expectedQuotaRemaining),
+ testUserDescription));
+ // success is a void method
+ doNothing()
+ .when(pubsubService)
+ .publishMessage(
+ notificationConfiguration.projectId(),
+ notificationConfiguration.topicId(),
+ stringifiedJobFailedNotification);
+
+ notificationService.configureAndSendPipelineRunFailedNotification(
+ testJobId, testUserId, flightContext);
+
+ // verify that the pubsub method was called
+ verify(pubsubService, times(1))
+ .publishMessage(
+ notificationConfiguration.projectId(),
+ notificationConfiguration.topicId(),
+ stringifiedJobFailedNotification);
+ }
+
+ @Test
+ void configureAndSendPipelineRunFailedNotificationNoUserQuota() throws IOException {
+ Pipeline pipeline = pipelinesService.getPipelineById(1L);
+ PipelineRun writtenPipelineRun =
+ createCompletedPipelineRunInDb(pipeline, CommonPipelineRunStatusEnum.FAILED);
+
+ // don't initialize user in user_quota table
+ int expectedQuotaRemaining =
+ quotasService.getPipelineQuota(pipeline.getName()).getDefaultQuota();
+
+ when(flightContext.getFlightId()).thenReturn(testJobId.toString());
+ RawlsServiceApiException rawlsServiceApiException =
+ new RawlsServiceApiException(testErrorMessage);
+ StepResult stepResultFailedWithException =
+ new StepResult(StepStatus.STEP_RESULT_FAILURE_FATAL, rawlsServiceApiException);
+ when(flightContext.getResult()).thenReturn(stepResultFailedWithException);
+
+ String stringifiedJobFailedNotification =
+ objectMapper.writeValueAsString(
+ new TeaspoonsJobFailedNotification(
+ testUserId,
+ pipeline.getDisplayName(),
+ testJobId.toString(),
+ testErrorMessage,
+ notificationService.formatInstantToReadableString(writtenPipelineRun.getCreated()),
+ notificationService.formatInstantToReadableString(writtenPipelineRun.getUpdated()),
+ String.valueOf(expectedQuotaRemaining),
+ testUserDescription));
+ // success is a void method
+ doNothing()
+ .when(pubsubService)
+ .publishMessage(
+ notificationConfiguration.projectId(),
+ notificationConfiguration.topicId(),
+ stringifiedJobFailedNotification);
+
+ notificationService.configureAndSendPipelineRunFailedNotification(
+ testJobId, testUserId, flightContext);
+
+ // verify that the pubsub method was called
+ verify(pubsubService, times(1))
+ .publishMessage(
+ notificationConfiguration.projectId(),
+ notificationConfiguration.topicId(),
+ stringifiedJobFailedNotification);
+ }
+
+ @Test
+ void configureAndSendPipelineRunFailedNotificationWithoutException() throws IOException {
+ Pipeline pipeline = pipelinesService.getPipelineById(1L);
+ PipelineRun writtenPipelineRun =
+ createCompletedPipelineRunInDb(pipeline, CommonPipelineRunStatusEnum.FAILED);
+
+ // don't initialize user in user_quota table
+ int expectedQuotaRemaining =
+ quotasService.getPipelineQuota(pipeline.getName()).getDefaultQuota();
+
+ when(flightContext.getFlightId()).thenReturn(testJobId.toString());
+ StepResult stepResultFailedWithoutException =
+ new StepResult(StepStatus.STEP_RESULT_FAILURE_FATAL);
+ when(flightContext.getResult()).thenReturn(stepResultFailedWithoutException);
+
+ String stringifiedJobFailedNotification =
+ objectMapper.writeValueAsString(
+ new TeaspoonsJobFailedNotification(
+ testUserId,
+ pipeline.getDisplayName(),
+ testJobId.toString(),
+ "Unknown error",
+ notificationService.formatInstantToReadableString(writtenPipelineRun.getCreated()),
+ notificationService.formatInstantToReadableString(writtenPipelineRun.getUpdated()),
+ String.valueOf(expectedQuotaRemaining),
+ testUserDescription));
+ // success is a void method
+ doNothing()
+ .when(pubsubService)
+ .publishMessage(
+ notificationConfiguration.projectId(),
+ notificationConfiguration.topicId(),
+ stringifiedJobFailedNotification);
+
+ notificationService.configureAndSendPipelineRunFailedNotification(
+ testJobId, testUserId, flightContext);
+
+ // verify that the pubsub method was called
+ verify(pubsubService, times(1))
+ .publishMessage(
+ notificationConfiguration.projectId(),
+ notificationConfiguration.topicId(),
+ stringifiedJobFailedNotification);
+ }
+
+ @Test
+ void configureAndSendPipelineRunFailedNotificationIOException() throws IOException {
+ Pipeline pipeline = pipelinesService.getPipelineById(1L);
+ createCompletedPipelineRunInDb(pipeline, CommonPipelineRunStatusEnum.FAILED);
+
+ when(flightContext.getFlightId()).thenReturn(testJobId.toString());
+ RawlsServiceApiException rawlsServiceApiException =
+ new RawlsServiceApiException(testErrorMessage);
+ StepResult stepResultFailedWithException =
+ new StepResult(StepStatus.STEP_RESULT_FAILURE_FATAL, rawlsServiceApiException);
+ when(flightContext.getResult()).thenReturn(stepResultFailedWithException);
+
+ doThrow(new IOException()).when(pubsubService).publishMessage(any(), any(), any());
+
+ // exception should be caught
+ assertDoesNotThrow(
+ () ->
+ notificationService.configureAndSendPipelineRunFailedNotification(
+ testJobId, testUserId, flightContext));
+ }
+
+ /**
+ * Helper method for tests to create a completed pipeline run in the database.
+ *
+ * @param pipeline the pipeline to create the run for
+ * @param statusEnum the status of the pipeline run
+ * @return the completed pipeline run
+ */
+ private PipelineRun createCompletedPipelineRunInDb(
+ Pipeline pipeline, CommonPipelineRunStatusEnum statusEnum) {
+ PipelineRun completedPipelineRun =
+ new PipelineRun(
+ testJobId,
+ testUserId,
+ pipeline.getId(),
+ pipeline.getWdlMethodVersion(),
+ pipeline.getWorkspaceId(),
+ pipeline.getWorkspaceBillingProject(),
+ pipeline.getWorkspaceName(),
+ pipeline.getWorkspaceStorageContainerName(),
+ pipeline.getWorkspaceGoogleProject(),
+ null, // timestamps auto generated by db
+ null,
+ statusEnum,
+ testUserDescription,
+ testQuotaConsumedByJob);
+
+ return pipelineRunsRepository.save(completedPipelineRun);
+ }
+}
diff --git a/service/src/test/java/bio/terra/pipelines/notifications/PubsubServiceTest.java b/service/src/test/java/bio/terra/pipelines/notifications/PubsubServiceTest.java
new file mode 100644
index 00000000..42e05eb7
--- /dev/null
+++ b/service/src/test/java/bio/terra/pipelines/notifications/PubsubServiceTest.java
@@ -0,0 +1,44 @@
+package bio.terra.pipelines.notifications;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import bio.terra.pipelines.testutils.BaseEmbeddedDbTest;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.pubsub.v1.TopicName;
+import java.io.IOException;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.springframework.beans.factory.annotation.Autowired;
+
+class PubsubServiceTest extends BaseEmbeddedDbTest {
+ @Autowired PubsubService pubsubService;
+
+ @Test
+ void initPublisher() throws IOException {
+ TopicName topicName = TopicName.of("projectId", "topicId");
+ try (MockedStatic