@@ -587,10 +587,13 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da
587
587
Fetches latest processed date based on daily csv files and clears relevant s3 files
588
588
"""
589
589
# We do this if we have multiple workers running different files for a single manifest.
590
- processing_date = ReportManifestDBAccessor ().get_manifest_daily_start_date (manifest_id )
590
+ manifest_accessor = ReportManifestDBAccessor ()
591
+ manifest = manifest_accessor .get_manifest_by_id (manifest_id )
592
+ processing_date = manifest_accessor .get_manifest_daily_start_date (manifest_id )
591
593
if processing_date :
592
- # Prevent other works running trino queries until all files are removed.
593
- clear_s3_files (csv_s3_path , provider_uuid , processing_date , "manifestid" , manifest_id , context , request_id )
594
+ if not manifest_accessor .get_s3_parquet_cleared (manifest ):
595
+ # Prevent other works running trino queries until all files are removed.
596
+ clear_s3_files (csv_s3_path , provider_uuid , processing_date , "manifestid" , manifest_id , context , request_id )
594
597
return processing_date
595
598
processing_date = start_date
596
599
try :
@@ -610,7 +613,7 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da
610
613
process_date - datetime .timedelta (days = 3 ) if process_date .day > 3 else process_date .replace (day = 1 )
611
614
)
612
615
# Set processing date for all workers
613
- processing_date = ReportManifestDBAccessor () .set_manifest_daily_start_date (manifest_id , processing_date )
616
+ processing_date = manifest_accessor .set_manifest_daily_start_date (manifest_id , processing_date )
614
617
# Try to clear s3 files for dates. Small edge case, we may have parquet files even without csvs
615
618
clear_s3_files (csv_s3_path , provider_uuid , processing_date , "manifestid" , manifest_id , context , request_id )
616
619
except (EndpointConnectionError , ClientError , AttributeError , ValueError ):
@@ -626,7 +629,7 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da
626
629
bucket = settings .S3_BUCKET_NAME ,
627
630
),
628
631
)
629
- processing_date = ReportManifestDBAccessor () .set_manifest_daily_start_date (manifest_id , processing_date )
632
+ processing_date = manifest_accessor .set_manifest_daily_start_date (manifest_id , processing_date )
630
633
to_delete = get_s3_objects_not_matching_metadata (
631
634
request_id ,
632
635
csv_s3_path ,
@@ -635,8 +638,7 @@ def get_or_clear_daily_s3_by_date(csv_s3_path, provider_uuid, start_date, end_da
635
638
context = context ,
636
639
)
637
640
delete_s3_objects (request_id , to_delete , context )
638
- manifest = ReportManifestDBAccessor ().get_manifest_by_id (manifest_id )
639
- ReportManifestDBAccessor ().mark_s3_csv_cleared (manifest )
641
+ manifest_accessor .mark_s3_csv_cleared (manifest )
640
642
LOG .info (
641
643
log_json (msg = "removed csv files, marked manifest csv cleared and parquet not cleared" , context = context )
642
644
)
@@ -853,22 +855,11 @@ def clear_s3_files(
853
855
s3_prefixes .append (parquet_ocp_on_cloud_path_s3 + path )
854
856
to_delete = []
855
857
for prefix in s3_prefixes :
856
- for obj_summary in _get_s3_objects (prefix ):
857
- try :
858
- existing_object = obj_summary .Object ()
859
- metadata_value = existing_object .metadata .get (metadata_key )
860
- if str (metadata_value ) != str (manifest_id ):
861
- to_delete .append (existing_object .key )
862
- except (ClientError ) as err :
863
- LOG .warning (
864
- log_json (
865
- request_id ,
866
- msg = "unable to get matching object, likely deleted by another worker" ,
867
- context = context ,
868
- bucket = settings .S3_BUCKET_NAME ,
869
- ),
870
- exc_info = err ,
871
- )
858
+ to_delete .extend (
859
+ get_s3_objects_not_matching_metadata (
860
+ request_id , prefix , metadata_key = metadata_key , metadata_value_check = str (manifest_id ), context = context
861
+ )
862
+ )
872
863
delete_s3_objects (request_id , to_delete , context )
873
864
manifest_accessor = ReportManifestDBAccessor ()
874
865
manifest = manifest_accessor .get_manifest_by_id (manifest_id )
0 commit comments