Skip to content

Commit ef60c15

Browse files
authored
[COST-4283] OCP: set start-date based on data in file being processed (#5212)
1 parent 166d175 commit ef60c15

File tree

5 files changed

+113
-46
lines changed

5 files changed

+113
-46
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ docker-up-min-no-build-with-listener: docker-up-min-no-build
357357
docker-up-db:
358358
$(DOCKER_COMPOSE) up -d db
359359
$(DOCKER_COMPOSE) up -d unleash
360-
dev/scripts/setup_unleash.py
360+
$(PYTHON) dev/scripts/setup_unleash.py
361361

362362
docker-up-db-monitor:
363363
$(DOCKER_COMPOSE) up --build -d grafana

koku/masu/external/kafka_msg_handler.py

+41-13
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
from api.common import log_json
3434
from api.provider.models import Provider
35+
from api.utils import DateHelper
3536
from common.queues import get_customer_queue
3637
from common.queues import OCPQueue
3738
from kafka_utils.utils import extract_from_header
@@ -626,16 +627,18 @@ def summarize_manifest(report_meta, manifest_uuid):
626627
if not MANIFEST_ACCESSOR.manifest_ready_for_summary(manifest_id):
627628
return
628629

629-
new_report_meta = {
630-
"schema": schema,
631-
"schema_name": schema,
632-
"provider_type": report_meta.get("provider_type"),
633-
"provider_uuid": report_meta.get("provider_uuid"),
634-
"manifest_id": manifest_id,
635-
"manifest_uuid": manifest_uuid,
636-
"start": start_date,
637-
"end": end_date,
638-
}
630+
new_report_meta = [
631+
{
632+
"schema": schema,
633+
"schema_name": schema,
634+
"provider_type": report_meta.get("provider_type"),
635+
"provider_uuid": report_meta.get("provider_uuid"),
636+
"manifest_id": manifest_id,
637+
"manifest_uuid": manifest_uuid,
638+
"start": start_date,
639+
"end": end_date,
640+
}
641+
]
639642
if not (start_date or end_date):
640643
# we cannot process without start and end dates
641644
LOG.info(
@@ -644,9 +647,35 @@ def summarize_manifest(report_meta, manifest_uuid):
644647
return
645648

646649
if "0001-01-01 00:00:00+00:00" not in [str(start_date), str(end_date)]:
650+
dates = {
651+
datetime.strptime(meta["meta_reportdatestart"], "%Y-%m-%d").date()
652+
for meta in report_meta["ocp_files_to_process"].values()
653+
}
654+
min_date = min(dates)
655+
max_date = max(dates)
656+
# if we cross the month boundary, then we need to create 2 manifests:
657+
# 1 for each month so that we summarize all the data correctly within the month bounds
658+
if min_date.month != max_date.month:
659+
dh = DateHelper()
660+
new_report_meta[0]["start"] = min_date
661+
new_report_meta[0]["end"] = dh.month_end(min_date)
662+
663+
new_report_meta.append(
664+
{
665+
"schema": schema,
666+
"schema_name": schema,
667+
"provider_type": report_meta.get("provider_type"),
668+
"provider_uuid": report_meta.get("provider_uuid"),
669+
"manifest_id": manifest_id,
670+
"manifest_uuid": manifest_uuid,
671+
"start": dh.month_start(max_date),
672+
"end": max_date,
673+
}
674+
)
675+
647676
# we have valid dates, so we can summarize the payload
648677
LOG.info(log_json(manifest_uuid, msg="summarizing ocp reports", context=context))
649-
return summarize_reports.s([new_report_meta], ocp_processing_queue).apply_async(queue=ocp_processing_queue)
678+
return summarize_reports.s(new_report_meta, ocp_processing_queue).apply_async(queue=ocp_processing_queue)
650679

651680
cr_status = report_meta.get("cr_status", {})
652681
if data_collection_message := cr_status.get("reports", {}).get("data_collection_message", ""):
@@ -782,8 +811,7 @@ def process_messages(msg):
782811
)
783812
)
784813
process_complete = report_metas_complete(report_metas)
785-
summary_task_id = summarize_manifest(report_meta, tracing_id)
786-
if summary_task_id:
814+
if summary_task_id := summarize_manifest(report_meta, tracing_id):
787815
LOG.info(log_json(tracing_id, msg=f"Summarization celery uuid: {summary_task_id}"))
788816

789817
if status and not settings.DEBUG:

koku/masu/processor/parquet/parquet_report_processor.py

+20-8
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,14 @@ def start_date(self, new_start_date):
202202
LOG.error(log_json(self.tracing_id, msg=msg, context=self.error_context), exc_info=ex)
203203
raise ParquetReportProcessorError(msg) from ex
204204

205+
@property
206+
def bill_date(self):
207+
return self.start_date.replace(day=1)
208+
209+
@property
210+
def trino_table_exists_key(self):
211+
return f"{self.report_type}|{self.bill_date}"
212+
205213
@property
206214
def create_table(self):
207215
"""Whether to create the Hive/Trino table"""
@@ -458,6 +466,13 @@ def convert_to_parquet(self): # noqa: C901
458466
return parquet_base_filename, daily_data_frames
459467

460468
for csv_filename in file_list:
469+
470+
# set start date based on data in the file being processed:
471+
if self.provider_type == Provider.PROVIDER_OCI:
472+
self.start_date = str(csv_filename).split(".")[1]
473+
elif self.provider_type == Provider.PROVIDER_OCP:
474+
self.start_date = self.ocp_files_to_process[csv_filename.stem]["meta_reportdatestart"]
475+
461476
self.prepare_parquet_s3(Path(csv_filename))
462477
if self.provider_type == Provider.PROVIDER_OCP and self.report_type is None:
463478
msg = "could not establish report type"
@@ -470,9 +485,7 @@ def convert_to_parquet(self): # noqa: C901
470485
)
471486
)
472487
raise ParquetReportProcessorError(msg)
473-
if self.provider_type == Provider.PROVIDER_OCI:
474-
file_specific_start_date = str(csv_filename).split(".")[1]
475-
self.start_date = file_specific_start_date
488+
476489
parquet_base_filename, daily_frame, success = self.convert_csv_to_parquet(csv_filename)
477490
daily_data_frames.extend(daily_frame)
478491
if self.provider_type not in (Provider.PROVIDER_AZURE):
@@ -498,16 +511,15 @@ def create_parquet_table(self, parquet_file, daily=False, partition_map=None):
498511
# Skip empty files, if we have no storage report data we can't create the table
499512
if parquet_file:
500513
processor = self._get_report_processor(parquet_file, daily=daily)
501-
bill_date = self.start_date.replace(day=1)
502514
if not processor.schema_exists():
503515
processor.create_schema()
504516
if not processor.table_exists():
505517
processor.create_table(partition_map=partition_map)
506-
self.trino_table_exists[self.report_type] = True
507-
processor.get_or_create_postgres_partition(bill_date=bill_date)
518+
self.trino_table_exists[self.trino_table_exists_key] = True
519+
processor.get_or_create_postgres_partition(bill_date=self.bill_date)
508520
processor.sync_hive_partitions()
509521
if not daily:
510-
processor.create_bill(bill_date=bill_date)
522+
processor.create_bill(bill_date=self.bill_date)
511523

512524
def check_required_columns_for_ingress_reports(self, col_names):
513525
LOG.info(log_json(msg="checking required columns for ingress reports", context=self._context))
@@ -570,7 +582,7 @@ def convert_csv_to_parquet(self, csv_filename: os.PathLike): # noqa: C901
570582
)
571583
)
572584
self.post_processor.finalize_post_processing()
573-
if self.create_table and not self.trino_table_exists.get(self.report_type):
585+
if self.create_table and not self.trino_table_exists.get(self.trino_table_exists_key):
574586
self.create_parquet_table(parquet_filepath)
575587

576588
except Exception as err:

koku/masu/test/external/test_kafka_msg_handler.py

+34-13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import shutil
1111
import tempfile
1212
import uuid
13+
from datetime import date
1314
from datetime import datetime
1415
from pathlib import Path
1516
from unittest.mock import patch
@@ -486,6 +487,7 @@ def test_summarize_manifest(self):
486487
"file": "/path/to/file.csv",
487488
"start": str(datetime.now()),
488489
"end": str(datetime.now()),
490+
"ocp_files_to_process": {"filename": {"meta_reportdatestart": str(datetime.now().date())}},
489491
}
490492

491493
with patch("masu.external.kafka_msg_handler.MANIFEST_ACCESSOR.manifest_ready_for_summary", return_value=True):
@@ -500,31 +502,49 @@ def test_summarize_manifest(self):
500502

501503
def test_summarize_manifest_dates(self):
502504
"""Test report summarization."""
505+
start_date = date(year=2024, month=6, day=17)
506+
end_date = date(year=2024, month=7, day=17)
503507
report_meta = {
504508
"schema_name": "test_schema",
505509
"manifest_id": "1",
506510
"provider_uuid": uuid.uuid4(),
507511
"provider_type": "OCP",
508512
"compression": "UNCOMPRESSED",
509513
"file": "/path/to/file.csv",
510-
"start": str(datetime.now()),
511-
"end": str(datetime.now()),
512-
}
513-
expected_meta = {
514-
"schema": report_meta.get("schema_name"),
515-
"schema_name": report_meta.get("schema_name"),
516-
"provider_type": report_meta.get("provider_type"),
517-
"provider_uuid": report_meta.get("provider_uuid"),
518-
"manifest_id": report_meta.get("manifest_id"),
519-
"start": report_meta.get("start"),
520-
"end": report_meta.get("end"),
521-
"manifest_uuid": "1234",
514+
"start": "2024-07-17 17:00:00.000000",
515+
"end": "2024-07-17 17:00:00.000000",
516+
"ocp_files_to_process": {
517+
"filename1": {"meta_reportdatestart": str(start_date)},
518+
"filename2": {"meta_reportdatestart": str(end_date)},
519+
},
522520
}
521+
expected_meta = [
522+
{
523+
"schema": report_meta.get("schema_name"),
524+
"schema_name": report_meta.get("schema_name"),
525+
"provider_type": report_meta.get("provider_type"),
526+
"provider_uuid": report_meta.get("provider_uuid"),
527+
"manifest_id": report_meta.get("manifest_id"),
528+
"start": date(year=2024, month=6, day=17),
529+
"end": date(year=2024, month=6, day=30),
530+
"manifest_uuid": "1234",
531+
},
532+
{
533+
"schema": report_meta.get("schema_name"),
534+
"schema_name": report_meta.get("schema_name"),
535+
"provider_type": report_meta.get("provider_type"),
536+
"provider_uuid": report_meta.get("provider_uuid"),
537+
"manifest_id": report_meta.get("manifest_id"),
538+
"start": date(year=2024, month=7, day=1),
539+
"end": date(year=2024, month=7, day=17),
540+
"manifest_uuid": "1234",
541+
},
542+
]
523543

524544
with patch("masu.external.kafka_msg_handler.MANIFEST_ACCESSOR.manifest_ready_for_summary", return_value=True):
525545
with patch("masu.external.kafka_msg_handler.summarize_reports.s") as mock_summarize_reports:
526546
msg_handler.summarize_manifest(report_meta, self.manifest_id)
527-
mock_summarize_reports.assert_called_with([expected_meta], OCPQueue.DEFAULT)
547+
mock_summarize_reports.assert_called_with(expected_meta, OCPQueue.DEFAULT)
528548

529549
with patch("masu.external.kafka_msg_handler.MANIFEST_ACCESSOR.manifest_ready_for_summary", return_value=False):
530550
with patch("masu.external.kafka_msg_handler.summarize_reports.s") as mock_summarize_reports:
@@ -860,6 +880,7 @@ def test_summarize_manifest_called_with_XL_queue(self):
860880
"manifest_id": "1",
861881
"start": str(datetime.now()),
862882
"end": str(datetime.now()),
883+
"ocp_files_to_process": {"filename": {"meta_reportdatestart": str(date.today())}},
863884
}
864885

865886
# Check when manifest is done

koku/masu/test/processor/parquet/test_parquet_report_processor.py

+17-11
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def setUp(self):
6161
self.manifest_id = CostUsageReportManifest.objects.filter(cluster_id__isnull=True).first().id
6262
self.ocp_manifest_id = CostUsageReportManifest.objects.filter(cluster_id__isnull=False).first().id
6363
self.start_date = self.today
64-
self.report_name = "koku-1.csv.gz"
64+
self.report_name = Path("koku-1.csv.gz")
6565
self.report_path = f"/my/{self.test_assembly_id}/{self.report_name}"
6666
self.report_processor = ParquetReportProcessor(
6767
schema_name=self.schema,
@@ -85,7 +85,17 @@ def setUp(self):
8585
provider_uuid=self.ocp_provider_uuid,
8686
provider_type=Provider.PROVIDER_OCP,
8787
manifest_id=self.manifest_id,
88-
context={"tracing_id": self.tracing_id, "start_date": self.today, "create_table": True},
88+
context={
89+
"tracing_id": self.tracing_id,
90+
"start_date": self.today,
91+
"create_table": True,
92+
"ocp_files_to_process": {
93+
self.report_name.stem: {
94+
"meta_reportdatestart": "2023-01-01",
95+
"meta_reportnumhours": "2",
96+
}
97+
},
98+
},
8999
)
90100
ingress_uuid = "882083b7-ea62-4aab-aa6a-f0d08d65ee2b"
91101
self.ingress_report_dict = {
@@ -271,15 +281,11 @@ def test_convert_to_parquet(self, mock_remove, mock_exists):
271281
self.assertEqual(file_name, "")
272282
self.assertTrue(data_frame.empty)
273283

274-
with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix", return_value=""):
275-
with patch(
276-
"masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.report_type", return_value=None
277-
):
278-
with patch(
279-
"masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.prepare_parquet_s3"
280-
):
281-
with self.assertRaises(ParquetReportProcessorError):
282-
self.report_processor_ocp.convert_to_parquet()
284+
with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix", return_value=""), patch(
285+
"masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.report_type", return_value=None
286+
), patch("masu.processor.parquet.parquet_report_processor.ParquetReportProcessor.prepare_parquet_s3"):
287+
with self.assertRaises(ParquetReportProcessorError):
288+
self.report_processor_ocp.convert_to_parquet()
283289

284290
expected = "no split files to convert to parquet"
285291
with patch("masu.processor.parquet.parquet_report_processor.get_path_prefix", return_value=""), patch.object(

0 commit comments

Comments
 (0)