Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ and start a new "In Progress" section above it.
- Include `job_options` as top-level properties in `GET /jobs/{job_id}` response ([#470](https://github.com/Open-EO/openeo-python-driver/issues/470))
- Bump STAC version from `0.9.0` to `1.0.0` in capabilities endpoint, collection metadata, job results and ML model metadata ([#363](https://github.com/Open-EO/openeo-python-driver/issues/363))
- `load_collection`: add check on malformed `spatial_extent`. ([#284](https://github.com/Open-EO/openeo-python-driver/issues/284))
- Support logging added value for synchronous requests ([Open-EO/openeo-geopyspark-driver#1436](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1436))


## 0.139.0
Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.140.0a1"
__version__ = "0.140.0a2"
3 changes: 3 additions & 0 deletions openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from typing import List, Union, NamedTuple, Dict, Optional, Callable, Iterable, Container, Any, Tuple

import flask
from openeo_driver.processgraph.definitions import ProcessGraphFlatDict

import openeo_driver.util.view_helpers
from openeo.utils.version import ComparableVersion
Expand Down Expand Up @@ -1072,6 +1073,8 @@ def request_costs(
job_options: Union[dict, None] = None,
request_id: str,
success: bool,
process_graph: Union[ProcessGraphFlatDict, None] = None,
tracer: Union[DryRunDataTracer, None] = None,
) -> Optional[float]:
"""
Report resource usage of (current) synchronous processing request and get associated cost.
Expand Down
8 changes: 6 additions & 2 deletions openeo_driver/processgraph/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from openeo_driver.util import UNSET
from openeo_driver.util.http import is_http_url
from openeo_driver.utils import EvalEnv, smart_bool
from openeo_driver.views import OPENEO_API_VERSION_DEFAULT
from openeo_driver.views import ENV_SYNC_DRY_RUN_TRACER, OPENEO_API_VERSION_DEFAULT

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -116,7 +116,11 @@ def evaluate(
env = env.push({ENV_FINAL_RESULT: [None], ENV_MAX_BUFFER: {}})

if do_dry_run:
dry_run_tracer = do_dry_run if isinstance(do_dry_run, DryRunDataTracer) else DryRunDataTracer()
dry_run_tracer = (
env.get(ENV_SYNC_DRY_RUN_TRACER) or do_dry_run
if isinstance(do_dry_run, DryRunDataTracer)
else DryRunDataTracer()
)
_log.info("Doing dry run")
dry_run_env = env.push(
{
Expand Down
26 changes: 19 additions & 7 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
LINK_REL,
)
from openeo_driver.datacube import DriverMlModel
from openeo_driver.dry_run import DryRunDataTracer
from openeo_driver.errors import (
FeatureUnsupportedException,
FilePathInvalidException,
Expand Down Expand Up @@ -105,6 +106,8 @@


STREAM_CHUNK_SIZE_DEFAULT = 10 * 1024
ENV_SYNC_DRY_RUN_TRACER = "sync_dry_run_tracer"


class OpenEoApiApp(Flask):

Expand Down Expand Up @@ -725,8 +728,21 @@ def result(user: User):
}
)

tracer = DryRunDataTracer()
env = env.push({ENV_SYNC_DRY_RUN_TRACER: tracer})

request_costs = functools.partial(
backend_implementation.request_costs,
user=user,
job_options=job_options,
request_id=request_id,
process_graph=process_graph,
tracer=tracer,
)

try:
result = backend_implementation.processing.evaluate(process_graph=process_graph, env=env)
result = backend_implementation.processing.evaluate(process_graph=copy.deepcopy(process_graph), env=env)

_log.info(f"`POST /result`: {type(result)}")

if result is None:
Expand All @@ -742,18 +758,14 @@ def result(user: User):
result = to_save_result(data=result)
response = result.create_flask_response()

costs = backend_implementation.request_costs(
success=True, user=user, request_id=request_id, job_options=job_options
)
costs = request_costs(success=True)
if costs:
# TODO not all costs are accounted for so don't expose in "OpenEO-Costs" yet
response.headers["OpenEO-Costs-experimental"] = costs

except Exception:
# TODO: also send "OpenEO-Costs" header on failure
backend_implementation.request_costs(
success=False, user=user, request_id=request_id, job_options=job_options
)
request_costs(success=False)
raise

# Add request id as "OpenEO-Identifier" like we do for batch jobs.
Expand Down
8 changes: 5 additions & 3 deletions tests/test_views_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from openeo_driver.datacube import DriverDataCube, DriverVectorCube
from openeo_driver.datastructs import ResolutionMergeArgs, SarBackscatterArgs
from openeo_driver.dry_run import ProcessType
from openeo_driver.dry_run import ProcessType, DryRunDataTracer
from openeo_driver.dummy import dummy_backend
from openeo_driver.dummy.dummy_backend import DummyVisitor
from openeo_driver.errors import (
Expand Down Expand Up @@ -4308,13 +4308,13 @@ def test_vector_buffer_returns_error_on_empty_result_geometry(api):
(None, None, None),
# request_costs override
(
lambda user, request_id, success, job_options: 1234 + isinstance(user, User),
lambda user, job_options, request_id, success, process_graph, tracer: 1234 + isinstance(user, User),
None,
"1235",
),
# Extra job options handling
(
lambda user, request_id, success, job_options: 1234 * job_options.get("extra", 0),
lambda user, job_options, request_id, success, process_graph, tracer: 1234 * job_options.get("extra", 0),
{"extra": 2},
"2468",
),
Expand Down Expand Up @@ -4363,6 +4363,8 @@ def test_synchronous_processing_request_costs(
job_options=job_options,
success=success,
request_id="r-abc123",
process_graph=pg,
tracer=dirty_equals.IsInstance(DryRunDataTracer),
)


Expand Down