1
1
#!/usr/bin/env python
2
2
3
3
import logging
4
+ import threading
4
5
from enum import Enum
5
6
from typing import Callable , Dict
6
7
from uuid import uuid4
@@ -32,7 +33,7 @@ def from_workflow_type(workflow_type: WorkFlowType) -> "Queue":
32
33
raise RuntimeError (f"Unimplemented workflow type { workflow_type } . Please implement." )
33
34
34
35
35
- class RabbitmqClient :
36
+ class RabbitmqClient ( threading . Thread ) :
36
37
rabbitmq_is_running : bool
37
38
rabbitmq_config : RabbitmqConfig
38
39
rabbitmq_exchange : str
@@ -41,6 +42,7 @@ class RabbitmqClient:
41
42
queue : str
42
43
43
44
def __init__ (self , config : RabbitmqConfig ):
45
+ super ().__init__ ()
44
46
self .rabbitmq_is_running = False
45
47
self .rabbitmq_config = config
46
48
self .rabbitmq_exchange = config .exchange_name
@@ -59,7 +61,7 @@ def _connect_rabbitmq(self):
59
61
self .rabbitmq_config .port ,
60
62
"/" ,
61
63
credentials ,
62
- heartbeat = 3600 ,
64
+ heartbeat = 60 ,
63
65
blocked_connection_timeout = 3600 ,
64
66
connection_attempts = 10 ,
65
67
)
@@ -73,15 +75,24 @@ def _connect_rabbitmq(self):
73
75
self .channel .queue_bind (self .queue , self .rabbitmq_exchange , routing_key = Queue .StartWorkflowOptimizer .value )
74
76
LOGGER .info ("Connected to RabbitMQ" )
75
77
76
- def wait_for_work (self , callbacks : Dict [Queue , PikaCallback ]):
78
+ def _start_rabbitmq (self ):
79
+ self ._connect_rabbitmq ()
80
+ self .start ()
81
+
82
+ def set_callbacks (self , callbacks : Dict [Queue , PikaCallback ]):
83
+ for queue , callback in callbacks .items ():
84
+ self .connection .add_callback_threadsafe (lambda : self .channel .basic_consume (queue = queue .value ,
85
+ on_message_callback = callback ,
86
+ auto_ack = False ))
87
+
88
+ def run (self ):
77
89
self .rabbitmq_is_running = True
78
90
79
91
while self .rabbitmq_is_running :
80
92
try :
81
- for queue , callback in callbacks .items ():
82
- self .channel .basic_consume (queue = queue .value , on_message_callback = callback , auto_ack = False )
83
93
LOGGER .info ("Waiting for input..." )
84
- self .channel .start_consuming ()
94
+ while self .rabbitmq_is_running :
95
+ self .connection .process_data_events (time_limit = 1 )
85
96
except pika .exceptions .ConnectionClosedByBroker as exc :
86
97
LOGGER .info ('Connection was closed by broker. Reason: "%s". Shutting down...' , exc .reply_text )
87
98
except pika .exceptions .AMQPConnectionError :
@@ -96,11 +107,11 @@ def _send_start_work_flow(self, job_id: uuid4, work_flow_type: WorkFlowType):
96
107
97
108
def _send_output (self , queue : Queue , message : str ):
98
109
body : bytes = message .encode ("utf-8" )
99
- self .channel .basic_publish (exchange = self .rabbitmq_exchange , routing_key = queue .value , body = body )
110
+ self .connection .add_callback_threadsafe (lambda : self .channel .basic_publish (exchange = self .rabbitmq_exchange ,
111
+ routing_key = queue .value ,
112
+ body = body ))
100
113
101
114
def _stop_rabbitmq (self ):
102
115
self .rabbitmq_is_running = False
103
- if self .channel :
104
- self .channel .stop_consuming ()
105
116
if self .connection :
106
- self .connection .close ()
117
+ self .connection .add_callback_threadsafe ( lambda : self . connection . close () )
0 commit comments