Skip to content

Commit 2e2f236

Browse files
committed
refactor: address review comments
1 parent a3b0ace commit 2e2f236

File tree

8 files changed

+48
-33
lines changed

8 files changed

+48
-33
lines changed

src/dve/common/error_utils.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,23 @@
1313
from dve.core_engine.exceptions import CriticalProcessingError
1414
from dve.core_engine.loggers import get_logger
1515
from dve.core_engine.message import UserMessage
16-
from dve.core_engine.type_hints import URI, DVEStage, Messages
16+
from dve.core_engine.type_hints import URI, DVEStageName, Messages
1717

1818

19-
def get_feedback_errors_uri(working_folder: URI, step_name: DVEStage) -> URI:
19+
def get_feedback_errors_uri(working_folder: URI, step_name: DVEStageName) -> URI:
2020
"""Determine the location of json lines file containing all errors generated in a step"""
2121
return fh.joinuri(working_folder, "errors", f"{step_name}_errors.jsonl")
2222

2323

2424
def get_processing_errors_uri(working_folder: URI) -> URI:
2525
"""Determine the location of json lines file containing all processing
2626
errors generated from DVE run"""
27-
return fh.joinuri(working_folder, "errors", "processing_errors", "processing_errors.jsonl")
27+
return fh.joinuri(working_folder, "processing_errors", "processing_errors.jsonl")
2828

2929

3030
def dump_feedback_errors(
3131
working_folder: URI,
32-
step_name: DVEStage,
32+
step_name: DVEStageName,
3333
messages: Messages,
3434
key_fields: Optional[dict[str, list[str]]] = None,
3535
) -> URI:
@@ -76,7 +76,7 @@ def dump_processing_errors(
7676
if not errors:
7777
raise AttributeError("errors list not passed")
7878

79-
error_file: URI = fh.joinuri(working_folder, "processing_errors", "processing_errors.json")
79+
error_file: URI = get_processing_errors_uri(working_folder)
8080
processed = []
8181

8282
for error in errors:
@@ -121,7 +121,7 @@ class BackgroundMessageWriter:
121121
def __init__(
122122
self,
123123
working_directory: URI,
124-
dve_stage: DVEStage,
124+
dve_stage: DVEStageName,
125125
key_fields: Optional[dict[str, list[str]]] = None,
126126
logger: Optional[logging.Logger] = None,
127127
):
@@ -149,6 +149,7 @@ def write_thread(self) -> Thread: # type: ignore
149149

150150
def _write_process_wrapper(self):
151151
"""Wrapper for dump feedback errors to run in background process"""
152+
# writing thread will block if nothing in queue
152153
while True:
153154
if msgs := self.write_queue.get():
154155
dump_feedback_errors(
@@ -167,6 +168,7 @@ def __exit__(self, exc_type, exc_value, traceback):
167168
"Issue occured during background write process:",
168169
exc_info=(exc_type, exc_value, traceback),
169170
)
171+
# None value in queue will trigger break in target
170172
self.write_queue.put(None)
171173
self.write_thread.join()
172174

src/dve/core_engine/backends/base/contract.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from dve.core_engine.type_hints import (
2828
URI,
2929
ArbitraryFunction,
30+
DVEStageName,
3031
EntityLocations,
3132
EntityName,
3233
JSONDict,
@@ -102,6 +103,10 @@ class BaseDataContract(Generic[EntityType], ABC):
102103
This is set and populated in `__init_subclass__` by identifying methods
103104
decorated with the '@reader_override' decorator, and is used in `read_entity_type`.
104105
106+
"""
107+
__stage_name__: DVEStageName = "data_contract"
108+
"""
109+
The name of the data contract DVE stage for use in auditing and logging
105110
"""
106111

107112
def __init_subclass__(cls, *_, **__) -> None:
@@ -392,13 +397,13 @@ def apply(
392397
and return the validated entities and any messages.
393398
394399
"""
395-
feedback_errors_uri = get_feedback_errors_uri(working_dir, "data_contract")
400+
feedback_errors_uri = get_feedback_errors_uri(working_dir, self.__stage_name__)
396401
processing_errors_uri = get_processing_errors_uri(working_dir)
397402
entities, messages, successful = self.read_raw_entities(entity_locations, contract_metadata)
398403
if not successful:
399404
dump_processing_errors(
400405
working_dir,
401-
"data_contract",
406+
self.__stage_name__,
402407
[
403408
CriticalProcessingError(
404409
"Issue occurred while reading raw entities",
@@ -416,15 +421,15 @@ def apply(
416421
successful = False
417422
new_messages = render_error(
418423
err,
419-
"data contract",
424+
self.__stage_name__,
420425
self.logger,
421426
)
422427
dump_processing_errors(
423428
working_dir,
424-
"data_contract",
429+
self.__stage_name__,
425430
[
426431
CriticalProcessingError(
427-
"Issue occurred while applying data_contract",
432+
f"Issue occurred while applying {self.__stage_name__}",
428433
[msg.error_message for msg in new_messages],
429434
)
430435
],

src/dve/core_engine/backends/base/rules.py

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
from dve.core_engine.backends.types import Entities, EntityType, StageSuccessful
4646
from dve.core_engine.exceptions import CriticalProcessingError
4747
from dve.core_engine.loggers import get_logger
48-
from dve.core_engine.type_hints import URI, EntityName, Messages, TemplateVariables
48+
from dve.core_engine.type_hints import URI, DVEStageName, EntityName, Messages, TemplateVariables
4949

5050
T_contra = TypeVar("T_contra", bound=AbstractStep, contravariant=True)
5151
T = TypeVar("T", bound=AbstractStep)
@@ -88,6 +88,10 @@ class BaseStepImplementations(Generic[EntityType], ABC): # pylint: disable=too-
8888
8989
This will be populated from the generic annotation at class creation time.
9090
91+
"""
92+
__stage_name__: DVEStageName = "business_rules"
93+
"""
94+
The name of the business rules DVE stage for use in auditing and logging
9195
"""
9296

9397
def __init_subclass__(cls, *_, **__) -> None:
@@ -367,13 +371,13 @@ def apply_sync_filters(
367371
368372
"""
369373
filters_by_entity: dict[EntityName, list[DeferredFilter]] = defaultdict(list)
370-
feedback_errors_uri = get_feedback_errors_uri(working_directory, "business_rules")
374+
feedback_errors_uri = get_feedback_errors_uri(working_directory, self.__stage_name__)
371375
for rule in filters:
372376
filters_by_entity[rule.entity_name].append(rule)
373377

374378
with BackgroundMessageWriter(
375379
working_directory=working_directory,
376-
dve_stage="business_rules",
380+
dve_stage=self.__stage_name__,
377381
key_fields=key_fields,
378382
logger=self.logger,
379383
) as msg_writer:
@@ -402,7 +406,7 @@ def apply_sync_filters(
402406
if not success:
403407
processing_errors_uri = dump_processing_errors(
404408
working_directory,
405-
"business_rules",
409+
self.__stage_name__,
406410
[
407411
CriticalProcessingError(
408412
"Issue occurred while applying filter logic",
@@ -431,7 +435,7 @@ def apply_sync_filters(
431435
if not success:
432436
processing_errors_uri = dump_processing_errors(
433437
working_directory,
434-
"business_rules",
438+
self.__stage_name__,
435439
[
436440
CriticalProcessingError(
437441
"Issue occurred while generating FeedbackMessages",
@@ -459,7 +463,7 @@ def apply_sync_filters(
459463
if not success:
460464
processing_errors_uri = dump_processing_errors(
461465
working_directory,
462-
"business_rules",
466+
self.__stage_name__,
463467
[
464468
CriticalProcessingError(
465469
"Issue occurred while generating FeedbackMessages",
@@ -497,7 +501,7 @@ def apply_sync_filters(
497501
if not success:
498502
processing_errors_uri = dump_processing_errors(
499503
working_directory,
500-
"business_rules",
504+
self.__stage_name__,
501505
[
502506
CriticalProcessingError(
503507
"Issue occurred while filtering error records",
@@ -525,7 +529,7 @@ def apply_sync_filters(
525529
if not success:
526530
processing_errors_uri = dump_processing_errors(
527531
working_directory,
528-
"business_rules",
532+
self.__stage_name__,
529533
[
530534
CriticalProcessingError(
531535
"Issue occurred while generating FeedbackMessages",
@@ -555,7 +559,7 @@ def apply_rules(
555559
"""
556560
self.logger.info("Applying business rules")
557561
rules_and_locals: Iterable[tuple[Rule, TemplateVariables]]
558-
errors_uri = get_feedback_errors_uri(working_directory, "business_rules")
562+
errors_uri = get_feedback_errors_uri(working_directory, self.__stage_name__)
559563
if rule_metadata.templating_strategy == "upfront":
560564
rules_and_locals = []
561565
for rule, local_variables in rule_metadata:
@@ -584,7 +588,7 @@ def apply_rules(
584588
if not success:
585589
processing_errors_uri = dump_processing_errors(
586590
working_directory,
587-
"business_rules",
591+
self.__stage_name__,
588592
[
589593
CriticalProcessingError(
590594
"Issue occurred while applying pre filter steps",
@@ -593,14 +597,16 @@ def apply_rules(
593597
],
594598
)
595599
if pre_sync_messages:
596-
dump_feedback_errors(working_directory, "business_rules", pre_sync_messages)
600+
dump_feedback_errors(
601+
working_directory, self.__stage_name__, pre_sync_messages
602+
)
597603

598604
return processing_errors_uri, False
599605
# if not a failure, ensure we keep track of any informational messages
600606
pre_sync_messages.extend(stage_messages)
601607
# if all successful, ensure we write out all informational messages
602608
if pre_sync_messages:
603-
dump_feedback_errors(working_directory, "business_rules", pre_sync_messages)
609+
dump_feedback_errors(working_directory, self.__stage_name__, pre_sync_messages)
604610

605611
sync_steps = []
606612
for rule, local_variables in rules_and_locals:
@@ -634,7 +640,7 @@ def apply_rules(
634640
if not success:
635641
processing_errors_uri = dump_processing_errors(
636642
working_directory,
637-
"business_rules",
643+
self.__stage_name__,
638644
[
639645
CriticalProcessingError(
640646
"Issue occurred while applying post filter steps",
@@ -644,15 +650,15 @@ def apply_rules(
644650
)
645651
if post_sync_messages:
646652
dump_feedback_errors(
647-
working_directory, "business_rules", post_sync_messages
653+
working_directory, self.__stage_name__, post_sync_messages
648654
)
649655

650656
return processing_errors_uri, False
651657
# if not a failure, ensure we keep track of any informational messages
652658
post_sync_messages.extend(stage_messages)
653659
# if all successful, ensure we write out all informational messages
654660
if post_sync_messages:
655-
dump_feedback_errors(working_directory, "business_rules", post_sync_messages)
661+
dump_feedback_errors(working_directory, self.__stage_name__, post_sync_messages)
656662
return errors_uri, True
657663

658664
def read_parquet(self, path: URI, **kwargs) -> EntityType:

src/dve/core_engine/backends/implementations/duckdb/contract.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import logging
55
from collections.abc import Iterator
66
from functools import partial
7-
from multiprocessing import Pool
7+
from multiprocessing import Pool, cpu_count
88
from typing import Any, Optional
99
from uuid import uuid4
1010

@@ -164,7 +164,7 @@ def apply_data_contract(
164164

165165
batches = pq.ParquetFile(entity_locations[entity_name]).iter_batches(10000)
166166
msg_count = 0
167-
with Pool(8) as pool:
167+
with Pool(cpu_count() - 1) as pool:
168168
for msgs in pool.imap_unordered(row_validator_helper, batches):
169169
if msgs:
170170
msg_writer.write_queue.put(msgs)

src/dve/core_engine/type_hints.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@
245245
BinaryComparator = Callable[[Any, Any], bool]
246246
"""Type hint for operator functions"""
247247

248-
DVEStage = Literal[
248+
DVEStageName = Literal[
249249
"audit_received",
250250
"file_transformation",
251251
"data_contract",

src/dve/pipeline/pipeline.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from dve.core_engine.loggers import get_logger
3636
from dve.core_engine.message import FeedbackMessage
3737
from dve.core_engine.models import SubmissionInfo, SubmissionStatisticsRecord
38-
from dve.core_engine.type_hints import URI, DVEStage, FileURI, InfoURI
38+
from dve.core_engine.type_hints import URI, DVEStageName, FileURI, InfoURI
3939
from dve.parser import file_handling as fh
4040
from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation
4141
from dve.parser.file_handling.service import _get_implementation
@@ -113,7 +113,9 @@ def get_entity_count(entity: EntityType) -> int:
113113
"""Get a row count of an entity stored as parquet"""
114114
raise NotImplementedError()
115115

116-
def get_submission_status(self, step_name: DVEStage, submission_id: str) -> SubmissionStatus:
116+
def get_submission_status(
117+
self, step_name: DVEStageName, submission_id: str
118+
) -> SubmissionStatus:
117119
"""Determine submission status of a submission if not explicitly given"""
118120
if not (submission_status := self._audit_tables.get_submission_status(submission_id)):
119121
self._logger.warning(

tests/test_pipeline/test_foundry_ddb_pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def test_foundry_runner_error(planet_test_files, temp_ddb_conn):
121121
processing_folder,
122122
sub_info.submission_id,
123123
"processing_errors",
124-
"processing_errors.json"
124+
"processing_errors.jsonl"
125125
)
126126
assert perror_path.exists()
127127
perror_schema = {

tests/test_reporting/test_error_utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def test_dump_processing_errors():
4444
perror_schema
4545
)
4646
error_df = pl.read_json(
47-
Path(output_path, "processing_errors.json")
47+
Path(output_path, "processing_errors.jsonl")
4848
)
4949
cols_to_check = ["step_name", "error_location", "error_level", "error_message"]
5050

0 commit comments

Comments
 (0)