Skip to content

Commit 02078f8

Browse files
authored
[COST-4571] Mechanism to delay summary for settings (#4933)
1 parent 59b5fc3 commit 02078f8

File tree

7 files changed

+199
-9
lines changed

7 files changed

+199
-9
lines changed
+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#
2+
# Copyright 2024 Red Hat Inc.
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
from collections import defaultdict
6+
7+
from django.db.models import F
8+
from django.db.models import Func
9+
10+
from api.models import Provider
11+
from api.utils import DateHelper
12+
from masu.processor.tasks import delayed_summarize_current_month
13+
from reporting.models import AWSTagsSummary
14+
from reporting.models import AzureTagsSummary
15+
from reporting.models import GCPTagsSummary
16+
from reporting.models import OCITagsSummary
17+
from reporting.provider.ocp.models import OCPTagsValues
18+
from reporting.provider.ocp.models import OCPUsageReportPeriod
19+
20+
21+
def resummarize_current_month_by_tag_keys(enabled_rows, schema_name):
22+
"""Creates a mapping to use for resummarizing sources given tag keys
23+
24+
enabled_rows: List of enabled tag keys rows
25+
schema_name: Str of the schema name to be used
26+
start_date: datetime of when to start looking for providers that match the tag key
27+
"""
28+
start_date = DateHelper().this_month_start
29+
cloud_model_mapping = {
30+
Provider.PROVIDER_AWS: AWSTagsSummary,
31+
Provider.PROVIDER_AZURE: AzureTagsSummary,
32+
Provider.PROVIDER_GCP: GCPTagsSummary,
33+
Provider.PROVIDER_OCI: OCITagsSummary,
34+
}
35+
key_sorting = defaultdict(list)
36+
for row in enabled_rows:
37+
key_sorting[row.provider_type].append(row.key)
38+
39+
for provider_type, key_list in key_sorting.items():
40+
if model := cloud_model_mapping.get(provider_type):
41+
provider_uuids = (
42+
model.objects.filter(key__in=key_list, cost_entry_bill__billing_period_start=start_date)
43+
.values_list("cost_entry_bill__provider__uuid", flat=True)
44+
.distinct()
45+
)
46+
delayed_summarize_current_month(schema_name, list(provider_uuids), provider_type)
47+
elif provider_type == Provider.PROVIDER_OCP:
48+
clusters = (
49+
OCPTagsValues.objects.filter(key__in=key_list)
50+
.annotate(clusters=Func(F("cluster_ids"), function="unnest"))
51+
.values_list("clusters", flat=True)
52+
.distinct()
53+
)
54+
provider_uuids = (
55+
OCPUsageReportPeriod.objects.filter(cluster_id__in=clusters, report_period_start=start_date)
56+
.values_list("provider__uuid", flat=True)
57+
.distinct()
58+
)
59+
delayed_summarize_current_month(schema_name, list(provider_uuids), provider_type)
60+
61+
return provider_type

koku/api/settings/tags/mapping/view.py

+10-8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from api.settings.tags.mapping.serializers import AddChildSerializer
2020
from api.settings.tags.mapping.serializers import EnabledTagKeysSerializer
2121
from api.settings.tags.mapping.serializers import TagMappingSerializer
22+
from api.settings.tags.mapping.utils import resummarize_current_month_by_tag_keys
2223
from api.settings.utils import NonValidatedMultipleChoiceFilter
2324
from api.settings.utils import SettingsFilter
2425
from reporting.provider.all.models import EnabledTagKeys
@@ -112,9 +113,10 @@ def put(self, request):
112113
serializer = AddChildSerializer(data=request.data)
113114
serializer.is_valid(raise_exception=True)
114115
parent_row = EnabledTagKeys.objects.get(uuid=serializer.data.get("parent"))
115-
children = EnabledTagKeys.objects.filter(uuid__in=serializer.data.get("children"))
116-
tag_mappings = [TagMapping(parent=parent_row, child=child_row) for child_row in children]
116+
children_rows = list(EnabledTagKeys.objects.filter(uuid__in=serializer.data.get("children")))
117+
tag_mappings = [TagMapping(parent=parent_row, child=child_row) for child_row in children_rows]
117118
TagMapping.objects.bulk_create(tag_mappings)
119+
resummarize_current_month_by_tag_keys(children_rows, request.user.customer.schema_name)
118120
return Response(status=status.HTTP_204_NO_CONTENT)
119121

120122

@@ -123,11 +125,11 @@ class SettingsTagMappingChildRemoveView(APIView):
123125

124126
def put(self, request: Request):
125127
children_uuids = request.data.get("ids", [])
126-
if not EnabledTagKeys.objects.filter(uuid__in=children_uuids).exists():
128+
enabled_tags = EnabledTagKeys.objects.filter(uuid__in=children_uuids)
129+
if not enabled_tags.exists():
127130
return Response({"detail": "Invalid children UUIDs."}, status=status.HTTP_400_BAD_REQUEST)
128-
129131
TagMapping.objects.filter(child__in=children_uuids).delete()
130-
132+
resummarize_current_month_by_tag_keys(list(enabled_tags), request.user.customer.schema_name)
131133
return Response(status=status.HTTP_204_NO_CONTENT)
132134

133135

@@ -136,9 +138,9 @@ class SettingsTagMappingParentRemoveView(APIView):
136138

137139
def put(self, request: Request):
138140
parents_uuid = request.data.get("ids", [])
139-
if not EnabledTagKeys.objects.filter(uuid__in=parents_uuid).exists():
141+
parent_rows = EnabledTagKeys.objects.filter(uuid__in=parents_uuid)
142+
if not parent_rows.exists():
140143
return Response({"detail": "Invalid parents UUIDs."}, status=status.HTTP_400_BAD_REQUEST)
141-
142144
TagMapping.objects.filter(parent__in=parents_uuid).delete()
143-
145+
resummarize_current_month_by_tag_keys(list(parent_rows), request.user.customer.schema_name)
144146
return Response({"detail": "Parents deleted successfully."}, status=status.HTTP_204_NO_CONTENT)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from django_tenants.utils import tenant_context
2+
3+
from api.models import Provider
4+
from api.settings.tags.mapping.utils import resummarize_current_month_by_tag_keys
5+
from masu.test import MasuTestCase
6+
from reporting.provider.all.models import EnabledTagKeys
7+
from reporting_common.models import DelayedCeleryTasks
8+
9+
10+
class TestTagMappingUtils(MasuTestCase):
11+
"""Test the utils for Tag mapping"""
12+
13+
def setUp(self):
14+
super().setUp()
15+
self.test_matrix = {
16+
Provider.PROVIDER_AWS: self.aws_provider.uuid,
17+
Provider.PROVIDER_AZURE: self.azure_provider.uuid,
18+
Provider.PROVIDER_GCP: self.gcp_provider.uuid,
19+
Provider.PROVIDER_OCI: self.oci_provider.uuid,
20+
Provider.PROVIDER_OCP: self.ocp_provider.uuid,
21+
}
22+
23+
def test_find_tag_key_providers(self):
24+
with tenant_context(self.tenant):
25+
for ptype, uuid in self.test_matrix.items():
26+
with self.subTest(ptype=ptype, uuid=uuid):
27+
keys = list(EnabledTagKeys.objects.filter(provider_type=ptype))
28+
resummarize_current_month_by_tag_keys(keys, self.schema_name)
29+
self.assertTrue(DelayedCeleryTasks.objects.filter(provider_uuid=uuid).exists())
30+
31+
def test_multiple_returns(self):
32+
with tenant_context(self.tenant):
33+
keys = list(EnabledTagKeys.objects.all())
34+
resummarize_current_month_by_tag_keys(keys, self.schema_name)
35+
for uuid in self.test_matrix.values():
36+
self.assertTrue(DelayedCeleryTasks.objects.filter(provider_uuid=uuid).exists())

koku/koku/celery.py

+9
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,15 @@ def validate_cron_expression(expresssion):
206206
"schedule": crontab(0, 0, day_of_month="15"),
207207
}
208208

209+
# Specify the frequency for checking delayed summary tasks
210+
DELAYED_TASK_POLLING_MINUTES = ENVIRONMENT.get_value("DELAYED_TASK_POLLING_MINUTES", default="30")
211+
trigger_delayed_tasks_schedule = crontab(minute=f"*/{DELAYED_TASK_POLLING_MINUTES}")
212+
app.conf.beat_schedule["delayed_tasks_trigger"] = {
213+
"task": "masu.celery.tasks.trigger_delayed_tasks",
214+
"schedule": trigger_delayed_tasks_schedule,
215+
}
216+
217+
209218
# Celery timeout if broker is unavailable to avoid blocking indefinitely
210219
app.conf.broker_transport_options = {"max_retries": 4, "interval_start": 0, "interval_step": 0.5, "interval_max": 3}
211220

koku/masu/celery/tasks.py

+7
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from masu.util.oci.common import OCI_REPORT_TYPES
4949
from masu.util.ocp.common import OCP_REPORT_TYPES
5050
from reporting.models import TRINO_MANAGED_TABLES
51+
from reporting_common.models import DelayedCeleryTasks
5152
from sources.tasks import delete_source
5253

5354
LOG = logging.getLogger(__name__)
@@ -561,3 +562,9 @@ def get_celery_queue_items(self, queue_name=None, task_name=None):
561562
decoded_tasks[queue] = task_list
562563

563564
return decoded_tasks
565+
566+
567+
@celery_app.task(name="masu.celery.tasks.trigger_delayed_tasks", queue=GET_REPORT_FILES_QUEUE)
568+
def trigger_delayed_tasks(*args, **kwargs):
569+
"""Removes the expired records starting the delayed celery tasks."""
570+
DelayedCeleryTasks.trigger_delayed_tasks()

koku/masu/processor/tasks.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
from masu.util.oci.common import deduplicate_reports_for_oci
6262
from reporting.ingress.models import IngressReports
6363
from reporting_common.models import CostUsageReportStatus
64+
from reporting_common.models import DelayedCeleryTasks
6465
from reporting_common.states import ManifestState
6566
from reporting_common.states import ManifestStep
6667

@@ -108,6 +109,31 @@
108109
UPDATE_SUMMARY_TABLES_QUEUE_XL,
109110
]
110111

112+
UPDATE_SUMMARY_TABLES_TASK = "masu.processor.tasks.update_summary_tables"
113+
114+
115+
def delayed_summarize_current_month(schema_name: str, provider_uuids: list, provider_type: str):
116+
"""Delay Resummarize provider data for the current month."""
117+
queue = UPDATE_SUMMARY_TABLES_QUEUE
118+
if is_customer_large(schema_name):
119+
queue = UPDATE_SUMMARY_TABLES_QUEUE_XL
120+
121+
for provider_uuid in provider_uuids:
122+
id = DelayedCeleryTasks.create_or_reset_timeout(
123+
task_name=UPDATE_SUMMARY_TABLES_TASK,
124+
task_args=[schema_name],
125+
task_kwargs={
126+
"provider_type": provider_type,
127+
"provider_uuid": str(provider_uuid),
128+
"start_date": str(DateHelper().this_month_start),
129+
},
130+
provider_uuid=provider_uuid,
131+
queue_name=queue,
132+
)
133+
if schema_name == settings.QE_SCHEMA:
134+
# bypass the wait for QE
135+
id.delete()
136+
111137

112138
def record_all_manifest_files(manifest_id, report_files, tracing_id):
113139
"""Store all report file names for manifest ID."""
@@ -427,7 +453,7 @@ def summarize_reports( # noqa: C901
427453
).apply_async(queue=queue_name or fallback_queue)
428454

429455

430-
@celery_app.task(name="masu.processor.tasks.update_summary_tables", queue=UPDATE_SUMMARY_TABLES_QUEUE) # noqa: C901
456+
@celery_app.task(name=UPDATE_SUMMARY_TABLES_TASK, queue=UPDATE_SUMMARY_TABLES_QUEUE) # noqa: C901
431457
def update_summary_tables( # noqa: C901
432458
schema,
433459
provider_type,

koku/reporting_common/test_reporting_common.py

+49
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,22 @@
33
# SPDX-License-Identifier: Apache-2.0
44
#
55
"""Test Reporting Common."""
6+
from unittest.mock import patch
7+
68
from django.utils import timezone
9+
from django_tenants.utils import schema_context
710

11+
from api.models import Provider
12+
from api.utils import DateHelper
13+
from masu.processor.tasks import delayed_summarize_current_month
14+
from masu.processor.tasks import UPDATE_SUMMARY_TABLES_QUEUE
15+
from masu.processor.tasks import UPDATE_SUMMARY_TABLES_QUEUE_XL
16+
from masu.processor.tasks import UPDATE_SUMMARY_TABLES_TASK
817
from masu.test import MasuTestCase
918
from reporting_common.models import CombinedChoices
1019
from reporting_common.models import CostUsageReportManifest
1120
from reporting_common.models import CostUsageReportStatus
21+
from reporting_common.models import DelayedCeleryTasks
1222

1323

1424
class TestCostUsageReportStatus(MasuTestCase):
@@ -107,3 +117,42 @@ def test_set_failed_status(self):
107117
stats.update_status(CombinedChoices.FAILED)
108118
self.assertIsNotNone(stats.failed_status)
109119
self.assertEqual(stats.status, CombinedChoices.FAILED)
120+
121+
@patch("masu.processor.tasks.is_customer_large")
122+
def test_delayed_summarize_current_month(self, mock_large_customer):
123+
mock_large_customer.return_value = False
124+
test_matrix = {
125+
Provider.PROVIDER_AWS: self.aws_provider,
126+
Provider.PROVIDER_AZURE: self.azure_provider,
127+
Provider.PROVIDER_GCP: self.gcp_provider,
128+
Provider.PROVIDER_OCI: self.oci_provider,
129+
Provider.PROVIDER_OCP: self.ocp_provider,
130+
}
131+
count = 0
132+
for test_provider_type, test_provider in test_matrix.items():
133+
with self.subTest(test_provider_type=test_provider_type, test_provider=test_provider):
134+
with schema_context(self.schema):
135+
delayed_summarize_current_month(self.schema_name, [test_provider.uuid], test_provider_type)
136+
count += 1
137+
self.assertEqual(DelayedCeleryTasks.objects.all().count(), count)
138+
db_entry = DelayedCeleryTasks.objects.get(provider_uuid=test_provider.uuid)
139+
self.assertEqual(db_entry.task_name, UPDATE_SUMMARY_TABLES_TASK)
140+
self.assertTrue(
141+
db_entry.task_kwargs,
142+
{
143+
"provider_type": test_provider_type,
144+
"provider_uuid": str(test_provider.uuid),
145+
"start_date": str(DateHelper().this_month_start),
146+
},
147+
)
148+
149+
self.assertEqual(db_entry.task_args, [self.schema_name])
150+
self.assertEqual(db_entry.queue_name, UPDATE_SUMMARY_TABLES_QUEUE)
151+
152+
@patch("masu.processor.tasks.is_customer_large")
153+
def test_large_customer(self, mock_large_customer):
154+
mock_large_customer.return_value = True
155+
delayed_summarize_current_month(self.schema_name, [self.aws_provider.uuid], Provider.PROVIDER_AWS)
156+
with schema_context(self.schema):
157+
db_entry = DelayedCeleryTasks.objects.get(provider_uuid=self.aws_provider.uuid)
158+
self.assertEqual(db_entry.queue_name, UPDATE_SUMMARY_TABLES_QUEUE_XL)

0 commit comments

Comments
 (0)