Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

E2e pipeline with schedule #2851

Merged
merged 12 commits into from
Mar 13, 2025
Merged

E2e pipeline with schedule #2851

merged 12 commits into from
Mar 13, 2025

Conversation

agnesgaroux
Copy link
Contributor

@agnesgaroux agnesgaroux commented Mar 11, 2025

What does this change?

wellcomecollection/platform#5969
This brings together the extraction and loading of nodes and edges from sources to graph, and the ingestion of the resulting concepts data into elasticsearch

Once we've asserted that this works nicely, I reckon we can remove catalogue_graph_bulk_loaders and catalogue_graph_extractors which are replaced by the Map states of the concepts_pipeline
Not sure about keeping catalogue_graph_pipeline around?

How to test

We have the pipeline-2025-03-06 that is not yet live. We could change the schedule to run this against it to test?

How can we measure success?

The concepts pipeline run end to end on the desired schedule

Have we considered potential risks?

We should do a test run on an index that is not yet live

terraform plan

Terraform will perform the following actions:

  # aws_iam_policy.state_machine_policy will be updated in-place
  ~ resource "aws_iam_policy" "state_machine_policy" {
        id               = "arn:aws:iam::760097843905:policy/terraform-20241220160948386000000001"
        name             = "terraform-20241220160948386000000001"
      ~ policy           = jsonencode(
            {
              - Statement = [
                  - {
                      - Action   = [
                          - "logs:CreateLogStream",
                          - "logs:PutLogEvents",
                        ]
                      - Effect   = "Allow"
                      - Resource = "*"
                    },
                  - {
                      - Action   = [
                          - "states:StartExecution",
                        ]
                      - Effect   = "Allow"
                      - Resource = [
                          - "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-extractor",
                          - "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-extractors",
                          - "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-bulk-loader",
                          - "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-bulk-loaders",
                          - "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-ingestor",
                        ]
                    },
                  - {
                      - Action   = [
                          - "lambda:InvokeFunction",
                        ]
                      - Effect   = "Allow"
                      - Resource = [
                          - "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-bulk-loader",
                          - "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-bulk-load-poller",
                          - "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-trigger",
                          - "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-loader",
                          - "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-indexer",
                        ]
                    },
                  - {
                      - Action   = [
                          - "ecs:RunTask",
                        ]
                      - Effect   = "Allow"
                      - Resource = [
                          - "arn:aws:ecs:eu-west-1:760097843905:task-definition/catalogue-graph_extractor:*",
                        ]
                    },
                  - {
                      - Action   = [
                          - "iam:PassRole",
                        ]
                      - Effect   = "Allow"
                      - Resource = [
                          - "arn:aws:iam::760097843905:role/catalogue-graph_extractor_execution_role",
                          - "arn:aws:iam::760097843905:role/catalogue-graph_extractor_task_role",
                        ]
                    },
                  - {
                      - Action   = [
                          - "events:PutTargets",
                          - "events:PutRule",
                          - "events:DescribeRule",
                        ]
                      - Effect   = "Allow"
                      - Resource = "arn:aws:events:eu-west-1:760097843905:rule/StepFunctions*"
                    },
                ]
              - Version   = "2012-10-17"
            }
        ) -> (known after apply)
        tags             = {}
        # (7 unchanged attributes hidden)
    }

  # aws_neptune_cluster.catalogue_graph_cluster will be updated in-place
  ~ resource "aws_neptune_cluster" "catalogue_graph_cluster" {
        id                                   = "catalogue-graph"
        tags                                 = {}
        # (31 unchanged attributes hidden)

      ~ serverless_v2_scaling_configuration {
          ~ max_capacity = 32 -> 16
            # (1 unchanged attribute hidden)
        }
    }

  # aws_scheduler_schedule.concept_pipeline_daily will be created
  + resource "aws_scheduler_schedule" "concept_pipeline_daily" {
      + arn                          = (known after apply)
      + group_name                   = (known after apply)
      + id                           = (known after apply)
      + name                         = "concept_pipeline_daily_run"
      + name_prefix                  = (known after apply)
      + schedule_expression          = "20 14 ? * MON-THU *"
      + schedule_expression_timezone = "UTC"
      + state                        = "ENABLED"

      + flexible_time_window {
          + mode = "OFF"
        }

      + target {
          + arn      = (known after apply)
          + input    = jsonencode(
                {
                  + MessageBody = [
                      + {
                          + entity_type      = "nodes"
                          + label            = "Catalogue Work Nodes"
                          + transformer_type = "catalogue_works"
                        },
                      + {
                          + entity_type      = "edges"
                          + label            = "Catalogue Work Edges"
                          + transformer_type = "catalogue_works"
                        },
                    ]
                }
            )
          + role_arn = "arn:aws:iam::760097843905:role/catalogue-graph-state-machine-execution-role"
        }
    }

  # aws_scheduler_schedule.concept_pipeline_monthly will be created
  + resource "aws_scheduler_schedule" "concept_pipeline_monthly" {
      + arn                          = (known after apply)
      + group_name                   = (known after apply)
      + id                           = (known after apply)
      + name                         = "concept_pipeline_monthly_run"
      + name_prefix                  = (known after apply)
      + schedule_expression          = "20 9 ? 1/1 MON#1 *"
      + schedule_expression_timezone = "UTC"
      + state                        = "ENABLED"

      + flexible_time_window {
          + mode = "OFF"
        }

      + target {
          + arn      = (known after apply)
          + input    = jsonencode(
                {
                  + MessageBody = [
                     .... all the inputs
                    ]
                }
            )
          + role_arn = "arn:aws:iam::760097843905:role/catalogue-graph-state-machine-execution-role"
        }
    }

  # aws_sfn_state_machine.catalogue_graph_bulk_loaders will be updated in-place
  ~ resource "aws_sfn_state_machine" "catalogue_graph_bulk_loaders" {
      ~ definition                = jsonencode(
          ~ {
              ~ States  = {
                  ~ "Load Catalogue Concept Edges"             = {
                      ~ Next       = "Success" -> "Load Catalogue Work Nodes"
                        # (3 unchanged attributes hidden)
                    }
                  + "Load Catalogue Work Edges"                = {
                      + Next       = "Success"
                      + Parameters = {
                          + Input           = {
                              + entity_type      = "edges"
                              + transformer_type = "catalogue_works"
                            }
                          + StateMachineArn = "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-bulk-loader"
                        }
                      + Resource   = "arn:aws:states:::states:startExecution.sync:2"
                      + Type       = "Task"
                    }
                  + "Load Catalogue Work Nodes"                = {
                      + Next       = "Load Catalogue Work Edges"
                      + Parameters = {
                          + Input           = {
                              + entity_type      = "nodes"
                              + transformer_type = "catalogue_works"
                            }
                          + StateMachineArn = "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-bulk-loader"
                        }
                      + Resource   = "arn:aws:states:::states:startExecution.sync:2"
                      + Type       = "Task"
                    }
                    # (20 unchanged attributes hidden)
                }
                # (2 unchanged attributes hidden)
            }
        )
        id                        = "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-bulk-loaders"
        name                      = "catalogue-graph-bulk-loaders"
        tags                      = {}
        # (11 unchanged attributes hidden)

        # (3 unchanged blocks hidden)
    }

  # aws_sfn_state_machine.catalogue_graph_extractor will be updated in-place
  ~ resource "aws_sfn_state_machine" "catalogue_graph_extractor" {
      ~ definition                = jsonencode(
          ~ {
              ~ States        = {
                  ~ Extract = {
                      ~ Arguments = {
                          ~ Overrides            = {
                              ~ ContainerOverrides = [
                                  ~ {
                                      ~ Command = [
                                            # (4 unchanged elements hidden)
                                            "--stream-destination",
                                          ~ "{% $states.input.stream_destination %}" -> "s3",
                                        ]
                                        # (1 unchanged attribute hidden)
                                    },
                                ]
                            }
                            # (4 unchanged attributes hidden)
                        }
                        # (4 unchanged attributes hidden)
                    }
                    # (1 unchanged attribute hidden)
                }
                # (3 unchanged attributes hidden)
            }
        )
        id                        = "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-extractor"
        name                      = "catalogue-graph-extractor"
        tags                      = {}
        # (11 unchanged attributes hidden)

        # (3 unchanged blocks hidden)
    }

  # aws_sfn_state_machine.catalogue_graph_extractors will be updated in-place
  ~ resource "aws_sfn_state_machine" "catalogue_graph_extractors" {
      ~ definition                = jsonencode(
          ~ {
              ~ States  = {
                  ~ "Extract Catalogue Concept Edges"             = {
                      ~ Next       = "Success" -> "Extract Catalogue Work Nodes"
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (2 unchanged attributes hidden)
                    }
                  ~ "Extract Catalogue Concept Nodes"             = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  + "Extract Catalogue Work Edges"                = {
                      + Next       = "Success"
                      + Parameters = {
                          + Input           = {
                              + "entity_type.$"      = "$$.Execution.Input.entity_type"
                              + "transformer_type.$" = "$$.Execution.Input.transformer_type"
                            }
                          + StateMachineArn = "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-extractor"
                        }
                      + Resource   = "arn:aws:states:::states:startExecution.sync:2"
                      + Type       = "Task"
                    }
                  + "Extract Catalogue Work Nodes"                = {
                      + Next       = "Extract Catalogue Work Edges"
                      + Parameters = {
                          + Input           = {
                              + "entity_type.$"      = "$$.Execution.Input.entity_type"
                              + "transformer_type.$" = "$$.Execution.Input.transformer_type"
                            }
                          + StateMachineArn = "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-extractor"
                        }
                      + Resource   = "arn:aws:states:::states:startExecution.sync:2"
                      + Type       = "Task"
                    }
                  ~ "Extract LoC Concept Edges"                   = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract LoC Concept Nodes"                   = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract LoC Location Edges"                  = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract LoC Location Nodes"                  = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract LoC Name Nodes"                      = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract MeSH Concept Edges"                  = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract MeSH Concept Nodes"                  = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract MeSH Location Nodes"                 = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract Wikidata Linked LoC Concept Edges"   = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract Wikidata Linked LoC Concept Nodes"   = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract Wikidata Linked LoC Location Edges"  = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract Wikidata Linked LoC Location Nodes"  = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract Wikidata Linked LoC Name Edges"      = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract Wikidata Linked LoC Name Nodes"      = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract Wikidata Linked MeSH Concept Edges"  = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract Wikidata Linked MeSH Concept Nodes"  = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract Wikidata Linked MeSH Location Edges" = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                  ~ "Extract Wikidata Linked MeSH Location Nodes" = {
                      ~ Parameters = {
                          ~ Input           = {
                              - "sample_size.$"      = "$$.Execution.Input.sample_size"
                              - stream_destination   = "s3"
                                # (2 unchanged attributes hidden)
                            }
                            # (1 unchanged attribute hidden)
                        }
                        # (3 unchanged attributes hidden)
                    }
                    # (1 unchanged attribute hidden)
                }
                # (2 unchanged attributes hidden)
            }
        )
        id                        = "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-extractors"
        name                      = "catalogue-graph-extractors"
        tags                      = {}
        # (11 unchanged attributes hidden)

        # (3 unchanged blocks hidden)
    }

  # aws_sfn_state_machine.concepts_pipeline will be created
  + resource "aws_sfn_state_machine" "concepts_pipeline" {
      + arn                       = (known after apply)
      + creation_date             = (known after apply)
      + definition                = jsonencode(
            {
              + Comment       = "Build the catalogue graph and ingest concepts into ES"
              + QueryLanguage = "JSONata"
              + StartAt       = "Extractors"
              + States        = {
                  + "Bulk loaders"      = {
                      + ItemProcessor  = {
                          + StartAt = "Load Neptune graph from S3"
                          + States  = {
                              + "Load Neptune graph from S3" = {
                                  + Arguments = {
                                      + Payload         = "{% $states.input %}"
                                      + StateMachineArn = "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-bulk-loader"
                                    }
                                  + End       = true
                                  + Resource  = "arn:aws:states:::states:startExecution.sync:2"
                                  + Type      = "Task"
                                }
                            }
                        }
                      + MaxConcurrency = 1
                      + Next           = "Concepts ingestor"
                      + Type           = "Map"
                    }
                  + "Concepts ingestor" = {
                      + Arguments = {
                          + StateMachineArn = "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-ingestor"
                        }
                      + Next      = "Success"
                      + Resource  = "arn:aws:states:::states:startExecution.sync:2"
                      + Type      = "Task"
                    }
                  + Extractors          = {
                      + ItemProcessor = {
                          + StartAt = "Extract nodes and edges from source"
                          + States  = {
                              + "Extract nodes and edges from source" = {
                                  + Arguments = {
                                      + Payload         = "{% $states.input %}"
                                      + StateMachineArn = "arn:aws:states:eu-west-1:760097843905:stateMachine:catalogue-graph-extractor"
                                    }
                                  + End       = true
                                  + Resource  = "arn:aws:states:::states:startExecution.sync:2"
                                  + Type      = "Task"
                                }
                            }
                        }
                      + Next          = "Bulk loaders"
                      + Type          = "Map"
                    }
                  + Success             = {
                      + Type = "Succeed"
                    }
                }
            }
        )
      + description               = (known after apply)
      + id                        = (known after apply)
      + name                      = "concepts-pipeline"
      + name_prefix               = (known after apply)
      + publish                   = false
      + revision_id               = (known after apply)
      + role_arn                  = "arn:aws:iam::760097843905:role/catalogue-graph-state-machine-execution-role"
      + state_machine_version_arn = (known after apply)
      + status                    = (known after apply)
      + tags_all                  = (known after apply)
      + type                      = "STANDARD"
      + version_description       = (known after apply)

      + encryption_configuration (known after apply)

      + logging_configuration (known after apply)

      + tracing_configuration (known after apply)
    }

Plan: 3 to add, 5 to change, 0 to destroy.

@agnesgaroux agnesgaroux requested a review from a team as a code owner March 11, 2025 09:31
Copy link

github-actions bot commented Mar 11, 2025

☂️ Python Coverage

current status: ✅

Overall Coverage

Lines Covered Coverage Threshold Status
1833 1582 86% 0% 🟢

New Files

No new covered files...

Modified Files

No covered modified files...

updated for commit: e19b114 by action🐍

@agnesgaroux
Copy link
Contributor Author

Terraform detected the following changes made outside of Terraform since the last "terraform apply" which may have affected this plan:

  # module.bulk_load_poller_lambda.aws_lambda_function.main has changed
  ~ resource "aws_lambda_function" "main" {
      ~ code_sha256                    = "M8aZ2F3RuoAqsCJfXo19/TRn95/sIChRuo6XO2Q/NUk=" -> "45Ay+USv08CIkv5kM3Rr89aCtWPgPek+SLhTK3CO+Jk="
        id                             = "catalogue-graph-bulk-load-poller"
      ~ last_modified                  = "2025-02-27T11:07:39.000+0000" -> "2025-03-07T11:54:25.000+0000"
      ~ qualified_arn                  = "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-bulk-load-poller:21" -> "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-bulk-load-poller:23"
      ~ qualified_invoke_arn           = "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-bulk-load-poller:21/invocations" -> "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-bulk-load-poller:23/invocations"
      ~ source_code_size               = 20258183 -> 57579750
        tags                           = {}
      ~ version                        = "21" -> "23"
        # (23 unchanged attributes hidden)

        # (4 unchanged blocks hidden)
    }

  # module.bulk_loader_lambda.aws_lambda_function.main has changed
  ~ resource "aws_lambda_function" "main" {
      ~ code_sha256                    = "M8aZ2F3RuoAqsCJfXo19/TRn95/sIChRuo6XO2Q/NUk=" -> "45Ay+USv08CIkv5kM3Rr89aCtWPgPek+SLhTK3CO+Jk="
        id                             = "catalogue-graph-bulk-loader"
      ~ last_modified                  = "2025-02-27T11:07:29.000+0000" -> "2025-03-07T11:54:22.000+0000"
      ~ qualified_arn                  = "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-bulk-loader:21" -> "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-bulk-loader:23"
      ~ qualified_invoke_arn           = "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-bulk-loader:21/invocations" -> "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-bulk-loader:23/invocations"
      ~ source_code_size               = 20258183 -> 57579750
        tags                           = {}
      ~ version                        = "21" -> "23"
        # (23 unchanged attributes hidden)

        # (5 unchanged blocks hidden)
    }

  # module.indexer_lambda.aws_lambda_function.main has changed
  ~ resource "aws_lambda_function" "main" {
      ~ code_sha256                    = "M8aZ2F3RuoAqsCJfXo19/TRn95/sIChRuo6XO2Q/NUk=" -> "45Ay+USv08CIkv5kM3Rr89aCtWPgPek+SLhTK3CO+Jk="
        id                             = "catalogue-graph-indexer"
      ~ last_modified                  = "2025-02-27T11:07:49.000+0000" -> "2025-03-07T11:54:23.000+0000"
      ~ qualified_arn                  = "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-indexer:20" -> "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-indexer:22"
      ~ qualified_invoke_arn           = "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-indexer:20/invocations" -> "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-indexer:22/invocations"
      ~ source_code_size               = 20258183 -> 57579750
        tags                           = {}
      ~ version                        = "20" -> "22"
        # (23 unchanged attributes hidden)

        # (4 unchanged blocks hidden)
    }

  # module.ingestor_indexer_lambda.aws_iam_role.lambda has changed
  ~ resource "aws_iam_role" "lambda" {
        id                    = "lambda-role-catalogue-graph-ingestor-indexer"
        name                  = "lambda-role-catalogue-graph-ingestor-indexer"
        tags                  = {}
        # (12 unchanged attributes hidden)

      - inline_policy {
          - name   = "terraform-20250228152516428700000001" -> null
          - policy = jsonencode(
                {
                  - Statement = [
                      - {
                          - Action   = "secretsmanager:GetSecretValue"
                          - Effect   = "Allow"
                          - Resource = [
                              - "arn:aws:secretsmanager:eu-west-1:760097843905:secret:elasticsearch/pipeline_storage_2024-11-18/protocol",
                              - "arn:aws:secretsmanager:eu-west-1:760097843905:secret:elasticsearch/pipeline_storage_2024-11-18/private_host",
                              - "arn:aws:secretsmanager:eu-west-1:760097843905:secret:elasticsearch/pipeline_storage_2024-11-18/port",
                              - "arn:aws:secretsmanager:eu-west-1:760097843905:secret:elasticsearch/pipeline_storage_2024-11-18/concept_ingestor/api_key",
                            ]
                        },
                    ]
                  - Version   = "2012-10-17"
                }
            ) -> null
        }
      + inline_policy {
          + name   = "terraform-20250228152516428700000001"
          + policy = jsonencode(
                {
                  + Statement = [
                      + {
                          + Action   = "secretsmanager:GetSecretValue"
                          + Effect   = "Allow"
                          + Resource = [
                              + "arn:aws:secretsmanager:eu-west-1:760097843905:secret:elasticsearch/pipeline_storage_2024-11-18/protocol*",
                              + "arn:aws:secretsmanager:eu-west-1:760097843905:secret:elasticsearch/pipeline_storage_2024-11-18/private_host*",
                              + "arn:aws:secretsmanager:eu-west-1:760097843905:secret:elasticsearch/pipeline_storage_2024-11-18/port*",
                              + "arn:aws:secretsmanager:eu-west-1:760097843905:secret:elasticsearch/pipeline_storage_2024-11-18/concept_ingestor/api_key*",
                            ]
                        },
                    ]
                  + Version   = "2012-10-17"
                }
            )
        }

        # (3 unchanged blocks hidden)
    }

  # module.ingestor_indexer_lambda.aws_lambda_function.main has changed
  ~ resource "aws_lambda_function" "main" {
      ~ code_sha256                    = "cGkmlmTgZu9/B/hrvwA+xOMH1h+K+ZqVvYhrtHMsvIw=" -> "XVdXhj+aiZbysZsyuyEsqQwWMfuskbvTInHBGyrQ3PM="
        id                             = "catalogue-graph-ingestor-indexer"
      ~ last_modified                  = "2025-02-28T15:26:21.000+0000" -> "2025-02-28T15:35:44.000+0000"
      ~ qualified_arn                  = "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-indexer:18" -> "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-indexer:19"
      ~ qualified_invoke_arn           = "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-indexer:18/invocations" -> "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-indexer:19/invocations"
      ~ source_code_size               = 57563137 -> 57563156
        tags                           = {}
      ~ version                        = "18" -> "19"
        # (23 unchanged attributes hidden)

        # (5 unchanged blocks hidden)
    }

  # module.ingestor_loader_lambda.aws_lambda_function.main has changed
  ~ resource "aws_lambda_function" "main" {
      ~ code_sha256                    = "cGkmlmTgZu9/B/hrvwA+xOMH1h+K+ZqVvYhrtHMsvIw=" -> "XVdXhj+aiZbysZsyuyEsqQwWMfuskbvTInHBGyrQ3PM="
        id                             = "catalogue-graph-ingestor-loader"
      ~ last_modified                  = "2025-02-28T15:26:12.000+0000" -> "2025-02-28T15:35:34.000+0000"
      ~ qualified_arn                  = "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-loader:20" -> "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-loader:21"
      ~ qualified_invoke_arn           = "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-loader:20/invocations" -> "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-loader:21/invocations"
      ~ source_code_size               = 57563137 -> 57563156
        tags                           = {}
      ~ version                        = "20" -> "21"
        # (23 unchanged attributes hidden)

        # (5 unchanged blocks hidden)
    }

  # module.ingestor_trigger_lambda.aws_lambda_function.main has changed
  ~ resource "aws_lambda_function" "main" {
      ~ code_sha256                    = "cGkmlmTgZu9/B/hrvwA+xOMH1h+K+ZqVvYhrtHMsvIw=" -> "XVdXhj+aiZbysZsyuyEsqQwWMfuskbvTInHBGyrQ3PM="
        id                             = "catalogue-graph-ingestor-trigger"
      ~ last_modified                  = "2025-02-28T15:26:31.000+0000" -> "2025-02-28T15:35:53.000+0000"
      ~ qualified_arn                  = "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-trigger:18" -> "arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-trigger:19"
      ~ qualified_invoke_arn           = "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-trigger:18/invocations" -> "arn:aws:apigateway:eu-west-1:lambda:path/2015-03-31/functions/arn:aws:lambda:eu-west-1:760097843905:function:catalogue-graph-ingestor-trigger:19/invocations"
      ~ source_code_size               = 57563137 -> 57563156
        tags                           = {}
      ~ version                        = "18" -> "19"
        # (23 unchanged attributes hidden)

        # (5 unchanged blocks hidden)
    }

ItemProcessor = {
StartAt = "Extract nodes and edges from source",
States = {
"Extract nodes and edges from source" = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the extractors also need to run in a particular order so this might not work as expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may be able to express the steps more succinctly and parallelise some parts, but there are some extractors that rely on previous extractions (through the source), specifically wikidata. That require previous steps to have completed. cc @StepanBrychta


variable "state_machine_monthly_inputs" {
type = list(object({ label : string, transformer_type : string, entity_type : string }))
default = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than as variable defaults, I would keep these values in a local.

@@ -36,7 +36,7 @@ resource "aws_sfn_state_machine" "catalogue_graph_extractor" {
"--entity-type",
"{% $states.input.entity_type %}",
"--stream-destination",
"{% $states.input.stream_destination %}"
Copy link
Contributor

@kenoir kenoir Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These variables are to allow the manual triggering of pipeline steps for testing.

@agnesgaroux agnesgaroux force-pushed the e2e-pipeline-with-schedule branch from 94099b9 to 24d65ca Compare March 11, 2025 15:54
@agnesgaroux agnesgaroux force-pushed the e2e-pipeline-with-schedule branch from cf5b866 to 4562bb4 Compare March 11, 2025 16:34
@agnesgaroux agnesgaroux force-pushed the e2e-pipeline-with-schedule branch from 53e1c6f to 884ffec Compare March 11, 2025 16:38
@agnesgaroux agnesgaroux force-pushed the e2e-pipeline-with-schedule branch from 7553b02 to d782888 Compare March 13, 2025 11:00
@agnesgaroux agnesgaroux force-pushed the e2e-pipeline-with-schedule branch from 437f8c5 to a36bd75 Compare March 13, 2025 11:04
@agnesgaroux agnesgaroux force-pushed the e2e-pipeline-with-schedule branch from aadbcc5 to b729d04 Compare March 13, 2025 11:07
@agnesgaroux agnesgaroux force-pushed the e2e-pipeline-with-schedule branch from 262be31 to 86e396e Compare March 13, 2025 11:37
"entity_type" : "edges"
},
{
"label" : "Catalogue Concept Nodes",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these can be moved into the daily inputs - the available catalogue concepts may change day to day as new works with the concepts are added.

Copy link
Contributor

@kenoir kenoir left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of comments, but otherwise looks good.

@agnesgaroux agnesgaroux force-pushed the e2e-pipeline-with-schedule branch from 60a1d78 to 60f2e03 Compare March 13, 2025 11:57
@agnesgaroux agnesgaroux force-pushed the e2e-pipeline-with-schedule branch from 1bbc8e3 to e19b114 Compare March 13, 2025 12:32
@agnesgaroux agnesgaroux merged commit 9814517 into main Mar 13, 2025
4 checks passed
@agnesgaroux agnesgaroux deleted the e2e-pipeline-with-schedule branch March 13, 2025 13:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants