Skip to content

Commit

Permalink
Merge pull request #561 from gbif/pipelines-client-update
Browse files Browse the repository at this point in the history
Pipelines client update
  • Loading branch information
fmendezh authored Apr 24, 2024
2 parents d776ca6 + 4ea43f3 commit 83ddbf1
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 232 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.gbif.registry.cli.datasetindex.batchindexer.DatasetBatchIndexer;
import org.gbif.registry.cli.datasetindex.batchindexer.DatasetBatchIndexerConfiguration;
import org.gbif.registry.cli.datasetindex.indexupdater.DatasetIndexUpdaterConfiguration;
import org.gbif.registry.pipelines.issues.GithubApiClient;
import org.gbif.registry.search.dataset.indexing.DatasetJsonConverter;
import org.gbif.registry.search.dataset.indexing.EsDatasetRealtimeIndexer;
import org.gbif.registry.search.dataset.indexing.checklistbank.ChecklistbankPersistenceServiceImpl;
Expand Down Expand Up @@ -231,6 +232,11 @@ public DatasetService datasetService(ClientBuilder clientBuilder) {
public NetworkService networkService(ClientBuilder clientBuilder) {
return clientBuilder.build(NetworkClient.class);
}

@Bean
public GithubApiClient githubApiClient(ClientBuilder clientBuilder){
return clientBuilder.build(GithubApiClient.class);
}
}

private static class CustomDateDeserializer extends DateDeserializers.DateDeserializer {
Expand Down
10 changes: 0 additions & 10 deletions registry-pipelines/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,6 @@
<groupId>org.gbif.occurrence</groupId>
<artifactId>occurrence-ws-client</artifactId>
</dependency>

<!-- okhttp client -->
<dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>retrofit</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.retrofit2</groupId>
<artifactId>converter-jackson</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,33 @@
*/
package org.gbif.registry.pipelines;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import freemarker.template.TemplateException;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import java.util.stream.Collectors;
import org.gbif.api.model.common.paging.Pageable;
import org.gbif.api.model.common.paging.PagingResponse;
import org.gbif.api.model.pipelines.*;
Expand All @@ -39,7 +66,10 @@
import org.gbif.registry.mail.EmailSender;
import org.gbif.registry.mail.pipelines.PipelinesEmailManager;
import org.gbif.registry.persistence.mapper.pipelines.PipelineProcessMapper;
import org.gbif.registry.pipelines.issues.GithubApiService;
import org.gbif.registry.pipelines.issues.GithubApiClient;
import org.gbif.registry.pipelines.issues.GithubApiClient.Issue;
import org.gbif.registry.pipelines.issues.GithubApiClient.IssueComment;
import org.gbif.registry.pipelines.issues.GithubApiClient.IssueResult;
import org.gbif.registry.pipelines.issues.IssueCreator;
import org.gbif.registry.pipelines.util.PredicateUtils;

Expand Down Expand Up @@ -81,13 +111,8 @@
import com.google.common.collect.Ordering;

import freemarker.template.TemplateException;
import retrofit2.Call;
import retrofit2.HttpException;
import retrofit2.Response;

import static org.gbif.registry.pipelines.issues.GithubApiService.Issue;
import static org.gbif.registry.pipelines.issues.GithubApiService.IssueComment;
import static org.gbif.registry.pipelines.issues.GithubApiService.IssueResult;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;

/** Service that allows to re-run pipeline steps on a specific attempt. */
@Service
Expand Down Expand Up @@ -119,8 +144,9 @@ public class DefaultRegistryPipelinesHistoryTrackingService
private final ExecutorService executorService;
private final EmailSender emailSender;
private final PipelinesEmailManager pipelinesEmailManager;
private final GithubApiService githubApiService;
private final GithubApiClient githubApiClient;
private final IssueCreator issueCreator;
private static final String DATASET_KEY_CANNOT_BE_NULL = "DatasetKey can't be null";

public DefaultRegistryPipelinesHistoryTrackingService(
@Qualifier("registryObjectMapper") ObjectMapper objectMapper,
Expand All @@ -129,7 +155,7 @@ public DefaultRegistryPipelinesHistoryTrackingService(
@Lazy DatasetService datasetService,
@Autowired EmailSender emailSender,
@Autowired PipelinesEmailManager pipelinesEmailManager,
@Autowired GithubApiService githubApiService,
@Autowired GithubApiClient githubApiClient,
@Autowired IssueCreator issueCreator,
@Value("${pipelines.doAllThreads}") Integer threadPoolSize) {
this.objectMapper = objectMapper;
Expand All @@ -138,7 +164,7 @@ public DefaultRegistryPipelinesHistoryTrackingService(
this.datasetService = datasetService;
this.emailSender = emailSender;
this.pipelinesEmailManager = pipelinesEmailManager;
this.githubApiService = githubApiService;
this.githubApiClient = githubApiClient;
this.issueCreator = issueCreator;
this.executorService =
Optional.ofNullable(threadPoolSize)
Expand Down Expand Up @@ -341,7 +367,7 @@ public PagingResponse<PipelineProcess> history(Pageable pageable) {

@Override
public PagingResponse<PipelineProcess> history(UUID datasetKey, Pageable pageable) {
Objects.requireNonNull(datasetKey, "DatasetKey can't be null");
Objects.requireNonNull(datasetKey, DATASET_KEY_CANNOT_BE_NULL);

long count = mapper.count(datasetKey, null);
List<PipelineProcess> statuses = mapper.list(datasetKey, null, pageable);
Expand All @@ -363,7 +389,7 @@ public RunPipelineResponse runPipelineAttempt(
String prefix,
boolean markPreviousAttemptAsFailed,
Set<String> interpretTypes) {
Objects.requireNonNull(datasetKey, "DatasetKey can't be null");
Objects.requireNonNull(datasetKey, DATASET_KEY_CANNOT_BE_NULL);
Objects.requireNonNull(steps, "Steps can't be null");
Objects.requireNonNull(reason, "Reason can't be null");
Objects.requireNonNull(publisher, "No message publisher configured");
Expand Down Expand Up @@ -392,41 +418,7 @@ public RunPipelineResponse runPipelineAttempt(

Map<StepType, PipelineBasedMessage> stepsToSend = new EnumMap<>(StepType.class);
for (StepType stepName : prioritizeSteps(steps, dataset)) {
Optional<PipelineStep> latestStepOpt = getLatestSuccessfulStep(process, stepName);

if (!latestStepOpt.isPresent()) {
LOG.warn("Can't find latest successful step for the datasetKey {}", datasetKey);
continue;
}

PipelineStep step = latestStepOpt.get();
try {
PipelineBasedMessage message = null;

if (stepName == StepType.INTERPRETED_TO_INDEX
|| stepName == StepType.HDFS_VIEW
|| stepName == StepType.FRAGMENTER) {
message = createInterpretedMessage(prefix, step.getMessage(), stepName);
} else if (stepName == StepType.VERBATIM_TO_INTERPRETED) {
message = createVerbatimMessage(prefix, step.getMessage(), interpretTypes);
} else if (stepName == StepType.DWCA_TO_VERBATIM) {
message = createMessage(step.getMessage(), PipelinesDwcaMessage.class);
} else if (stepName == StepType.ABCD_TO_VERBATIM) {
message = createMessage(step.getMessage(), PipelinesAbcdMessage.class);
} else if (stepName == StepType.XML_TO_VERBATIM) {
message = createMessage(step.getMessage(), PipelinesXmlMessage.class);
} else if (stepName == StepType.EVENTS_VERBATIM_TO_INTERPRETED) {
message = createMessage(step.getMessage(), PipelinesEventsMessage.class);
} else if (stepName == StepType.EVENTS_INTERPRETED_TO_INDEX) {
message = createMessage(step.getMessage(), PipelinesEventsInterpretedMessage.class);
}

if (message != null) {
stepsToSend.put(stepName, message);
}
} catch (IOException ex) {
LOG.warn("Error reading message", ex);
}
processStep(stepName, process, prefix, interpretTypes, stepsToSend);
}

if (stepsToSend.isEmpty()) {
Expand Down Expand Up @@ -480,6 +472,45 @@ public RunPipelineResponse runPipelineAttempt(
return responseBuilder.build();
}

private void processStep(StepType stepName, PipelineProcess process, String prefix,
Set<String> interpretTypes, Map<StepType, PipelineBasedMessage> stepsToSend){
Optional<PipelineStep> latestStepOpt = getLatestSuccessfulStep(process, stepName);

if (!latestStepOpt.isPresent()) {
LOG.warn("Can't find latest successful step for the datasetKey {}", process.getDatasetKey());
return;
}

PipelineStep step = latestStepOpt.get();
try {
PipelineBasedMessage message = null;

if (stepName == StepType.INTERPRETED_TO_INDEX
|| stepName == StepType.HDFS_VIEW
|| stepName == StepType.FRAGMENTER) {
message = createInterpretedMessage(prefix, step.getMessage(), stepName);
} else if (stepName == StepType.VERBATIM_TO_INTERPRETED) {
message = createVerbatimMessage(prefix, step.getMessage(), interpretTypes);
} else if (stepName == StepType.DWCA_TO_VERBATIM) {
message = createMessage(step.getMessage(), PipelinesDwcaMessage.class);
} else if (stepName == StepType.ABCD_TO_VERBATIM) {
message = createMessage(step.getMessage(), PipelinesAbcdMessage.class);
} else if (stepName == StepType.XML_TO_VERBATIM) {
message = createMessage(step.getMessage(), PipelinesXmlMessage.class);
} else if (stepName == StepType.EVENTS_VERBATIM_TO_INTERPRETED) {
message = createMessage(step.getMessage(), PipelinesEventsMessage.class);
} else if (stepName == StepType.EVENTS_INTERPRETED_TO_INDEX) {
message = createMessage(step.getMessage(), PipelinesEventsInterpretedMessage.class);
}

if (message != null) {
stepsToSend.put(stepName, message);
}
} catch (IOException ex) {
LOG.warn("Error reading message", ex);
}
}

@VisibleForTesting
protected Set<StepType> getStepTypes(Set<StepType> stepsToSend) {
Set<StepType> finalSteps =
Expand Down Expand Up @@ -542,7 +573,7 @@ private PipelineBasedMessage createInterpretedMessage(

@Override
public PipelineProcess get(UUID datasetKey, int attempt) {
Objects.requireNonNull(datasetKey, "DatasetKey can't be null");
Objects.requireNonNull(datasetKey, DATASET_KEY_CANNOT_BE_NULL);

PipelineProcess process = mapper.getByDatasetAndAttempt(datasetKey, attempt);

Expand All @@ -564,7 +595,7 @@ public PagingResponse<PipelineProcess> getRunningPipelineProcess(Pageable pageab

@Override
public long createOrGet(UUID datasetKey, int attempt, String creator) {
Objects.requireNonNull(datasetKey, "DatasetKey can't be null");
Objects.requireNonNull(datasetKey, DATASET_KEY_CANNOT_BE_NULL);
Objects.requireNonNull(creator, "Creator can't be null");

PipelineProcess pipelineProcess = new PipelineProcess();
Expand Down Expand Up @@ -715,46 +746,50 @@ public void sendAbsentIndentifiersEmail(UUID datasetKey, int attempt, String mes
@Override
public void notifyAbsentIdentifiers(
UUID datasetKey, int attempt, long executionKey, String message) {
// check if there is an open issue for this dataset
List<IssueResult> existingIssues =
syncCall(
githubApiService.listIssues(
Collections.singletonList(datasetKey.toString()), "open", 1, 1));

if (existingIssues.isEmpty()) {
LOG.info(
"Creating absent identifiers GH issue, datasetKey {}, attmept {}, message: {}",
datasetKey,
attempt,
message);

// create new one
Issue issue =
issueCreator.createIdsValidationFailedIssue(datasetKey, attempt, executionKey, message);
syncCall(githubApiService.createIssue(issue));
} else {
IssueResult existing = existingIssues.get(0);
try {
// check if there is an open issue for this dataset
List<IssueResult> existingIssues =
githubApiClient.listIssues(
Collections.singletonList(datasetKey.toString()), "open", 1, 1);

if (existingIssues.isEmpty()) {
LOG.info(
"Creating absent identifiers GH issue, datasetKey {}, attmept {}, message: {}",
datasetKey,
attempt,
message);

LOG.info(
"Updating absent identifiers GH issue with number {}, datasetKey {}, attmept {}, message: {}",
existing.getNumber(),
datasetKey,
attempt,
message);
// create new one
Issue issue =
issueCreator.createIdsValidationFailedIssue(datasetKey, attempt, executionKey, message);
githubApiClient.createIssue(issue);
} else {
IssueResult existing = existingIssues.get(0);

// add comment with new cause
IssueComment issueComment =
issueCreator.createIdsValidationFailedIssueComment(
datasetKey, attempt, executionKey, message);
syncCall(githubApiService.addIssueComment(existing.getNumber(), issueComment));

// add labels to the existing issue
syncCall(
githubApiService.updateIssueLabels(
existing.getNumber(),
GithubApiService.IssueLabels.builder()
.labels(issueCreator.updateLabels(existing, datasetKey, attempt))
.build()));
LOG.info(
"Updating absent identifiers GH issue with number {}, datasetKey {}, attmept {}, message: {}",
existing.getNumber(),
datasetKey,
attempt,
message);

// add comment with new cause
IssueComment issueComment =
issueCreator.createIdsValidationFailedIssueComment(
datasetKey, attempt, executionKey, message);
githubApiClient.addIssueComment(existing.getNumber(), issueComment);

// add labels to the existing issue
githubApiClient.updateIssueLabels(
existing.getNumber(),
GithubApiClient.IssueLabels.builder()
.labels(issueCreator.updateLabels(existing, datasetKey, attempt))
.build());
}
} catch (HttpClientErrorException | HttpServerErrorException e) {
LOG.error("Error occurred calling GitHub API: {}", e.getMessage(), e);
} catch (Exception e) {
LOG.error("Error in notifyAbsentIdentifiers: {}", e.getMessage(), e);
}
}

Expand Down Expand Up @@ -868,24 +903,13 @@ private List<Endpoint> prioritySortEndpoints(List<Endpoint> endpoints) {
}

private void setDatasetTitle(PipelineProcess process) {
if (process != null && process.getDatasetKey() != null) {
Dataset dataset = datasetService.get(process.getDatasetKey());
if (dataset != null) {
process.setDatasetTitle(dataset.getTitle());
}
if (process == null || process.getDatasetKey() == null) {
return;
}
}

private static <T> T syncCall(Call<T> call) {
try {
Response<T> response = call.execute();
if (response.isSuccessful()) {
return response.body();
}
LOG.error("Service responded with an error {}", response);
throw new HttpException(response); // Propagates the failed response
} catch (IOException ex) {
throw new IllegalStateException("Error executing call", ex);
Dataset dataset = datasetService.get(process.getDatasetKey());
if (dataset != null) {
process.setDatasetTitle(dataset.getTitle());
}
}
}
Loading

0 comments on commit 83ddbf1

Please sign in to comment.