Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cleanup task #37

Merged
merged 4 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions invenio_swh/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from invenio_db import db

from invenio_swh.models import SWHDepositModel
from invenio_swh.models import SWHDepositModel, SWHDepositStatus


class SWHDeposit:
Expand Down Expand Up @@ -46,14 +46,7 @@ def get(cls, id_):
return cls(deposit)

@classmethod
def get_by_status(cls, status):
"""Get a swh deposit by status."""
with db.session.no_autoflush:
query = cls.model_cls.query.filter_by(status=status)
return [cls(deposit) for deposit in query.all()]

@classmethod
def get_record_deposit(cls, record_id):
def get_by_record_id(cls, record_id):
"""Get a local swh deposit by record id."""
with db.session.no_autoflush:
deposit = cls.model_cls.query.filter_by(object_uuid=record_id).one_or_none()
Expand All @@ -69,16 +62,38 @@ def id(self):
"""Returns the remote id of the swh deposit."""
return self.model.swh_deposit_id

@id.setter
def id(self, value):
"""Set the remote id of the swh deposit."""
self.model.swh_deposit_id = value

@property
def swhid(self):
"""Returns the software hash id of the swh deposit."""
return self.model.swhid

@swhid.setter
def swhid(self, value):
"""Set the software hash id of the swh deposit."""
self.model.swhid = value

@property
def status(self):
"""Returns the status of the swh deposit."""
return self.model.status

@status.setter
def status(self, value):
"""Set the status of the swh deposit."""
if isinstance(value, str) and value in [x.item for x in SWHDepositStatus]:
self.model.status = SWHDepositStatus(value)
elif isinstance(value, SWHDepositStatus):
self.model.status = value
else:
raise ValueError(
f"Invalid status value for Software Heritage deposit. Got: {value}"
)

def commit(self):
"""Commit the deposit to the database."""
if self.model is None:
Expand Down
2 changes: 1 addition & 1 deletion invenio_swh/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class SWHDepositModel(db.Model, Timestamp):
swhid = db.Column(db.String(1024), nullable=True)
"""Software Hash ID."""

swh_deposit_id = db.Column(db.String, nullable=True)
swh_deposit_id = db.Column(db.String, nullable=True) # TODO add an index
"""Software Heritage deposit id."""

status = db.Column(
Expand Down
2 changes: 1 addition & 1 deletion invenio_swh/serializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class SoftwareHeritageXMLSerializer(MarshmallowSerializer):
default_namespaces = {
"default": "https://doi.org/10.5063/SCHEMA/CODEMETA-2.0",
"atom": "http://www.w3.org/2005/Atom",
"swh": "https://www.softwareheritage.org/schema/2018/deposit"
"swh": "https://www.softwareheritage.org/schema/2018/deposit",
}

def __init__(self, namespaces=None, **kwargs):
Expand Down
90 changes: 42 additions & 48 deletions invenio_swh/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@
# it under the terms of the MIT License; see LICENSE file for more details.
"""Invenio Software Heritage service."""

from collections.abc import Iterable

from flask import current_app
from invenio_records_resources.services.uow import RecordCommitOp, unit_of_work

from invenio_swh.api import SWHDeposit
from invenio_swh.controller import SWHController
from invenio_swh.errors import DepositFailed, InvalidRecord
from invenio_swh.errors import DepositFailed, DepositNotCreated, InvalidRecord
from invenio_swh.models import SWHDepositStatus
from invenio_swh.schema import SWHCodemetaSchema

Expand Down Expand Up @@ -60,35 +58,37 @@ def result_item(self, deposit: SWHDeposit):
"""Return a result item."""
return self.result_cls(deposit)

def __init__(self, swh_controller: SWHController):
def __init__(self, controller: SWHController):
"""Instantiate the service.

Injects the software heritage controller into the service.
"""
self.swh_controller = swh_controller
self.controller = controller

@unit_of_work()
def create(self, record, uow=None):
"""Create a new deposit.

If the controller fails to create the deposit, the transaction will be rolledback by the Unit of Work
and the deposit won't be created locally.
If the controller fails to create the deposit, it won't be created locally either.
"""
self.validate_record(record)

deposit = self.record_cls.create(record.id)

metadata = self.schema.dump(record)
swh_deposit = self.swh_controller.create_deposit(metadata)
deposit.model.swh_deposit_id = str(swh_deposit["deposit_id"])
deposit.model.status = SWHDepositStatus.CREATED
swh_deposit = self.controller.create_deposit(metadata)
deposit_id = swh_deposit.get("deposit_id")
if not deposit_id:
raise DepositNotCreated("Deposit id not returned by SWH.")

deposit = self.record_cls.create(record.id)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved deposit.create after the deposition is created on remote, otherwise we could end up with a deposition locally that doesn't exist in SWH.

deposit.id = str(deposit_id)
self.update_status(deposit, SWHDepositStatus.CREATED, uow=uow)

uow.register(RecordCommitOp(deposit))
return deposit

def get_record_deposit(self, record_id):
"""Return the deposit associated to a given record."""
deposit = self.record_cls.get_record_deposit(record_id)
deposit = self.record_cls.get_by_record_id(record_id)
return self.result_item(deposit)

def read(self, id_) -> SWHDepositResult:
Expand All @@ -103,14 +103,9 @@ def sync_status(self, id_, uow=None):
deposit = deposit_res.deposit
if not deposit:
return
if deposit.status == SWHDepositStatus.FAILED:
Copy link
Member Author

@alejandromumo alejandromumo Jul 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the check from sync_status, otherwise we can't "recover" a deposition. E.g.:

  • We create a deposit and start polling its status
  • It reaches the max retries for polling, we mark it as FAILED
  • Next day we want to sync the deposition status because it might have worked in SWH

raise DepositFailed("Deposit has already failed. Cannot sync status.")
res = self.swh_controller.fetch_deposit_status(deposit.id)
res = self.controller.fetch_deposit_status(deposit.id)
new_status = res.get("deposit_status")
if new_status in ("failed", "rejected", "expired"):
current_app.logger.warning("Deposit failed")
current_app.logger.warning(str(res))
self.handle_status_update(deposit, new_status)
self.update_status(deposit, new_status)

# Handle swhid created
swhid = res.get("deposit_swhid")
Expand All @@ -130,17 +125,18 @@ def complete(self, id_: int, uow=None):
:rtype: Deposit
:raises DepositFailed: If the deposit has already failed.
"""
deposit_res = self.read(id_)
deposit = deposit_res.deposit
if deposit.status == SWHDepositStatus.FAILED:
raise DepositFailed(
"Deposit has already failed. Cannot complete deposition."
)
try:
deposit_res = self.read(id_)
deposit = deposit_res.deposit
if deposit.status == SWHDepositStatus.FAILED:
raise DepositFailed("Deposit has already failed. Cannot complete deposition.")
self.swh_controller.complete_deposit(deposit.id)
deposit.model.status = SWHDepositStatus.WAITING
self.controller.complete_deposit(deposit.id)
self.update_status(deposit, SWHDepositStatus.WAITING, uow=uow)
except Exception as exc:
current_app.logger.exception(str(exc))
deposit.model.status = SWHDepositStatus.FAILED
uow.register(RecordCommitOp(deposit))
current_app.logger.exception("Deposit completion failed.")
self.update_status(deposit, SWHDepositStatus.FAILED, uow=uow)
return deposit

@unit_of_work()
Expand All @@ -161,19 +157,18 @@ def upload_files(self, id_, files, uow=None):
:return: The updated deposit.
:rtype: object
"""
deposit_res = self.read(id_)
deposit = deposit_res.deposit
try:
deposit_res = self.read(id_)
deposit = deposit_res.deposit
self.validate_files(files)
file = self._get_first_file(files)
fp = file.get_stream("rb")
file_metadata = file.file.dumps()
file_metadata["filename"] = file.file.key
self.swh_controller.update_deposit_files(deposit.id, fp, file_metadata)
self.controller.update_deposit_files(deposit.id, fp, file_metadata)
except Exception as exc:
current_app.logger.exception(str(exc))
deposit.model.status = SWHDepositStatus.FAILED
uow.register(RecordCommitOp(deposit))
self.update_status(deposit, SWHDepositStatus.FAILED, uow=uow)
return deposit

@unit_of_work()
Expand All @@ -191,16 +186,16 @@ def update_swhid(self, id_: int, swhid: str, uow=None) -> None:
:return: The updated deposit.
:rtype: object
"""
deposit_res = self.read(id_)
deposit = deposit_res.deposit
try:
deposit_res = self.read(id_)
deposit = deposit_res.deposit
deposit.model.swhid = swhid
deposit.model.status = SWHDepositStatus.SUCCESS
deposit.swhid = swhid
self.update_status(deposit, SWHDepositStatus.SUCCESS, uow=uow)
uow.register(RecordCommitOp(deposit))
except Exception as exc:
current_app.logger.exception(str(exc))
deposit.model.status = SWHDepositStatus.FAILED
self.update_status(deposit, SWHDepositStatus.FAILED, uow=uow)

uow.register(RecordCommitOp(deposit))
return deposit

def _parse_status(self, status):
Expand All @@ -216,9 +211,12 @@ def _parse_status(self, status):
raise ValueError(f"Invalid status: {status}")

@unit_of_work()
def handle_status_update(self, deposit: SWHDeposit, status, uow=None):
def update_status(self, deposit: SWHDeposit, status, uow=None):
"""Handle a status update of the deposit.

It can be used to update the status from the remote, by parsing the status to an internal status.
It can also be used to update the status to a new one.

:param deposit: The deposit to be updated.
:type deposit: SWHDeposit
:param status: The new status of the deposit.
Expand All @@ -227,13 +225,9 @@ def handle_status_update(self, deposit: SWHDeposit, status, uow=None):

"""
internal_status = self._parse_status(status)
if not internal_status:
current_app.logger.warning(
f"Got unkwnown deposit status from remote: {status}"
)
return
if deposit.model.status != internal_status:
deposit.model.status = internal_status
# Update the status if it has changed
if deposit.status != internal_status:
deposit.status = internal_status
uow.register(RecordCommitOp(deposit))

def _get_first_file(self, files_manager):
Expand Down
58 changes: 48 additions & 10 deletions invenio_swh/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
# Invenio-swh is free software; you can redistribute it and/or modify
# it under the terms of the MIT License; see LICENSE file for more details.
"""Celery tasks for Invenio / Software Heritage integration."""
from datetime import datetime, timedelta

from celery.app import shared_task
from flask import current_app
from invenio_access.permissions import system_identity
from invenio_rdm_records.proxies import current_rdm_records_service as record_service
from invenio_records_resources.services.uow import UnitOfWork

from invenio_swh.errors import DepositFailed, DepositWaiting, InvalidRecord
from invenio_swh.errors import DepositWaiting, InvalidRecord
from invenio_swh.models import SWHDepositStatus
from invenio_swh.proxies import current_swh_service as service

Expand All @@ -34,10 +36,9 @@ def process_published_record(pid):
pid (str): The record ID.

"""
record = record_service.read(system_identity, id_=pid)

try:
# Create the deposit in a separate transaction to store the deposit ID and possible failed status
record = record_service.read(system_identity, id_=pid)
# Create the deposit in a separate transaction. If it fails, no deposit is created locally.
deposit = service.create(record._record)
except Exception as exc:
# If it fails, the deposit was rolled back. We can create it later if the record is valid.
Expand All @@ -52,8 +53,9 @@ def process_published_record(pid):
service.complete(deposit.id, uow=uow)
uow.commit()
except Exception as exc:
# Don't retry the task if failed.
current_app.logger.exception("Failed to complete deposit archival.")
raise
return
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-raising would retry this task, and the task would try to create the deposit again. We don't want that


poll_deposit.delay(str(deposit.id))

Expand Down Expand Up @@ -83,16 +85,52 @@ def poll_deposit(self, id_):
try:
deposit = service.read(id_).deposit
service.sync_status(deposit.id)
new_status = deposit.status
except DepositFailed:
# If the deposit already failed, we don't need to retry.
except Exception:
# Gracefully fail, the deposit can still be retried
pass

# If the deposit failed already, don't do anything else
if deposit.status == SWHDepositStatus.FAILED:
return

# Manually set status to FAILED on last retry.
# Celery has a bug where it doesn't raise MaxRetriesExceededError, therefore we need to check retries manually.
if self.request.retries == 5:
service.handle_status_update(deposit, SWHDepositStatus.FAILED)
service.update_status(deposit, SWHDepositStatus.FAILED)
return

if new_status == SWHDepositStatus.WAITING:
if deposit.status == SWHDepositStatus.WAITING:
raise DepositWaiting("Deposit is still waiting")


@shared_task()
def cleanup_depositions():
"""Cleanup old depositions."""
if not current_app.config.get("SWH_ENABLED"):
current_app.logger.warning(
"Sofware Heritage interation is not enabled, cleanup task can't run."
)
return
Deposit = service.record_cls
DepositModel = Deposit.model_cls
# query for records that are stuck in "waiting"
res = DepositModel.query.filter(
DepositModel.status == SWHDepositStatus.WAITING,
DepositModel.updated < datetime.now() - timedelta(days=1),
)

for _deposit in res:
# Wrap the deposit in its API object
deposit = service.record_cls.create(_deposit.object_uuid)
try:
service.sync_status(deposit.id)
except Exception as ex:
# If the sync failed for any reason, set the status to "FAILED"
try:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugly nested try / catch, but service.update_status can also fail. In that case, we want to continue trying other deposits

service.update_status(deposit, SWHDepositStatus.FAILED)
except Exception as exc:
current_app.logger.exception(
"Failed to sync deposit status during cleanup.",
extra={"deposit": deposit.id},
)
pass # Gracefully handle update failure, the deposit will be retried in the future