Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ and start a new "In Progress" section above it.
## In progress: 0.137.0

- Add `ephemeral_flask_server` testing utility (`openeo_driver.testing`) for request mocking based on a Flask app. Allows to do request/response mocking independently from actual request library (`requests`, `urllib`, `urllib3`, etc.) through a well-documented API (Flask).
- Support exposing auxiliary (non-asset) files as links ([Open-EO/openeo-geopyspark-driver#1278](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1278))


## 0.136.0
Expand Down
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.137.0a1"
__version__ = "0.137.0a2"
4 changes: 4 additions & 0 deletions openeo_driver/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,7 @@ class JOB_STATUS:
DEFAULT_LOG_LEVEL_PROCESSING = "info"
# Default value for `level in `GET /jobs/{job_id}/logs`, `GET /services/{service_id}/logs` requests
DEFAULT_LOG_LEVEL_RETRIEVAL = "debug"


class ITEM_LINK_PROPERTY:
EXPOSE_AUXILIARY = "_expose_auxiliary"
77 changes: 76 additions & 1 deletion openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import typing
from collections import defaultdict, namedtuple
from typing import Callable, List, Optional, Tuple, Union
from urllib.parse import urlparse

import flask
import flask_cors
Expand Down Expand Up @@ -48,6 +49,7 @@
from openeo_driver.constants import (
DEFAULT_LOG_LEVEL_PROCESSING,
DEFAULT_LOG_LEVEL_RETRIEVAL,
ITEM_LINK_PROPERTY,
JOB_STATUS,
STAC_EXTENSION,
)
Expand Down Expand Up @@ -1442,6 +1444,42 @@ def _get_job_result_item11(job_id, item_id, user_id):
geometry = BoundingBox.from_wsen_tuple(job_info.proj_bbox, job_info.epsg).as_polygon()
geometry = mapping(reproject_geometry(geometry, CRS.from_epsg(job_info.epsg), CRS.from_epsg(4326)))

exposable_links = [
link for link in item_metadata.get("links", []) if link.get(ITEM_LINK_PROPERTY.EXPOSE_AUXILIARY, False)
]
for link in exposable_links:
link.pop(ITEM_LINK_PROPERTY.EXPOSE_AUXILIARY)
auxiliary_filename = urlparse(link["href"]).path.split("/")[-1] # TODO: assumes file is not nested

if link["href"].startswith("s3://"):
link["href"] = backend_implementation.config.asset_url.build_url(
asset_metadata={"href": link["href"]}, # TODO: clean up this hack to support s3proxy
asset_name=auxiliary_filename,
job_id=job_id,
user_id=user_id,
)
else:
signer = get_backend_config().url_signer
if signer:
expires = signer.get_expires()
secure_key = signer.sign_job_asset(
job_id=job_id, user_id=user_id, filename=auxiliary_filename, expires=expires
)
user_base64 = user_id_b64_encode(user_id)
link["href"] = flask.url_for(
".download_job_auxiliary_file_signed",
job_id=job_id,
user_base64=user_base64,
filename=auxiliary_filename,
expires=expires,
secure_key=secure_key,
_external=True,
)
else:
link["href"] = flask.url_for(
".download_job_auxiliary_file", job_id=job_id, filename=auxiliary_filename, _external=True
)

stac_item = {
"type": "Feature",
"stac_version": "1.1.0",
Expand All @@ -1466,7 +1504,8 @@ def _get_job_result_item11(job_id, item_id, user_id):
"href": url_for(".list_job_results", job_id=job_id, _external=True), # SHOULD be absolute
"type": "application/json",
},
],
]
+ exposable_links,
"assets": assets,
"collection": job_id,
}
Expand All @@ -1483,6 +1522,42 @@ def _get_job_result_item11(job_id, item_id, user_id):
resp.mimetype = stac_item_media_type
return resp

@blueprint.route("/jobs/<job_id>/results/aux/<user_base64>/<secure_key>/<filename>", methods=["GET"])
def download_job_auxiliary_file_signed(job_id, user_base64, secure_key, filename):
expires = request.args.get("expires")
signer = get_backend_config().url_signer
user_id = user_id_b64_decode(user_base64)
signer.verify_job_asset(
signature=secure_key, job_id=job_id, user_id=user_id, filename=filename, expires=expires
)
return _download_job_auxiliary_file(job_id=job_id, filename=filename, user_id=user_id)

@blueprint.route("/jobs/<job_id>/results/aux/<filename>", methods=["GET"])
@auth_handler.requires_bearer_auth
def download_job_auxiliary_file(job_id, filename, user: User):
return _download_job_auxiliary_file(job_id, filename, user.user_id)

def _download_job_auxiliary_file(job_id, filename, user_id):
metadata = backend_implementation.batch_jobs.get_result_metadata(job_id=job_id, user_id=user_id)

auxiliary_links = [
link
for item in metadata.items.values()
for link in item.get("links", [])
if link.get(ITEM_LINK_PROPERTY.EXPOSE_AUXILIARY, False) and link["href"].endswith(f"/{filename}")
]

if not auxiliary_links:
raise FilePathInvalidException(f"invalid file {filename!r}")

auxiliary_link = auxiliary_links[0]
uri_parts = urlparse(auxiliary_link["href"])

# S3 URIs are handled by s3proxy
assert uri_parts.scheme in ["", "file"], f"unexpected scheme {uri_parts.scheme}"

auxiliary_file = pathlib.Path(uri_parts.path)
return send_from_directory(auxiliary_file.parent, auxiliary_file.name, mimetype=auxiliary_link.get("type"))

def _get_job_result_item(job_id, item_id, user_id):
if item_id == DriverMlModel.METADATA_FILE_NAME:
Expand Down
116 changes: 112 additions & 4 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
not_implemented,
)
from openeo_driver.config import OpenEoBackendConfig
from openeo_driver.constants import ITEM_LINK_PROPERTY
from openeo_driver.datacube import DriverVectorCube
from openeo_driver.dummy import dummy_backend, dummy_config
from openeo_driver.dummy.dummy_backend import DummyBackendImplementation, DummyProcessing, DummyProcessRegistry
Expand Down Expand Up @@ -3004,6 +3005,14 @@ def test_get_stac_1_1_item(self, api110, backend_implementation, backend_config_
"id": "5d2db643-5cc3-4b27-8ef3-11f7d203b221_2023-12-31T21:41:00Z",
"properties": {"datetime": "2023-12-31T21:41:00Z"},
"bbox": [3.359808992021044, 51.08284561357965, 4.690166134878123, 51.88641704215104],
"links": [
{
"rel": "custom",
"href": "/data/projects/OpenEO/07024ee9-7847-4b8a-b260-6c879a2b3cdc/07024ee9-7847-4b8a-b260-6c879a2b3cdc_input_items_9569134155392213115.json",
"type": "application/json",
ITEM_LINK_PROPERTY.EXPOSE_AUXILIARY: True,
},
],
}
}
),
Expand Down Expand Up @@ -3042,10 +3051,16 @@ def test_get_stac_1_1_item(self, api110, backend_implementation, backend_config_
'type': 'application/geo+json'
},
{
'href': 'http://oeo.net/openeo/1.1.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results',
'rel': 'collection',
'type': 'application/json'
}
"href": "http://oeo.net/openeo/1.1.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results",
"rel": "collection",
"type": "application/json",
},
{
"rel": "custom",
# TODO: what does the URL look like? Currently /aux instead of /assets; should /items be in there?
"href": "http://oeo.net/openeo/1.1.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/aux/TXIuVGVzdA==/a0274432f627ca9cf9b4ff79d57c61bd/07024ee9-7847-4b8a-b260-6c879a2b3cdc_input_items_9569134155392213115.json",
"type": "application/json",
},
],
'properties': {'datetime': '2023-12-31T21:41:00Z'},
'stac_extensions': ['https://stac-extensions.github.io/eo/v1.1.0/schema.json',
Expand All @@ -3055,6 +3070,99 @@ def test_get_stac_1_1_item(self, api110, backend_implementation, backend_config_
'type': 'Feature'
}

@mock.patch("time.time", mock.MagicMock(return_value=1234))
@pytest.mark.parametrize("backend_config_overrides", [{"url_signer": UrlSigner(secret="123&@#", expiration=1000)}])
def test_download_job_auxiliary_file_signed_with_expiration(self, api110, tmp_path, backend_config_overrides):
job_id = "07024ee9-7847-4b8a-b260-6c879a2b3cdc"
job_dir = tmp_path
auxiliary_file = job_dir / "07024ee9-7847-4b8a-b260-6c879a2b3cdc_input_items_9569134155392213115.json"

with open(auxiliary_file, "w") as f:
f.write("aux")

with self._fresh_job_registry():
dummy_backend.DummyBatchJobs.set_result_metadata(
job_id=job_id,
user_id=TEST_USER,
metadata=BatchJobResultMetadata(
items={
"5d2db643-5cc3-4b27-8ef3-11f7d203b221_2023-12-31T21:41:00Z": {
"geometry": {
"coordinates": [
[
[3.359808992021044, 51.08284561357965],
[3.359808992021044, 51.88641704215104],
[4.690166134878123, 51.88641704215104],
[4.690166134878123, 51.08284561357965],
[3.359808992021044, 51.08284561357965],
]
],
"type": "Polygon",
},
"assets": {
"openEO": {
"datetime": "2023-12-31T21:41:00Z",
"roles": ["data"],
"bbox": [
3.359808992021044,
51.08284561357965,
4.690166134878123,
51.88641704215104,
],
"geometry": {
"coordinates": [
[
[3.359808992021044, 51.08284561357965],
[3.359808992021044, 51.88641704215104],
[4.690166134878123, 51.88641704215104],
[4.690166134878123, 51.08284561357965],
[3.359808992021044, 51.08284561357965],
]
],
"type": "Polygon",
},
"href": "s3://openeo-data-staging-waw4-1/batch_jobs/j-250605095828442799fdde3c29b5b047/openEO_20231231T214100Z.tif",
"nodata": "nan",
"type": "image/tiff; application=geotiff",
"bands": [
{"name": "LST", "common_name": "surface_temperature", "aliases": ["LST_in:LST"]}
],
"raster:bands": [
{
"name": "LST",
"statistics": {
"valid_percent": 66.88,
"maximum": 281.04800415039,
"stddev": 19.598456945276,
"minimum": 224.46798706055,
"mean": 259.57087672984,
},
}
],
}
},
"id": "5d2db643-5cc3-4b27-8ef3-11f7d203b221_2023-12-31T21:41:00Z",
"properties": {"datetime": "2023-12-31T21:41:00Z"},
"bbox": [3.359808992021044, 51.08284561357965, 4.690166134878123, 51.88641704215104],
"links": [
{
"rel": "custom",
"href": str(auxiliary_file),
"type": "application/json",
ITEM_LINK_PROPERTY.EXPOSE_AUXILIARY: True,
},
],
}
}
),
)

resp = api110.get(
"/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/aux/TXIuVGVzdA==/5b3d0f30d2ad8ef3146dc0785821aac3/07024ee9-7847-4b8a-b260-6c879a2b3cdc_input_items_9569134155392213115.json?expires=2234",
)

assert resp.text == "aux"

def test_get_job_results_invalid_job(self, api):
api.get("/jobs/deadbeef-f00/results", headers=self.AUTH_HEADER).assert_error(404, "JobNotFound")

Expand Down