Skip to content

Conversation

@taoerman
Copy link
Member

@taoerman taoerman commented Nov 6, 2025

Summary

Implemented a license audit system for community channel submissions: added the AuditedSpecialPermissionsLicense model to store special permission license descriptions for admin review;

Created a Celery task audit_channel_licenses_task that identifies invalid licenses (e.g., "All Rights Reserved") and extracts special permissions license descriptions from published content;

Added a POST endpoint /api/channel/{id}/audit_licenses/ to trigger audits (requires editor/admin access)

Exposed the published_data field in channel API responses to include audit results.

All functionality is covered by tests, and all tests are passing.

References

Fixed #5448

Reviewer guidance

run unit tests

Copy link
Member

@AlexVelezLl AlexVelezLl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few thoughts! I think the most important one is to use the channel content database for getting the included_licenses too!

Comment on lines 182 to 395
def _validate_audit_request(channel_id, user_id):
"""Validate user and channel for audit request."""
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
logger.error(f"User with id {user_id} does not exist")
return None, None

try:
channel = Channel.objects.get(pk=channel_id)
except Channel.DoesNotExist:
logger.error(f"Channel with id {channel_id} does not exist")
return None, None

if not channel.editors.filter(pk=user_id).exists() and not user.is_admin:
logger.error(
f"User {user_id} is not an editor of channel {channel_id} and is not an admin"
)
return None, None

if not channel.main_tree.published:
logger.error(f"Channel {channel_id} is not published")
return None, None

return user, channel


def _calculate_included_licenses(channel, published_data_version, channel_version):
"""Calculate and cache included_licenses if not already present."""
included_licenses = published_data_version.get("included_licenses")
if not included_licenses:
published_nodes = (
channel.main_tree.get_descendants()
.filter(published=True)
.exclude(kind_id=content_kinds.TOPIC)
)
license_ids = list(
published_nodes.exclude(license=None)
.values_list("license", flat=True)
.distinct()
)
included_licenses = sorted(set(license_ids))
published_data_version["included_licenses"] = included_licenses
logger.info(
f"Calculated included_licenses for channel {channel.id} version {channel_version}: {included_licenses}"
)
return included_licenses


def _check_invalid_licenses(included_licenses):
"""Check for invalid licenses (All Rights Reserved)."""
invalid_license_ids = []
try:
all_rights_reserved_license = License.objects.get(
license_name="All Rights Reserved"
)
if all_rights_reserved_license.id in included_licenses:
invalid_license_ids = [all_rights_reserved_license.id]
except License.DoesNotExist:
logger.warning("License 'All Rights Reserved' not found in database")
except License.MultipleObjectsReturned:
logger.warning("Multiple 'All Rights Reserved' licenses found, using first one")
all_rights_reserved_license = License.objects.filter(
license_name="All Rights Reserved"
).first()
if (
all_rights_reserved_license
and all_rights_reserved_license.id in included_licenses
):
invalid_license_ids = [all_rights_reserved_license.id]
return invalid_license_ids


def _process_special_permissions_licenses(channel_id, included_licenses):
"""Process special permissions licenses and return audited license IDs."""
try:
special_permissions_license = License.objects.get(
license_name="Special Permissions"
)
except License.DoesNotExist:
logger.warning("License 'Special Permissions' not found in database")
return []
except License.MultipleObjectsReturned:
logger.warning("Multiple 'Special Permissions' licenses found, using first one")
special_permissions_license = License.objects.filter(
license_name="Special Permissions"
).first()

if (
not special_permissions_license
or special_permissions_license.id not in included_licenses
):
return []

unversioned_db_filename = f"{channel_id}.sqlite3"
unversioned_db_storage_path = os.path.join(
settings.DB_ROOT, unversioned_db_filename
)

if not storage.exists(unversioned_db_storage_path):
logger.error(
f"Unversioned database not found at {unversioned_db_storage_path} for channel {channel_id}"
)
return None

with using_temp_migrated_content_database(unversioned_db_storage_path):
special_perms_nodes = KolibriContentNode.objects.filter(
license_name="Special Permissions"
).exclude(kind=content_kinds.TOPIC)

license_descriptions = (
special_perms_nodes.exclude(license_description__isnull=True)
.exclude(license_description="")
.values_list("license_description", flat=True)
.distinct()
)

audited_license_ids = []
for description in license_descriptions:
(
audited_license,
created,
) = AuditedSpecialPermissionsLicense.objects.get_or_create(
description=description, defaults={"distributable": False}
)
audited_license_ids.append(audited_license.id)
if created:
logger.info(
f"Created new AuditedSpecialPermissionsLicense for description: {description[:100]}"
)

return audited_license_ids


def _save_audit_results(
channel,
published_data_version,
invalid_license_ids,
special_permissions_license_ids,
user_id,
):
"""Save audit results to published_data and create change event."""
published_data_version["community_library_invalid_licenses"] = (
invalid_license_ids if invalid_license_ids else None
)
published_data_version["community_library_special_permissions"] = (
special_permissions_license_ids if special_permissions_license_ids else None
)

channel.save()

Change.create_change(
generate_update_event(
channel.id,
CHANNEL,
{"published_data": channel.published_data},
channel_id=channel.id,
),
applied=True,
created_by_id=user_id,
)


@app.task(bind=True, name="audit-channel-licenses")
def audit_channel_licenses_task(self, channel_id, user_id):
"""
Audits channel licenses for community library submission.
Checks for invalid licenses (All Rights Reserved) and special permissions licenses,
and updates the channel's published_data with audit results.
:type self: contentcuration.utils.celery.tasks.CeleryTask
:param channel_id: The channel ID to audit
:param user_id: The user ID requesting the audit
"""
user, channel = _validate_audit_request(channel_id, user_id)
if not user or not channel:
return

channel_version = channel.version
version_str = str(channel_version)

if version_str not in channel.published_data:
channel.published_data[version_str] = {}

published_data_version = channel.published_data[version_str]

included_licenses = _calculate_included_licenses(
channel, published_data_version, channel_version
)
invalid_license_ids = _check_invalid_licenses(included_licenses)
special_permissions_license_ids = _process_special_permissions_licenses(
channel_id, included_licenses
)

if special_permissions_license_ids is None:
# Database not found, save partial results
_save_audit_results(
channel, published_data_version, invalid_license_ids, None, user_id
)
return

_save_audit_results(
channel,
published_data_version,
invalid_license_ids,
special_permissions_license_ids,
user_id,
)

logger.info(
f"License audit completed for channel {channel_id} version {channel_version}. "
f"Invalid licenses: {invalid_license_ids}, "
f"Special permissions count: {len(special_permissions_license_ids) if special_permissions_license_ids else 0}"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we are introducing too much logic to this tasks module, and it may make this module harder to maintain. To prevent this, we may better consider moving all this logic to its own file, e.g. in contentcuration/contentcuration/utils/community_library_submission.py or even contentcuration/contentcuration/utils/audit_channel_licenses.py whatever option makes more sense!

And then in the tasks module, we would only do something like

@app.task(bind=True, name="audit-channel-licenses")
def audit_channel_licenses_task(self, channel_id, user_id):
  audit_channel_licenses(channel_id, user_id)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Alex, i've fixed.

Comment on lines 345 to 346
@app.task(bind=True, name="audit-channel-licenses")
def audit_channel_licenses_task(self, channel_id, user_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any benefit in binding the function to the celery task model? It seems we are not using the self argument, so we can perhaps just skip it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, Thanks!

Comment on lines 356 to 358
user, channel = _validate_audit_request(channel_id, user_id)
if not user or not channel:
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just skip this validation and let the permissions validation responsibility fall to the Viewset that will enqueue this task?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, Thanks!

invalid_license_ids = []
try:
all_rights_reserved_license = License.objects.get(
license_name="All Rights Reserved"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we can use the le_utils licenses object to avoid using magic strings:

from le_utils.constants import licenses
license_name=licenses.ALL_RIGHTS_RESERVED

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, Thanks!

Comment on lines 242 to 243
except License.MultipleObjectsReturned:
logger.warning("Multiple 'All Rights Reserved' licenses found, using first one")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can skip this case, the License model is populated during our loadconstants command using the le_utils licenses array https://github.com/taoerman/studio/blob/b168ea629714e7e5bddc0770c9fff80417210b29/contentcuration/contentcuration/management/commands/loadconstants.py#L49

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, Thanks!

Comment on lines 259 to 265
license_name="Special Permissions"
)
except License.DoesNotExist:
logger.warning("License 'Special Permissions' not found in database")
return []
except License.MultipleObjectsReturned:
logger.warning("Multiple 'Special Permissions' licenses found, using first one")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idem

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, Thanks!

Comment on lines 300 to 311
for description in license_descriptions:
(
audited_license,
created,
) = AuditedSpecialPermissionsLicense.objects.get_or_create(
description=description, defaults={"distributable": False}
)
audited_license_ids.append(audited_license.id)
if created:
logger.info(
f"Created new AuditedSpecialPermissionsLicense for description: {description[:100]}"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried that we may perform too many get_or_create operations, which means each one involves at least one database query. Another option is to use the bulk_create method and set the ignore_conflicts flag to true to prevent getting errors for duplicates.

https://docs.djangoproject.com/en/3.2/ref/models/querysets/#bulk-create

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used bulk_create instead, Thanks!

Comment on lines 209 to 228
def _calculate_included_licenses(channel, published_data_version, channel_version):
"""Calculate and cache included_licenses if not already present."""
included_licenses = published_data_version.get("included_licenses")
if not included_licenses:
published_nodes = (
channel.main_tree.get_descendants()
.filter(published=True)
.exclude(kind_id=content_kinds.TOPIC)
)
license_ids = list(
published_nodes.exclude(license=None)
.values_list("license", flat=True)
.distinct()
)
included_licenses = sorted(set(license_ids))
published_data_version["included_licenses"] = included_licenses
logger.info(
f"Calculated included_licenses for channel {channel.id} version {channel_version}: {included_licenses}"
)
return included_licenses
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To retrieve the included licenses, we should also use the unversioned database in storage, just as we do in the _process_special_permissions_licenses method. Then, to prevent us from having to download the channel database twice, we should do both operations within the same with using_temp_migrated_content_database context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, Thanks!

)
audited_license_ids.append(audited_license.id)
if created:
logger.info(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we may need to log the description strings here 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, Thanks!

return Response(
{
"task_id": async_result.task_id,
"status": "enqueued",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just return the "task_id" here. To prevent the creation of any hardcoded task status in the response of the api request.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, Thanks!

Copy link
Member

@AlexVelezLl AlexVelezLl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks super good @taoerman. Just a few things before merging. I have also manually tested it, and its working great!

Comment on lines 197 to 209
if special_permissions_license_ids is None:
save_audit_results(
channel, published_data_version, invalid_license_ids, None, user_id
)
return

save_audit_results(
channel,
published_data_version,
invalid_license_ids,
special_permissions_license_ids,
user_id,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that these call could be simplified to be

    save_audit_results(
        channel,
        published_data_version,
        invalid_license_ids,
        special_permissions_license_ids,
        user_id,
    )

always, since we are sending None if special_permissions_license_ids is None anyways.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it!

Comment on lines 32 to 35
versioned_db_path = os.path.join(
settings.DB_ROOT, f"{channel_id}-{channel_version}.sqlite3"
)
unversioned_db_path = os.path.join(settings.DB_ROOT, f"{channel_id}.sqlite3")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are already repeating the computation of these db_paths, could we create a util function called something like get_content_db_path(channel_id, version=None) that returns the db path of the versioned or unversioned channel.

And it would be nice if we can check other places where we are using this, and replace it with this new util function (e.g. here, but pretty sure there are other places)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool!! I've written a helper function get_content_db_path.

Comment on lines 62 to 65
if not db_path:
if included_licenses is None:
included_licenses = []
return included_licenses, None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the channel database does not exist, then an error should be raised because that means that this channel has missing/corrupted data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

Comment on lines 78 to 80
for license_name in license_names:
studio_license = License.objects.get(license_name=license_name)
license_ids.append(studio_license.id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we use a License.objects.filter to make just one database query.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we've been using this module in several years. I think we should leave it untouched.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed changes!

Comment on lines 1 to 5
"""
All task functions decorated with `app.task` transform the function to an instance of
`contentcuration.utils.celery.tasks.CeleryTask`. See the methods of that class for enqueuing and fetching results of
the tasks.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep this module description? Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, Thanks!

return invalid_license_ids


def save_audit_results(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is also a helper method, we can also use the _ prefix?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Create license audit task and special permissions model

2 participants