Skip to content

Commit f8fce47

Browse files
committed
adding warehouse message
gbif/pipelines#1078
1 parent 7d3bbfd commit f8fce47

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

src/main/java/org/gbif/common/messaging/DefaultMessageRegistry.java

+2
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public class DefaultMessageRegistry implements MessageRegistry {
104104
messageToExchangeMappingInternal.put(CamtrapDpDownloadFinishedMessage.class, "crawler");
105105
messageToExchangeMappingInternal.put(DownloadLauncherMessage.class, "occurrence");
106106
messageToExchangeMappingInternal.put(DownloadCancelMessage.class, "occurrence");
107+
messageToExchangeMappingInternal.put(DataWarehouseMessage.class, "occurrence");
107108
MESSAGE_TO_EXCHANGE_MAPPING = Collections.unmodifiableMap(messageToExchangeMappingInternal);
108109

109110
Map<Class<? extends Message>, String> messageToRoutingKeyMapping = new HashMap<>();
@@ -186,6 +187,7 @@ public class DefaultMessageRegistry implements MessageRegistry {
186187
messageToRoutingKeyMapping.put(
187188
DownloadLauncherMessage.class, DownloadLauncherMessage.ROUTING_KEY);
188189
messageToRoutingKeyMapping.put(DownloadCancelMessage.class, DownloadCancelMessage.ROUTING_KEY);
190+
messageToRoutingKeyMapping.put(DataWarehouseMessage.class, DataWarehouseMessage.ROUTING_KEY);
189191
MESSAGE_TO_ROUTING_KEY_MAPPING = Collections.unmodifiableMap(messageToRoutingKeyMapping);
190192
}
191193

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package org.gbif.common.messaging.api.messages;
2+
3+
import org.gbif.api.model.pipelines.PipelinesWorkflow;
4+
import org.gbif.api.model.pipelines.StepType;
5+
6+
import java.util.Collections;
7+
import java.util.List;
8+
import java.util.Set;
9+
import java.util.UUID;
10+
11+
import com.fasterxml.jackson.annotation.JsonCreator;
12+
import com.fasterxml.jackson.annotation.JsonProperty;
13+
import com.fasterxml.jackson.databind.ObjectMapper;
14+
15+
import lombok.SneakyThrows;
16+
17+
public class DataWarehouseMessage extends PipelinesHdfsViewMessage {
18+
19+
public DataWarehouseMessage() {}
20+
21+
@JsonCreator
22+
public DataWarehouseMessage(
23+
@JsonProperty("datasetUuid") UUID datasetUuid,
24+
@JsonProperty("attempt") int attempt,
25+
@JsonProperty("pipelineSteps") Set<String> pipelineSteps,
26+
@JsonProperty("runner") String runner,
27+
@JsonProperty("executionId") Long executionId) {
28+
super(datasetUuid, attempt, pipelineSteps, runner, executionId);
29+
}
30+
31+
@SneakyThrows
32+
public static void main(String[] args) {
33+
DataWarehouseMessage message = new DataWarehouseMessage(UUID.fromString("a92f4b3c-ae5c-45af-8dac-4a6a88d35ddd"), 25, Collections.singleton("DATA_WAREHOUSE"), "DISTRIBUTED" ,814400L);
34+
ObjectMapper mapper = new ObjectMapper();
35+
System.out.println(mapper.writeValueAsString(message));
36+
PipelinesWorkflow.Graph<StepType> workflow = PipelinesWorkflow.getOccurrenceWorkflow();
37+
List<PipelinesWorkflow.Graph<StepType>.Edge> edges = workflow.getNodeEdges(StepType.HDFS_VIEW);
38+
System.out.println(edges);
39+
}
40+
}

0 commit comments

Comments
 (0)