Skip to content

Commit eb03aa8

Browse files
committed
PR #200 finetune VectorCube.apply_dimension
ref: #197, Open-EO/openeo-geopyspark-driver#437
1 parent acd2e7a commit eb03aa8

File tree

4 files changed

+217
-33
lines changed

4 files changed

+217
-33
lines changed

openeo_driver/datacube.py

+40-33
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
from openeo.metadata import CollectionMetadata
2020
from openeo.util import ensure_dir, str_truncate
21+
import openeo.udf
2122
from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset
2223
from openeo_driver.errors import FeatureUnsupportedException, InternalException
2324
from openeo_driver.util.geometry import GeometryBufferer, validate_geojson_coordinates
2425
from openeo_driver.util.ioformats import IOFORMATS
26+
from openeo_driver.util.pgparsing import SingleRunUDFProcessGraph
2527
from openeo_driver.util.utm import area_in_square_meters
2628
from openeo_driver.utils import EvalEnv
27-
from openeogeotrellis.backend import SingleNodeUDFProcessGraphVisitor
2829

2930
log = logging.getLogger(__name__)
3031

@@ -248,38 +249,6 @@ def with_cube(self, cube: xarray.DataArray, flatten_prefix: str = FLATTEN_PREFIX
248249
geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix
249250
)
250251

251-
def apply_dimension(
252-
self,
253-
process: dict,
254-
*,
255-
dimension: str,
256-
target_dimension: Optional[str] = None,
257-
context: Optional[dict] = None,
258-
env: EvalEnv,
259-
) -> "DriverVectorCube":
260-
if dimension == "bands" and target_dimension == None and len(process) == 1 and next(iter(process.values())).get('process_id') == 'run_udf':
261-
visitor = SingleNodeUDFProcessGraphVisitor().accept_process_graph(process)
262-
udf = visitor.udf_args.get('udf', None)
263-
264-
from openeo.udf import FeatureCollection, UdfData
265-
collection = FeatureCollection(id='VectorCollection', data=self._as_geopandas_df())
266-
data = UdfData(
267-
proj={"EPSG": self._geometries.crs.to_epsg()}, feature_collection_list=[collection], user_context=context
268-
)
269-
270-
log.info(f"[run_udf] Running UDF {str_truncate(udf, width=256)!r} on {data!r}")
271-
result_data = env.backend_implementation.processing.run_udf(udf, data)
272-
log.info(f"[run_udf] UDF resulted in {result_data!r}")
273-
274-
if isinstance(result_data, UdfData):
275-
if(result_data.get_feature_collection_list() is not None and len(result_data.get_feature_collection_list()) == 1):
276-
return DriverVectorCube(geometries=result_data.get_feature_collection_list()[0].data)
277-
278-
raise ValueError(f"Could not handle UDF result: {result_data}")
279-
280-
else:
281-
raise FeatureUnsupportedException()
282-
283252
@classmethod
284253
def from_fiona(
285254
cls,
@@ -537,6 +506,44 @@ def buffer_points(self, distance: float = 10) -> "DriverVectorCube":
537506
]
538507
)
539508

509+
def apply_dimension(
510+
self,
511+
process: dict,
512+
*,
513+
dimension: str,
514+
target_dimension: Optional[str] = None,
515+
context: Optional[dict] = None,
516+
env: EvalEnv,
517+
) -> "DriverVectorCube":
518+
single_run_udf = SingleRunUDFProcessGraph.parse_or_none(process)
519+
520+
if single_run_udf:
521+
# Process with single "run_udf" node
522+
if self._cube is None and dimension == self.DIM_GEOMETRIES and target_dimension is None:
523+
log.warning(
524+
f"Using experimental feature: DriverVectorCube.apply_dimension along dim {dimension} and empty cube"
525+
)
526+
# TODO: this is non-standard special case: vector cube with only geometries, but no "cube" data
527+
gdf = self._as_geopandas_df()
528+
feature_collection = openeo.udf.FeatureCollection(id="_", data=gdf)
529+
udf_data = openeo.udf.UdfData(
530+
proj={"EPSG": self._geometries.crs.to_epsg()},
531+
feature_collection_list=[feature_collection],
532+
user_context=context,
533+
)
534+
log.info(f"[run_udf] Running UDF {str_truncate(single_run_udf.udf, width=256)!r} on {udf_data!r}")
535+
result_data = env.backend_implementation.processing.run_udf(udf=single_run_udf.udf, data=udf_data)
536+
log.info(f"[run_udf] UDF resulted in {result_data!r}")
537+
538+
if isinstance(result_data, openeo.udf.UdfData):
539+
result_features = result_data.get_feature_collection_list()
540+
if result_features and len(result_features) == 1:
541+
return DriverVectorCube(geometries=result_features[0].data)
542+
raise ValueError(f"Could not handle UDF result: {result_data}")
543+
544+
raise FeatureUnsupportedException()
545+
546+
540547

541548
class DriverMlModel:
542549
"""Base class for driver-side 'ml-model' data structures"""

openeo_driver/util/pgparsing.py

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import dataclasses
2+
from typing import Optional
3+
4+
5+
class NotASingleRunUDFProcessGraph(ValueError):
6+
pass
7+
8+
9+
@dataclasses.dataclass(frozen=True)
10+
class SingleRunUDFProcessGraph:
11+
"""
12+
Container (and parser) for a callback process graph containing only a single `run_udf` node.
13+
"""
14+
15+
data: dict
16+
udf: str
17+
runtime: str
18+
version: Optional[str] = None
19+
context: Optional[dict] = None
20+
21+
@classmethod
22+
def parse(cls, process_graph: dict) -> "SingleRunUDFProcessGraph":
23+
try:
24+
(node,) = process_graph.values()
25+
assert node["process_id"] == "run_udf"
26+
assert node["result"] is True
27+
arguments = node["arguments"]
28+
assert {"data", "udf", "runtime"}.issubset(arguments.keys())
29+
30+
return cls(
31+
data=arguments["data"],
32+
udf=arguments["udf"],
33+
runtime=arguments["runtime"],
34+
version=arguments.get("version"),
35+
context=arguments.get("context") or {},
36+
)
37+
except Exception as e:
38+
raise NotASingleRunUDFProcessGraph(str(e)) from e
39+
40+
@classmethod
41+
def parse_or_none(cls, process_graph: dict) -> Optional["SingleNodeRunUDFProcessGraph"]:
42+
try:
43+
return cls.parse(process_graph=process_graph)
44+
except NotASingleRunUDFProcessGraph:
45+
return None

tests/test_vectorcube.py

+54
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import textwrap
2+
13
import geopandas as gpd
24
import numpy.testing
35
import pyproj
@@ -9,6 +11,7 @@
911
from openeo_driver.datacube import DriverVectorCube
1012
from openeo_driver.testing import DictSubSet, ApproxGeometry
1113
from openeo_driver.util.geometry import as_geojson_feature_collection
14+
from openeo_driver.utils import EvalEnv
1215

1316
from .data import get_path
1417

@@ -22,6 +25,10 @@ def gdf(self) -> gpd.GeoDataFrame:
2225
df = gpd.read_file(path)
2326
return df
2427

28+
@pytest.fixture
29+
def vc(self, gdf) -> DriverVectorCube:
30+
return DriverVectorCube(geometries=gdf)
31+
2532
def test_basic(self, gdf):
2633
vc = DriverVectorCube(gdf)
2734
assert vc.get_bounding_box() == (1, 1, 5, 4)
@@ -446,3 +453,50 @@ def test_buffer_points(self):
446453
],
447454
}
448455
)
456+
457+
def test_apply_dimension_run_udf(self, vc, backend_implementation):
458+
udf = textwrap.dedent(
459+
"""
460+
from openeo.udf import UdfData, FeatureCollection
461+
def process_geometries(udf_data: UdfData) -> UdfData:
462+
[feature_collection] = udf_data.get_feature_collection_list()
463+
gdf = feature_collection.data
464+
gdf["geometry"] = gdf["geometry"].buffer(distance=1, resolution=2)
465+
udf_data.set_feature_collection_list([
466+
FeatureCollection(id="_", data=gdf),
467+
])
468+
"""
469+
)
470+
callback = {
471+
"runudf1": {
472+
"process_id": "run_udf",
473+
"arguments": {"data": {"from_parameter": "data"}, "udf": udf, "runtime": "Python"},
474+
"result": True,
475+
}
476+
}
477+
env = EvalEnv({"backend_implementation": backend_implementation})
478+
result = vc.apply_dimension(process=callback, dimension="geometries", env=env)
479+
assert isinstance(result, DriverVectorCube)
480+
feature_collection = result.to_geojson()
481+
assert feature_collection == DictSubSet(
482+
{
483+
"type": "FeatureCollection",
484+
"bbox": pytest.approx((0, 0, 6, 5), abs=0.1),
485+
"features": [
486+
{
487+
"type": "Feature",
488+
"bbox": pytest.approx((0, 0, 4, 4), abs=0.1),
489+
"geometry": DictSubSet({"type": "Polygon"}),
490+
"id": "0",
491+
"properties": {"id": "first", "pop": 1234},
492+
},
493+
{
494+
"type": "Feature",
495+
"bbox": pytest.approx((2, 1, 6, 5), abs=0.1),
496+
"geometry": DictSubSet({"type": "Polygon"}),
497+
"id": "1",
498+
"properties": {"id": "second", "pop": 5678},
499+
},
500+
],
501+
}
502+
)

tests/util/test_pgparsing.py

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import pytest
2+
3+
from openeo_driver.util.pgparsing import SingleRunUDFProcessGraph, NotASingleRunUDFProcessGraph
4+
5+
6+
class TestSingleRunUDFProcessGraph:
7+
def test_parse_basic(self):
8+
pg = {
9+
"runudf1": {
10+
"process_id": "run_udf",
11+
"arguments": {
12+
"data": {"from_parameter": "data"},
13+
"udf": "print('Hello world')",
14+
"runtime": "Python",
15+
},
16+
"result": True,
17+
}
18+
}
19+
run_udf = SingleRunUDFProcessGraph.parse(pg)
20+
assert run_udf.data == {"from_parameter": "data"}
21+
assert run_udf.udf == "print('Hello world')"
22+
assert run_udf.runtime == "Python"
23+
assert run_udf.version is None
24+
assert run_udf.context == {}
25+
26+
@pytest.mark.parametrize(
27+
"pg",
28+
[
29+
{
30+
"runudf1": {
31+
"process_id": "run_udffffffffffffffff",
32+
"arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"},
33+
"result": True,
34+
}
35+
},
36+
{
37+
"runudf1": {
38+
"process_id": "run_udf",
39+
"arguments": {"udf": "x = 4", "runtime": "Python"},
40+
"result": True,
41+
}
42+
},
43+
{
44+
"runudf1": {
45+
"process_id": "run_udf",
46+
"arguments": {"data": {"from_parameter": "data"}, "runtime": "Python"},
47+
"result": True,
48+
}
49+
},
50+
{
51+
"runudf1": {
52+
"process_id": "run_udf",
53+
"arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4"},
54+
"result": True,
55+
}
56+
},
57+
{
58+
"runudf1": {
59+
"process_id": "run_udf",
60+
"arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"},
61+
}
62+
},
63+
{
64+
"runudf1": {
65+
"process_id": "run_udf",
66+
"arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"},
67+
"result": True,
68+
},
69+
"runudf2": {
70+
"process_id": "run_udf",
71+
"arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"},
72+
},
73+
},
74+
],
75+
)
76+
def test_parse_invalid(self, pg):
77+
with pytest.raises(NotASingleRunUDFProcessGraph):
78+
_ = SingleRunUDFProcessGraph.parse(pg)

0 commit comments

Comments
 (0)