Skip to content

Commit eb0817b

Browse files
authored
MRG: Merge pull request #133 from octue/release/0.1.13
Release/0.1.13
2 parents 70b4ec3 + 47ccf05 commit eb0817b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1199
-527
lines changed

docs/source/deploying_services.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Automated deployment with Octue means:
1515

1616
All you need to enable automated deployments are the following files in your repository root:
1717

18-
* A ``requirements.txt`` file that includes ``octue>=0.1.12`` and the rest of your service's dependencies
18+
* A ``requirements.txt`` file that includes ``octue>=0.1.13`` and the rest of your service's dependencies
1919
* A ``twine.json`` file
2020
* A ``deployment_configuration.json`` file (optional)
2121

octue/__init__.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import logging
22

3-
from .cli import octue_cli
43
from .logging_handlers import LOG_FORMAT
54
from .runner import Runner
65

76

8-
__all__ = "LOG_FORMAT", "octue_cli", "Runner"
7+
__all__ = "LOG_FORMAT", "Runner"
98
package_logger = logging.getLogger(__name__)

octue/cli.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
import click
44
import pkg_resources
55

6+
from octue.cloud.pub_sub.service import Service
67
from octue.definitions import CHILDREN_FILENAME, FOLDER_DEFAULTS, MANIFEST_FILENAME, VALUES_FILENAME
78
from octue.logging_handlers import get_remote_handler
8-
from octue.resources.communication import Service, service_backends
9+
from octue.resources import service_backends
910
from octue.runner import Runner
1011
from twined import Twine
1112

@@ -119,7 +120,7 @@ def run(app_dir, data_dir, config_dir, input_dir, output_dir, twine):
119120
twine = Twine(source=twine)
120121

121122
(
122-
configruation_values,
123+
configuration_values,
123124
configuration_manifest,
124125
input_values,
125126
input_manifest,
@@ -138,7 +139,7 @@ def run(app_dir, data_dir, config_dir, input_dir, output_dir, twine):
138139
runner = Runner(
139140
app_src=app_dir,
140141
twine=twine,
141-
configuration_values=configruation_values,
142+
configuration_values=configuration_values,
142143
configuration_manifest=configuration_manifest,
143144
output_manifest_path=os.path.join(output_dir, MANIFEST_FILENAME),
144145
children=children,
File renamed without changes.

octue/cloud/credentials.py

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import json
2+
import logging
3+
import os
4+
import warnings
5+
from google.oauth2 import service_account
6+
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class GCPCredentialsManager:
12+
"""A credentials manager for Google Cloud Platform (GCP) that takes a path to a service account JSON file, or a
13+
JSON string of the contents of such a service account file, from the given environment variable and instantiates
14+
a Google Cloud credentials object.
15+
16+
:param str environment_variable_name:
17+
:return None:
18+
"""
19+
20+
def __init__(self, environment_variable_name="GOOGLE_APPLICATION_CREDENTIALS"):
21+
self.environment_variable_name = environment_variable_name
22+
23+
if self.environment_variable_name is None:
24+
self.service_account_json = None
25+
return
26+
27+
try:
28+
self.service_account_json = os.environ[self.environment_variable_name]
29+
except KeyError:
30+
warnings.warn(
31+
f"No environment variable called {self.environment_variable_name!r}; resorting to default Google Cloud "
32+
f"credentials."
33+
)
34+
self.service_account_json = None
35+
36+
def get_credentials(self):
37+
"""Get the Google OAUTH2 service account credentials.
38+
39+
:return google.auth.service_account.Credentials:
40+
"""
41+
if self.service_account_json is None:
42+
return None
43+
44+
# Check that the environment variable refers to a real file.
45+
if os.path.exists(self.service_account_json):
46+
return self._get_credentials_from_file()
47+
48+
# If it doesn't, assume that it's the credentials file as a JSON string.
49+
return self._get_credentials_from_string()
50+
51+
def _get_credentials_from_file(self):
52+
with open(self.service_account_json) as f:
53+
credentials = json.load(f)
54+
55+
logger.debug("GCP credentials read from file.")
56+
return service_account.Credentials.from_service_account_info(credentials)
57+
58+
def _get_credentials_from_string(self):
59+
credentials = json.loads(self.service_account_json)
60+
logger.debug("GCP credentials loaded from string.")
61+
return service_account.Credentials.from_service_account_info(credentials)
File renamed without changes.

octue/resources/communication/google_pub_sub/service.py octue/cloud/pub_sub/service.py

+48-12
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,17 @@
22
import logging
33
import uuid
44
from concurrent.futures import TimeoutError
5+
import google.api_core
6+
import google.api_core.exceptions
57
from google.api_core import retry
68
from google.cloud import pubsub_v1
79

810
from octue import exceptions
11+
from octue.cloud.credentials import GCPCredentialsManager
12+
from octue.cloud.pub_sub import Subscription, Topic
13+
from octue.exceptions import FileLocationError
914
from octue.mixins import CoolNameable
10-
from octue.resources.communication.google_pub_sub import Subscription, Topic
1115
from octue.resources.manifest import Manifest
12-
from octue.utils.cloud.credentials import GCPCredentialsManager
1316

1417

1518
logger = logging.getLogger(__name__)
@@ -24,6 +27,29 @@
2427
BATCH_SETTINGS = pubsub_v1.types.BatchSettings(max_bytes=10 * 1000 * 1000, max_latency=0.01, max_messages=1)
2528

2629

30+
def create_custom_retry(timeout):
31+
"""Create a custom `Retry` object specifying that the given Google Cloud request should retry for the given amount
32+
of time for the given exceptions.
33+
34+
:param float timeout:
35+
:return google.api_core.retry.Retry:
36+
"""
37+
return retry.Retry(
38+
maximum=timeout / 4,
39+
deadline=timeout,
40+
predicate=google.api_core.retry.if_exception_type(
41+
google.api_core.exceptions.NotFound,
42+
google.api_core.exceptions.Aborted,
43+
google.api_core.exceptions.DeadlineExceeded,
44+
google.api_core.exceptions.InternalServerError,
45+
google.api_core.exceptions.ResourceExhausted,
46+
google.api_core.exceptions.ServiceUnavailable,
47+
google.api_core.exceptions.Unknown,
48+
google.api_core.exceptions.Cancelled,
49+
),
50+
)
51+
52+
2753
class Service(CoolNameable):
2854
"""A Twined service that can be used in two modes:
2955
* As a server accepting questions (input values and manifests), running them through its app, and responding to the
@@ -39,11 +65,7 @@ def __init__(self, backend, id=None, run_function=None):
3965
self.backend = backend
4066
self.run_function = run_function
4167

42-
if backend.credentials_environment_variable is None:
43-
credentials = None
44-
else:
45-
credentials = GCPCredentialsManager(backend.credentials_environment_variable).get_credentials()
46-
68+
credentials = GCPCredentialsManager(backend.credentials_environment_variable).get_credentials()
4769
self.publisher = pubsub_v1.PublisherClient(credentials=credentials, batch_settings=BATCH_SETTINGS)
4870
self.subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
4971
super().__init__()
@@ -81,7 +103,7 @@ def receive_question_then_answer(self, question):
81103
question.ack()
82104
self.answer(data, question_uuid)
83105

84-
def answer(self, data, question_uuid):
106+
def answer(self, data, question_uuid, timeout=30):
85107
"""Answer a question (i.e. run the Service's app to analyse the given data, and return the output values to the
86108
asker). Answers are published to a topic whose name is generated from the UUID sent with the question, and are
87109
in the format specified in the Service's Twine file.
@@ -101,6 +123,7 @@ def answer(self, data, question_uuid):
101123
data=json.dumps(
102124
{"output_values": analysis.output_values, "output_manifest": serialised_output_manifest}
103125
).encode(),
126+
retry=create_custom_retry(timeout),
104127
)
105128
logger.info("%r responded on topic %r.", self, topic.path)
106129

@@ -110,6 +133,13 @@ def ask(self, service_id, input_values, input_manifest=None):
110133
before sending the question to the serving Service - the topic is the expected publishing place for the answer
111134
from the serving Service when it comes, and the subscription is set up to subscribe to this.
112135
"""
136+
if (input_manifest is not None) and (not input_manifest.all_datasets_are_in_cloud):
137+
raise FileLocationError(
138+
"All datasets of the input manifest and all files of the datasets must be uploaded to the cloud before "
139+
"asking a service to perform an analysis upon them. The manifest must then be updated with the new "
140+
"cloud locations."
141+
)
142+
113143
question_topic = Topic(name=service_id, namespace=OCTUE_NAMESPACE, service=self)
114144
if not question_topic.exists():
115145
raise exceptions.ServiceNotFound(f"Service with ID {service_id!r} cannot be found.")
@@ -141,14 +171,20 @@ def ask(self, service_id, input_values, input_manifest=None):
141171
logger.debug("%r asked question to %r service. Question UUID is %r.", self, service_id, question_uuid)
142172
return response_subscription, question_uuid
143173

144-
def wait_for_answer(self, subscription, timeout=20):
174+
def wait_for_answer(self, subscription, timeout=30):
145175
"""Wait for an answer to a question on the given subscription, deleting the subscription and its topic once
146176
the answer is received.
147177
"""
148-
answer = self.subscriber.pull(
178+
pull_response = self.subscriber.pull(
149179
request={"subscription": subscription.path, "max_messages": 1},
150-
retry=retry.Retry(deadline=timeout),
151-
).received_messages[0]
180+
timeout=timeout,
181+
retry=create_custom_retry(timeout),
182+
)
183+
184+
try:
185+
answer = pull_response.received_messages[0]
186+
except IndexError:
187+
raise TimeoutError("No answer received from topic %r", subscription.topic.path)
152188

153189
self.subscriber.acknowledge(request={"subscription": subscription.path, "ack_ids": [answer.ack_id]})
154190
logger.debug("%r received a response to question on topic %r", self, subscription.topic.path)

octue/cloud/storage/__init__.py

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from octue.cloud.storage import path
2+
from octue.cloud.storage.client import GoogleCloudStorageClient
3+
4+
5+
__all__ = ["path", "GoogleCloudStorageClient"]

octue/utils/cloud/storage/client.py octue/cloud/storage/client.py

+18-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
66
from google_crc32c import Checksum
77

8-
from octue.utils.cloud.credentials import GCPCredentialsManager
8+
from octue.cloud.credentials import GCPCredentialsManager
99

1010

1111
logger = logging.getLogger(__name__)
@@ -29,6 +29,23 @@ def __init__(self, project_name, credentials=OCTUE_MANAGED_CREDENTIALS):
2929

3030
self.client = storage.Client(project=project_name, credentials=credentials)
3131

32+
def create_bucket(self, name, location=None, allow_existing=False, timeout=_DEFAULT_TIMEOUT):
33+
"""Create a new bucket. If the bucket already exists, and `allow_existing` is `True`, do nothing; if it is
34+
`False`, raise an error.
35+
36+
:param str name:
37+
:param str|None location: physical region of bucket; e.g. "europe-west6"; defaults to "US"
38+
:param bool allow_existing:
39+
:param float timeout:
40+
:raise google.cloud.exceptions.Conflict:
41+
:return None:
42+
"""
43+
if allow_existing:
44+
if self.client.lookup_bucket(bucket_name=name, timeout=timeout) is not None:
45+
return
46+
47+
self.client.create_bucket(bucket_or_name=name, location=location, timeout=timeout)
48+
3249
def upload_file(self, local_path, bucket_name, path_in_bucket, metadata=None, timeout=_DEFAULT_TIMEOUT):
3350
"""Upload a local file to a Google Cloud bucket at gs://<bucket_name>/<path_in_bucket>.
3451
File renamed without changes.

octue/deployment/google/Dockerfile

+11-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM python:slim-buster
1+
FROM python:3.9-slim-buster
22

33
# Allow statements and log messages to immediately appear in the Knative logs on Google Cloud.
44
ENV PYTHONUNBUFFERED True
@@ -9,20 +9,21 @@ WORKDIR $PROJECT_ROOT
99
RUN apt-get update -y && apt-get install -y --fix-missing build-essential && rm -rf /var/lib/apt/lists/*
1010

1111
# This will cache bust if any of the requirements change.
12-
COPY requirements.txt ./
13-
14-
# Gunicorn is required to run the Flask server that connected the Docker container to Cloud Run.
15-
RUN echo "gunicorn" >> requirements.txt
16-
17-
# Upgrade to latest pip and setuptools after the cache bust, then install requirements
18-
RUN pip install --upgrade pip && pip install -r requirements.txt
12+
COPY requirements*.txt .
13+
COPY setup.* .
1914

2015
COPY . .
2116

17+
# Install requirements (supports requirements.txt, requirements-dev.txt, and setup.py; all will be run if all are present.)
18+
RUN if [ ! -f "requirements.txt" ] && [ ! -f "requirements-dev.txt" ] && [ ! -f "setup.py" ]; then exit 1; fi
19+
RUN if [ -f "requirements.txt" ]; then pip install --upgrade pip && pip install -r requirements.txt; fi
20+
RUN if [ -f "requirements-dev.txt" ]; then pip install --upgrade pip && pip install -r requirements-dev.txt; fi
21+
RUN if [ -f "setup.py" ]; then pip install --upgrade pip && pip install -e .; fi
22+
2223
EXPOSE $PORT
2324

24-
ARG _TRIGGER_ID
25-
ENV SERVICE_ID=$_TRIGGER_ID
25+
ARG _SERVICE_ID
26+
ENV SERVICE_ID=$_SERVICE_ID
2627

2728
ARG _GUNICORN_WORKERS=1
2829
ENV _GUNICORN_WORKERS=$_GUNICORN_WORKERS

octue/deployment/google/cloud_run.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
import os
55
from flask import Flask, request
66

7+
from octue.cloud.pub_sub.service import Service
78
from octue.logging_handlers import apply_log_handler
8-
from octue.resources.communication.google_pub_sub.service import Service
9-
from octue.resources.communication.service_backends import GCPPubSubBackend
9+
from octue.resources.service_backends import GCPPubSubBackend
1010
from octue.runner import Runner
1111

1212

@@ -55,22 +55,27 @@ def _log_bad_request_and_return_400_response(message):
5555
return (f"Bad Request: {message}", 400)
5656

5757

58-
def answer_question(project_name, data, question_uuid, deployment_configuration_path=None):
58+
def answer_question(project_name, data, question_uuid):
5959
"""Answer a question from a service by running the deployed app with the deployment configuration. Either the
6060
`deployment_configuration_path` should be specified, or the `deployment_configuration`.
6161
6262
:param str project_name:
6363
:param dict data:
6464
:param str question_uuid:
65-
:param str|None deployment_configuration_path:
6665
:return None:
6766
"""
68-
deployment_configuration = {}
67+
deployment_configuration_path = "deployment_configuration.json"
6968

70-
if deployment_configuration_path is not None:
69+
try:
7170
with open(deployment_configuration_path) as f:
7271
deployment_configuration = json.load(f)
7372

73+
logger.info("Deployment configuration loaded from %r.", os.path.abspath(deployment_configuration_path))
74+
75+
except FileNotFoundError:
76+
deployment_configuration = {}
77+
logger.info("Default deployment configuration used.")
78+
7479
runner = Runner(
7580
app_src=deployment_configuration.get("app_dir", "."),
7681
twine=deployment_configuration.get("twine", "twine.json"),
@@ -82,6 +87,7 @@ def answer_question(project_name, data, question_uuid, deployment_configuration_
8287
log_level=deployment_configuration.get("log_level", "INFO"),
8388
handler=deployment_configuration.get("log_handler", None),
8489
show_twined_logs=deployment_configuration.get("show_twined_logs", False),
90+
project_name=project_name,
8591
)
8692

8793
service = Service(

octue/exceptions.py

+10
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ class FileNotFoundException(InvalidInputException, FileNotFoundError):
1010
"""Raise when a required folder (e.g. <data_dir>/input) cannot be found"""
1111

1212

13+
class FileLocationError(Exception):
14+
"""Raise when a file exists in an unsupported location for a given operation."""
15+
16+
1317
class ProtectedAttributeException(OctueSDKException, KeyError):
1418
"""Raise when a user attempts to set an attribute whose value should be protected"""
1519

@@ -68,3 +72,9 @@ class BackendNotFound(OctueSDKException):
6872
"""Raise when details of a backend that doesn't exist in `octue.resources.service_backends` are given for use as a
6973
Service backend.
7074
"""
75+
76+
77+
class AttributeConflict(OctueSDKException):
78+
"""Raise if, when trying to set an attribute whose current value has a significantly higher confidence than the new
79+
value, the new value conflicts with the current value.
80+
"""

octue/mixins/identifiable.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ class MyResource(Identifiable):
2121
```
2222
"""
2323

24-
def __init__(self, *args, id=None, **kwargs):
24+
def __init__(self, *args, id=None, name=None, **kwargs):
2525
"""Constructor for Identifiable class"""
26+
self._name = name
2627
super().__init__(*args, **kwargs)
2728

2829
# Store a boolean record of whether this object was created with a previously-existing uuid or was created new.
@@ -55,3 +56,7 @@ def __repr__(self):
5556
@property
5657
def id(self):
5758
return self._id
59+
60+
@property
61+
def name(self):
62+
return self._name

0 commit comments

Comments
 (0)