Skip to content

Commit 720483e

Browse files
committed
Issue #404/#481 introduce connection level auto_validate option
- connection level option will be more practical and user friendly - per `execute` overriding is still possible - generalize `_warn_if_process_graph_invalid` to `_preflight_validation` (to future proof it) - additional finetuning
1 parent 0c6e3da commit 720483e

File tree

6 files changed

+109
-70
lines changed

6 files changed

+109
-70
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99
## [Unreleased]
1010

1111
### Added
12-
- Automatically validate process graph before download/execute/execute_batch and log warning when invalid ([#404](https://github.com/Open-EO/openeo-python-client/issues/404))
1312

1413
- Add `DataCube.reduce_spatial()`
14+
- Added option (enabled by default) to automatically validate a process graph before execution. Validation issues just trigger warnings for now. ([#404](https://github.com/Open-EO/openeo-python-client/issues/404))
1515

1616
### Changed
1717

openeo/rest/connection.py

Lines changed: 74 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ def __init__(
260260
refresh_token_store: Optional[RefreshTokenStore] = None,
261261
slow_response_threshold: Optional[float] = None,
262262
oidc_auth_renewer: Optional[OidcAuthenticator] = None,
263+
auto_validate: bool = True,
263264
):
264265
"""
265266
Constructor of Connection, authenticates user.
@@ -282,6 +283,7 @@ def __init__(
282283
self._auth_config = auth_config
283284
self._refresh_token_store = refresh_token_store
284285
self._oidc_auth_renewer = oidc_auth_renewer
286+
self._auto_validate = auto_validate
285287

286288
@classmethod
287289
def version_discovery(
@@ -1052,8 +1054,8 @@ def validate_process_graph(self, process_graph: dict) -> List[dict]:
10521054
:param process_graph: (flat) dict representing process graph
10531055
:return: list of errors (dictionaries with "code" and "message" fields)
10541056
"""
1055-
graph = self._build_request_with_process_graph(process_graph)["process"]
1056-
return self.post(path="/validation", json=graph, expected_status=200).json()["errors"]
1057+
pg_with_metadata = self._build_request_with_process_graph(process_graph)["process"]
1058+
return self.post(path="/validation", json=pg_with_metadata, expected_status=200).json()["errors"]
10571059

10581060
@property
10591061
def _api_version(self) -> ComparableVersion:
@@ -1393,8 +1395,9 @@ def load_url(self, url: str, format: str, options: Optional[dict] = None):
13931395

13941396
def create_service(self, graph: dict, type: str, **kwargs) -> Service:
13951397
# TODO: type hint for graph: is it a nested or a flat one?
1396-
req = self._build_request_with_process_graph(process_graph=graph, type=type, **kwargs)
1397-
response = self.post(path="/services", json=req, expected_status=201)
1398+
pg_with_metadata = self._build_request_with_process_graph(process_graph=graph, type=type, **kwargs)
1399+
self._preflight_validation(pg_with_metadata=pg_with_metadata)
1400+
response = self.post(path="/services", json=pg_with_metadata, expected_status=201)
13981401
service_id = response.headers.get("OpenEO-Identifier")
13991402
return Service(service_id, self)
14001403

@@ -1463,43 +1466,55 @@ def upload_file(
14631466
def _build_request_with_process_graph(self, process_graph: Union[dict, FlatGraphableMixin, Any], **kwargs) -> dict:
14641467
"""
14651468
Prepare a json payload with a process graph to submit to /result, /services, /jobs, ...
1466-
:param process_graph: flat dict representing a process graph
1469+
:param process_graph: flat dict representing a "process graph with metadata" ({"process": {"process_graph": ...}, ...})
14671470
"""
14681471
# TODO: make this a more general helper (like `as_flat_graph`)
14691472
result = kwargs
14701473
process_graph = as_flat_graph(process_graph)
14711474
if "process_graph" not in process_graph:
14721475
process_graph = {"process_graph": process_graph}
1473-
# TODO: also check if `process_graph` already has "process" key (i.e. is a "process graph with metadata already)
1476+
# TODO: also check if `process_graph` already has "process" key (i.e. is a "process graph with metadata" already)
14741477
result["process"] = process_graph
14751478
return result
14761479

1477-
def _warn_if_process_graph_invalid(self, process_graph: Union[dict, FlatGraphableMixin, str, Path]):
1478-
# At present, the intention is that a failed validation does not block
1479-
# the job from running, it is only reported as a warning.
1480-
# Therefor we also want to continue when something *else* goes wrong
1481-
# *during* the validation.
1482-
try:
1483-
if not self.capabilities().supports_endpoint("/validation", "POST"):
1484-
return
1485-
1486-
graph = self._build_request_with_process_graph(process_graph)["process"]
1487-
validation_errors = self.validate_process_graph(process_graph=graph)
1488-
if validation_errors:
1489-
_log.warning(
1490-
"Process graph is not valid. Validation errors:\n"
1491-
+ "\n".join(e["message"] for e in validation_errors)
1492-
)
1493-
except Exception:
1494-
_log.warning("Could not validate the process graph", exc_info=True)
1480+
def _preflight_validation(self, pg_with_metadata: dict, *, validate: Optional[bool] = None):
1481+
"""
1482+
Preflight validation of process graph to execute.
1483+
1484+
:param pg_with_metadata: flat dict representation of process graph with metadata,
1485+
e.g. as produced by `_build_request_with_process_graph`
1486+
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
1487+
(overruling the connection's ``auto_validate`` setting).
1488+
1489+
:return:
1490+
"""
1491+
if validate is None:
1492+
validate = self._auto_validate
1493+
if validate and self.capabilities().supports_endpoint("/validation", "POST"):
1494+
# At present, the intention is that a failed validation does not block
1495+
# the job from running, it is only reported as a warning.
1496+
# Therefor we also want to continue when something *else* goes wrong
1497+
# *during* the validation.
1498+
try:
1499+
resp = self.post(path="/validation", json=pg_with_metadata["process"], expected_status=200)
1500+
validation_errors = resp.json()["errors"]
1501+
if validation_errors:
1502+
_log.warning(
1503+
"Preflight process graph validation raised: "
1504+
+ (" ".join(f"[{e.get('code')}] {e.get('message')}" for e in validation_errors))
1505+
)
1506+
except Exception as e:
1507+
_log.error(f"Preflight process graph validation failed: {e}", exc_info=True)
1508+
1509+
# TODO: additional validation and sanity checks: e.g. is there a result node, are all process_ids valid, ...?
14951510

14961511
# TODO: unify `download` and `execute` better: e.g. `download` always writes to disk, `execute` returns result (raw or as JSON decoded dict)
14971512
def download(
14981513
self,
14991514
graph: Union[dict, FlatGraphableMixin, str, Path],
15001515
outputfile: Union[Path, str, None] = None,
15011516
timeout: Optional[int] = None,
1502-
validate: bool = True,
1517+
validate: Optional[bool] = None,
15031518
) -> Union[None, bytes]:
15041519
"""
15051520
Downloads the result of a process graph synchronously,
@@ -1510,14 +1525,14 @@ def download(
15101525
or as local file path or URL
15111526
:param outputfile: output file
15121527
:param timeout: timeout to wait for response
1528+
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
1529+
(overruling the connection's ``auto_validate`` setting).
15131530
"""
1514-
if validate:
1515-
self._warn_if_process_graph_invalid(process_graph=graph)
1516-
1517-
request = self._build_request_with_process_graph(process_graph=graph)
1531+
pg_with_metadata = self._build_request_with_process_graph(process_graph=graph)
1532+
self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
15181533
response = self.post(
15191534
path="/result",
1520-
json=request,
1535+
json=pg_with_metadata,
15211536
expected_status=200,
15221537
stream=True,
15231538
timeout=timeout or DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE,
@@ -1534,22 +1549,23 @@ def execute(
15341549
self,
15351550
process_graph: Union[dict, str, Path],
15361551
timeout: Optional[int] = None,
1537-
validate: bool = True,
1552+
validate: Optional[bool] = None,
15381553
):
15391554
"""
15401555
Execute a process graph synchronously and return the result (assumed to be JSON).
15411556
15421557
:param process_graph: (flat) dict representing a process graph, or process graph as raw JSON string,
15431558
or as local file path or URL
1559+
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
1560+
(overruling the connection's ``auto_validate`` setting).
1561+
15441562
:return: parsed JSON response
15451563
"""
1546-
if validate:
1547-
self._warn_if_process_graph_invalid(process_graph=process_graph)
1548-
1549-
req = self._build_request_with_process_graph(process_graph=process_graph)
1564+
pg_with_metadata = self._build_request_with_process_graph(process_graph=process_graph)
1565+
self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
15501566
return self.post(
15511567
path="/result",
1552-
json=req,
1568+
json=pg_with_metadata,
15531569
expected_status=200,
15541570
timeout=timeout or DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE,
15551571
).json()
@@ -1563,7 +1579,7 @@ def create_job(
15631579
plan: Optional[str] = None,
15641580
budget: Optional[float] = None,
15651581
additional: Optional[dict] = None,
1566-
validate: bool = True,
1582+
validate: Optional[bool] = None,
15671583
) -> BatchJob:
15681584
"""
15691585
Create a new job from given process graph on the back-end.
@@ -1575,22 +1591,22 @@ def create_job(
15751591
:param plan: billing plan
15761592
:param budget: maximum cost the request is allowed to produce
15771593
:param additional: additional job options to pass to the backend
1594+
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
1595+
(overruling the connection's ``auto_validate`` setting).
15781596
:return: Created job
15791597
"""
15801598
# TODO move all this (BatchJob factory) logic to BatchJob?
15811599

1582-
if validate:
1583-
self._warn_if_process_graph_invalid(process_graph=process_graph)
1584-
1585-
req = self._build_request_with_process_graph(
1600+
pg_with_metadata = self._build_request_with_process_graph(
15861601
process_graph=process_graph,
15871602
**dict_no_none(title=title, description=description, plan=plan, budget=budget)
15881603
)
15891604
if additional:
15901605
# TODO: get rid of this non-standard field? https://github.com/Open-EO/openeo-api/issues/276
1591-
req["job_options"] = additional
1606+
pg_with_metadata["job_options"] = additional
15921607

1593-
response = self.post("/jobs", json=req, expected_status=201)
1608+
self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
1609+
response = self.post("/jobs", json=pg_with_metadata, expected_status=201)
15941610

15951611
job_id = None
15961612
if "openeo-identifier" in response.headers:
@@ -1668,8 +1684,8 @@ def as_curl(
16681684
cmd += ["-H", "Content-Type: application/json"]
16691685
if isinstance(self.auth, BearerAuth):
16701686
cmd += ["-H", f"Authorization: Bearer {'...' if obfuscate_auth else self.auth.bearer}"]
1671-
post_data = self._build_request_with_process_graph(data)
1672-
post_json = json.dumps(post_data, separators=(',', ':'))
1687+
pg_with_metadata = self._build_request_with_process_graph(data)
1688+
post_json = json.dumps(pg_with_metadata, separators=(",", ":"))
16731689
cmd += ["--data", post_json]
16741690
cmd += [self.build_url(path)]
16751691
return " ".join(shlex.quote(c) for c in cmd)
@@ -1689,17 +1705,20 @@ def version_info(self):
16891705

16901706

16911707
def connect(
1692-
url: Optional[str] = None,
1693-
auth_type: Optional[str] = None, auth_options: Optional[dict] = None,
1694-
session: Optional[requests.Session] = None,
1695-
default_timeout: Optional[int] = None,
1708+
url: Optional[str] = None,
1709+
*,
1710+
auth_type: Optional[str] = None,
1711+
auth_options: Optional[dict] = None,
1712+
session: Optional[requests.Session] = None,
1713+
default_timeout: Optional[int] = None,
1714+
auto_validate: bool = True,
16961715
) -> Connection:
16971716
"""
16981717
This method is the entry point to OpenEO.
16991718
You typically create one connection object in your script or application
17001719
and re-use it for all calls to that backend.
17011720
1702-
If the backend requires authentication, you can pass authentication data directly to this function
1721+
If the backend requires authentication, you can pass authentication data directly to this function,
17031722
but it could be easier to authenticate as follows:
17041723
17051724
>>> # For basic authentication
@@ -1711,7 +1730,10 @@ def connect(
17111730
:param auth_type: Which authentication to use: None, "basic" or "oidc" (for OpenID Connect)
17121731
:param auth_options: Options/arguments specific to the authentication type
17131732
:param default_timeout: default timeout (in seconds) for requests
1714-
:rtype: openeo.connections.Connection
1733+
:param auto_validate: toggle to automatically validate process graphs before execution
1734+
1735+
.. versionadded:: 0.24.0
1736+
added ``auto_validate`` argument
17151737
"""
17161738

17171739
def _config_log(message):
@@ -1736,7 +1758,7 @@ def _config_log(message):
17361758

17371759
if not url:
17381760
raise OpenEoClientException("No openEO back-end URL given or known to connect to.")
1739-
connection = Connection(url, session=session, default_timeout=default_timeout)
1761+
connection = Connection(url, session=session, default_timeout=default_timeout, auto_validate=auto_validate)
17401762

17411763
auth_type = auth_type.lower() if isinstance(auth_type, str) else auth_type
17421764
if auth_type in {None, False, 'null', 'none'}:

openeo/rest/datacube.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1981,7 +1981,8 @@ def download(
19811981
outputfile: Optional[Union[str, pathlib.Path]] = None,
19821982
format: Optional[str] = None,
19831983
options: Optional[dict] = None,
1984-
validate: bool = True,
1984+
*,
1985+
validate: Optional[bool] = None,
19851986
) -> Union[None, bytes]:
19861987
"""
19871988
Execute synchronously and download the raster data cube, e.g. as GeoTIFF.
@@ -1992,6 +1993,8 @@ def download(
19921993
:param outputfile: Optional, an output file if the result needs to be stored on disk.
19931994
:param format: Optional, an output format supported by the backend.
19941995
:param options: Optional, file format options
1996+
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
1997+
(overruling the connection's ``auto_validate`` setting).
19951998
:return: None if the result is stored to disk, or a bytes object returned by the backend.
19961999
"""
19972000
if format is None and outputfile:
@@ -2099,7 +2102,7 @@ def execute_batch(
20992102
max_poll_interval: float = 60,
21002103
connection_retry_interval: float = 30,
21012104
job_options: Optional[dict] = None,
2102-
validate: bool = True,
2105+
validate: Optional[bool] = None,
21032106
# TODO: avoid `format_options` as keyword arguments
21042107
**format_options,
21052108
) -> BatchJob:
@@ -2112,6 +2115,8 @@ def execute_batch(
21122115
:param outputfile: The path of a file to which a result can be written
21132116
:param out_format: (optional) File format to use for the job result.
21142117
:param job_options:
2118+
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
2119+
(overruling the connection's ``auto_validate`` setting).
21152120
"""
21162121
if "format" in format_options and not out_format:
21172122
out_format = format_options["format"] # align with 'download' call arg name
@@ -2134,7 +2139,7 @@ def create_job(
21342139
plan: Optional[str] = None,
21352140
budget: Optional[float] = None,
21362141
job_options: Optional[dict] = None,
2137-
validate: bool = True,
2142+
validate: Optional[bool] = None,
21382143
# TODO: avoid `format_options` as keyword arguments
21392144
**format_options,
21402145
) -> BatchJob:
@@ -2152,6 +2157,9 @@ def create_job(
21522157
:param plan: billing plan
21532158
:param budget: maximum cost the request is allowed to produce
21542159
:param job_options: custom job options.
2160+
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
2161+
(overruling the connection's ``auto_validate`` setting).
2162+
21552163
:return: Created job.
21562164
"""
21572165
# TODO: add option to also automatically start the job?
@@ -2200,8 +2208,8 @@ def save_user_defined_process(
22002208
returns=returns, categories=categories, examples=examples, links=links,
22012209
)
22022210

2203-
def execute(self, validate: bool = True) -> dict:
2204-
"""Executes the process graph of the imagery. """
2211+
def execute(self, *, validate: Optional[bool] = None) -> dict:
2212+
"""Executes the process graph."""
22052213
return self._connection.execute(self.flat_graph(), validate=validate)
22062214

22072215
@staticmethod

openeo/rest/udp.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ def store(
9494
# TODO: this "public" flag is not standardized yet EP-3609, https://github.com/Open-EO/openeo-api/issues/310
9595
process["public"] = public
9696

97+
self._connection._preflight_validation(pg_with_metadata=process)
9798
self._connection.put(
9899
path="/process_graphs/{}".format(self.user_defined_process_id), json=process, expected_status=200
99100
)

0 commit comments

Comments
 (0)