Skip to content

improve dry run #313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 12 additions & 21 deletions openeo_driver/ProcessGraphDeserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1222,9 +1222,6 @@ def aggregate_spatial(args: ProcessArgs, env: EvalEnv) -> DriverDataCube:
# TODO #114: convert all cases to DriverVectorCube first and just work with that
if isinstance(geoms, DriverVectorCube):
pass
elif isinstance(geoms, DryRunDataCube):
# TODO: properly support DriverVectorCube in dry run
geoms = DriverVectorCube(geometries=gpd.GeoDataFrame(geometry=[]), cube=None)
elif isinstance(geoms, dict):
try:
# Automatically convert inline GeoJSON to a vector cube #114/#141
Expand Down Expand Up @@ -1397,7 +1394,6 @@ def filter_spatial(args: Dict, env: EvalEnv) -> DriverDataCube:
elif isinstance(geometries, DriverVectorCube):
pass
else:
# TODO #114: support DriverVectorCube
raise NotImplementedError(
"filter_spatial does not support {g!r}".format(g=geometries)
)
Expand Down Expand Up @@ -1510,7 +1506,6 @@ def run_udf(args: dict, env: EvalEnv):
_log.info(f"run_udf: data of type {type(data)} has direct run_udf support")
return data.run_udf(udf=udf, runtime=runtime, context=context, env=env)

# TODO #114 add support for DriverVectorCube
if isinstance(data, AggregatePolygonResult):
pass
if isinstance(data, DriverVectorCube):
Expand Down Expand Up @@ -1860,11 +1855,13 @@ def get_geometries(args: Dict, env: EvalEnv) -> Union[DelayedVector, dict]:
)
def raster_to_vector(args: Dict, env: EvalEnv):
image_collection = extract_arg(args, 'data')

if not isinstance(image_collection, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="data", process="raster_to_vector",
reason=f"Invalid data type {type(image_collection)!r} expected raster-cube."
)

return image_collection.raster_to_vector()


Expand All @@ -1876,34 +1873,28 @@ def raster_to_vector(args: Dict, env: EvalEnv):
)
def vector_to_raster(args: dict, env: EvalEnv) -> DriverDataCube:
input_vector_cube = extract_arg(args, "data")
dry_run_tracer: DryRunDataTracer = env.get(ENV_DRY_RUN_TRACER)
if dry_run_tracer:
if not isinstance(input_vector_cube, DryRunDataCube):
raise ProcessParameterInvalidException(
parameter="data",
process="vector_to_raster",
reason=f"Invalid data type {type(input_vector_cube)!r} expected vector-cube.",
)
return input_vector_cube

if "target_data_cube" in args:
target = extract_arg(args, "target_data_cube") # TODO: remove after full migration to use of 'target'
else:
target = extract_arg(args, "target")
# TODO: to_driver_vector_cube is temporary. Remove it when vector cube is fully supported.
if not isinstance(input_vector_cube, DriverVectorCube) and not hasattr(input_vector_cube, "to_driver_vector_cube"):
raise ProcessParameterInvalidException(
parameter="data",
process="vector_to_raster",
reason=f"Invalid data type {type(input_vector_cube)!r} expected vector-cube.",
)
if not isinstance(input_vector_cube, DriverVectorCube):
if not hasattr(input_vector_cube, "to_driver_vector_cube"):
raise ProcessParameterInvalidException(
parameter="data",
process="vector_to_raster",
reason=f"Invalid data type {type(input_vector_cube)!r} expected vector-cube.",
)
input_vector_cube = input_vector_cube.to_driver_vector_cube()
if not isinstance(target, DriverDataCube):
raise ProcessParameterInvalidException(
parameter="target",
process="vector_to_raster",
reason=f"Invalid data type {type(target)!r} expected raster-cube.",
)
return env.backend_implementation.vector_to_raster(input_vector_cube, target)

return input_vector_cube.to_raster(target, env)


def _get_udf(args, env: EvalEnv) -> Tuple[str, str]:
Expand Down
6 changes: 6 additions & 0 deletions openeo_driver/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ def to_scl_dilation_mask(
# Note: this is a non-standard process
self._not_implemented()

def raster_to_vector(self) -> DriverVectorCube:
return self._not_implemented()


class VectorCubeError(InternalException):
code = "VectorCubeError"
Expand Down Expand Up @@ -799,6 +802,9 @@ def apply_dimension(
message=f"DriverVectorCube.apply_dimension with {dimension=} and {bool(single_run_udf)=}"
)

def to_raster(self, target: DriverDataCube, env: EvalEnv):
return env.backend_implementation.vector_to_raster(self, target)

def __repr__(self):
bbox = repr(self.get_bounding_box()) if self.geometry_count() > 0 else "(none)"
return (
Expand Down
55 changes: 47 additions & 8 deletions openeo_driver/dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from enum import Enum
from typing import List, Union, Tuple, Any, Optional

import geopandas as gpd
import numpy
import shapely.geometry.base
from shapely.geometry import Point, Polygon, MultiPolygon, GeometryCollection
Expand Down Expand Up @@ -563,14 +564,18 @@ def aggregate_spatial(
geometries: Union[BaseGeometry, str, DriverVectorCube],
reducer: dict,
target_dimension: str = "result",
) -> "DryRunDataCube":
) -> DryRunVectorCube:
# TODO #71 #114 EP-3981 normalize to vector cube instead of GeometryCollection
geoms_is_empty = isinstance(geometries, DriverVectorCube) and len(geometries.get_geometries()) == 0
cube = self
if not geoms_is_empty:
geometries, bbox = self._normalize_geometry(geometries)
cube = self.filter_bbox(**bbox, operation="weak_spatial_extent")
return cube._process(operation="aggregate_spatial", arguments={"geometries": geometries})

traces = cube._data_tracer.process_traces(
traces=cube._traces, operation="aggregate_spatial", arguments={"geometries": geometries}
)
return DryRunVectorCube(traces=traces, data_tracer=cube._data_tracer, metadata=cube.metadata)

def _normalize_geometry(self, geometries) -> Tuple[Union[DriverVectorCube, DelayedVector, BaseGeometry], dict]:
"""
Expand Down Expand Up @@ -618,15 +623,19 @@ def _normalize_geometry(self, geometries) -> Tuple[Union[DriverVectorCube, Delay
bbox = dict(west=bbox[0], south=bbox[1], east=bbox[2], north=bbox[3], crs=crs)
return geometries, bbox

def raster_to_vector(self):
dimensions = [SpatialDimension(name=DriverVectorCube.DIM_GEOMETRY,extent=self.metadata.extent)]
if(self.metadata.has_temporal_dimension()):
def raster_to_vector(self) -> DriverVectorCube:
dimensions = [SpatialDimension(name=DriverVectorCube.DIM_GEOMETRY, extent=self.metadata.extent)]
if self.metadata.has_temporal_dimension():
dimensions.append(self.metadata.temporal_dimension)
if(self.metadata.has_band_dimension()):
if self.metadata.has_band_dimension():
dimensions.append(self.metadata.band_dimension)

return self._process(operation="raster_to_vector", arguments={},metadata=CollectionMetadata(metadata={}, dimensions=dimensions))

traces = self._data_tracer.process_traces(traces=self._traces, operation="raster_to_vector", arguments={})
return DryRunVectorCube(
traces=traces,
data_tracer=self._data_tracer,
metadata=CollectionMetadata(metadata={}, dimensions=dimensions),
)

def resample_cube_spatial(self, target: 'DryRunDataCube', method: str = 'near') -> 'DryRunDataCube':
cube = self._process("process_type", [ProcessType.FOCAL_SPACE])
Expand Down Expand Up @@ -809,3 +818,33 @@ def _nop(self, *args, **kwargs) -> 'DryRunDataCube':
water_vapor = _nop
linear_scale_range = _nop
dimension_labels = _nop


class DryRunVectorCube(DriverVectorCube):
def __init__(
self,
traces: List[DataTraceBase],
data_tracer: DryRunDataTracer,
metadata: CollectionMetadata,
):
super().__init__(geometries=gpd.GeoDataFrame(geometry=[]), cube=None)
self._traces = traces or []
self._data_tracer = data_tracer
self.metadata = metadata

def to_raster(self, target: DriverDataCube, env: EvalEnv):
dimensions = [SpatialDimension(name="x", extent=[]), SpatialDimension(name="y", extent=[])]

if self.metadata.has_temporal_dimension():
dimensions.append(self.metadata.temporal_dimension)
if self.metadata.has_band_dimension():
dimensions.append(self.metadata.band_dimension)

traces = self._data_tracer.process_traces(traces=self._traces, operation="raster_to_vector", arguments={})
return DryRunDataCube(
traces=traces,
data_tracer=self._data_tracer,
metadata=CollectionMetadata(metadata={}, dimensions=dimensions),
)

# TODO: support apply_dimension, filter_bands