Skip to content

Commit 2b0df63

Browse files
author
bruvio
committed
refactoring to expose resolve_read and resolve_write like in R
1 parent 8bf8dc5 commit 2b0df63

File tree

3 files changed

+187
-66
lines changed

3 files changed

+187
-66
lines changed

data_pipeline_api/__init__.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
"initialise",
33
"link_read",
44
"link_write",
5+
"resolve_read",
6+
"resolve_write",
7+
"read_array",
8+
"write_array",
59
"finalise",
610
"raise_issue_by_data_product",
711
"raise_issue_by_index",
@@ -14,7 +18,14 @@
1418
]
1519

1620
from .fdp_utils import get_handle_index_from_path
17-
from .link import link_read, link_write
21+
from .link import (
22+
link_read,
23+
link_write,
24+
read_array,
25+
resolve_read,
26+
resolve_write,
27+
write_array,
28+
)
1829
from .pipeline import finalise, initialise
1930
from .raise_issue import (
2031
raise_issue_by_data_product,

data_pipeline_api/fdp_utils.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -494,18 +494,3 @@ def register_issues(token: str, handle: dict) -> dict: # sourcery no-metrics
494494
api_version=api_version,
495495
)
496496
return current_issue
497-
498-
499-
def resolve_read(handle: dict, data_product: str, component: str) -> None:
500-
handle_yaml = handle["yaml"]
501-
endpoint = handle_yaml["run_metadata"]["local_data_registry_url"]
502-
read = handle_yaml["read"]
503-
504-
if not isinstance(read, list):
505-
dummy = read
506-
read = []
507-
read.append(dummy)
508-
509-
510-
def resolve_write(handle: dict, data_product: str, file_type: str) -> None:
511-
pass

data_pipeline_api/link.py

Lines changed: 175 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,42 +1,31 @@
1+
# type: ignore
2+
13
import logging
24
import os
3-
from typing import Any
4-
5-
from data_pipeline_api import fdp_utils
5+
from typing import Any, Tuple
66

7+
import numpy as np
78

8-
def link_write(handle: dict, data_product: str) -> str:
9-
"""Reads write information in config file, updates handle with relevant
10-
metadata and returns path to write data product to.
11-
12-
Args:
13-
| data_product: Specified name of data product in config.
9+
from data_pipeline_api import fdp_utils
1410

15-
Returns:
16-
| path: Path to write data product to.
17-
"""
1811

12+
def resolve_write(
13+
handle: dict, data_product: str, file_type: str = None
14+
) -> Tuple[str, dict]:
15+
# If multiple write blocks exist, find corresponding index for given DP
1916
# Get metadata from handle
2017
run_metadata = handle["yaml"]["run_metadata"]
2118
datastore = run_metadata["write_data_store"]
22-
2319
index = 0
24-
25-
if "write" not in handle["yaml"].keys():
26-
raise ValueError(
27-
"Error: Write has not been specified in the given config file"
28-
)
29-
30-
# If multiple write blocks exist, find corresponding index for given DP
3120
for i in enumerate(handle["yaml"]["write"]):
3221
if i[1]["data_product"] == data_product:
3322
index = i[0]
34-
23+
if file_type is None:
24+
file_type = "netcdf"
3525
# Get metadata from config
3626
write = handle["yaml"]["write"][index]
3727
write_data_product = write["data_product"]
3828
write_version = write["use"]["version"]
39-
file_type = write["file_type"]
4029
description = write["description"]
4130
write_namespace = run_metadata["default_output_namespace"]
4231
write_public = run_metadata["public"]
@@ -56,7 +45,7 @@ def link_write(handle: dict, data_product: str) -> str:
5645
os.makedirs(directory)
5746

5847
# Create metadata dict
59-
output_dict = {
48+
return path, {
6049
"data_product": data_product,
6150
"use_data_product": write_data_product,
6251
"use_component": None,
@@ -68,19 +57,9 @@ def link_write(handle: dict, data_product: str) -> str:
6857
"public": write_public,
6958
}
7059

71-
# If output exists in handle, append new metadata, otherwise create dict
72-
if "output" in handle:
73-
key = "output_" + str(len(handle["output"]))
74-
handle["output"][key] = output_dict
75-
else:
76-
handle["output"] = {}
77-
handle["output"]["output_0"] = output_dict
78-
79-
return path
8060

81-
82-
def link_read(handle: dict, data_product: str) -> str:
83-
"""Reads 'read' information in config file, updates handle with relevant
61+
def link_write(handle: dict, data_product: str) -> str:
62+
"""Reads write information in config file, updates handle with relevant
8463
metadata and returns path to write data product to.
8564
8665
Args:
@@ -90,24 +69,25 @@ def link_read(handle: dict, data_product: str) -> str:
9069
| path: Path to write data product to.
9170
"""
9271

93-
# If data product is already in handle, return path
94-
if "input" in handle:
95-
for index in handle["input"].keys():
96-
if handle["input"][index]["data_product"] == data_product:
97-
return handle["input"][index]["path"]
98-
if "read" not in handle["yaml"].keys():
72+
if "write" not in handle["yaml"].keys():
9973
raise ValueError(
100-
"Error: Read has not been specified in the given config file"
74+
"Error: Write has not been specified in the given config file"
10175
)
10276

103-
# Check if data product is in config yaml
104-
read_list = [
105-
i[1]["data_product"] for i in enumerate(handle["yaml"]["read"])
106-
]
77+
path, output_dict = resolve_write(handle, data_product)
10778

108-
if data_product not in read_list:
109-
logging.info("Read information for data product not in config")
79+
# If output exists in handle, append new metadata, otherwise create dict
80+
if "output" in handle:
81+
key = "output_" + str(len(handle["output"]))
82+
handle["output"][key] = output_dict
83+
else:
84+
handle["output"] = {}
85+
handle["output"]["output_0"] = output_dict
11086

87+
return path
88+
89+
90+
def resolve_read(handle: dict, data_product: str) -> Tuple[str, dict]:
11191
index = 0
11292
# Get index for given data product
11393
for i in enumerate(handle["yaml"]["read"]):
@@ -197,7 +177,7 @@ def link_read(handle: dict, data_product: str) -> str:
197177
component = use["component"] if "component" in use else None
198178

199179
# Write to handle and return path
200-
input_dict = {
180+
return path, {
201181
"data_product": data_product,
202182
"use_data_product": data_product,
203183
"use_component": component,
@@ -207,6 +187,38 @@ def link_read(handle: dict, data_product: str) -> str:
207187
"component_url": component_url,
208188
}
209189

190+
191+
def link_read(handle: dict, data_product: str) -> str:
192+
"""Reads 'read' information in config file, updates handle with relevant
193+
metadata and returns path to write data product to.
194+
195+
Args:
196+
| data_product: Specified name of data product in config.
197+
198+
Returns:
199+
| path: Path to write data product to.
200+
"""
201+
202+
# If data product is already in handle, return path
203+
if "input" in handle:
204+
for index in handle["input"].keys():
205+
if handle["input"][index]["data_product"] == data_product:
206+
return handle["input"][index]["path"]
207+
if "read" not in handle["yaml"].keys():
208+
raise ValueError(
209+
"Error: Read has not been specified in the given config file"
210+
)
211+
212+
# Check if data product is in config yaml
213+
read_list = [
214+
i[1]["data_product"] for i in enumerate(handle["yaml"]["read"])
215+
]
216+
217+
if data_product not in read_list:
218+
logging.info("Read information for data product not in config")
219+
220+
path, input_dict = resolve_read(handle, data_product)
221+
210222
if "input" in handle:
211223
index = "input_" + str(len(handle["input"]))
212224
handle["input"][index] = input_dict
@@ -253,6 +265,38 @@ def read_array(handle: dict, data_product: str, component: str) -> Any:
253265
if data_product not in read_list:
254266
logging.info("Read information for data product not in config")
255267

268+
read_metadata = resolve_read(handle, data_product)
269+
# Get metadata ------------------------------------------------------------
270+
271+
write_data_product = read_metadata["data_product"] # noqa: F841
272+
write_version = read_metadata["version"] # noqa: F841
273+
write_namespace = read_metadata["namespace"] # noqa: F841
274+
write_public = read_metadata["public"] # noqa: F841
275+
data_product_decription = read_metadata["description"] # noqa: F841
276+
path = read_metadata["path"] # noqa: F841
277+
278+
if not os.path.exists(path):
279+
raise FileNotFoundError("File missing from data store")
280+
281+
# read netcdf file
282+
283+
284+
# Extract data object
285+
286+
287+
# Extract dimension names and make sure they're in the right order
288+
289+
290+
# Attach dimension names to the object
291+
292+
293+
# Attach remaining list elements as attributes
294+
295+
296+
# Write to handle ---------------------------------------------------------
297+
298+
# If data product is already recorded in handle return index
299+
256300

257301
def write_array(
258302
array: Any,
@@ -296,4 +340,85 @@ def write_array(
296340
which can be used to raise an issue if necessary
297341
"""
298342

299-
pass
343+
if "write" not in handle["yaml"].keys():
344+
raise ValueError(
345+
"Error: Write has not been specified in the given config file"
346+
)
347+
348+
write_metadata = resolve_write(handle, data_product, file_type="netcdf")
349+
# Get metadata ------------------------------------------------------------
350+
351+
write_data_product = write_metadata["data_product"] # noqa: F841
352+
write_version = write_metadata["version"] # noqa: F841
353+
write_namespace = write_metadata["namespace"] # noqa: F841
354+
write_public = write_metadata["public"] # noqa: F841
355+
data_product_decription = write_metadata["description"] # noqa: F841
356+
path = write_metadata["path"] # noqa: F841
357+
358+
if not isinstance(array, np.array):
359+
raise TypeError(f"{array} must be an array")
360+
# Check dimensions class
361+
if dimension_names:
362+
if not all(
363+
list(map(lambda x: isinstance(x, np.array), dimension_names))
364+
):
365+
raise TypeError("Elements of dimension_names must be arrays")
366+
# Check number of dimensions
367+
# if (length(dim(array)) != length(dimension_names))
368+
if len(array) != len(array):
369+
raise ValueError(
370+
"Length of dimension_names does not equal number of dimensions in array"
371+
)
372+
373+
# Check length of elements in each dimension
374+
if len(array) != len(array):
375+
# if (any(unname(unlist(lapply(dimension_names, length))) != dim(array)))
376+
raise ValueError(
377+
"Number of elements in dimension_names does not equal number of dimensions in array"
378+
)
379+
parentdir = os.path.dirname(os.path.abspath(path))
380+
381+
# Write hdf5 file ---------------------------------------------------------
382+
383+
# Generate directory structure
384+
if not os.path.dirname(parentdir):
385+
os.makedirs(parentdir, exist_ok=True)
386+
387+
388+
# Write hdf5 file
389+
390+
391+
# Generate internal structure
392+
393+
# This structure needs to be added
394+
395+
# If the structure doesn't exist make it
396+
397+
# Update current structure
398+
399+
# Attach data
400+
401+
# Dimension names and titles ----------------------------------------------
402+
403+
404+
# Attach dimension titles
405+
406+
407+
# Attach dimension names
408+
409+
410+
# Dimension values and units ----------------------------------------------
411+
412+
# Attach dimension values
413+
414+
415+
# Attach dimension units
416+
417+
418+
# Attach units
419+
420+
421+
# Write to handle ---------------------------------------------------------
422+
423+
424+
# Return handle index -----------------------------------------------------

0 commit comments

Comments
 (0)