-
Notifications
You must be signed in to change notification settings - Fork 1
Update the dlq reposting tool to not require zocalo #493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
751a462
Update the dlq reposting tool to not require zocalo
stephen-riggs 9e35e3a
Simplification is possible as don't need to store messages
stephen-riggs 8d9978d
Rename the tool
stephen-riggs e1984a2
Should add some tests
stephen-riggs a758da6
Try setting env in conftest
stephen-riggs c9d3bab
Better security config reading
stephen-riggs b4eab67
Small cleanups
stephen-riggs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,163 @@ | ||
| import argparse | ||
| import json | ||
| from datetime import datetime | ||
| from functools import partial | ||
| from pathlib import Path | ||
| from queue import Empty, Queue | ||
|
|
||
| import requests | ||
| from jose import jwt | ||
| from workflows.transport.pika_transport import PikaTransport | ||
|
|
||
| from murfey.util.config import security_from_file | ||
|
|
||
|
|
||
| def dlq_purge( | ||
| dlq_dump_path: Path, queue: str, rabbitmq_credentials: Path | ||
| ) -> list[Path]: | ||
| transport = PikaTransport() | ||
| transport.load_configuration_file(rabbitmq_credentials) | ||
| transport.connect() | ||
|
|
||
| queue_to_purge = f"dlq.{queue}" | ||
| idlequeue: Queue = Queue() | ||
| exported_messages = [] | ||
|
|
||
| def receive_dlq_message(header: dict, message: dict) -> None: | ||
| idlequeue.put_nowait("start") | ||
| header["x-death"][0]["time"] = datetime.timestamp(header["x-death"][0]["time"]) | ||
| filename = dlq_dump_path / f"{queue}-{header['message-id']}" | ||
| dlqmsg = {"header": header, "message": message} | ||
| with filename.open("w") as fh: | ||
| json.dump(dlqmsg, fh, indent=2, sort_keys=True) | ||
| print(f"Message {header['message-id']} exported to {filename}") | ||
| exported_messages.append(filename) | ||
| transport.ack(header) | ||
| idlequeue.put_nowait("done") | ||
|
|
||
| print("Looking for DLQ messages in " + queue_to_purge) | ||
| transport.subscribe( | ||
| queue_to_purge, | ||
| partial(receive_dlq_message), | ||
| acknowledgement=True, | ||
| ) | ||
| try: | ||
| while True: | ||
| idlequeue.get(True, 0.1) | ||
| except Empty: | ||
| print("Done dlq purge") | ||
| transport.disconnect() | ||
| return exported_messages | ||
|
|
||
|
|
||
| def handle_dlq_messages(messages_path: list[Path], rabbitmq_credentials: Path): | ||
| transport = PikaTransport() | ||
| transport.load_configuration_file(rabbitmq_credentials) | ||
| transport.connect() | ||
|
|
||
| for f, dlqfile in enumerate(messages_path): | ||
| if not dlqfile.is_file(): | ||
| continue | ||
| with open(dlqfile) as fh: | ||
| dlqmsg = json.load(fh) | ||
| header = dlqmsg["header"] | ||
| header["dlq-reinjected"] = "True" | ||
|
|
||
| drop_keys = { | ||
| "message-id", | ||
| "routing_key", | ||
| "redelivered", | ||
| "exchange", | ||
| "consumer_tag", | ||
| "delivery_mode", | ||
| } | ||
| clean_header = {k: str(v) for k, v in header.items() if k not in drop_keys} | ||
|
|
||
| destination = header.get("x-death", [{}])[0].get("queue") | ||
| transport.send( | ||
| destination, | ||
| dlqmsg["message"], | ||
| headers=clean_header, | ||
| ) | ||
| dlqfile.unlink() | ||
| print(f"Reinjected {dlqfile}\n") | ||
|
|
||
| transport.disconnect() | ||
|
|
||
|
|
||
| def handle_failed_posts(messages_path: list[Path], token: str): | ||
| """Deal with any messages that have been sent as failed client posts""" | ||
| for json_file in messages_path: | ||
| with open(json_file, "r") as json_data: | ||
| message = json.load(json_data) | ||
|
|
||
| if not message.get("message") or not message["message"].get("url"): | ||
| print(f"{json_file} is not a failed client post") | ||
| continue | ||
| dest = message["message"]["url"] | ||
| message_json = message["message"]["json"] | ||
|
|
||
| response = requests.post( | ||
| dest, json=message_json, headers={"Authorization": f"Bearer {token}"} | ||
| ) | ||
| if response.status_code != 200: | ||
| print(f"Failed to repost {json_file}") | ||
| else: | ||
| print(f"Reposted {json_file}") | ||
| json_file.unlink() | ||
|
|
||
|
|
||
| def run(): | ||
| """ | ||
| Method of checking and purging murfey queues on rabbitmq | ||
| Two types of messages are possible: | ||
| - failed client posts which need reposting to the murfey server API | ||
| - feedback messages that can be sent back to rabbitmq | ||
| """ | ||
| parser = argparse.ArgumentParser( | ||
| description="Purge and reinject failed murfey messages" | ||
| ) | ||
| parser.add_argument( | ||
| "-c", | ||
| "--config", | ||
| help="Security config file", | ||
| required=True, | ||
| ) | ||
| parser.add_argument( | ||
| "-u", | ||
| "--username", | ||
| help="Token username", | ||
| required=True, | ||
| ) | ||
| parser.add_argument( | ||
| "-d", "--dir", default="DLQ", help="Directory to export messages to" | ||
| ) | ||
| args = parser.parse_args() | ||
|
|
||
| # Read the security config file | ||
| security_config = security_from_file(args.config) | ||
|
|
||
| # Get the token to post to the api with | ||
| token = jwt.encode( | ||
| {"user": args.username}, | ||
| security_config.auth_key, | ||
| algorithm=security_config.auth_algorithm, | ||
| ) | ||
|
|
||
| # Purge the queue and repost/reinject any messages found | ||
| dlq_dump_path = Path(args.dir) | ||
| dlq_dump_path.mkdir(parents=True, exist_ok=True) | ||
| exported_messages = dlq_purge( | ||
| dlq_dump_path, | ||
| security_config.feedback_queue, | ||
| security_config.rabbitmq_credentials, | ||
| ) | ||
| handle_failed_posts(exported_messages, token) | ||
| handle_dlq_messages(exported_messages, security_config.rabbitmq_credentials) | ||
|
|
||
| # Clean up any created directories | ||
| try: | ||
| dlq_dump_path.rmdir() | ||
| except OSError: | ||
| print(f"Cannot remove {dlq_dump_path} as it is not empty") | ||
| print("Done") | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm understanding the function correctly, you're deleting the DLQ message after resubmission. Is it worth implementing flags such as in the cryoemservices implementation of this function to control whether to keep or delete DLQ messages upon resubmission?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main reason we need the tool in murfey is to handle failed API posts.
I've deliberately kept the tool simple, and with limited functionality for safety.
So it will only repost what it purges, and cannot be used to save or modify messages, or post other things to the API.
We could implement more flags, but it would just be duplicating functionality that can be got through the cryoemservices tool.