7
7
from multiprocessing .process import current_process
8
8
import os
9
9
from pathlib import Path
10
- from typing import cast
10
+ from typing import cast , Dict , List , Tuple , Optional
11
11
12
+ from mesido .exceptions import MesidoAssetIssueError
13
+ from omotes_sdk .internal .orchestrator_worker_events .esdl_messages import (
14
+ EsdlMessage ,
15
+ MessageSeverity ,
16
+ )
12
17
from omotes_sdk .internal .worker .worker import initialize_worker , UpdateProgressHandler
13
18
from omotes_sdk .types import ProtobufDict
14
19
from mesido .esdl .esdl_parser import ESDLStringParser
@@ -36,14 +41,14 @@ class EarlySystemExit(Exception):
36
41
...
37
42
38
43
39
- def run_mesido (input_esdl : str ) -> str :
44
+ def run_mesido (input_esdl : str ) -> Tuple [ Optional [ str ], List [ EsdlMessage ]] :
40
45
"""Run mesido using the specific workflow.
41
46
42
47
Note: This is run without a subprocess! Casadi does not yield the GIL and therefore
43
48
causes starved thread issues.
44
49
45
50
:param input_esdl: The input ESDL XML string.
46
- :return: GROW optimized or simulated ESDL
51
+ :return: GROW optimized or simulated ESDL and a list of ESDL feedback messages.
47
52
"""
48
53
mesido_func = get_problem_function (GROW_TASK_TYPE )
49
54
mesido_workflow = get_problem_type (GROW_TASK_TYPE )
@@ -75,10 +80,45 @@ def run_mesido(input_esdl: str) -> str:
75
80
update_progress_function = None ,
76
81
profile_reader = InfluxDBProfileReader ,
77
82
)
83
+ esdl_str = cast (str , solution .optimized_esdl_string )
84
+ # TODO get esdl_messages from solution after mesido update.
85
+ esdl_messages = []
86
+ except MesidoAssetIssueError as mesido_issues_error :
87
+ esdl_str = None
88
+ esdl_messages = parse_mesido_esdl_messages (
89
+ mesido_issues_error .general_issue , mesido_issues_error .message_per_asset_id
90
+ )
78
91
except SystemExit as e :
79
92
raise EarlySystemExit (e )
80
93
81
- return cast (str , solution .optimized_esdl_string )
94
+ return esdl_str , esdl_messages
95
+
96
+
97
+ def parse_mesido_esdl_messages (
98
+ general_message : str , object_messages : Dict [str , str ]
99
+ ) -> List [EsdlMessage ]:
100
+ """Convert mesido messages to a list of esdl messages in omotes format.
101
+
102
+ :param general_message: general message (not related to a specific ESDL object).
103
+ :param object_messages: esdl object messages per object id.
104
+ :return: list of EsdlMessage dataclass objects.
105
+ """
106
+ # TODO get severity from esdl message and add list of general messages after mesido update.
107
+ esdl_messages = []
108
+
109
+ if general_message :
110
+ esdl_messages .append (
111
+ EsdlMessage (technical_message = general_message , severity = MessageSeverity .ERROR )
112
+ )
113
+ for object_id , message in object_messages .items ():
114
+ esdl_messages .append (
115
+ EsdlMessage (
116
+ technical_message = message ,
117
+ severity = MessageSeverity .ERROR ,
118
+ esdl_object_id = object_id ,
119
+ )
120
+ )
121
+ return esdl_messages
82
122
83
123
84
124
def kill_pool (pool : multiprocessing .pool .Pool ) -> None :
@@ -114,7 +154,7 @@ def kill_pool(pool: multiprocessing.pool.Pool) -> None:
114
154
115
155
def grow_worker_task (
116
156
input_esdl : str , workflow_config : ProtobufDict , update_progress_handler : UpdateProgressHandler
117
- ) -> str :
157
+ ) -> Tuple [ Optional [ str ], List [ EsdlMessage ]] :
118
158
"""Run the grow worker task and run configured specific problem type for this worker instance.
119
159
120
160
Note: Be careful! This spawns within a subprocess and gains a copy of memory from parent
@@ -125,7 +165,7 @@ def grow_worker_task(
125
165
:param input_esdl: The input ESDL XML string.
126
166
:param workflow_config: Extra parameters to configure this run.
127
167
:param update_progress_handler: Handler to notify of any progress changes.
128
- :return: GROW optimized or simulated ESDL
168
+ :return: GROW optimized or simulated ESDL and a list of ESDL feedback messages.
129
169
"""
130
170
# TODO Very nasty hack. Celery unfortunately starts the worker subprocesses as 'daemons'
131
171
# which prevents this process from creating any other subprocesses. Therefore, we
@@ -137,13 +177,13 @@ def grow_worker_task(
137
177
138
178
with multiprocessing .Pool (1 ) as pool :
139
179
try :
140
- output_esdl = pool .map (run_mesido , [input_esdl ])[0 ]
180
+ output = pool .map (run_mesido , [input_esdl ])[0 ]
141
181
except SystemExit as e :
142
182
logger .warning ("During pool the worker was requested to quit: %s %s" , type (e ), e )
143
183
kill_pool (pool )
144
184
raise
145
185
146
- return output_esdl
186
+ return output
147
187
148
188
149
189
if __name__ == "__main__" :
0 commit comments