@@ -78,23 +78,25 @@ def _handle_workflow_request(self, properties, method, request_message):
78
78
self ._send_acknowledgement (method .delivery_tag )
79
79
return
80
80
81
- workflow_id = str (uuid .uuid4 ())
82
- output_dir = f"/ spleen-output/{ workflow_id } "
81
+ workflow_instance_id = str (uuid .uuid4 ())
82
+ output_dir = f"spleen-output/{ workflow_instance_id } "
83
83
task_dispatch = {
84
- "workflow_id " : workflow_id ,
84
+ "workflow_instance_id " : workflow_instance_id ,
85
85
"task_id" : str (uuid .uuid4 ()),
86
86
"execution_id" : str (uuid .uuid4 ()),
87
87
"correlation_id" : correlation_id ,
88
88
"type" : "argo" ,
89
+ "payload_id" : request_message ['payload_id' ],
89
90
"task_plugin_arguments" : {
90
- "baseUrl" : self ._config ['argo' ]['baseUrl' ],
91
- "workflowTemplateName" : request_message ['workflows' ][0 ],
92
- "workflowTemplateEntrypoint" : request_message ['workflows' ][1 ],
93
- "messagingEndpoint" : f"{ self ._config ['messaging' ]['host' ]} /{ self ._config ['messaging' ]['virtual_host' ]} " ,
94
- "messagingUsername" : self ._config ['messaging' ]['username' ],
95
- "messagingPassword" : self ._config ['messaging' ]['password' ],
96
- "messagingExchange" : self ._config ['messaging' ]['exchange' ],
97
- "messagingTopic" : "md.tasks.callback" ,
91
+ "server_url" : self ._config ['argo' ]['baseUrl' ],
92
+ "workflow_id" : workflow_instance_id ,
93
+ "workflow_template_name" : request_message ['workflows' ][0 ],
94
+ "messaging_endpoint" : f"{ self ._config ['messaging' ]['host' ]} " ,
95
+ "messaging_username" : self ._config ['messaging' ]['username' ],
96
+ "messaging_password" : self ._config ['messaging' ]['password' ],
97
+ "messaging_exchange" : self ._config ['messaging' ]['exchange' ],
98
+ "messaging_topic" : "md.tasks.callback" ,
99
+ "messaging_vhost" : self ._config ['messaging' ]['virtual_host' ],
98
100
},
99
101
"inputs" : [],
100
102
"outputs" : []
@@ -106,16 +108,15 @@ def _handle_workflow_request(self, properties, method, request_message):
106
108
"endpoint" : self ._config ['storage' ]['endpoint' ],
107
109
"bucket" : self ._config ['storage' ]['bucket' ],
108
110
"secured_connection" : False ,
109
- "relative_root_path" : f"{ request_message ['payload_id' ]} / "
111
+ "relative_root_path" : f"{ request_message ['payload_id' ]} "
110
112
})
111
- task_dispatch ['outputs' ].append (
112
- {
113
+ task_dispatch ['intermediate_storage' ] = {
113
114
"name" : "tempStorage" ,
114
115
"endpoint" : self ._config ['storage' ]['endpoint' ],
115
116
"bucket" : self ._config ['storage' ]['bucket' ],
116
117
"secured_connection" : False ,
117
- "relative_root_path" : "/ rabbit"
118
- })
118
+ "relative_root_path" : "rabbit"
119
+ }
119
120
task_dispatch ['outputs' ].append (
120
121
{
121
122
"name" : "output" ,
@@ -125,22 +126,22 @@ def _handle_workflow_request(self, properties, method, request_message):
125
126
"relative_root_path" : output_dir
126
127
})
127
128
self ._publish (task_dispatch , 'md.tasks.dispatch' )
128
- self ._output_directories [workflow_id ] = output_dir
129
+ self ._output_directories [workflow_instance_id ] = output_dir
129
130
self ._send_acknowledgement (method .delivery_tag )
130
131
131
132
def _handle_task_update (self , properties , method , message ):
132
133
133
134
if message ['status' ] == 'Succeeded' :
134
- if message ['workflow_id ' ] not in self ._output_directories :
135
+ if message ['workflow_instance_id ' ] not in self ._output_directories :
135
136
self ._logger .warn ('Unable to send an export request due to missing workflow' )
136
137
return
137
138
138
139
export_request = {
139
- "workflow_id " : message ['workflow_id ' ],
140
+ "workflow_instance_id " : message ['workflow_instance_id ' ],
140
141
"export_task_id" : str (uuid .uuid4 ()),
141
142
"destination" : 'ORTHANC' ,
142
143
"correlation_id" : message ['correlation_id' ],
143
- "files" : self ._list_files (message ['workflow_id ' ])
144
+ "files" : self ._list_files (message ['workflow_instance_id ' ])
144
145
}
145
146
self ._publish (export_request , 'md.export.request.monaiscu' )
146
147
self ._logger .info ("==> Export request sent." )
@@ -151,10 +152,10 @@ def _handle_task_update(self, properties, method, message):
151
152
152
153
self ._send_acknowledgement (method .delivery_tag )
153
154
154
- def _list_files (self , workflow_id ):
155
+ def _list_files (self , workflow_instance_id ):
155
156
objects = self ._storage_client .list_objects (
156
157
self ._config ['storage' ]['bucket' ],
157
- prefix = f"/spleen-output/{ workflow_id } " ,
158
+ prefix = f"/spleen-output/{ workflow_instance_id } " ,
158
159
recursive = True )
159
160
160
161
files = []
0 commit comments