Skip to content

Commit

Permalink
TSPS-140 Send email notifications for succeeded and failed jobs (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmorgantaylor authored Jan 3, 2025
1 parent 9567a35 commit aa748b5
Show file tree
Hide file tree
Showing 33 changed files with 1,209 additions and 20 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tag-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ jobs:

- name: Auth to GCP
id: 'auth'
uses: google-github-actions/auth@v0
uses: google-github-actions/auth@v2
with:
token_format: 'access_token'
workload_identity_provider: 'projects/1038484894585/locations/global/workloadIdentityPools/github-wi-pool/providers/github-wi-provider'
service_account: 'dsp-artifact-registry-push@dsp-artifact-registry.iam.gserviceaccount.com'

# Install gcloud, `setup-gcloud` automatically picks up authentication from `auth`.
- name: 'Set up Cloud SDK'
uses: 'google-github-actions/setup-gcloud@v0'
uses: google-github-actions/setup-gcloud@v2

- name: Explicitly auth Docker for Artifact Registry
run: gcloud auth configure-docker $GOOGLE_DOCKER_REPOSITORY --quiet
Expand Down
1 change: 1 addition & 0 deletions service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ dependencies {
// gcs
implementation platform('com.google.cloud:libraries-bom:26.44.0')
implementation 'com.google.cloud:google-cloud-storage'
implementation 'com.google.cloud:google-cloud-pubsub'

liquibaseRuntime 'info.picocli:picocli:4.6.1'
liquibaseRuntime 'org.postgresql:postgresql:42.6.1'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package bio.terra.pipelines.app.configuration.internal;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("pipelines.notifications")
public record NotificationConfiguration(String projectId, String topicId) {}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import bio.terra.pipelines.dependencies.sam.SamService;
import bio.terra.pipelines.dependencies.wds.WdsService;
import bio.terra.pipelines.dependencies.workspacemanager.WorkspaceManagerService;
import bio.terra.pipelines.notifications.NotificationService;
import bio.terra.pipelines.service.PipelineInputsOutputsService;
import bio.terra.pipelines.service.PipelineRunsService;
import bio.terra.pipelines.service.PipelinesService;
Expand Down Expand Up @@ -38,6 +39,7 @@ public class FlightBeanBag {
private final WorkspaceManagerService workspaceManagerService;
private final RawlsService rawlsService;
private final QuotasService quotasService;
private final NotificationService notificationService;
private final ImputationConfiguration imputationConfiguration;
private final CbasConfiguration cbasConfiguration;
private final WdlPipelineConfiguration wdlPipelineConfiguration;
Expand All @@ -54,6 +56,7 @@ public FlightBeanBag(
CbasService cbasService,
RawlsService rawlsService,
QuotasService quotasService,
NotificationService notificationService,
WorkspaceManagerService workspaceManagerService,
ImputationConfiguration imputationConfiguration,
CbasConfiguration cbasConfiguration,
Expand All @@ -68,6 +71,7 @@ public FlightBeanBag(
this.workspaceManagerService = workspaceManagerService;
this.rawlsService = rawlsService;
this.quotasService = quotasService;
this.notificationService = notificationService;
this.imputationConfiguration = imputationConfiguration;
this.cbasConfiguration = cbasConfiguration;
this.wdlPipelineConfiguration = wdlPipelineConfiguration;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package bio.terra.pipelines.common.utils;

import static bio.terra.pipelines.common.utils.FlightUtils.flightMapKeyIsTrue;

import bio.terra.pipelines.dependencies.stairway.JobMapKeys;
import bio.terra.pipelines.notifications.NotificationService;
import bio.terra.stairway.FlightContext;
import bio.terra.stairway.FlightMap;
import bio.terra.stairway.FlightStatus;
import bio.terra.stairway.HookAction;
import bio.terra.stairway.StairwayHook;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
* A {@link StairwayHook} that sends a Job Failed Notification email via pubsub/Thurloe upon flight
* failure.
*
* <p>This hook action will only run if the flight's input parameters contain the JobMapKeys key for
* DO_SEND_JOB_FAILURE_NOTIFICATION_HOOK and the flight's status is not SUCCESS.
*
* <p>The JobMapKeys key for PIPELINE_NAME is required to send the notification.
*/
@Component
public class StairwaySendFailedJobNotificationHook implements StairwayHook {
private final NotificationService notificationService;
private static final Logger logger =
LoggerFactory.getLogger(StairwaySendFailedJobNotificationHook.class);

public StairwaySendFailedJobNotificationHook(NotificationService notificationService) {
this.notificationService = notificationService;
}

@Override
public HookAction endFlight(FlightContext context) {

FlightMap inputParameters = context.getInputParameters();

if (flightMapKeyIsTrue(inputParameters, JobMapKeys.DO_SEND_JOB_FAILURE_NOTIFICATION_HOOK)
&& context.getFlightStatus() != FlightStatus.SUCCESS) {
logger.info(
"Flight has status {}, sending failed job notification email", context.getFlightStatus());

FlightUtils.validateRequiredEntries(inputParameters, JobMapKeys.USER_ID);

UUID jobId = UUID.fromString(context.getFlightId());
String userId = inputParameters.get(JobMapKeys.USER_ID, String.class);

// send email notification
notificationService.configureAndSendPipelineRunFailedNotification(jobId, userId, context);
}
return HookAction.CONTINUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public class PipelineRun {
@Column(name = "description")
private String description;

@Column(name = "quota_consumed")
private Integer quotaConsumed;

/** Constructor for in progress or complete PipelineRun. */
public PipelineRun(
UUID jobId,
Expand All @@ -83,7 +86,8 @@ public PipelineRun(
Instant created,
Instant updated,
CommonPipelineRunStatusEnum status,
String description) {
String description,
Integer quotaConsumed) {
this.jobId = jobId;
this.userId = userId;
this.pipelineId = pipelineId;
Expand All @@ -97,6 +101,7 @@ public PipelineRun(
this.updated = updated;
this.status = status;
this.description = description;
this.quotaConsumed = quotaConsumed;
}

/** Constructor for creating a new GCP pipeline run. Timestamps are auto-generated. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public class JobMapKeys {
"do_set_pipeline_run_status_failed_hook";
public static final String DO_INCREMENT_METRICS_FAILED_COUNTER_HOOK =
"do_increment_metrics_failed_counter_hook";
public static final String DO_SEND_JOB_FAILURE_NOTIFICATION_HOOK =
"do_send_job_failure_notification_hook";

JobMapKeys() {
throw new IllegalStateException("Attempted to instantiate utility class JobMapKeys");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import bio.terra.pipelines.common.utils.FlightBeanBag;
import bio.terra.pipelines.common.utils.PipelinesEnum;
import bio.terra.pipelines.common.utils.StairwayFailedMetricsCounterHook;
import bio.terra.pipelines.common.utils.StairwaySendFailedJobNotificationHook;
import bio.terra.pipelines.common.utils.StairwaySetPipelineRunStatusHook;
import bio.terra.pipelines.dependencies.stairway.exception.*;
import bio.terra.pipelines.dependencies.stairway.model.EnumeratedJob;
Expand Down Expand Up @@ -112,6 +113,8 @@ public void initialize() {
.addHook(new StairwayLoggingHook())
.addHook(new MonitoringHook(openTelemetry))
.addHook(new StairwayFailedMetricsCounterHook())
.addHook(
new StairwaySendFailedJobNotificationHook(flightBeanBag.getNotificationService()))
.addHook(new StairwaySetPipelineRunStatusHook(flightBeanBag.getPipelineRunsService()))
.exceptionSerializer(new StairwayExceptionSerializer(objectMapper)));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package bio.terra.pipelines.notifications;

import lombok.Getter;

/** Base class for Teaspoons job notifications. Contains common fields for all job notifications. */
@Getter
public abstract class BaseTeaspoonsJobNotification {
protected String notificationType;
protected String recipientUserId;
protected String pipelineDisplayName;
protected String jobId;
protected String timeSubmitted;
protected String timeCompleted;
protected String quotaRemaining;
protected String quotaConsumedByJob;
protected String userDescription;

protected BaseTeaspoonsJobNotification(
String recipientUserId,
String pipelineDisplayName,
String jobId,
String timeSubmitted,
String timeCompleted,
String quotaRemaining,
String userDescription) {
this.recipientUserId = recipientUserId;
this.pipelineDisplayName = pipelineDisplayName;
this.jobId = jobId;
this.timeSubmitted = timeSubmitted;
this.timeCompleted = timeCompleted;
this.quotaRemaining = quotaRemaining;
this.userDescription = userDescription;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package bio.terra.pipelines.notifications;

import static bio.terra.pipelines.app.controller.JobApiUtils.buildApiErrorReport;

import bio.terra.pipelines.app.configuration.internal.NotificationConfiguration;
import bio.terra.pipelines.db.entities.Pipeline;
import bio.terra.pipelines.db.entities.PipelineRun;
import bio.terra.pipelines.db.entities.UserQuota;
import bio.terra.pipelines.generated.model.ApiErrorReport;
import bio.terra.pipelines.service.PipelineRunsService;
import bio.terra.pipelines.service.PipelinesService;
import bio.terra.pipelines.service.QuotasService;
import bio.terra.stairway.FlightContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

/**
* Service to encapsulate the logic for composing and sending email notifications to users about
* completed pipeline runs. Works with the Terra Thurloe service via PubSub messages.
*/
@Service
public class NotificationService {
private static final Logger logger = LoggerFactory.getLogger(NotificationService.class);

private final PipelineRunsService pipelineRunsService;
private final PipelinesService pipelinesService;
private final QuotasService quotasService;
private final PubsubService pubsubService;
private final NotificationConfiguration notificationConfiguration;
private final ObjectMapper objectMapper;

public NotificationService(
PipelineRunsService pipelineRunsService,
PipelinesService pipelinesService,
QuotasService quotasService,
PubsubService pubsubService,
NotificationConfiguration notificationConfiguration,
ObjectMapper objectMapper) {
this.pipelineRunsService = pipelineRunsService;
this.pipelinesService = pipelinesService;
this.quotasService = quotasService;
this.pubsubService = pubsubService;
this.notificationConfiguration = notificationConfiguration;
this.objectMapper = objectMapper;
}

/**
* Pull together the common fields for a notification.
*
* @param jobId the job id
* @param userId the user id
* @param context the flight context (only needed for failed notifications)
* @param isSuccess whether the notification is for a succeeded job; if false, creates a failed
* notification
* @return the base notification object
*/
private BaseTeaspoonsJobNotification createTeaspoonsJobNotification(
UUID jobId, String userId, FlightContext context, boolean isSuccess) {
PipelineRun pipelineRun = pipelineRunsService.getPipelineRun(jobId, userId);
Pipeline pipeline = pipelinesService.getPipelineById(pipelineRun.getPipelineId());
String pipelineDisplayName = pipeline.getDisplayName();

// if flight fails before quota steps on user's first run, there won't be a row for them yet
// in the quotas table
UserQuota userQuota =
quotasService.getOrCreateQuotaForUserAndPipeline(userId, pipeline.getName());
String quotaRemaining = String.valueOf(userQuota.getQuota() - userQuota.getQuotaConsumed());

if (isSuccess) { // succeeded
return new TeaspoonsJobSucceededNotification(
userId,
pipelineDisplayName,
jobId.toString(),
formatInstantToReadableString(pipelineRun.getCreated()),
formatInstantToReadableString(pipelineRun.getUpdated()),
pipelineRun.getQuotaConsumed().toString(),
quotaRemaining,
pipelineRun.getDescription());
} else { // failed
// get exception
Optional<Exception> exception = context.getResult().getException();
String errorMessage;
if (exception.isPresent()) {
ApiErrorReport errorReport =
buildApiErrorReport(exception.get()); // use same logic that the status endpoint uses
errorMessage = errorReport.getMessage();
} else {
logger.error(
"No exception found in flight result for flight {} with status {}",
context.getFlightId(),
context.getFlightStatus());
errorMessage = "Unknown error";
}
return new TeaspoonsJobFailedNotification(
userId,
pipelineDisplayName,
jobId.toString(),
errorMessage,
formatInstantToReadableString(pipelineRun.getCreated()),
formatInstantToReadableString(pipelineRun.getUpdated()),
quotaRemaining,
pipelineRun.getDescription());
}
}

/**
* Format an Instant as a date time string in UTC using the RFC-1123 date-time formatter, such as
* 'Tue, 3 Jun 2008 11:05:30 GMT'.
*
* @param dateTime the Instant to format
* @return the formatted date time string
*/
protected String formatInstantToReadableString(Instant dateTime) {
return dateTime.atZone(ZoneId.of("UTC")).format(DateTimeFormatter.RFC_1123_DATE_TIME);
}

/**
* Configure and send a notification that a job has succeeded.
*
* @param jobId the job id
* @param userId the user id
*/
public void configureAndSendPipelineRunSucceededNotification(UUID jobId, String userId) {
try {
pubsubService.publishMessage(
notificationConfiguration.projectId(),
notificationConfiguration.topicId(),
objectMapper.writeValueAsString(
createTeaspoonsJobNotification(jobId, userId, null, true)));
} catch (IOException e) {
logger.error("Error sending pipelineRunSucceeded notification", e);
}
}

/**
* Configure and send a notification that a job has failed.
*
* @param jobId the job id
* @param userId the user id
* @param context the flight context
*/
public void configureAndSendPipelineRunFailedNotification(
UUID jobId, String userId, FlightContext context) {
try {
pubsubService.publishMessage(
notificationConfiguration.projectId(),
notificationConfiguration.topicId(),
objectMapper.writeValueAsString(
createTeaspoonsJobNotification(jobId, userId, context, false)));
} catch (IOException e) {
logger.error("Error sending pipelineRunFailed notification", e);
}
}
}
Loading

0 comments on commit aa748b5

Please sign in to comment.