Skip to content

Commit 78e8e93

Browse files
committed
Merge branch 'issue404_automatically-validate-process-graph-before-download-or-execute'
2 parents 2622c8a + 54fa08b commit 78e8e93

12 files changed

+709
-133
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
### Added
1212

1313
- Add `DataCube.reduce_spatial()`
14+
- Added option (enabled by default) to automatically validate a process graph before execution. Validation issues just trigger warnings for now. ([#404](https://github.com/Open-EO/openeo-python-client/issues/404))
1415

1516
### Changed
1617

openeo/rest/_testing.py

+34-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import re
23
from typing import Optional, Union
34

@@ -11,16 +12,33 @@ class DummyBackend:
1112
and allows inspection of posted process graphs
1213
"""
1314

15+
__slots__ = (
16+
"connection",
17+
"sync_requests",
18+
"batch_jobs",
19+
"validation_requests",
20+
"next_result",
21+
"next_validation_errors",
22+
)
23+
1424
# Default result (can serve both as JSON or binary data)
1525
DEFAULT_RESULT = b'{"what?": "Result data"}'
1626

1727
def __init__(self, requests_mock, connection: Connection):
1828
self.connection = connection
1929
self.sync_requests = []
2030
self.batch_jobs = {}
31+
self.validation_requests = []
2132
self.next_result = self.DEFAULT_RESULT
22-
requests_mock.post(connection.build_url("/result"), content=self._handle_post_result)
23-
requests_mock.post(connection.build_url("/jobs"), content=self._handle_post_jobs)
33+
self.next_validation_errors = []
34+
requests_mock.post(
35+
connection.build_url("/result"),
36+
content=self._handle_post_result,
37+
)
38+
requests_mock.post(
39+
connection.build_url("/jobs"),
40+
content=self._handle_post_jobs,
41+
)
2442
requests_mock.post(
2543
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), content=self._handle_post_job_results
2644
)
@@ -32,12 +50,19 @@ def __init__(self, requests_mock, connection: Connection):
3250
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
3351
content=self._handle_get_job_result_asset,
3452
)
53+
requests_mock.post(connection.build_url("/validation"), json=self._handle_post_validation)
3554

3655
def _handle_post_result(self, request, context):
3756
"""handler of `POST /result` (synchronous execute)"""
3857
pg = request.json()["process"]["process_graph"]
3958
self.sync_requests.append(pg)
40-
return self.next_result
59+
result = self.next_result
60+
if isinstance(result, (dict, list)):
61+
result = json.dumps(result).encode("utf-8")
62+
elif isinstance(result, str):
63+
result = result.encode("utf-8")
64+
assert isinstance(result, bytes)
65+
return result
4166

4267
def _handle_post_jobs(self, request, context):
4368
"""handler of `POST /jobs` (create batch job)"""
@@ -83,6 +108,12 @@ def _handle_get_job_result_asset(self, request, context):
83108
assert self.batch_jobs[job_id]["status"] == "finished"
84109
return self.next_result
85110

111+
def _handle_post_validation(self, request, context):
112+
"""Handler of `POST /validation` (validate process graph)."""
113+
pg = request.json()["process_graph"]
114+
self.validation_requests.append(pg)
115+
return {"errors": self.next_validation_errors}
116+
86117
def get_sync_pg(self) -> dict:
87118
"""Get one and only synchronous process graph"""
88119
assert len(self.sync_requests) == 1

openeo/rest/connection.py

+77-23
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ def __init__(
260260
refresh_token_store: Optional[RefreshTokenStore] = None,
261261
slow_response_threshold: Optional[float] = None,
262262
oidc_auth_renewer: Optional[OidcAuthenticator] = None,
263+
auto_validate: bool = True,
263264
):
264265
"""
265266
Constructor of Connection, authenticates user.
@@ -282,6 +283,7 @@ def __init__(
282283
self._auth_config = auth_config
283284
self._refresh_token_store = refresh_token_store
284285
self._oidc_auth_renewer = oidc_auth_renewer
286+
self._auto_validate = auto_validate
285287

286288
@classmethod
287289
def version_discovery(
@@ -1052,8 +1054,8 @@ def validate_process_graph(self, process_graph: dict) -> List[dict]:
10521054
:param process_graph: (flat) dict representing process graph
10531055
:return: list of errors (dictionaries with "code" and "message" fields)
10541056
"""
1055-
request = {"process_graph": process_graph}
1056-
return self.post(path="/validation", json=request, expected_status=200).json()["errors"]
1057+
pg_with_metadata = self._build_request_with_process_graph(process_graph)["process"]
1058+
return self.post(path="/validation", json=pg_with_metadata, expected_status=200).json()["errors"]
10571059

10581060
@property
10591061
def _api_version(self) -> ComparableVersion:
@@ -1393,8 +1395,9 @@ def load_url(self, url: str, format: str, options: Optional[dict] = None):
13931395

13941396
def create_service(self, graph: dict, type: str, **kwargs) -> Service:
13951397
# TODO: type hint for graph: is it a nested or a flat one?
1396-
req = self._build_request_with_process_graph(process_graph=graph, type=type, **kwargs)
1397-
response = self.post(path="/services", json=req, expected_status=201)
1398+
pg_with_metadata = self._build_request_with_process_graph(process_graph=graph, type=type, **kwargs)
1399+
self._preflight_validation(pg_with_metadata=pg_with_metadata)
1400+
response = self.post(path="/services", json=pg_with_metadata, expected_status=201)
13981401
service_id = response.headers.get("OpenEO-Identifier")
13991402
return Service(service_id, self)
14001403

@@ -1463,23 +1466,55 @@ def upload_file(
14631466
def _build_request_with_process_graph(self, process_graph: Union[dict, FlatGraphableMixin, Any], **kwargs) -> dict:
14641467
"""
14651468
Prepare a json payload with a process graph to submit to /result, /services, /jobs, ...
1466-
:param process_graph: flat dict representing a process graph
1469+
:param process_graph: flat dict representing a "process graph with metadata" ({"process": {"process_graph": ...}, ...})
14671470
"""
14681471
# TODO: make this a more general helper (like `as_flat_graph`)
14691472
result = kwargs
14701473
process_graph = as_flat_graph(process_graph)
14711474
if "process_graph" not in process_graph:
14721475
process_graph = {"process_graph": process_graph}
1473-
# TODO: also check if `process_graph` already has "process" key (i.e. is a "process graph with metadata already)
1476+
# TODO: also check if `process_graph` already has "process" key (i.e. is a "process graph with metadata" already)
14741477
result["process"] = process_graph
14751478
return result
14761479

1480+
def _preflight_validation(self, pg_with_metadata: dict, *, validate: Optional[bool] = None):
1481+
"""
1482+
Preflight validation of process graph to execute.
1483+
1484+
:param pg_with_metadata: flat dict representation of process graph with metadata,
1485+
e.g. as produced by `_build_request_with_process_graph`
1486+
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
1487+
(overruling the connection's ``auto_validate`` setting).
1488+
1489+
:return:
1490+
"""
1491+
if validate is None:
1492+
validate = self._auto_validate
1493+
if validate and self.capabilities().supports_endpoint("/validation", "POST"):
1494+
# At present, the intention is that a failed validation does not block
1495+
# the job from running, it is only reported as a warning.
1496+
# Therefor we also want to continue when something *else* goes wrong
1497+
# *during* the validation.
1498+
try:
1499+
resp = self.post(path="/validation", json=pg_with_metadata["process"], expected_status=200)
1500+
validation_errors = resp.json()["errors"]
1501+
if validation_errors:
1502+
_log.warning(
1503+
"Preflight process graph validation raised: "
1504+
+ (" ".join(f"[{e.get('code')}] {e.get('message')}" for e in validation_errors))
1505+
)
1506+
except Exception as e:
1507+
_log.error(f"Preflight process graph validation failed: {e}", exc_info=True)
1508+
1509+
# TODO: additional validation and sanity checks: e.g. is there a result node, are all process_ids valid, ...?
1510+
14771511
# TODO: unify `download` and `execute` better: e.g. `download` always writes to disk, `execute` returns result (raw or as JSON decoded dict)
14781512
def download(
14791513
self,
14801514
graph: Union[dict, FlatGraphableMixin, str, Path],
14811515
outputfile: Union[Path, str, None] = None,
14821516
timeout: Optional[int] = None,
1517+
validate: Optional[bool] = None,
14831518
) -> Union[None, bytes]:
14841519
"""
14851520
Downloads the result of a process graph synchronously,
@@ -1490,11 +1525,14 @@ def download(
14901525
or as local file path or URL
14911526
:param outputfile: output file
14921527
:param timeout: timeout to wait for response
1528+
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
1529+
(overruling the connection's ``auto_validate`` setting).
14931530
"""
1494-
request = self._build_request_with_process_graph(process_graph=graph)
1531+
pg_with_metadata = self._build_request_with_process_graph(process_graph=graph)
1532+
self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
14951533
response = self.post(
14961534
path="/result",
1497-
json=request,
1535+
json=pg_with_metadata,
14981536
expected_status=200,
14991537
stream=True,
15001538
timeout=timeout or DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE,
@@ -1511,21 +1549,26 @@ def execute(
15111549
self,
15121550
process_graph: Union[dict, str, Path],
15131551
timeout: Optional[int] = None,
1552+
validate: Optional[bool] = None,
15141553
):
15151554
"""
15161555
Execute a process graph synchronously and return the result (assumed to be JSON).
15171556
15181557
:param process_graph: (flat) dict representing a process graph, or process graph as raw JSON string,
15191558
or as local file path or URL
1559+
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
1560+
(overruling the connection's ``auto_validate`` setting).
1561+
15201562
:return: parsed JSON response
15211563
"""
1522-
req = self._build_request_with_process_graph(process_graph=process_graph)
1564+
pg_with_metadata = self._build_request_with_process_graph(process_graph=process_graph)
1565+
self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
15231566
return self.post(
15241567
path="/result",
1525-
json=req,
1568+
json=pg_with_metadata,
15261569
expected_status=200,
15271570
timeout=timeout or DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE,
1528-
).json()
1571+
).json() # TODO: only do JSON decoding when mimetype is actually JSON?
15291572

15301573
def create_job(
15311574
self,
@@ -1536,6 +1579,7 @@ def create_job(
15361579
plan: Optional[str] = None,
15371580
budget: Optional[float] = None,
15381581
additional: Optional[dict] = None,
1582+
validate: Optional[bool] = None,
15391583
) -> BatchJob:
15401584
"""
15411585
Create a new job from given process graph on the back-end.
@@ -1547,18 +1591,22 @@ def create_job(
15471591
:param plan: billing plan
15481592
:param budget: maximum cost the request is allowed to produce
15491593
:param additional: additional job options to pass to the backend
1594+
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
1595+
(overruling the connection's ``auto_validate`` setting).
15501596
:return: Created job
15511597
"""
15521598
# TODO move all this (BatchJob factory) logic to BatchJob?
1553-
req = self._build_request_with_process_graph(
1599+
1600+
pg_with_metadata = self._build_request_with_process_graph(
15541601
process_graph=process_graph,
15551602
**dict_no_none(title=title, description=description, plan=plan, budget=budget)
15561603
)
15571604
if additional:
15581605
# TODO: get rid of this non-standard field? https://github.com/Open-EO/openeo-api/issues/276
1559-
req["job_options"] = additional
1606+
pg_with_metadata["job_options"] = additional
15601607

1561-
response = self.post("/jobs", json=req, expected_status=201)
1608+
self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
1609+
response = self.post("/jobs", json=pg_with_metadata, expected_status=201)
15621610

15631611
job_id = None
15641612
if "openeo-identifier" in response.headers:
@@ -1636,8 +1684,8 @@ def as_curl(
16361684
cmd += ["-H", "Content-Type: application/json"]
16371685
if isinstance(self.auth, BearerAuth):
16381686
cmd += ["-H", f"Authorization: Bearer {'...' if obfuscate_auth else self.auth.bearer}"]
1639-
post_data = self._build_request_with_process_graph(data)
1640-
post_json = json.dumps(post_data, separators=(',', ':'))
1687+
pg_with_metadata = self._build_request_with_process_graph(data)
1688+
post_json = json.dumps(pg_with_metadata, separators=(",", ":"))
16411689
cmd += ["--data", post_json]
16421690
cmd += [self.build_url(path)]
16431691
return " ".join(shlex.quote(c) for c in cmd)
@@ -1657,17 +1705,20 @@ def version_info(self):
16571705

16581706

16591707
def connect(
1660-
url: Optional[str] = None,
1661-
auth_type: Optional[str] = None, auth_options: Optional[dict] = None,
1662-
session: Optional[requests.Session] = None,
1663-
default_timeout: Optional[int] = None,
1708+
url: Optional[str] = None,
1709+
*,
1710+
auth_type: Optional[str] = None,
1711+
auth_options: Optional[dict] = None,
1712+
session: Optional[requests.Session] = None,
1713+
default_timeout: Optional[int] = None,
1714+
auto_validate: bool = True,
16641715
) -> Connection:
16651716
"""
16661717
This method is the entry point to OpenEO.
16671718
You typically create one connection object in your script or application
16681719
and re-use it for all calls to that backend.
16691720
1670-
If the backend requires authentication, you can pass authentication data directly to this function
1721+
If the backend requires authentication, you can pass authentication data directly to this function,
16711722
but it could be easier to authenticate as follows:
16721723
16731724
>>> # For basic authentication
@@ -1679,7 +1730,10 @@ def connect(
16791730
:param auth_type: Which authentication to use: None, "basic" or "oidc" (for OpenID Connect)
16801731
:param auth_options: Options/arguments specific to the authentication type
16811732
:param default_timeout: default timeout (in seconds) for requests
1682-
:rtype: openeo.connections.Connection
1733+
:param auto_validate: toggle to automatically validate process graphs before execution
1734+
1735+
.. versionadded:: 0.24.0
1736+
added ``auto_validate`` argument
16831737
"""
16841738

16851739
def _config_log(message):
@@ -1704,7 +1758,7 @@ def _config_log(message):
17041758

17051759
if not url:
17061760
raise OpenEoClientException("No openEO back-end URL given or known to connect to.")
1707-
connection = Connection(url, session=session, default_timeout=default_timeout)
1761+
connection = Connection(url, session=session, default_timeout=default_timeout, auto_validate=auto_validate)
17081762

17091763
auth_type = auth_type.lower() if isinstance(auth_type, str) else auth_type
17101764
if auth_type in {None, False, 'null', 'none'}:

0 commit comments

Comments
 (0)