|
6 | 6 |
|
7 | 7 | from jsonschema.validators import validator_for # type: ignore
|
8 | 8 |
|
9 |
| -from .. import types |
| 9 | +from .. import settings, types |
10 | 10 | from ..error import Error
|
11 | 11 | from ..errors.metadata import MetadataError
|
12 | 12 | from .data import load_data
|
13 | 13 | from .file import read_file
|
14 | 14 |
|
| 15 | +# TODO: implement additional user-side profile caching |
15 | 16 |
|
16 |
| -def select_profile(*, metadata_type: types.IMetadataType) -> str: |
17 |
| - if metadata_type == "package": |
18 |
| - return "data-package" |
19 |
| - elif metadata_type == "resource": |
20 |
| - return "data-resource" |
21 |
| - elif metadata_type == "dialect": |
22 |
| - return "table-dialect" |
23 |
| - elif metadata_type == "schema": |
24 |
| - return "table-schema" |
25 |
| - raise Error(f'Invalid metadata type "{metadata_type}"') |
| 17 | + |
| 18 | +def check_profile(*, metadata: types.IData, profile: str) -> List[MetadataError]: |
| 19 | + # Prepare validator |
| 20 | + jsonSchema = read_profile(profile=profile) |
| 21 | + Validator = validator_for(jsonSchema) # type: ignore |
| 22 | + validator = Validator(jsonSchema) # type: ignore |
| 23 | + |
| 24 | + # Validate metadata |
| 25 | + errors: List[MetadataError] = [] |
| 26 | + for validation_error in validator.iter_errors(metadata): # type: ignore |
| 27 | + errors.append(MetadataError(validation_error)) # type: ignore |
| 28 | + |
| 29 | + return errors |
26 | 30 |
|
27 | 31 |
|
28 | 32 | @lru_cache
|
29 |
| -def read_profile(*, metadata_type: types.IMetadataType) -> types.IDict: |
30 |
| - format = "json" |
31 |
| - name = select_profile(metadata_type=metadata_type) |
32 |
| - path = os.path.join(os.path.dirname(__file__), "..", "profiles", f"{name}.{format}") |
| 33 | +def read_profile(*, profile: str) -> types.IData: |
| 34 | + parts = parse_profile(profile) |
| 35 | + |
| 36 | + # Replace with builtin copy |
| 37 | + if parts: |
| 38 | + version, filename = parts |
| 39 | + profile = os.path.join(settings.PROFILE_BASEDIR, version, filename) |
| 40 | + |
| 41 | + # Read jsonSchema |
33 | 42 | try:
|
34 |
| - text = read_file(path) |
35 |
| - data = load_data(text, format=format) |
| 43 | + text = read_file(profile) |
| 44 | + data = load_data(text, format="json") |
36 | 45 | except Exception:
|
37 |
| - raise Error(f'Cannot read profile "{name}" at "{path}"') |
| 46 | + raise Error(f'Cannot read profile: "{profile}"') |
| 47 | + |
38 | 48 | return data
|
39 | 49 |
|
40 | 50 |
|
41 |
| -def check_metadata_against_jsonschema( |
42 |
| - metadata: types.IDict, jsonSchema: types.IDict |
43 |
| -) -> List[MetadataError]: |
44 |
| - Validator = validator_for(jsonSchema) # type: ignore |
45 |
| - validator = Validator(jsonSchema) # type: ignore |
46 |
| - errors: List[MetadataError] = [] |
47 |
| - for validation_error in validator.iter_errors(metadata): # type: ignore |
48 |
| - errors.append(MetadataError(validation_error)) # type: ignore |
49 |
| - return errors |
| 51 | +def parse_profile(profile: str): |
| 52 | + parts = profile.rsplit("/", 3) |
| 53 | + |
| 54 | + # Ensure builtin copy exists |
| 55 | + if len(parts) != 3: |
| 56 | + return None |
| 57 | + if parts[0] != settings.PROFILE_BASEURL: |
| 58 | + return None |
| 59 | + if parts[1] not in os.listdir(settings.PROFILE_BASEDIR): |
| 60 | + return None |
| 61 | + if parts[2] not in os.listdir(os.path.join(settings.PROFILE_BASEDIR, parts[1])): |
| 62 | + return None |
| 63 | + |
| 64 | + return parts[1], parts[2] |
0 commit comments