Skip to content

Commit 9a8c547

Browse files
committed
WIP: Putting the structure in place and basic testing.
1 parent 048788b commit 9a8c547

File tree

3 files changed

+319
-267
lines changed

3 files changed

+319
-267
lines changed

src/python/api/client/Ubidots.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def get_all_devices() -> List[LogicalDevice]:
100100

101101
if response_obj['next'] is None:
102102
break
103-
103+
104104
page += 1
105105

106106
return devices
@@ -118,7 +118,7 @@ def get_device(label: str) -> LogicalDevice:
118118
return _dict_to_logical_device(response_obj)
119119

120120

121-
def post_device_data(label: str, body) -> None:
121+
def post_device_data(label: str, body) -> bool:
122122
"""
123123
Post timeseries data to an Ubidots device.
124124
@@ -130,6 +130,7 @@ def post_device_data(label: str, body) -> None:
130130
'temperature': {'value': 37.17, 'timestamp': 1643934748392}
131131
}
132132
"""
133+
time.sleep(0.3)
133134
url = f'{BASE_1_6}/devices/{label}'
134135
hdrs = headers
135136
hdrs['Content-Type'] = 'application/json'
@@ -138,6 +139,9 @@ def post_device_data(label: str, body) -> None:
138139
if r.status_code != 200:
139140
logging.info(f'POST {url}: {r.status_code}: {r.reason}')
140141
logging.info(body_str)
142+
return False
143+
144+
return True
141145

142146

143147
def update_device(label: str, patch_obj) -> None:

src/python/delivery/BaseWriter.py

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
from typing import Any
2+
3+
from pdmodels.Models import LogicalDevice, PhysicalDevice
4+
5+
import json, logging, os, signal, time
6+
7+
import BrokerConstants
8+
9+
import pika, pika.channel, pika.spec
10+
from pika.exchange_type import ExchangeType
11+
12+
import api.client.DAO as dao
13+
import util.LoggingUtil as lu
14+
15+
_user = os.environ['RABBITMQ_DEFAULT_USER']
16+
_passwd = os.environ['RABBITMQ_DEFAULT_PASS']
17+
_host = os.environ['RABBITMQ_HOST']
18+
_port = os.environ['RABBITMQ_PORT']
19+
20+
_amqp_url_str = f'amqp://{_user}:{_passwd}@{_host}:{_port}/%2F'
21+
22+
23+
class BaseWriter:
24+
MSG_OK = 0
25+
MSG_RETRY = 1
26+
MSG_FAIL = 2
27+
28+
def __init__(self, name) -> None:
29+
#super().__init__()
30+
self.name: str = name
31+
self.connection = None
32+
self.channel = None
33+
self.keep_running = True
34+
signal.signal(signal.SIGTERM, self.sigterm_handler)
35+
36+
def run(self) -> None:
37+
logging.info('===============================================================')
38+
logging.info(f' STARTING {self.name.upper()} WRITER')
39+
logging.info('===============================================================')
40+
41+
while self.keep_running:
42+
try:
43+
logging.info('Opening connection')
44+
self.connection = None
45+
connection = pika.BlockingConnection(pika.URLParameters(_amqp_url_str))
46+
47+
logging.info('Opening channel')
48+
self.channel = connection.channel()
49+
self.channel.basic_qos(prefetch_count=1)
50+
logging.info('Declaring exchange')
51+
self.channel.exchange_declare(
52+
exchange=BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME,
53+
exchange_type=ExchangeType.fanout,
54+
durable=True)
55+
logging.info('Declaring queue')
56+
self.channel.queue_declare(queue=f'{self.name}_logical_msg_queue', durable=True)
57+
self.channel.queue_bind(f'{self.name}_logical_msg_queue', BrokerConstants.LOGICAL_TIMESERIES_EXCHANGE_NAME, 'logical_timeseries')
58+
59+
logging.info('Waiting for messages.')
60+
# This loops until _channel.cancel is called in the signal handler.
61+
for method, properties, body in self.channel.consume('ubidots_logical_msg_queue'):
62+
delivery_tag = method.delivery_tag
63+
logging.info(method)
64+
logging.info(properties)
65+
66+
# If the finish flag is set, reject the message so RabbitMQ will re-queue it
67+
# and return early.
68+
if not self.keep_running:
69+
lu.cid_logger.info(f'NACK delivery tag {delivery_tag}, keep_running is False', extra=msg)
70+
self.channel.basic_reject(delivery_tag)
71+
continue # This will break from loop without running all the logic within the loop below here.
72+
73+
msg = json.loads(body)
74+
p_uid = msg[BrokerConstants.PHYSICAL_DEVICE_UID_KEY]
75+
l_uid = msg[BrokerConstants.LOGICAL_DEVICE_UID_KEY]
76+
lu.cid_logger.info(f'Accepted message from physical / logical device ids {p_uid} / {l_uid}', extra=msg)
77+
78+
pd = dao.get_physical_device(p_uid)
79+
if pd is None:
80+
lu.cid_logger.error(f'Could not find physical device, dropping message: {body}', extra=msg)
81+
return BaseWriter.MSG_FAIL
82+
83+
ld = dao.get_logical_device(l_uid)
84+
if ld is None:
85+
lu.cid_logger.error(f'Could not find logical device, dropping message: {body}', extra=msg)
86+
return BaseWriter.MSG_FAIL
87+
88+
rc = self.on_message(pd, ld, msg)
89+
if rc == BaseWriter.MSG_OK:
90+
lu.cid_logger.info('Message processed ok.', extra=msg)
91+
self.channel.basic_ack(delivery_tag)
92+
elif rc == BaseWriter.MSG_RETRY:
93+
# This is where the message should be published to a different exchange,
94+
# private to the delivery service in question, so it can be retried later
95+
# but not stuck at the head of the queue and immediately redelivered to
96+
# here, possibly causing an endless loop.
97+
lu.cid_logger.warning('Message processing failed, retrying message.', extra=msg)
98+
self.channel.basic_nack(delivery_tag, requeue=True)
99+
elif rc == BaseWriter.MSG_FAIL:
100+
lu.cid_logger.error('Message processing failed, dropping message.', extra=msg)
101+
self.channel.basic_nack(delivery_tag, requeue=False)
102+
else:
103+
logging.error(f'Invalid message processing return value: {rc}')
104+
105+
except pika.exceptions.ConnectionClosedByBroker:
106+
logging.info('Connection closed by server.')
107+
break
108+
109+
except pika.exceptions.AMQPChannelError as err:
110+
logging.exception(err)
111+
break
112+
113+
except pika.exceptions.AMQPConnectionError:
114+
logging.exception(err)
115+
logging.warning('Connection was closed, retrying after a pause.')
116+
time.sleep(10)
117+
continue
118+
119+
if self.connection is not None:
120+
logging.info('Closing connection')
121+
self.connection.close()
122+
123+
logging.info('Waiting forever')
124+
while True:
125+
time.sleep(60)
126+
127+
def on_message(self, pd: PhysicalDevice, ld: LogicalDevice, msg: dict[Any]) -> int:
128+
logging.info(f'{pd.name} / {ld.name}: {msg}')
129+
return BaseWriter.MSG_OK
130+
131+
def sigterm_handler(self, sig_no, stack_frame) -> None:
132+
"""
133+
Handle SIGTERM from docker by closing the mq and db connections and setting a
134+
flag to tell the main loop to exit.
135+
"""
136+
logging.info(f'{signal.strsignal(sig_no)}, setting keep_running to False')
137+
self.keep_running = False
138+
dao.stop()
139+
140+
# This breaks the endless loop in main.
141+
self.channel.cancel()
142+
143+
144+
if __name__ == '__main__':
145+
# Does not return until SIGTERM is received.
146+
deliverer = BaseWriter('test')
147+
deliverer.run()
148+
149+
logging.info('Exiting')

0 commit comments

Comments
 (0)