Skip to content

Commit ee33dec

Browse files
authored
Merge pull request #42 from NHSDigital/develop_v06
Develop v06
2 parents 6f6d218 + 096a534 commit ee33dec

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+4466
-632
lines changed

.github/workflows/ci_linting.yml

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ name: CI Formatting & Linting
33
on:
44
pull_request:
55
types: [opened, reopened, synchronize]
6-
branches:
7-
- main
86

97

108
jobs:
@@ -30,17 +28,9 @@ jobs:
3028
- name: ensure poetry using desired python version
3129
run: poetry env use $(asdf which python)
3230

33-
- name: Cache Poetry virtualenv
34-
uses: actions/cache@v4
35-
with:
36-
path: ~/.cache/pypoetry
37-
key: ${{ runner.os }}-poetry-${{ hashFiles('**/poetry.lock') }}
38-
restore-keys: |
39-
${{ runner.os }}-poetry-
40-
4131
- name: Install lint dependencies
4232
run: |
43-
make install
33+
poetry install --sync --no-interaction --with lint
4434
4535
- name: Run black
4636
run: poetry run black src

.github/workflows/ci_testing.yml

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,10 @@ jobs:
3030

3131
- name: ensure poetry using desired python version
3232
run: poetry env use $(asdf which python)
33-
34-
- name: Cache Poetry virtualenv
35-
uses: actions/cache@v4
36-
with:
37-
path: ~/.cache/pypoetry
38-
key: ${{ runner.os }}-poetry-${{ hashFiles('**/poetry.lock') }}
39-
restore-keys: |
40-
${{ runner.os }}-poetry-
4133

4234
- name: Install test dependencies
4335
run: |
44-
make install
36+
poetry install --sync --no-interaction --with test
4537
4638
- name: Run pytest and coverage
4739
run: |

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ share/python-wheels/
3232
.installed.cfg
3333
*.egg
3434
MANIFEST
35-
poetry.lock
3635

3736
# PyInstaller
3837
# Usually these files are written by a python script from a template

CHANGELOG.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
## v0.6.0 (2026-02-16)
2+
3+
### Feat
4+
5+
- added reference data loading of arrow ipc files including enhanced test coverage for reference data loaders
6+
- Add read of arrow ipc files to reference data loaders
7+
- Change how error messages are generated (by writing in batches). Duckdb no long relies on pandas- use of pyarrow, multiprocessing and background thread batch writing to avoid memory pressure
8+
9+
### Refactor
10+
11+
- removal of processing pool for duckdb data contract
12+
- amended process pool to use executor supplied to duckdb pipeline/data contract rather than always instantiating new pool
13+
- address review comments
14+
- merging develop v06
15+
- merging logging additions from release branch
16+
- merge in main and resolve conflicts and linting issues
17+
- Modified business rule step to write feedback messages in batches to increase tolerance to large files with large numbers of validation errors
18+
119
## v0.5.2 (2026-02-02)
220

321
### Refactor

docs/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ for entity in data_contract_config.schemas:
165165

166166
# Data contract step here
167167
data_contract = SparkDataContract(spark_session=spark)
168-
entities, validation_messages, success = data_contract.apply_data_contract(
169-
entities, data_contract_config
168+
entities, feedback_errors_uri, success = data_contract.apply_data_contract(
169+
entities, None, data_contract_config
170170
)
171171
```
172172

poetry.lock

Lines changed: 3130 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "nhs_dve"
3-
version = "0.5.2"
3+
version = "0.6.0"
44
description = "`nhs data validation engine` is a framework used to validate data"
55
authors = ["NHS England <england.contactus@nhs.net>"]
66
readme = "README.md"
@@ -18,29 +18,19 @@ classifiers = [
1818

1919
[tool.poetry.dependencies]
2020
python = ">=3.10,<3.12"
21-
boto3 = "1.34.162"
22-
botocore = "1.34.162"
23-
delta-spark = "2.4.0"
24-
duckdb = "1.1.0" # mitigates security vuln in < 1.1.0
25-
formulas = "1.2.4"
26-
idna = "3.7" # Downstream dep of requests but has security vuln < 3.7
27-
Jinja2 = "3.1.6" # mitigates security vuln in < 3.1.6
28-
lxml = "4.9.1"
29-
openpyxl = "3.1.0"
30-
pandas = "2.2.2"
31-
polars = "0.20.14"
32-
pyarrow = "17.0.0"
33-
pydantic = "1.10.15" # Mitigates security vuln in < 1.10.13
34-
pymongo = "4.6.3"
35-
pyspark = "3.4.4"
36-
pytz = "2022.1"
37-
PyYAML = "6.0.3"
38-
requests = "2.32.4" # Mitigates security vuln in < 2.31.0
39-
schedula = "1.2.19"
40-
sqlalchemy = "2.0.19"
41-
typing_extensions = "4.6.2"
42-
urllib3 = "2.6.3" # Mitigates security vuln in < 2.6.0
43-
xmltodict = "0.13.0"
21+
boto3 = ">=1.34.162,<1.36" # breaking change beyond 1.36
22+
botocore = ">=1.34.162,<1.36" # breaking change beyond 1.36
23+
delta-spark = "2.4.*"
24+
duckdb = "1.1.*" # breaking changes beyond 1.1
25+
Jinja2 = "3.1.*"
26+
lxml = "^4.9.1"
27+
openpyxl = "^3.1"
28+
pandas = "^2.2.2"
29+
polars = "0.20.*"
30+
pyarrow = "^17.0.0"
31+
pydantic = "1.10.15"
32+
pyspark = "3.4.*"
33+
typing_extensions = "^4.6.2"
4434

4535
[tool.poetry.group.dev]
4636
optional = true
@@ -62,7 +52,6 @@ behave = "1.3.3"
6252
coverage = "7.11.0"
6353
moto = {extras = ["s3"], version = "4.0.13"}
6454
Werkzeug = "3.0.6" # Dependency of moto which needs 3.0.6 for security vuln mitigation
65-
mongomock = "4.1.2"
6655
pytest = "8.4.2"
6756
pytest-lazy-fixtures = "1.4.0" # switched from https://github.com/TvoroG/pytest-lazy-fixture as it's no longer supported
6857
xlsx2csv = "0.8.2"
@@ -98,10 +87,9 @@ ignore = ["B028", "D213", "D203", "D205", "D107", "D105"]
9887

9988
[tool.mypy]
10089
plugins = ["pydantic.mypy"]
101-
enable_recursive_aliases = true
10290

10391
[[tool.mypy.overrides]]
104-
module = "polars"
92+
module = "polars.*"
10593
follow_imports = "skip"
10694
# ^language server knows what's going on, but mypy can't find attributes on Self? type
10795

src/dve/common/__init__.py

Whitespace-only changes.

src/dve/common/error_utils.py

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
"""Utilities to support reporting"""
2+
3+
import datetime as dt
4+
import json
5+
import logging
6+
from collections.abc import Iterable
7+
from itertools import chain
8+
from multiprocessing import Queue
9+
from threading import Thread
10+
from typing import Optional, Union
11+
12+
import dve.parser.file_handling as fh
13+
from dve.core_engine.exceptions import CriticalProcessingError
14+
from dve.core_engine.loggers import get_logger
15+
from dve.core_engine.message import UserMessage
16+
from dve.core_engine.type_hints import URI, DVEStageName, Messages
17+
18+
19+
def get_feedback_errors_uri(working_folder: URI, step_name: DVEStageName) -> URI:
20+
"""Determine the location of json lines file containing all errors generated in a step"""
21+
return fh.joinuri(working_folder, "errors", f"{step_name}_errors.jsonl")
22+
23+
24+
def get_processing_errors_uri(working_folder: URI) -> URI:
25+
"""Determine the location of json lines file containing all processing
26+
errors generated from DVE run"""
27+
return fh.joinuri(working_folder, "processing_errors", "processing_errors.jsonl")
28+
29+
30+
def dump_feedback_errors(
31+
working_folder: URI,
32+
step_name: DVEStageName,
33+
messages: Messages,
34+
key_fields: Optional[dict[str, list[str]]] = None,
35+
) -> URI:
36+
"""Write out captured feedback error messages."""
37+
if not working_folder:
38+
raise AttributeError("processed files path not passed")
39+
40+
if not key_fields:
41+
key_fields = {}
42+
43+
error_file = get_feedback_errors_uri(working_folder, step_name)
44+
processed = []
45+
46+
for message in messages:
47+
if message.original_entity is not None:
48+
primary_keys = key_fields.get(message.original_entity, [])
49+
elif message.entity is not None:
50+
primary_keys = key_fields.get(message.entity, [])
51+
else:
52+
primary_keys = []
53+
54+
error = message.to_dict(
55+
key_field=primary_keys,
56+
value_separator=" -- ",
57+
max_number_of_values=10,
58+
record_converter=None,
59+
)
60+
error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ")
61+
processed.append(error)
62+
63+
with fh.open_stream(error_file, "a") as f:
64+
f.write("\n".join([json.dumps(rec, default=str) for rec in processed]) + "\n")
65+
return error_file
66+
67+
68+
def dump_processing_errors(
69+
working_folder: URI, step_name: str, errors: list[CriticalProcessingError]
70+
):
71+
"""Write out critical processing errors"""
72+
if not working_folder:
73+
raise AttributeError("processed files path not passed")
74+
if not step_name:
75+
raise AttributeError("step name not passed")
76+
if not errors:
77+
raise AttributeError("errors list not passed")
78+
79+
error_file: URI = get_processing_errors_uri(working_folder)
80+
processed = []
81+
82+
for error in errors:
83+
processed.append(
84+
{
85+
"step_name": step_name,
86+
"error_location": "processing",
87+
"error_level": "integrity",
88+
"error_message": error.error_message,
89+
"error_traceback": error.messages,
90+
}
91+
)
92+
93+
with fh.open_stream(error_file, "a") as f:
94+
f.write("\n".join([json.dumps(rec, default=str) for rec in processed]) + "\n")
95+
96+
return error_file
97+
98+
99+
def load_feedback_messages(feedback_messages_uri: URI) -> Iterable[UserMessage]:
100+
"""Load user messages from jsonl file"""
101+
if not fh.get_resource_exists(feedback_messages_uri):
102+
return
103+
with fh.open_stream(feedback_messages_uri) as errs:
104+
yield from (UserMessage(**json.loads(err)) for err in errs.readlines())
105+
106+
107+
def load_all_error_messages(error_directory_uri: URI) -> Iterable[UserMessage]:
108+
"Load user messages from all jsonl files"
109+
return chain.from_iterable(
110+
[
111+
load_feedback_messages(err_file)
112+
for err_file, _ in fh.iter_prefix(error_directory_uri)
113+
if err_file.endswith(".jsonl")
114+
]
115+
)
116+
117+
118+
class BackgroundMessageWriter:
119+
"""Controls batch writes to error jsonl files"""
120+
121+
def __init__(
122+
self,
123+
working_directory: URI,
124+
dve_stage: DVEStageName,
125+
key_fields: Optional[dict[str, list[str]]] = None,
126+
logger: Optional[logging.Logger] = None,
127+
):
128+
self._working_directory = working_directory
129+
self._dve_stage = dve_stage
130+
self._feedback_message_uri = get_feedback_errors_uri(
131+
self._working_directory, self._dve_stage
132+
)
133+
self._key_fields = key_fields
134+
self.logger = logger or get_logger(type(self).__name__)
135+
self._write_thread: Optional[Thread] = None
136+
self._queue: Queue = Queue()
137+
138+
@property
139+
def write_queue(self) -> Queue: # type: ignore
140+
"""Queue for storing batches of messages to be written"""
141+
return self._queue
142+
143+
@property
144+
def write_thread(self) -> Thread: # type: ignore
145+
"""Thread to write batches of messages to jsonl file"""
146+
if not self._write_thread:
147+
self._write_thread = Thread(target=self._write_process_wrapper)
148+
return self._write_thread
149+
150+
def _write_process_wrapper(self):
151+
"""Wrapper for dump feedback errors to run in background process"""
152+
# writing thread will block if nothing in queue
153+
while True:
154+
if msgs := self.write_queue.get():
155+
dump_feedback_errors(
156+
self._working_directory, self._dve_stage, msgs, self._key_fields
157+
)
158+
else:
159+
break
160+
161+
def __enter__(self) -> "BackgroundMessageWriter":
162+
self.write_thread.start()
163+
return self
164+
165+
def __exit__(self, exc_type, exc_value, traceback):
166+
if exc_type:
167+
self.logger.exception(
168+
"Issue occured during background write process:",
169+
exc_info=(exc_type, exc_value, traceback),
170+
)
171+
# None value in queue will trigger break in target
172+
self.write_queue.put(None)
173+
self.write_thread.join()
174+
175+
176+
def conditional_cast(value, primary_keys: list[str], value_separator: str) -> Union[list[str], str]:
177+
"""Determines what to do with a value coming back from the error list"""
178+
if isinstance(value, list):
179+
casts = [
180+
conditional_cast(val, primary_keys, value_separator) for val in value
181+
] # type: ignore
182+
return value_separator.join(
183+
[f"{pk}: {id}" if pk else "" for pk, id in zip(primary_keys, casts)]
184+
)
185+
if isinstance(value, dt.date):
186+
return value.isoformat()
187+
if isinstance(value, dict):
188+
return ""
189+
return str(value)

0 commit comments

Comments
 (0)