Skip to content

Commit

Permalink
Chore/refactor wes models (#26)
Browse files Browse the repository at this point in the history
Co-authored-by: salihuDickson <[email protected]>
  • Loading branch information
SalihuDickson and SalihuDickson authored Sep 10, 2024
1 parent 0efb216 commit 71b1143
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 65 deletions.
20 changes: 10 additions & 10 deletions crategen/converters/wes_converter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from pydantic import ValidationError

from ..models.wes_models import WESData, WESOutputs, WESRequest, WESRunLog
from ..models.wes_models import Log, RunRequest, WESData, WESOutputs
from ..models.wrroc_models import WRROCDataWES
from ..utils import convert_to_iso8601
from ..validators import validate_wrroc_wes
from .abstract_converter import AbstractConverter


Expand All @@ -27,6 +26,7 @@ def convert_to_wrroc(self, data: dict) -> dict:
except ValidationError as e:
raise ValueError(f"Invalid WES data: {e.errors()}") from e

# create the object using the model
wrroc_data = {
"@id": data_wes.run_id,
"name": data_wes.run_log.name,
Expand All @@ -39,8 +39,6 @@ def convert_to_wrroc(self, data: dict) -> dict:
],
}

# Validate WRROC data before returning
validate_wrroc_wes(wrroc_data)
return wrroc_data

def convert_from_wrroc(self, data: dict) -> dict:
Expand All @@ -64,16 +62,18 @@ def convert_from_wrroc(self, data: dict) -> dict:
f"Invalid WRROC data for WES conversion: {e.errors()}"
) from e

wes_outputs = [WESOutputs(location=res.id, name=res.name) for res in data_wrroc.result]
wes_run_log = WESRunLog(
wes_outputs = [
WESOutputs(location=res.id, name=res.name) for res in data_wrroc.result
]
wes_run_log = Log(
name=data_wrroc.name,
start_time=data_wrroc.startTime,
end_time=data_wrroc.endTime
end_time=data_wrroc.endTime,
)
wes_request = WESRequest(
wes_request = RunRequest(
workflow_params={}, # Adjust as necessary
workflow_type="CWL", # Example type, adjust as necessary
workflow_type_version="v1.0" # Example version, adjust as necessary
workflow_type_version="v1.0", # Example version, adjust as necessary
)

wes_data = WESData(
Expand All @@ -82,7 +82,7 @@ def convert_from_wrroc(self, data: dict) -> dict:
state=data_wrroc.status,
run_log=wes_run_log,
task_logs=None, # Provide appropriate value
outputs=wes_outputs
outputs=wes_outputs,
)

# Validate WES data before returning
Expand Down
Empty file removed crategen/models.py
Empty file.
12 changes: 8 additions & 4 deletions crategen/models/tes_models.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
Each model in this module conforms to the corresponding TES model names as specified by the GA4GH schema (https://ga4gh.github.io/task-execution-schemas/docs/).
"""

import os
from datetime import datetime
from enum import Enum
Expand Down Expand Up @@ -31,9 +35,9 @@ class TESOutputFileLog(BaseModel):
**Attributes:**
- **url** (str): URL of the file in storage.
- **path** (str): Path of the file inside the container. Must be an absolute path.
- **size_bytes** (str): Size of the file in bytes. Note, this is currently coded as a string because official JSON doesn't support int64 numbers.
- **url** (`str`): URL of the file in storage.
- **path** (`str`): Path of the file inside the container. Must be an absolute path.
- **size_bytes** (`str`): Size of the file in bytes. Note, this is currently coded as a string because official JSON doesn't support int64 numbers.
**Reference:** https://ga4gh.github.io/task-execution-schemas/docs/#operation/GetTask
"""
Expand Down Expand Up @@ -94,7 +98,7 @@ class TESExecutor(BaseModel):
env: Optional[dict[str, str]] = None

@validator("stdin", "stdout")
def validate_stdin_stdin(cls, value):
def validate_stdin_stdin(value):
if not os.path.isabs(value):
raise ValueError(f"The '${value}' attribute must contain an absolute path.")
return value
Expand Down
175 changes: 124 additions & 51 deletions crategen/models/wes_models.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,160 @@
"""
Each model in this module conforms to the corresponding WES model names as specified by the GA4GH schema (https://ga4gh.github.io/workflow-execution-service-schemas/docs/).
"""

from datetime import datetime
from enum import Enum
from typing import Optional

from pydantic import BaseModel, Field, root_validator
from pydantic import BaseModel, Field, root_validator, validator

from ..utils import convert_to_rfc3339_format


class State(str, Enum):
UNKNOWN = "UNKNOWN"
QUEUED = "QUEUED"
INITIALIZING = "INITIALIZING"
RUNNING = "RUNNING"
PAUSED = "PAUSED"
COMPLETE = "COMPLETE"
EXECUTOR_ERROR = "EXECUTOR_ERROR"
SYSTEM_ERROR = "SYSTEM_ERROR"
CANCELLED = "CANCELLED"
CANCELING = "CANCELING"
PREEMPTED = "PREEMPTED"

class WESRunLog(BaseModel):

class WESOutputs(BaseModel):
location: str
name: str


class Log(BaseModel):
"""
Represents a run log in the Workflow Execution Service (WES).
Attributes:
name (Optional[str]): The name of the run.
start_time (Optional[str]): The start time of the run.
end_time (Optional[str]): The end time of the run.
cmd (Optional[list[str]]): The command executed in the run.
stdout (Optional[str]): The path to the stdout log.
stderr (Optional[str]): The path to the stderr log.
exit_code (Optional[int]): The exit code of the run.
tes_logs_url (Optional[str]): The URL of the TES logs.
**Attributes:**
- **name** (`Optional[str]`): The task or workflow name.
- **cmd** (`Optional[list[str]]`): The command line that was executed.
- **start_time** (`Optional[str]`): When the command started executing, in ISO 8601 format.
- **end_time** (`Optional[str]`): When the command stopped executing, in ISO 8601 format.
- **stdout** (`Optional[str]`): A URL to retrieve standard output logs of the workflow run or task..
- **stderr** (`Optional[str]`): A URL to retrieve standard error logs of the workflow run or task.
- **exit_code** (`Optional[int]`): The exit code of the program.
- **system_logs** (`optional[list[str]]`): Any logs the system decides are relevant, which are not tied directly to a workflow.
**Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/runlog_model
"""

name: Optional[str] = None
start_time: Optional[str] = None
end_time: Optional[str] = None
cmd: Optional[list[str]] = None
stdout: Optional[str] = None
stderr: Optional[str] = None
exit_code: Optional[int] = None
tes_logs_url: Optional[str] = None
name: Optional[str]
start_time: Optional[datetime]
end_time: Optional[datetime]
cmd: Optional[list[str]]
stdout: Optional[str]
stderr: Optional[str]
exit_code: Optional[int]
system_logs: Optional[list[str]]

@validator("start_time", "end_time")
def validate_datetime(value):
return convert_to_rfc3339_format(value)

class WESOutputs(BaseModel):
"""
Represents output files in WES.

Attributes:
location (str): The URL of the output file.
name (str): The name of the output file.
class TaskLog(Log):
"""
Represents a task log in the Workflow Execution Service (WES).
**Attributes:**
- **name** (`str`): The task or workflow name.
- **cmd** (`Optional[list[str]]`): The command line that was executed.
- **start_time** (`Optional[str]`): When the command started executing, in ISO 8601 format.
- **end_time** (`Optional[str]`): When the command stopped executing, in ISO 8601 format.
- **stdout** (`Optional[str]`): A URL to retrieve standard output logs of the workflow run or task..
- **stderr** (`Optional[str]`): A URL to retrieve standard error logs of the workflow run or task.
- **exit_code** (`Optional[int]`): The exit code of the program.
- **system_logs** (`Optional[list[str]]`): Any logs the system decides are relevant, which are not tied directly to a workflow.
- **id** (`str`): A unique identifier which maybe used to reference the task
- **tes_uri** (`Optional[str]`): An optional URL pointing to an extended task definition defined by a TES api
**Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/runlog_model
"""

location: str
name: str
id: str
tes_uri: Optional[str]
name: str = Field(
...
) # test if adding Field makes a diff, gemini says no on specific questioning.


class WESRequest(BaseModel):
class RunRequest(BaseModel):
"""
Represents a workflow request in WES.
Attributes:
workflow_params (dict[str, str]): The parameters for the workflow.
workflow_type (str): The type of the workflow (e.g., CWL).
workflow_type_version (str): The version of the workflow type.
tags (Optional[dict[str, str]]): Additional tags associated with the workflow.
**Attributes:**
- **workflow_params** (`Optional[dict[str, str]]`): The workflow run parameterizations(JSON encoded), including input and output file locations.
- **workflow_type** (`str`): The workflow descriptor type.
- **workflow_type_version** (`str`): The workflow descriptor type version.
- **tags** (`Optional[dict[str, str]]`): Additional tags associated with the workflow.
- **workflow_engine_parameters** (Optional[dict[str, str]]): Input values specific to the workflow engine.
- **workflow_engine** (`Optional[str]`): The workflow engine.
- **workflow_engine_version (`Optional[str]`): The workflow engine version.
- **workflow_url** (`str`): The workflow url
**Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/runlog_model
"""

workflow_params: dict[str, str]
workflow_type: str
workflow_type_version: str
tags: Optional[dict[str, str]] = None
tags: Optional[dict[str, str]] = {}
workflow_engine_parameters: Optional[dict[str, str]]
workflow_engine: Optional[str]
workflow_engine_version: Optional[str]
workflow_url: str

@root_validator()
def validate_workflow_engine(cls, values):
"""
- If workflow_engine_version is set the workflow_engine must be set.
"""
engine_version = values.get("workflow_engine_version")
engine = values.get("workflow_engine")

if engine_version is not None and engine is None:
raise ValueError(
"The 'workflow_engine' attribute is required when the 'workflow_engine_verision' attribute is set"
)
return values


class WESData(BaseModel):
"""
Represents a WES run.
Attributes:
run_id (str): The unique identifier for the WES run.
request (WESRequest): The request associated with the WES run.
state (str): The state of the WES run.
run_log (WESRunLog): The log of the WES run.
task_logs (Optional[list[WESRunLog]]): The logs of individual tasks within the run.
outputs (list[WESOutputs]): The outputs of the WES run.
**Attributes:**
- **run_id** (`str`): The unique identifier for the WES run.
- **request** (`Optional[RunRequest]`): The request associated with the WES run.
- **state** (`Optional[State]`): The state of the WES run.
- **run_log** (`Object`): The log of the WES run.
- **task_logs_url** (`Optional[str]`): A reference to the complete url which may be used to obtain a paginated list of task logs for this workflow.
- **task_logs** (`Optional[list[Log | RunLog] | None]`): The logs of individual tasks within the run. This attribute is deprecated.
- **outputs** (`dict[str, str]`): The outputs of the WES run.
**Reference:** https://ga4gh.github.io/workflow-execution-service-schemas/docs/#tag/runlog_model
"""

run_id: str
request: WESRequest
state: str
run_log: WESRunLog
task_logs: Optional[list[WESRunLog]] = Field(
None, description="This field is deprecated. Use tes_logs_url instead."
)
outputs: list[WESOutputs]

class Config:
extra = "allow"
request: Optional[RunRequest]
state: Optional[State]
run_log: Optional[Log]
task_logs_url: Optional[str]
task_logs: Optional[list[Log | TaskLog] | None]
outputs: dict[str, str]

@root_validator
def check_deprecated_fields(cls, values):
Expand Down

0 comments on commit 71b1143

Please sign in to comment.