24
24
25
25
class Queue (Enum ):
26
26
StartWorkflowOptimizer = "start_work_flow.optimizer"
27
+ StartWorkflowGrowSimulator = "start_work_flow.grow_simulator"
27
28
28
29
@staticmethod
29
30
def from_workflow_type (workflow_type : WorkFlowType ) -> "Queue" :
30
- if workflow_type == WorkFlowType .GROWTH_OPTIMIZER :
31
+ if workflow_type == WorkFlowType .GROW_OPTIMIZER :
31
32
return Queue .StartWorkflowOptimizer
33
+ elif workflow_type == WorkFlowType .GROW_SIMULATOR :
34
+ return Queue .StartWorkflowGrowSimulator
32
35
else :
33
36
raise RuntimeError (f"Unimplemented workflow type { workflow_type } . Please implement." )
34
37
@@ -71,8 +74,9 @@ def _connect_rabbitmq(self):
71
74
self .channel = self .connection .channel ()
72
75
self .channel .basic_qos (prefetch_size = 0 , prefetch_count = 1 )
73
76
self .channel .exchange_declare (exchange = self .rabbitmq_exchange , exchange_type = "topic" )
74
- self .queue = self .channel .queue_declare (Queue .StartWorkflowOptimizer .value , exclusive = False ).method .queue
75
- self .channel .queue_bind (self .queue , self .rabbitmq_exchange , routing_key = Queue .StartWorkflowOptimizer .value )
77
+ for queue_item in Queue :
78
+ queue = self .channel .queue_declare (queue_item .value , exclusive = False ).method .queue
79
+ self .channel .queue_bind (queue , self .rabbitmq_exchange , routing_key = queue_item .value )
76
80
LOGGER .info ("Connected to RabbitMQ" )
77
81
78
82
def _start_rabbitmq (self ):
@@ -81,9 +85,9 @@ def _start_rabbitmq(self):
81
85
82
86
def set_callbacks (self , callbacks : Dict [Queue , PikaCallback ]):
83
87
for queue , callback in callbacks .items ():
84
- self .connection .add_callback_threadsafe (lambda : self . channel . basic_consume ( queue = queue . value ,
85
- on_message_callback = callback ,
86
- auto_ack = False ) )
88
+ self .connection .add_callback_threadsafe (
89
+ lambda : self . channel . basic_consume ( queue = queue . value , on_message_callback = callback , auto_ack = False )
90
+ )
87
91
88
92
def run (self ):
89
93
self .rabbitmq_is_running = True
@@ -107,9 +111,9 @@ def _send_start_work_flow(self, job_id: uuid4, work_flow_type: WorkFlowType):
107
111
108
112
def _send_output (self , queue : Queue , message : str ):
109
113
body : bytes = message .encode ("utf-8" )
110
- self .connection .add_callback_threadsafe (lambda : self . channel . basic_publish ( exchange = self . rabbitmq_exchange ,
111
- routing_key = queue .value ,
112
- body = body ) )
114
+ self .connection .add_callback_threadsafe (
115
+ lambda : self . channel . basic_publish ( exchange = self . rabbitmq_exchange , routing_key = queue .value , body = body )
116
+ )
113
117
114
118
def _stop_rabbitmq (self ):
115
119
self .rabbitmq_is_running = False
0 commit comments