Skip to content

Commit aaea473

Browse files
authored
style: Additional logging around file transformation, business rules and error reports (#36)
* style: Additional logging around file transformation, business rules and error reports * style: remove unnecessary f string * refactor: further logging details * refactor: tweaked some logging messages following review
1 parent 6f6d218 commit aaea473

File tree

6 files changed

+33
-3
lines changed

6 files changed

+33
-3
lines changed

src/dve/core_engine/backends/base/rules.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,13 +360,15 @@ def apply_sync_filters(
360360

361361
messages: Messages = []
362362
for entity_name, filter_rules in filters_by_entity.items():
363+
self.logger.info(f"Applying filters to {entity_name}")
363364
entity = entities[entity_name]
364365

365366
filter_column_names: list[str] = []
366367
unmodified_entities = {entity_name: entity}
367368
modified_entities = {entity_name: entity}
368369

369370
for rule in filter_rules:
371+
self.logger.info(f"Applying filter {rule.reporting.code}")
370372
if rule.reporting.emit == "record_failure":
371373
column_name = f"filter_{uuid4().hex}"
372374
filter_column_names.append(column_name)
@@ -411,7 +413,12 @@ def apply_sync_filters(
411413
if not success:
412414
return messages, False
413415

416+
self.logger.info(f"Filter {rule.reporting.code} found {len(temp_messages)} issues")
417+
414418
if filter_column_names:
419+
self.logger.info(
420+
f"Filtering records from entity {entity_name} for error code {rule.reporting.code}" # pylint: disable=line-too-long
421+
)
415422
success_condition = " AND ".join(
416423
[f"({c_name} IS NOT NULL AND {c_name})" for c_name in filter_column_names]
417424
)
@@ -456,6 +463,7 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
456463
altering the entities in-place.
457464
458465
"""
466+
self.logger.info("Applying business rules")
459467
rules_and_locals: Iterable[tuple[Rule, TemplateVariables]]
460468
if rule_metadata.templating_strategy == "upfront":
461469
rules_and_locals = []
@@ -472,6 +480,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
472480
rules_and_locals = rule_metadata
473481

474482
messages: Messages = []
483+
484+
self.logger.info("Applying pre-sync steps")
475485
for rule, local_variables in rules_and_locals:
476486
for step in rule.pre_sync_steps:
477487
if rule_metadata.templating_strategy == "runtime":
@@ -498,6 +508,8 @@ def apply_rules(self, entities: Entities, rule_metadata: RuleMetadata) -> Messag
498508
if not success:
499509
return messages
500510

511+
self.logger.info("Applying post-sync steps")
512+
501513
for rule, local_variables in rules_and_locals:
502514
for step in rule.post_sync_steps:
503515
if rule_metadata.templating_strategy == "runtime":

src/dve/core_engine/backends/implementations/duckdb/contract.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ def apply_data_contract(
102102
self, entities: DuckDBEntities, contract_metadata: DataContractMetadata
103103
) -> tuple[DuckDBEntities, Messages, StageSuccessful]:
104104
"""Apply the data contract to the duckdb relations"""
105-
self.logger.info("Applying data contracts")
106105
all_messages: Messages = []
107106

108107
successful = True
@@ -131,6 +130,9 @@ def apply_data_contract(
131130
coerce_inferred_numpy_array_to_list(relation.df()).apply(
132131
application_helper, axis=1
133132
) # pandas uses eager evaluation so potential memory issue here?
133+
self.logger.info(
134+
f"Data contract found {len(application_helper.errors)} issues in {entity_name}"
135+
)
134136
all_messages.extend(application_helper.errors)
135137

136138
casting_statements = [

src/dve/core_engine/backends/implementations/spark/contract.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,10 @@ def apply_data_contract(
113113
# .persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
114114
)
115115
messages = validated.flatMap(lambda row: row[1]).filter(bool)
116+
messages.cache()
117+
self.logger.info(f"Data contract found {messages.count()} issues in {entity_name}")
116118
all_messages.extend(messages.collect())
119+
messages.unpersist()
117120

118121
try:
119122
record_df = record_df.select(

src/dve/pipeline/duckdb_pipeline.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class DDBDVEPipeline(BaseDVEPipeline):
2121
"""
2222
Modified Pipeline class for running a DVE Pipeline with Spark
2323
"""
24+
2425
# pylint: disable=R0913
2526
def __init__(
2627
self,

src/dve/pipeline/pipeline.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ def write_file_to_parquet(
190190
errors = []
191191

192192
for model_name, model in models.items():
193+
self._logger.info(f"Transforming {model_name} to stringified parquet")
193194
reader: BaseFileReader = load_reader(dataset, model_name, ext)
194195
try:
195196
if not entity_type:
@@ -230,6 +231,7 @@ def audit_received_file_step(
230231
self, pool: ThreadPoolExecutor, submitted_files: Iterable[tuple[FileURI, InfoURI]]
231232
) -> tuple[list[SubmissionInfo], list[SubmissionInfo]]:
232233
"""Set files as being received and mark them for file transformation"""
234+
self._logger.info("Starting audit received file service")
233235
audit_received_futures: list[tuple[str, FileURI, Future]] = []
234236
for submission_file in submitted_files:
235237
data_uri, metadata_uri = submission_file
@@ -291,7 +293,7 @@ def file_transformation(
291293
"""Transform a file from its original format into a 'stringified' parquet file"""
292294
if not self.processed_files_path:
293295
raise AttributeError("processed files path not provided")
294-
296+
self._logger.info(f"Applying file transformation to {submission_info.submission_id}")
295297
errors: list[FeedbackMessage] = []
296298
submission_status: SubmissionStatus = SubmissionStatus()
297299
submission_file_uri: URI = fh.joinuri(
@@ -326,6 +328,7 @@ def file_transformation_step(
326328
list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]
327329
]:
328330
"""Step to transform files from their original format into parquet files"""
331+
self._logger.info("Starting file transformation service")
329332
file_transform_futures: list[tuple[SubmissionInfo, Future]] = []
330333

331334
for submission_info in submissions_to_process:
@@ -397,6 +400,7 @@ def apply_data_contract(
397400
self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None
398401
) -> tuple[SubmissionInfo, SubmissionStatus]:
399402
"""Method for applying the data contract given a submission_info"""
403+
self._logger.info(f"Applying data contract to {submission_info.submission_id}")
400404
if not submission_status:
401405
submission_status = self.get_submission_status(
402406
"contract", submission_info.submission_id
@@ -450,6 +454,7 @@ def data_contract_step(
450454
list[tuple[SubmissionInfo, SubmissionStatus]], list[tuple[SubmissionInfo, SubmissionStatus]]
451455
]:
452456
"""Step to validate the types of an untyped (stringly typed) parquet file"""
457+
self._logger.info("Starting data contract service")
453458
processed_files: list[tuple[SubmissionInfo, SubmissionStatus]] = []
454459
failed_processing: list[tuple[SubmissionInfo, SubmissionStatus]] = []
455460
dc_futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
@@ -517,6 +522,7 @@ def apply_business_rules(
517522
"""Apply the business rules to a given submission, the submission may have failed at the
518523
data_contract step so this should be passed in as a bool
519524
"""
525+
self._logger.info(f"Applying business rules to {submission_info.submission_id}")
520526
if not submission_status:
521527
submission_status = self.get_submission_status(
522528
"business_rules", submission_info.submission_id
@@ -606,6 +612,7 @@ def business_rule_step(
606612
list[tuple[SubmissionInfo, SubmissionStatus]],
607613
]:
608614
"""Step to apply business rules (Step impl) to a typed parquet file"""
615+
self._logger.info("Starting business rules service")
609616
future_files: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
610617

611618
for submission_info, submission_status in files:
@@ -747,7 +754,7 @@ def error_report(
747754
SubmissionInfo, SubmissionStatus, Optional[SubmissionStatisticsRecord], Optional[URI]
748755
]:
749756
"""Creates the error reports given a submission info and submission status"""
750-
757+
self._logger.info(f"Generating error report for {submission_info.submission_id}")
751758
if not submission_status:
752759
submission_status = self.get_submission_status(
753760
"error_report", submission_info.submission_id
@@ -756,6 +763,7 @@ def error_report(
756763
if not self.processed_files_path:
757764
raise AttributeError("processed files path not provided")
758765

766+
self._logger.info("Reading error dataframes")
759767
errors_df, aggregates = self._get_error_dataframes(submission_info.submission_id)
760768

761769
if not submission_status.number_of_records:
@@ -794,9 +802,11 @@ def error_report(
794802
"error_reports",
795803
f"{submission_info.file_name}_{submission_info.file_extension.strip('.')}.xlsx",
796804
)
805+
self._logger.info("Writing error report")
797806
with fh.open_stream(report_uri, "wb") as stream:
798807
stream.write(er.ExcelFormat.convert_to_bytes(workbook))
799808

809+
self._logger.info("Publishing error aggregates")
800810
self._publish_error_aggregates(submission_info.submission_id, aggregates)
801811

802812
return submission_info, submission_status, sub_stats, report_uri
@@ -812,6 +822,7 @@ def error_report_step(
812822
"""Step to produce error reports
813823
takes processed files and files that failed file transformation
814824
"""
825+
self._logger.info("Starting error reports service")
815826
futures: list[tuple[SubmissionInfo, SubmissionStatus, Future]] = []
816827
reports: list[
817828
tuple[SubmissionInfo, SubmissionStatus, Union[None, SubmissionStatisticsRecord], URI]

src/dve/pipeline/spark_pipeline.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class SparkDVEPipeline(BaseDVEPipeline):
2323
"""
2424
Polymorphed Pipeline class for running a DVE Pipeline with Spark
2525
"""
26+
2627
# pylint: disable=R0913
2728
def __init__(
2829
self,

0 commit comments

Comments
 (0)