-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcontract.py
More file actions
462 lines (387 loc) · 17.9 KB
/
contract.py
File metadata and controls
462 lines (387 loc) · 17.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
"""Base implementation of the data contract."""
import logging
from abc import ABC, abstractmethod
from collections.abc import Iterable, Iterator
from inspect import ismethod
from typing import Any, ClassVar, Generic, Optional, TypeVar
from pydantic import BaseModel
from typing_extensions import Protocol
from dve.common.error_utils import (
dump_processing_errors,
get_feedback_errors_uri,
get_processing_errors_uri,
)
from dve.core_engine.backends.base.core import get_entity_type
from dve.core_engine.backends.base.reader import BaseFileReader
from dve.core_engine.backends.exceptions import ReaderLacksEntityTypeSupport, render_error
from dve.core_engine.backends.metadata.contract import DataContractMetadata
from dve.core_engine.backends.readers import get_reader
from dve.core_engine.backends.types import Entities, EntityType, StageSuccessful
from dve.core_engine.backends.utilities import dedup_messages, stringify_model
from dve.core_engine.exceptions import CriticalProcessingError
from dve.core_engine.loggers import get_logger
from dve.core_engine.message import FeedbackMessage
from dve.core_engine.type_hints import (
URI,
ArbitraryFunction,
DVEStageName,
EntityLocations,
EntityName,
JSONDict,
Messages,
WrapDecorator,
)
from dve.parser.file_handling import get_file_suffix, get_resource_exists
from dve.parser.type_hints import Extension
T = TypeVar("T")
ExtensionConfig = dict[Extension, "ReaderConfig"]
"""Configuration options for file extensions."""
_READER_OVERRIDE_ATTR_NAME = "_implements_reader_for"
"""The name of the reader override function's reader override attribute."""
class ReaderConfig(BaseModel):
"""Configuration options for a given reader."""
reader: str
"""The name of the reader to be used."""
parameters: JSONDict
"""The parameters the reader should use."""
class _UnboundReaderOverride(Protocol[T]): # pylint: disable=too-few-public-methods
"""The protocol required to implement an override for a specific file reader."""
@staticmethod
def __call__( # pylint: disable=bad-staticmethod-argument
self: "BaseDataContract[T]", # This is the protocol for an _unbound_ method.
reader: BaseFileReader,
resource: URI,
entity_name: EntityName,
schema: type[BaseModel],
) -> T: ...
def reader_override(reader_type: type[BaseFileReader]) -> WrapDecorator:
"""A decorator function which wraps a `ReaderProtocol` method to add support
for custom reader overrides.
"""
def reader_impl_decorator(func: ArbitraryFunction) -> ArbitraryFunction:
"""Wrap a reader function to indicate the reader type it implements an override
for.
"""
setattr(func, _READER_OVERRIDE_ATTR_NAME, reader_type)
return func
return reader_impl_decorator
class BaseDataContract(Generic[EntityType], ABC):
"""The base implementation of a data contract."""
__entity_type__: ClassVar[type[EntityType]] # type: ignore
"""
The entity type that should be requested from a reader without a
specific implementation.
This will be populated from the generic annotation at class creation time.
"""
__reader_overrides__: ClassVar[dict[type[BaseFileReader], _UnboundReaderOverride[EntityType]]] = {} # type: ignore # pylint: disable=line-too-long
"""
A dictionary mapping implemented reader types to override functions which provide
a 'local' implementation of the reader. These can provide a more optimised version
of a specific reader for the implemented backend.
This is set and populated in `__init_subclass__` by identifying methods
decorated with the '@reader_override' decorator, and is used in `read_entity_type`.
"""
__stage_name__: DVEStageName = "data_contract"
"""
The name of the data contract DVE stage for use in auditing and logging
"""
def __init_subclass__(cls, *_, **__) -> None:
"""When this class is subclassed, create and populate the `__reader_overrides__`
and `__entity_type__` class variables for this subclass.
"""
# Set entity type from parent class subscript.
if cls is not BaseDataContract:
cls.__entity_type__ = get_entity_type(cls, "BaseDataContract")
# Identify provided reader overrides.
cls.__reader_overrides__ = {}
for method_name in dir(cls):
method = getattr(cls, method_name, None)
if not (ismethod(method) or callable(method)):
continue
reader_type = getattr(method, _READER_OVERRIDE_ATTR_NAME, None)
if reader_type is None:
continue
if not (isinstance(reader_type, type) and issubclass(reader_type, BaseFileReader)):
continue
cls.__reader_overrides__[reader_type] = method # type: ignore
def __init__( # pylint: disable=unused-argument
self,
logger: Optional[logging.Logger] = None,
**kwargs: Any,
):
self.logger = logger or get_logger(type(self).__name__)
"""The `logging.Logger instance for the data contract config."""
@abstractmethod
def create_entity_from_py_iterator(
self, entity_name: EntityName, records: Iterator[dict[str, Any]], schema: type[BaseModel]
) -> EntityType:
"""A fallback function to be used where no entity type specific
reader implemenattions are available.
"""
def read_entity_from_py_iterator(
self,
reader: BaseFileReader,
resource: URI,
entity_name: EntityName,
schema: type[BaseModel],
) -> EntityType:
"""A fallback function for readers that should read records with the
'read_to_py_iterator' implementation and create an entity of the correct
type.
This will be used where there are not more specific implementations for a
given reader type (either as a reader-specific override, or through direct
support for the contract's entity type in the reader).
"""
py_iterator = reader.read_to_py_iterator(resource, entity_name, schema)
return self.create_entity_from_py_iterator(entity_name, py_iterator, schema)
def read_entity(
self,
reader: BaseFileReader,
resource: URI,
entity_name: EntityName,
schema: type[BaseModel],
) -> EntityType:
"""Read an entity using the provided reader class.
NOTE: In the reader, simple types will either be returned as strings (if present)
or `None`. Format validation, casting, and parsing should be done when the
contract is applied.
NOTE 2: The default implementation will stringify schemas before passing them
to the reader and `create_entity_from_py_iterator`.
"""
schema = stringify_model(schema)
try:
# Try fetching an overridden implementation for the given reader type.
impl = self.__reader_overrides__[type(reader)]
except KeyError:
try:
# If there is no override, try having the reader read directly to
# the contract's entity type.
self.logger.debug("Attempting to read directly to contract entity type...")
entity = reader.read_to_entity_type(
self.__entity_type__, resource, entity_name, schema
)
return entity
except ReaderLacksEntityTypeSupport:
pass
else:
self.logger.debug(f"Using contract-specific override for {type(reader).__name__}...")
return impl(self, reader, resource, entity_name, schema)
# Finally, fall back to using the pure Python reader and creating an entity.
self.logger.debug("Reading via Python iterator...")
return self.read_entity_from_py_iterator(reader, resource, entity_name, schema)
def _create_critical_error(
self, entity_name: EntityName, error_message: str
) -> FeedbackMessage:
"""Create a critical data contract error."""
return FeedbackMessage(
record=None,
entity=entity_name,
failure_type="integrity",
error_message=error_message,
error_location="Whole file",
category="Bad file",
)
def _ensure_all_entities_provided(
self, entity_names: Iterable[str], contract_metadata: DataContractMetadata
) -> Messages:
"""Ensure all entities are provided, with no extras."""
provided_entities = set(entity_names)
expected_entities = set(contract_metadata.schemas.keys())
missing_entities = sorted(provided_entities - expected_entities)
extra_entities = sorted(expected_entities - provided_entities)
messages: Messages = []
for entity_name in missing_entities:
self.logger.error(f"No location specified for {entity_name!r}")
message = self._create_critical_error(entity_name, "Entity was not provided")
messages.append(message)
for entity_name in extra_entities:
self.logger.error(f"Unrecognised entity provided ({entity_name!r})")
message = self._create_critical_error(entity_name, "Unrecognised entity name provided")
messages.append(message)
return messages
def _ensure_entity_locations_appropriate(
self, entity_locations: EntityLocations, contract_metadata: DataContractMetadata
) -> Messages:
"""Ensure the provided entity locations really exist."""
messages: Messages = []
for entity_name in contract_metadata.schemas:
try:
entity_location = entity_locations[entity_name]
except KeyError:
continue
try:
if not get_resource_exists(entity_location):
self.logger.error(
f"Resource does not exist for {entity_name!r} (location: "
+ f"{entity_location!r})"
)
message = self._create_critical_error(
entity_name, "The provided location does not exist"
)
messages.append(message)
except Exception as err: # pylint: disable=broad-except
self.logger.error(
f"Error checking location exists for {entity_name!r} (location: "
+ f"{entity_location!r})"
)
self.logger.exception(err)
error_message = (
f"Unable to ensure entity location exists ({type(err).__name__}: {err})"
)
message = self._create_critical_error(entity_name, error_message)
messages.append(message)
return messages
def _ensure_entity_locations_have_read_support(
self, entity_locations: EntityLocations, contract_metadata: DataContractMetadata
) -> Messages:
"""Ensure that provided entity locations have supported readers."""
messages: Messages = []
for entity_name in contract_metadata.schemas:
try:
entity_location = entity_locations[entity_name]
except KeyError:
continue
suffix = get_file_suffix(entity_location) or ""
if not suffix:
self.logger.error(
f"{entity_name!r} (location: {entity_location!r}) missing file extension"
)
message = self._create_critical_error(entity_name, "Missing file extension")
messages.append(message)
extension = f".{suffix}"
if extension not in contract_metadata.reader_metadata[entity_name]:
self.logger.error(
f"{entity_name!r} (location: {entity_location!r}) does not have configured "
+ f"reader for {extension} files"
)
error_message = f"Does not implement support for {extension!r} types"
message = self._create_critical_error(entity_name, error_message)
return messages
def read_raw_entities(
self, entity_locations: EntityLocations, contract_metadata: DataContractMetadata
) -> tuple[Entities, Messages, StageSuccessful]:
"""Read the raw entities from the entity locations using the configured readers.
These will not yet have had the data contracts applied.
"""
messages: Messages = []
messages.extend(self._ensure_all_entities_provided(entity_locations, contract_metadata))
messages.extend(
self._ensure_entity_locations_appropriate(entity_locations, contract_metadata)
)
messages.extend(
self._ensure_entity_locations_have_read_support(entity_locations, contract_metadata)
)
if any(message.is_critical for message in messages):
return {}, messages, False
entities: Entities = {}
successful = True
for entity_name, resource in entity_locations.items():
reader_metadata = contract_metadata.reader_metadata[entity_name]
extension = (
"." + (get_file_suffix(resource) or "").lower()
) # Already checked that extension supported.
reader_config = reader_metadata[extension]
reader_type = get_reader(reader_config.reader)
reader = reader_type(**reader_config.parameters)
self.logger.info(f"Reading entity {entity_name!r} using {reader_config.reader!r}")
try:
schema = contract_metadata.schemas[entity_name]
entities[entity_name] = self.read_entity(
reader,
resource,
entity_name,
schema, # type: ignore
)
except Exception as err: # pylint: disable=broad-except
successful = False
location = f"data contract (reading entity {entity_name!r} from {resource!r})"
new_messages = render_error(
err,
location,
self.logger,
entity_name=entity_name,
error_location="Whole file",
error_category="Bad file",
)
messages.extend(new_messages)
return entities, dedup_messages(messages), successful
def add_record_index(self, entity: EntityType, **kwargs) -> EntityType:
"""Add a record index to the entity"""
raise NotImplementedError(f"add_record_index not implemented in {self.__class__}")
def drop_record_index(self, entity: EntityType, **kwargs) -> EntityType:
"""Drop a record index from the entity"""
raise NotImplementedError(f"drop_record_index not implemented in {self.__class__}")
@abstractmethod
def apply_data_contract(
self,
working_dir: URI,
entities: Entities,
entity_locations: EntityLocations,
contract_metadata: DataContractMetadata,
key_fields: Optional[dict[str, list[str]]] = None,
) -> tuple[Entities, URI, StageSuccessful]:
"""Apply the data contract to the raw entities, returning the validated entities
and any messages.
Record-level identifiers should be added at this point.
"""
raise NotImplementedError()
def apply(
self,
working_dir: URI,
entity_locations: EntityLocations,
contract_metadata: DataContractMetadata,
key_fields: Optional[dict[str, list[str]]] = None,
) -> tuple[Entities, URI, StageSuccessful, URI]:
"""Read the entities from the provided locations according to the data contract,
and return the validated entities and any messages.
"""
feedback_errors_uri = get_feedback_errors_uri(working_dir, self.__stage_name__)
processing_errors_uri = get_processing_errors_uri(working_dir)
entities, messages, successful = self.read_raw_entities(entity_locations, contract_metadata)
if not successful:
dump_processing_errors(
working_dir,
self.__stage_name__,
[
CriticalProcessingError(
"Issue occurred while reading raw entities",
[msg.error_message for msg in messages],
)
],
)
return {}, feedback_errors_uri, successful, processing_errors_uri
try:
entities, feedback_errors_uri, successful = self.apply_data_contract(
working_dir, entities, entity_locations, contract_metadata, key_fields
)
except Exception as err: # pylint: disable=broad-except
successful = False
new_messages = render_error(
err,
self.__stage_name__,
self.logger,
)
dump_processing_errors(
working_dir,
self.__stage_name__,
[
CriticalProcessingError(
f"Issue occurred while applying {self.__stage_name__}",
[msg.error_message for msg in new_messages],
)
],
)
if contract_metadata.cache_originals:
for entity_name in list(entities):
entities[f"Original{entity_name}"] = entities[entity_name]
return entities, feedback_errors_uri, successful, processing_errors_uri
def read_parquet(self, path: URI, **kwargs) -> EntityType:
"""Method to read parquet files from stringified parquet output
from file transformation phase.
"""
raise NotImplementedError()
def write_parquet(self, entity: EntityType, target_location: URI, **kwargs) -> URI:
"""Method to write parquet files from type cast entities
following data contract application
"""
raise NotImplementedError()