Skip to content

Commit 9814517

Browse files
authored
Merge pull request #2851 from wellcomecollection/e2e-pipeline-with-schedule
E2e pipeline with schedule
2 parents d83562a + c107dd9 commit 9814517

8 files changed

+407
-59
lines changed

catalogue_graph/terraform/iam_state_machines.tf

+11-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ resource "aws_iam_role" "state_machine_execution_role" {
88
{
99
Effect = "Allow",
1010
Principal = {
11-
Service = "states.amazonaws.com"
11+
Service = [
12+
"states.amazonaws.com",
13+
"scheduler.amazonaws.com"
14+
]
1215
},
1316
Action = "sts:AssumeRole"
1417
}
@@ -31,9 +34,15 @@ resource "aws_iam_policy" "state_machine_policy" {
3134
Resource = [
3235
aws_sfn_state_machine.catalogue_graph_extractor.arn,
3336
aws_sfn_state_machine.catalogue_graph_extractors.arn,
37+
aws_sfn_state_machine.catalogue_graph_extractors_monthly.arn,
38+
aws_sfn_state_machine.catalogue_graph_extractors_daily.arn,
3439
aws_sfn_state_machine.catalogue_graph_bulk_loader.arn,
3540
aws_sfn_state_machine.catalogue_graph_bulk_loaders.arn,
36-
aws_sfn_state_machine.catalogue_graph_ingestor.arn
41+
aws_sfn_state_machine.catalogue_graph_bulk_loaders_monthly.arn,
42+
aws_sfn_state_machine.catalogue_graph_bulk_loaders_daily.arn,
43+
aws_sfn_state_machine.catalogue_graph_ingestor.arn,
44+
aws_sfn_state_machine.concepts_pipeline_monthly.arn,
45+
aws_sfn_state_machine.concepts_pipeline_daily.arn
3746
]
3847
},
3948
{

catalogue_graph/terraform/locals.tf

+116
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,122 @@ locals {
1717

1818
# This is a hint, the ingestors might need to be in the pipeline stack!
1919
pipeline_date = "2024-11-18"
20+
21+
concepts_pipeline_inputs_monthly = [
22+
{
23+
"label" : "LoC Concept Nodes",
24+
"transformer_type" : "loc_concepts",
25+
"entity_type" : "nodes"
26+
},
27+
{
28+
"label" : "LoC Location Nodes",
29+
"transformer_type" : "loc_locations",
30+
"entity_type" : "nodes"
31+
},
32+
{
33+
"label" : "LoC Name Nodes",
34+
"transformer_type" : "loc_names",
35+
"entity_type" : "nodes"
36+
},
37+
{
38+
"label" : "LoC Concept Edges",
39+
"transformer_type" : "loc_concepts",
40+
"entity_type" : "edges"
41+
},
42+
{
43+
"label" : "LoC Location Edges",
44+
"transformer_type" : "loc_locations",
45+
"entity_type" : "edges"
46+
},
47+
{
48+
"label" : "MeSH Concept Nodes",
49+
"transformer_type" : "mesh_concepts",
50+
"entity_type" : "nodes"
51+
},
52+
{
53+
"label" : "MeSH Location Nodes",
54+
"transformer_type" : "mesh_locations",
55+
"entity_type" : "nodes"
56+
},
57+
{
58+
"label" : "MeSH Concept Edges",
59+
"transformer_type" : "mesh_concepts",
60+
"entity_type" : "edges"
61+
},
62+
{
63+
"label" : "Wikidata Linked LoC Concept Nodes",
64+
"transformer_type" : "wikidata_linked_loc_concepts",
65+
"entity_type" : "nodes"
66+
},
67+
{
68+
"label" : "Wikidata Linked LoC Location Nodes",
69+
"transformer_type" : "wikidata_linked_loc_locations",
70+
"entity_type" : "nodes"
71+
},
72+
{
73+
"label" : "Wikidata Linked LoC Name Nodes",
74+
"transformer_type" : "wikidata_linked_loc_names",
75+
"entity_type" : "nodes"
76+
},
77+
{
78+
"label" : "Wikidata Linked LoC Concept Edges",
79+
"transformer_type" : "wikidata_linked_loc_concepts",
80+
"entity_type" : "edges"
81+
},
82+
{
83+
"label" : "Wikidata Linked LoC Location Edges",
84+
"transformer_type" : "wikidata_linked_loc_locations",
85+
"entity_type" : "edges"
86+
},
87+
{
88+
"label" : "Wikidata Linked LoC Name Edges",
89+
"transformer_type" : "wikidata_linked_loc_names",
90+
"entity_type" : "edges"
91+
},
92+
{
93+
"label" : "Wikidata Linked MeSH Concept Nodes",
94+
"transformer_type" : "wikidata_linked_mesh_concepts",
95+
"entity_type" : "nodes"
96+
},
97+
{
98+
"label" : "Wikidata Linked MeSH Location Nodes",
99+
"transformer_type" : "wikidata_linked_mesh_locations",
100+
"entity_type" : "nodes"
101+
},
102+
{
103+
"label" : "Wikidata Linked MeSH Concept Edges",
104+
"transformer_type" : "wikidata_linked_mesh_concepts",
105+
"entity_type" : "edges"
106+
},
107+
{
108+
"label" : "Wikidata Linked MeSH Location Edges",
109+
"transformer_type" : "wikidata_linked_mesh_locations",
110+
"entity_type" : "edges"
111+
}
112+
]
113+
114+
concepts_pipeline_inputs_daily = [
115+
{
116+
"label" : "Catalogue Concept Nodes",
117+
"transformer_type" : "catalogue_concepts",
118+
"entity_type" : "nodes"
119+
},
120+
{
121+
"label" : "Catalogue Concept Edges",
122+
"transformer_type" : "catalogue_concepts",
123+
"entity_type" : "edges"
124+
},
125+
{
126+
"label" : "Catalogue Work Nodes",
127+
"transformer_type" : "catalogue_works",
128+
"entity_type" : "nodes"
129+
},
130+
{
131+
"label" : "Catalogue Work Edges",
132+
"transformer_type" : "catalogue_works",
133+
"entity_type" : "edges"
134+
},
135+
]
20136
}
21137

22138
data "aws_vpc" "vpc" {

catalogue_graph/terraform/state_machine_bulk_loaders.tf

+59
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,63 @@ resource "aws_sfn_state_machine" "catalogue_graph_bulk_loaders" {
2727

2828
})
2929
}
30+
resource "aws_sfn_state_machine" "catalogue_graph_bulk_loaders_monthly" {
31+
name = "catalogue-graph-bulk-loaders_monthly"
32+
role_arn = aws_iam_role.state_machine_execution_role.arn
33+
34+
definition = jsonencode({
35+
Comment = "Trigger the catalogue-graph-bulk-loader state machine in sequence for each combination of inputs."
36+
StartAt = "Load ${local.concepts_pipeline_inputs_monthly[0].label}"
37+
States = merge(tomap({
38+
for index, task_input in local.concepts_pipeline_inputs_monthly :
39+
"Load ${task_input.label}" => {
40+
Type = "Task"
41+
Resource = "arn:aws:states:::states:startExecution.sync:2",
42+
Parameters = {
43+
StateMachineArn = aws_sfn_state_machine.catalogue_graph_bulk_loader.arn
44+
Input = {
45+
"transformer_type" = task_input.transformer_type,
46+
"entity_type" = task_input.entity_type
47+
}
48+
}
49+
Next = index == length(local.concepts_pipeline_inputs_monthly) - 1 ? "Success" : "Load ${local.concepts_pipeline_inputs_monthly[index + 1].label}"
50+
}
51+
}), {
52+
Success = {
53+
Type = "Succeed"
54+
}
55+
})
56+
})
57+
}
58+
59+
resource "aws_sfn_state_machine" "catalogue_graph_bulk_loaders_daily" {
60+
name = "catalogue-graph-bulk-loaders_daily"
61+
role_arn = aws_iam_role.state_machine_execution_role.arn
62+
63+
definition = jsonencode({
64+
Comment = "Trigger the catalogue-graph-bulk-loader state machine in sequence for each combination of inputs."
65+
StartAt = "Load ${local.concepts_pipeline_inputs_daily[0].label}"
66+
States = merge(tomap({
67+
for index, task_input in local.concepts_pipeline_inputs_daily :
68+
"Load ${task_input.label}" => {
69+
Type = "Task"
70+
Resource = "arn:aws:states:::states:startExecution.sync:2",
71+
Parameters = {
72+
StateMachineArn = aws_sfn_state_machine.catalogue_graph_bulk_loader.arn
73+
Input = {
74+
"transformer_type" = task_input.transformer_type,
75+
"entity_type" = task_input.entity_type
76+
}
77+
}
78+
Next = index == length(local.concepts_pipeline_inputs_daily) - 1 ? "Success" : "Load ${local.concepts_pipeline_inputs_daily[index + 1].label}"
79+
}
80+
}), {
81+
Success = {
82+
Type = "Succeed"
83+
}
84+
})
85+
})
86+
}
87+
88+
3089

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
resource "aws_sfn_state_machine" "concepts_pipeline_monthly" {
2+
name = "concepts-pipeline_monthly"
3+
role_arn = aws_iam_role.state_machine_execution_role.arn
4+
5+
definition = jsonencode({
6+
Comment = "Extract raw concepts from external sources, transform them into nodes and edges, and load them into the graph",
7+
StartAt = "Extractors"
8+
States = {
9+
"Extractors" = {
10+
Type = "Task"
11+
Resource = "arn:aws:states:::states:startExecution.sync:2",
12+
Parameters = {
13+
StateMachineArn = aws_sfn_state_machine.catalogue_graph_extractors_monthly.arn
14+
}
15+
Next = "Bulk loaders"
16+
},
17+
"Bulk loaders" = {
18+
Type = "Task"
19+
Resource = "arn:aws:states:::states:startExecution.sync:2",
20+
Parameters = {
21+
StateMachineArn = aws_sfn_state_machine.catalogue_graph_bulk_loaders_monthly.arn
22+
}
23+
Next = "Success"
24+
},
25+
Success = {
26+
Type = "Succeed"
27+
}
28+
}
29+
})
30+
}
31+
32+
resource "aws_sfn_state_machine" "concepts_pipeline_daily" {
33+
name = "concepts-pipeline_daily"
34+
role_arn = aws_iam_role.state_machine_execution_role.arn
35+
36+
definition = jsonencode({
37+
Comment = "Extract concepts from catalogue works, load them into the graph, and ingests into ES index.",
38+
StartAt = "Extractors"
39+
States = {
40+
"Extractors" = {
41+
Type = "Task"
42+
Resource = "arn:aws:states:::states:startExecution.sync:2",
43+
Parameters = {
44+
StateMachineArn = aws_sfn_state_machine.catalogue_graph_extractors_daily.arn
45+
}
46+
Next = "Bulk loaders"
47+
},
48+
"Bulk loaders" = {
49+
Type = "Task"
50+
Resource = "arn:aws:states:::states:startExecution.sync:2",
51+
Parameters = {
52+
StateMachineArn = aws_sfn_state_machine.catalogue_graph_bulk_loaders_daily.arn
53+
}
54+
Next = "Concepts ingestor"
55+
},
56+
"Concepts ingestor" = {
57+
Type = "Task"
58+
Resource = "arn:aws:states:::states:startExecution.sync:2",
59+
Parameters = {
60+
StateMachineArn = aws_sfn_state_machine.catalogue_graph_ingestor.arn,
61+
}
62+
Next = "Success"
63+
},
64+
Success = {
65+
Type = "Succeed"
66+
}
67+
}
68+
})
69+
}
70+
71+
72+

catalogue_graph/terraform/state_machine_extractors.tf

+63
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,66 @@ resource "aws_sfn_state_machine" "catalogue_graph_extractors" {
2929
})
3030
})
3131
}
32+
resource "aws_sfn_state_machine" "catalogue_graph_extractors_monthly" {
33+
name = "catalogue-graph-extractors_monthly"
34+
role_arn = aws_iam_role.state_machine_execution_role.arn
35+
36+
definition = jsonencode({
37+
Comment = "Extract raw concepts from external sources, transform them into nodes and edges, and stream them into an S3 bucket."
38+
StartAt = "Extract ${local.concepts_pipeline_inputs_monthly[0].label}"
39+
40+
States = merge(tomap({
41+
for index, task_input in local.concepts_pipeline_inputs_monthly :
42+
"Extract ${task_input.label}" => {
43+
Type = "Task"
44+
Resource = "arn:aws:states:::states:startExecution.sync:2",
45+
Parameters = {
46+
StateMachineArn = aws_sfn_state_machine.catalogue_graph_extractor.arn
47+
Input = {
48+
"stream_destination" : "s3",
49+
"transformer_type.$" : "$$.Execution.Input.transformer_type",
50+
"entity_type.$" : "$$.Execution.Input.entity_type",
51+
"sample_size.$" : "$$.Execution.Input.sample_size"
52+
}
53+
}
54+
Next = index == length(local.concepts_pipeline_inputs_monthly) - 1 ? "Success" : "Extract ${local.concepts_pipeline_inputs_monthly[index + 1].label}"
55+
}
56+
}), {
57+
Success = {
58+
Type = "Succeed"
59+
}
60+
})
61+
})
62+
}
63+
64+
resource "aws_sfn_state_machine" "catalogue_graph_extractors_daily" {
65+
name = "catalogue-graph-extractors_daily"
66+
role_arn = aws_iam_role.state_machine_execution_role.arn
67+
68+
definition = jsonencode({
69+
Comment = "Extract concepts from catalogue works, transform them into nodes and edges, and stream them into an S3 bucket."
70+
StartAt = "Extract ${local.concepts_pipeline_inputs_daily[0].label}"
71+
72+
States = merge(tomap({
73+
for index, task_input in local.concepts_pipeline_inputs_daily :
74+
"Extract ${task_input.label}" => {
75+
Type = "Task"
76+
Resource = "arn:aws:states:::states:startExecution.sync:2",
77+
Parameters = {
78+
StateMachineArn = aws_sfn_state_machine.catalogue_graph_extractor.arn
79+
Input = {
80+
"stream_destination" : "s3",
81+
"transformer_type.$" : "$$.Execution.Input.transformer_type",
82+
"entity_type.$" : "$$.Execution.Input.entity_type",
83+
"sample_size.$" : "$$.Execution.Input.sample_size"
84+
}
85+
}
86+
Next = index == length(local.concepts_pipeline_inputs_daily) - 1 ? "Success" : "Extract ${local.concepts_pipeline_inputs_daily[index + 1].label}"
87+
}
88+
}), {
89+
Success = {
90+
Type = "Succeed"
91+
}
92+
})
93+
})
94+
}

0 commit comments

Comments
 (0)