Skip to content

Commit fd0ccff

Browse files
kenoirweco-botagnesgaroux
authored
Monitor Catalogue Graph loader output (#2854)
* Parameterise the catalogue-graph ingestor trigger with pipeline date This change is in order to allow ingestion runs to be namespaced and monitored on a per pipeline basis, required for monitoring. * Add monitor for catalogue graph ingestor trigger step * remove unused comma * add deployment to ci * ignore transformer_outputs * Apply auto-formatting rules * Parameterise the catalogue-graph ingestor trigger with pipeline date This change is in order to allow ingestion runs to be namespaced and monitored on a per pipeline basis, required for monitoring. * Add monitor for catalogue graph ingestor trigger step * Monitor Catalogue Graph loader output * fmt * add lambda to ci * resolve test issues * add loader_monitor tests * fmt * add terraform for monitor lambda * add state machine step * correct policy name * correct name and payload * fixes for ingestor * get the input right * extra tweaks to get pipeline into working state * rename to record count from end index * add convenience script to deploy all ingestor lambdas * Address review comments Co-Authored-By: agnesgaroux <[email protected]> --------- Co-authored-by: Github on behalf of Wellcome Collection <[email protected]> Co-authored-by: Agnes Garoux <[email protected]> Co-authored-by: agnesgaroux <[email protected]>
1 parent 430e7a9 commit fd0ccff

22 files changed

+814
-83
lines changed

.github/workflows/catalogue-graph-ci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ jobs:
5555
catalogue-graph-ingestor-trigger-monitor,
5656
catalogue-graph-ingestor-indexer,
5757
catalogue-graph-ingestor-loader,
58+
catalogue-graph-ingestor-loader-monitor,
5859
]
5960
steps:
6061
- uses: actions/checkout@v3

catalogue_graph/scripts/build.sh

+17-4
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,23 @@ S3_PREFIX="lambdas/catalogue_graph"
1818
ZIP_TARGET="${ROOT}/target/build.zip"
1919
TAG_DEFAULT="dev"
2020
PUSH=false
21+
SKIP_CONTAINER_BUILD=false
2122

2223
# parse command line arguments
2324
while [[ $# -gt 0 ]]; do
2425
case "$1" in
2526
-t|--tag)
2627
TAG=${2:-$TAG_DEFAULT}
2728
echo "Using tag: $TAG"
28-
shift
29+
shift
2930
;;
3031
-p|--push)
3132
PUSH=true
3233
echo "Will push build artifacts to AWS"
33-
shift
34+
;;
35+
-s|--skip-container-build)
36+
SKIP_CONTAINER_BUILD=true
37+
echo "Will skip building the container"
3438
;;
3539
*)
3640
echo "Unknown option: $1"
@@ -46,6 +50,10 @@ cd "$ROOT"
4650

4751
function build_zip() {( set -e
4852
local ZIP_TARGET=$1
53+
# Ensure the target directory is clean
54+
rm -rf target/tmp
55+
rm -f $ZIP_TARGET
56+
4957
mkdir -p target/tmp
5058

5159
cp -r src/* target/tmp
@@ -83,9 +91,14 @@ function docker_compose {( set -e
8391
)}
8492

8593
build_zip "$ZIP_TARGET"
86-
docker_compose "build" "extractor"
94+
95+
if [ "$SKIP_CONTAINER_BUILD" == false ]; then
96+
docker_compose "build" "extractor"
97+
fi
8798

8899
if [ "$PUSH" == true ]; then
89100
upload_zip "$ZIP_TARGET"
90-
docker_compose "push" "extractor"
101+
if [ "$SKIP_CONTAINER_BUILD" == false ]; then
102+
docker_compose "push" "extractor"
103+
fi
91104
fi
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env bash
2+
3+
# Convenience script to build and deploy all lambdas
4+
5+
set -o errexit
6+
set -o nounset
7+
8+
# set ROOT to the root of the project
9+
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
10+
ROOT+="$(dirname "$DIR")"
11+
12+
LAMBDAS=(
13+
"catalogue-graph-ingestor-loader"
14+
"catalogue-graph-ingestor-loader-monitor"
15+
"catalogue-graph-ingestor-trigger-monitor"
16+
"catalogue-graph-ingestor-indexer"
17+
"catalogue-graph-ingestor-trigger"
18+
)
19+
20+
export AWS_PROFILE=platform-developer
21+
22+
pushd $ROOT/..
23+
24+
$ROOT/scripts/build.sh --push --skip-container-build
25+
26+
for LAMBDA in "${LAMBDAS[@]}"; do
27+
echo "Deploying $LAMBDA"
28+
./catalogue_graph/scripts/deploy_lambda_zip.sh $LAMBDA
29+
done

catalogue_graph/scripts/deploy_lambda_zip.sh

-1
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,4 @@ echo "Revision id: $REVISION_ID"
2929
echo "Awaiting function update"
3030
aws lambda wait function-updated \
3131
--function-name $LAMBDA_NAME
32-
3332
echo "Done"

catalogue_graph/src/ingestor_indexer.py

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class IngestorIndexerObject(BaseModel):
2525

2626
class IngestorIndexerLambdaEvent(BaseModel):
2727
pipeline_date: str | None = INGESTOR_PIPELINE_DATE
28+
job_id: str | None = None
2829
object_to_index: IngestorIndexerObject
2930

3031

catalogue_graph/src/ingestor_loader.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def handler(
9393
filename = (
9494
f"{str(event.start_offset).zfill(8)}-{str(event.end_index).zfill(8)}.parquet"
9595
)
96-
s3_object_key = f"{event.pipeline_date}/{event.job_id}/{filename}"
96+
s3_object_key = f"{event.pipeline_date or 'dev'}/{event.job_id}/{filename}"
9797
s3_uri = f"s3://{config.loader_s3_bucket}/{config.loader_s3_prefix}/{s3_object_key}"
9898

9999
extracted_data = extract_data(
@@ -108,6 +108,7 @@ def handler(
108108

109109
return IngestorIndexerLambdaEvent(
110110
pipeline_date=event.pipeline_date,
111+
job_id=event.job_id,
111112
object_to_index=result,
112113
)
113114

@@ -148,14 +149,15 @@ def local_handler() -> None:
148149
required=False,
149150
default="dev",
150151
)
152+
151153
args = parser.parse_args()
152154

153155
event = IngestorLoaderLambdaEvent(**args.__dict__)
154156
config = IngestorLoaderConfig(is_local=True)
155157

156158
result = handler(event, config)
157159

158-
pprint.pprint(result)
160+
pprint.pprint(result.model_dump())
159161

160162

161163
if __name__ == "__main__":
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import boto3
2+
import smart_open
3+
from pydantic import BaseModel, typing
4+
5+
from clients.metric_reporter import MetricReporter
6+
from config import INGESTOR_S3_BUCKET, INGESTOR_S3_PREFIX
7+
from ingestor_indexer import IngestorIndexerLambdaEvent
8+
from models.step_events import IngestorMonitorStepEvent
9+
10+
11+
class IngestorLoaderMonitorLambdaEvent(IngestorMonitorStepEvent):
12+
events: list[IngestorIndexerLambdaEvent]
13+
14+
15+
class IngestorLoaderMonitorConfig(IngestorMonitorStepEvent):
16+
loader_s3_bucket: str = INGESTOR_S3_BUCKET
17+
loader_s3_prefix: str = INGESTOR_S3_PREFIX
18+
percentage_threshold: float = 0.1
19+
20+
is_local: bool = False
21+
22+
23+
class LoaderReport(BaseModel):
24+
pipeline_date: str
25+
job_id: str
26+
record_count: int
27+
total_file_size: int
28+
29+
30+
def run_check(
31+
event: IngestorLoaderMonitorLambdaEvent, config: IngestorLoaderMonitorConfig
32+
) -> LoaderReport:
33+
pipeline_date = event.events[0].pipeline_date or "dev"
34+
assert all([(e.pipeline_date or "dev") == pipeline_date for e in event.events]), (
35+
"pipeline_date mismatch! Stopping."
36+
)
37+
job_id = event.events[0].job_id
38+
assert all([e.job_id == job_id for e in event.events]), "job_id mismatch! Stopping."
39+
force_pass = config.force_pass or event.force_pass
40+
41+
print(
42+
f"Checking loader events for pipeline_date: {pipeline_date}:{job_id}, force_pass: {force_pass} ..."
43+
)
44+
45+
# assert there are no empty content lengths
46+
assert all([e.object_to_index.content_length for e in event.events]), (
47+
"Empty content length found! Stopping."
48+
)
49+
sum_file_size = sum([(e.object_to_index.content_length or 0) for e in event.events])
50+
51+
# assert there are no empty record counts
52+
assert all([e.object_to_index.record_count for e in event.events]), (
53+
"Empty record count found! Stopping."
54+
)
55+
sum_record_count = sum(
56+
[(e.object_to_index.record_count or 0) for e in event.events]
57+
)
58+
59+
current_report = LoaderReport(
60+
pipeline_date=pipeline_date,
61+
job_id=job_id or "dev",
62+
record_count=sum_record_count,
63+
total_file_size=sum_file_size,
64+
)
65+
66+
s3_report_name = "report.loader.json"
67+
s3_url_current_job = f"s3://{config.loader_s3_bucket}/{config.loader_s3_prefix}/{pipeline_date}/{job_id}/{s3_report_name}"
68+
s3_url_latest = f"s3://{config.loader_s3_bucket}/{config.loader_s3_prefix}/{pipeline_date}/{s3_report_name}"
69+
70+
# open with smart_open, check for file existence
71+
latest_report = None
72+
try:
73+
with smart_open.open(s3_url_latest, "r") as f:
74+
latest_report = LoaderReport.model_validate_json(f.read())
75+
76+
# if file does not exist, ignore
77+
except (OSError, KeyError) as e:
78+
print(f"No latest report found: {e}")
79+
80+
if latest_report is not None:
81+
# check if the sum file size has changed by more than the threshold,
82+
# we are ignoring the record count for now, as this will be the same as the trigger step
83+
delta = current_report.total_file_size - latest_report.total_file_size
84+
percentage = abs(delta) / latest_report.total_file_size
85+
86+
if percentage > config.percentage_threshold:
87+
error_message = f"Percentage change {percentage} exceeds threshold {config.percentage_threshold}!"
88+
if force_pass:
89+
print(f"Force pass enabled: {error_message}, but continuing.")
90+
else:
91+
raise ValueError(error_message)
92+
else:
93+
print(
94+
f"Percentage change {percentage} ({delta}/{latest_report.total_file_size}) is within threshold {config.percentage_threshold}."
95+
)
96+
97+
transport_params = {"client": boto3.client("s3")}
98+
99+
# write the current report to s3 as latest
100+
with smart_open.open(s3_url_latest, "w", transport_params=transport_params) as f:
101+
f.write(current_report.model_dump_json())
102+
103+
# write the current report to s3 as job_id
104+
with smart_open.open(
105+
s3_url_current_job, "w", transport_params=transport_params
106+
) as f:
107+
f.write(current_report.model_dump_json())
108+
109+
return current_report
110+
111+
112+
def report_results(
113+
report: LoaderReport,
114+
send_report: bool,
115+
) -> None:
116+
dimensions = {
117+
"pipeline_date": report.pipeline_date,
118+
"step": "ingestor_loader_monitor",
119+
"job_id": report.job_id,
120+
}
121+
122+
print(f"Reporting results {report}, {dimensions} ...")
123+
if send_report:
124+
reporter = MetricReporter("catalogue_graph_ingestor")
125+
reporter.put_metric_data(
126+
metric_name="total_file_size",
127+
value=report.total_file_size,
128+
dimensions=dimensions,
129+
)
130+
else:
131+
print("Skipping sending report metrics.")
132+
133+
return
134+
135+
136+
def handler(
137+
event: IngestorLoaderMonitorLambdaEvent, config: IngestorLoaderMonitorConfig
138+
) -> None:
139+
print("Checking output of ingestor_loader ...")
140+
send_report = event.report_results or config.report_results
141+
142+
try:
143+
report = run_check(event, config)
144+
report_results(report, send_report)
145+
except ValueError as e:
146+
print(f"Check failed: {e}")
147+
raise e
148+
149+
print("Check complete.")
150+
return
151+
152+
153+
def lambda_handler(
154+
event: list[IngestorIndexerLambdaEvent] | IngestorLoaderMonitorLambdaEvent,
155+
context: typing.Any,
156+
) -> list[dict]:
157+
handler_event = None
158+
if isinstance(event, list):
159+
handler_event = IngestorLoaderMonitorLambdaEvent(events=event)
160+
else:
161+
handler_event = event
162+
163+
handler(
164+
event=handler_event,
165+
config=IngestorLoaderMonitorConfig(),
166+
)
167+
168+
return [e.model_dump() for e in handler_event.events]

catalogue_graph/src/ingestor_local.py

+24-8
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@
88
from ingestor_indexer import (
99
handler as indexer_handler,
1010
)
11-
from ingestor_loader import (
12-
IngestorLoaderConfig,
11+
from ingestor_loader import IngestorLoaderConfig
12+
from ingestor_loader import handler as loader_handler
13+
from ingestor_loader_monitor import (
14+
IngestorLoaderMonitorConfig,
15+
IngestorLoaderMonitorLambdaEvent,
1316
)
14-
from ingestor_loader import (
15-
handler as loader_handler,
17+
from ingestor_loader_monitor import (
18+
handler as loader_monitor_handler,
1619
)
1720
from ingestor_trigger import (
1821
IngestorTriggerConfig,
@@ -25,7 +28,7 @@
2528
IngestorTriggerMonitorConfig,
2629
)
2730
from ingestor_trigger_monitor import (
28-
handler as monitor_handler,
31+
handler as trigger_monitor_handler,
2932
)
3033

3134

@@ -41,7 +44,7 @@ def main() -> None:
4144
parser.add_argument(
4245
"--pipeline-date",
4346
type=str,
44-
help="The date to use for the pipeline, required.",
47+
help='The pipeline that is being ingested to, will default to "None".',
4548
required=False,
4649
)
4750
parser.add_argument(
@@ -63,7 +66,12 @@ def main() -> None:
6366

6467
args = parser.parse_args()
6568

66-
trigger_event = IngestorTriggerLambdaEvent(**args.__dict__)
69+
trigger_event = IngestorTriggerLambdaEvent(
70+
job_id=args.job_id,
71+
pipeline_date=args.pipeline_date,
72+
)
73+
print(f"Processing pipeline for {trigger_event.pipeline_date}.")
74+
6775
config = IngestorTriggerConfig(is_local=True)
6876
trigger_result = trigger_handler(trigger_event, config)
6977

@@ -75,11 +83,19 @@ def main() -> None:
7583
trigger_monitor_config = IngestorTriggerMonitorConfig(
7684
is_local=True, force_pass=bool(args.force_pass)
7785
)
78-
monitor_handler(trigger_result, trigger_monitor_config)
86+
trigger_monitor_handler(trigger_result, trigger_monitor_config)
7987

8088
loader_config = IngestorLoaderConfig(is_local=True)
8189
loader_results = [loader_handler(e, loader_config) for e in trigger_result_events]
8290

91+
if args.monitoring:
92+
loader_monitor_config = IngestorLoaderMonitorConfig(is_local=True)
93+
loader_monitor_event = IngestorLoaderMonitorLambdaEvent(
94+
events=loader_results,
95+
force_pass=bool(args.force_pass),
96+
)
97+
loader_monitor_handler(loader_monitor_event, loader_monitor_config)
98+
8399
indexer_config = IngestorIndexerConfig(is_local=True)
84100
success_counts = [indexer_handler(e, indexer_config) for e in loader_results]
85101

0 commit comments

Comments
 (0)