Skip to content

Commit 15fd04e

Browse files
♻️🐛 Use celery task manager in function job task client service (#8352)
Co-authored-by: Werner Van Geit <[email protected]>
1 parent 9d30172 commit 15fd04e

File tree

16 files changed

+962
-670
lines changed

16 files changed

+962
-670
lines changed

services/api-server/docs/api-server.drawio.svg

Lines changed: 356 additions & 213 deletions
Loading

services/api-server/src/simcore_service_api_server/_meta.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
APP_NAME: Final[str] = info.app_name
1818
SUMMARY: Final[str] = info.get_summary()
1919

20-
2120
#
2221
# https://patorjk.com/software/taag/#p=display&f=JS%20Stick%20Letters&t=API-server%0A
2322
#

services/api-server/src/simcore_service_api_server/_service_function_jobs.py

Lines changed: 0 additions & 205 deletions
Original file line numberDiff line numberDiff line change
@@ -9,44 +9,32 @@
99
FunctionInputs,
1010
FunctionJobCollectionID,
1111
FunctionJobID,
12-
FunctionJobStatus,
13-
FunctionOutputs,
1412
FunctionSchemaClass,
1513
ProjectFunctionJob,
1614
RegisteredFunction,
1715
RegisteredFunctionJob,
1816
RegisteredFunctionJobPatch,
19-
RegisteredFunctionJobWithStatus,
2017
RegisteredProjectFunctionJobPatch,
2118
RegisteredSolverFunctionJobPatch,
2219
SolverFunctionJob,
2320
SolverJobID,
2421
TaskID,
2522
)
2623
from models_library.functions_errors import (
27-
FunctionExecuteAccessDeniedError,
2824
FunctionInputsValidationError,
29-
FunctionsExecuteApiAccessDeniedError,
3025
UnsupportedFunctionClassError,
31-
UnsupportedFunctionFunctionJobClassCombinationError,
3226
)
3327
from models_library.products import ProductName
3428
from models_library.projects import ProjectID
3529
from models_library.projects_nodes_io import NodeID
36-
from models_library.projects_state import RunningState
3730
from models_library.rest_pagination import PageMetaInfoLimitOffset, PageOffsetInt
3831
from models_library.rpc_pagination import PageLimitInt
3932
from models_library.users import UserID
4033
from pydantic import ValidationError
4134
from simcore_service_api_server._service_functions import FunctionService
4235
from simcore_service_api_server.services_rpc.storage import StorageService
43-
from sqlalchemy.ext.asyncio import AsyncEngine
4436

4537
from ._service_jobs import JobService
46-
from .exceptions.function_errors import (
47-
FunctionJobCacheNotFoundError,
48-
FunctionJobProjectMissingError,
49-
)
5038
from .models.api_resources import JobLinks
5139
from .models.domain.functions import PreRegisteredFunctionJobData
5240
from .models.schemas.jobs import JobInputs, JobPricingSpecification
@@ -102,33 +90,6 @@ async def list_function_jobs(
10290
**pagination_kwargs,
10391
)
10492

105-
async def list_function_jobs_with_status(
106-
self,
107-
*,
108-
filter_by_function_id: FunctionID | None = None,
109-
filter_by_function_job_ids: list[FunctionJobID] | None = None,
110-
filter_by_function_job_collection_id: FunctionJobCollectionID | None = None,
111-
pagination_offset: PageOffsetInt | None = None,
112-
pagination_limit: PageLimitInt | None = None,
113-
) -> tuple[
114-
list[RegisteredFunctionJobWithStatus],
115-
PageMetaInfoLimitOffset,
116-
]:
117-
"""Lists all function jobs for a user with pagination"""
118-
119-
pagination_kwargs = as_dict_exclude_none(
120-
pagination_offset=pagination_offset, pagination_limit=pagination_limit
121-
)
122-
123-
return await self._web_rpc_client.list_function_jobs_with_status(
124-
user_id=self.user_id,
125-
product_name=self.product_name,
126-
filter_by_function_id=filter_by_function_id,
127-
filter_by_function_job_ids=filter_by_function_job_ids,
128-
filter_by_function_job_collection_id=filter_by_function_job_collection_id,
129-
**pagination_kwargs,
130-
)
131-
13293
async def validate_function_inputs(
13394
self, *, function_id: FunctionID, inputs: FunctionInputs
13495
) -> tuple[bool, str]:
@@ -158,54 +119,6 @@ async def validate_function_inputs(
158119
f"Unsupported function schema class {function.input_schema.schema_class}",
159120
)
160121

161-
async def inspect_function_job(
162-
self, function: RegisteredFunction, function_job: RegisteredFunctionJob
163-
) -> FunctionJobStatus:
164-
"""Raises FunctionJobProjectNotRegisteredError if no project is associated with job"""
165-
stored_job_status = await self._web_rpc_client.get_function_job_status(
166-
function_job_id=function_job.uid,
167-
user_id=self.user_id,
168-
product_name=self.product_name,
169-
)
170-
171-
if stored_job_status.status in (RunningState.SUCCESS, RunningState.FAILED):
172-
return stored_job_status
173-
174-
if (
175-
function.function_class == FunctionClass.PROJECT
176-
and function_job.function_class == FunctionClass.PROJECT
177-
):
178-
if function_job.project_job_id is None:
179-
raise FunctionJobProjectMissingError
180-
job_status = await self._job_service.inspect_study_job(
181-
job_id=function_job.project_job_id,
182-
)
183-
elif (function.function_class == FunctionClass.SOLVER) and (
184-
function_job.function_class == FunctionClass.SOLVER
185-
):
186-
if function_job.solver_job_id is None:
187-
raise FunctionJobProjectMissingError
188-
job_status = await self._job_service.inspect_solver_job(
189-
solver_key=function.solver_key,
190-
version=function.solver_version,
191-
job_id=function_job.solver_job_id,
192-
)
193-
else:
194-
raise UnsupportedFunctionFunctionJobClassCombinationError(
195-
function_class=function.function_class,
196-
function_job_class=function_job.function_class,
197-
)
198-
199-
new_job_status = FunctionJobStatus(status=job_status.state)
200-
201-
return await self._web_rpc_client.update_function_job_status(
202-
function_job_id=function_job.uid,
203-
user_id=self.user_id,
204-
product_name=self.product_name,
205-
job_status=new_job_status,
206-
check_write_permissions=False,
207-
)
208-
209122
async def create_function_job_inputs( # pylint: disable=no-self-use
210123
self,
211124
*,
@@ -220,58 +133,6 @@ async def create_function_job_inputs( # pylint: disable=no-self-use
220133
values=joined_inputs or {},
221134
)
222135

223-
async def get_cached_function_job(
224-
self,
225-
*,
226-
function: RegisteredFunction,
227-
job_inputs: JobInputs,
228-
) -> RegisteredFunctionJob:
229-
"""
230-
N.B. this function checks access rights
231-
232-
raises FunctionsExecuteApiAccessDeniedError if user cannot execute functions
233-
raises FunctionJobCacheNotFoundError if no cached job is found
234-
235-
"""
236-
237-
user_api_access_rights = (
238-
await self._web_rpc_client.get_functions_user_api_access_rights(
239-
user_id=self.user_id, product_name=self.product_name
240-
)
241-
)
242-
if not user_api_access_rights.execute_functions:
243-
raise FunctionsExecuteApiAccessDeniedError(
244-
user_id=self.user_id,
245-
function_id=function.uid,
246-
)
247-
248-
user_permissions = await self._web_rpc_client.get_function_user_permissions(
249-
function_id=function.uid,
250-
user_id=self.user_id,
251-
product_name=self.product_name,
252-
)
253-
if not user_permissions.execute:
254-
raise FunctionExecuteAccessDeniedError(
255-
user_id=self.user_id,
256-
function_id=function.uid,
257-
)
258-
259-
if cached_function_jobs := await self._web_rpc_client.find_cached_function_jobs(
260-
function_id=function.uid,
261-
inputs=job_inputs.values,
262-
user_id=self.user_id,
263-
product_name=self.product_name,
264-
):
265-
for cached_function_job in cached_function_jobs:
266-
job_status = await self.inspect_function_job(
267-
function=function,
268-
function_job=cached_function_job,
269-
)
270-
if job_status.status == RunningState.SUCCESS:
271-
return cached_function_job
272-
273-
raise FunctionJobCacheNotFoundError
274-
275136
async def pre_register_function_job(
276137
self,
277138
*,
@@ -466,69 +327,3 @@ async def run_function(
466327
raise UnsupportedFunctionClassError(
467328
function_class=function.function_class,
468329
)
469-
470-
async def function_job_outputs(
471-
self,
472-
*,
473-
function: RegisteredFunction,
474-
function_job: RegisteredFunctionJob,
475-
user_id: UserID,
476-
product_name: ProductName,
477-
stored_job_outputs: FunctionOutputs | None,
478-
async_pg_engine: AsyncEngine,
479-
) -> FunctionOutputs:
480-
481-
if stored_job_outputs is not None:
482-
return stored_job_outputs
483-
484-
try:
485-
job_status = await self.inspect_function_job(
486-
function=function,
487-
function_job=function_job,
488-
)
489-
except FunctionJobProjectMissingError:
490-
return None
491-
492-
if job_status.status != RunningState.SUCCESS:
493-
return None
494-
495-
if (
496-
function.function_class == FunctionClass.PROJECT
497-
and function_job.function_class == FunctionClass.PROJECT
498-
):
499-
if function_job.project_job_id is None:
500-
return None
501-
new_outputs = dict(
502-
(
503-
await self._job_service.get_study_job_outputs(
504-
study_id=function.project_id,
505-
job_id=function_job.project_job_id,
506-
)
507-
).results
508-
)
509-
elif (
510-
function.function_class == FunctionClass.SOLVER
511-
and function_job.function_class == FunctionClass.SOLVER
512-
):
513-
if function_job.solver_job_id is None:
514-
return None
515-
new_outputs = dict(
516-
(
517-
await self._job_service.get_solver_job_outputs(
518-
solver_key=function.solver_key,
519-
version=function.solver_version,
520-
job_id=function_job.solver_job_id,
521-
async_pg_engine=async_pg_engine,
522-
)
523-
).results
524-
)
525-
else:
526-
raise UnsupportedFunctionClassError(function_class=function.function_class)
527-
528-
return await self._web_rpc_client.update_function_job_outputs(
529-
function_job_id=function_job.uid,
530-
user_id=user_id,
531-
product_name=product_name,
532-
outputs=new_outputs,
533-
check_write_permissions=False,
534-
)

0 commit comments

Comments
 (0)