Skip to content

Commit 35c1c38

Browse files
authored
Issue 93 (#97)
This commit vastly simplifies the code of individual delivery services, while adding extra protection against service failure. The BaseWriter class accepts messages from logical_timeseries RabbitMQ exchange and writes them to a service-specific database table. A thread reads from the table and invokes the service-specific on_message method to deliver the message to the destination platform. on_message may return ok, fail, or retry. The service-specific code is responsible to deciding how many times a message should be retried before being marked as a fail. The Ubidots client code was updated to include the broker correlation id in its log messages, and also have the 0.3s delay before calls to adding data which is probably what caused some of the error responses we're seeing.
1 parent 048788b commit 35c1c38

File tree

8 files changed

+589
-540
lines changed

8 files changed

+589
-540
lines changed

src/python/api/client/DAO.py

Lines changed: 115 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,17 @@
77
import psycopg2.errors
88
from psycopg2.extensions import AsIs
99
from psycopg2.extras import Json, register_uuid
10-
from typing import Any, Dict, List, Optional, Union
10+
from typing import Any, Dict, List, Optional, Tuple, Union
1111
import hashlib
1212
import os
1313

1414
import BrokerConstants
1515
from pdmodels.Models import BaseDevice, DeviceNote, LogicalDevice, PhysicalDevice, PhysicalToLogicalMapping, User
16+
from threading import Lock
1617

1718
logging.captureWarnings(True)
1819

20+
_lock = Lock()
1921

2022
class DAOException(Exception):
2123
def __init__(self, msg: str = None, wrapped: Exception = None):
@@ -50,10 +52,19 @@ class DAOUniqeConstraintException(DAOException):
5052
"""
5153

5254

55+
_stopped = False
56+
5357
def stop() -> None:
58+
global _stopped
5459
logging.info('Closing connection pool.')
55-
if conn_pool is not None:
56-
conn_pool.closeall()
60+
_lock.acquire()
61+
try:
62+
if not _stopped:
63+
_stopped = True
64+
if conn_pool is not None:
65+
conn_pool.closeall()
66+
finally:
67+
_lock.release()
5768

5869

5970
@contextmanager
@@ -73,8 +84,8 @@ def _get_connection():
7384
# This throws an exception if the db hostname cannot be resolved, or
7485
# the database is not accepting connections.
7586
try:
76-
# Try lazy initialisation the connection pool and Location/point
77-
# converter to give the db as much time as possible to start.
87+
# Try lazy initialisation the connection pool to give the db as
88+
# much time as possible to start.
7889
if conn_pool is None:
7990
logging.info('Creating connection pool, registering type converters.')
8091
conn_pool = pool.ThreadedConnectionPool(1, 5)
@@ -792,7 +803,7 @@ def toggle_device_mapping(is_active: bool, pd: Optional[Union[PhysicalDevice, in
792803
_toggle_device_mapping(conn, is_active, pd, ld)
793804
return
794805
except Exception as err:
795-
raise err if isinstance(err, DAOException) else DAOException('pause_current_device_mapping failed.', err)
806+
raise err if isinstance(err, DAOException) else DAOException('toggle_device_mapping failed.', err)
796807
finally:
797808
if conn is not None:
798809
free_conn(conn)
@@ -1319,3 +1330,101 @@ def token_enable(uname)-> None:
13191330
if conn is not None:
13201331
free_conn(conn)
13211332

1333+
1334+
# ===========================================================================
1335+
# Delivery thread related functions
1336+
# ===========================================================================
1337+
1338+
def _get_delivery_table_id(name: str) -> str:
1339+
#return sql.Identifier(f'{name}_delivery_q')
1340+
return f'{name}_delivery_q'
1341+
1342+
def create_delivery_table(name: str) -> None:
1343+
logging.info(f'Creating message delivery table for service {name}')
1344+
1345+
try:
1346+
qry = f"""create table if not exists {_get_delivery_table_id(name)} (
1347+
uid integer generated always as identity primary key,
1348+
json_msg jsonb not null,
1349+
retry_count integer not null default 0)"""
1350+
1351+
with _get_connection() as conn, conn.cursor() as cursor:
1352+
cursor.execute(qry)
1353+
1354+
except Exception as err:
1355+
raise err if isinstance(err, DAOException) else DAOException('create_delivery_table failed.', err)
1356+
finally:
1357+
if conn is not None:
1358+
conn.commit()
1359+
free_conn(conn)
1360+
1361+
def get_delivery_msg_count(name: str) -> int:
1362+
try:
1363+
with _get_connection() as conn, conn.cursor() as cursor:
1364+
cursor.execute(f'select count(uid) from {_get_delivery_table_id(name)}')
1365+
return cursor.fetchone()[0]
1366+
1367+
except Exception as err:
1368+
raise err if isinstance(err, DAOException) else DAOException('get_delivery_msg_count failed.', err)
1369+
finally:
1370+
if conn is not None:
1371+
conn.commit()
1372+
free_conn(conn)
1373+
1374+
def get_delivery_msg_batch(name: str, from_uid: int = 0, batch_size: int = 10) -> List[Tuple[int, list[dict[Any]]]]:
1375+
try:
1376+
with _get_connection() as conn, conn.cursor() as cursor:
1377+
# Using order by asc in case time series databases need values inserted in timestamp order.
1378+
cursor.execute(f'select uid, json_msg, retry_count from {_get_delivery_table_id(name)} where uid > %s order by uid asc limit %s', (from_uid, batch_size))
1379+
if cursor.rowcount < 1:
1380+
return 0, []
1381+
1382+
return cursor.fetchall()
1383+
1384+
except Exception as err:
1385+
raise err if isinstance(err, DAOException) else DAOException('get_delivery_msg_batch failed.', err)
1386+
finally:
1387+
if conn is not None:
1388+
conn.commit()
1389+
free_conn(conn)
1390+
1391+
def add_delivery_msg(name: str, msg: dict[Any]) -> None:
1392+
try:
1393+
with _get_connection() as conn, conn.cursor() as cursor:
1394+
cursor.execute(f'insert into {_get_delivery_table_id(name)} (json_msg) values (%s)', (Json(msg), ))
1395+
1396+
except Exception as err:
1397+
raise err if isinstance(err, DAOException) else DAOException('add_delivery_msg failed.', err)
1398+
finally:
1399+
if conn is not None:
1400+
conn.commit()
1401+
free_conn(conn)
1402+
1403+
def remove_delivery_msg(name: str, uid: int) -> None:
1404+
try:
1405+
with _get_connection() as conn, conn.cursor() as cursor:
1406+
cursor.execute(f'delete from {_get_delivery_table_id(name)} where uid = %s', (uid, ))
1407+
1408+
except Exception as err:
1409+
raise err if isinstance(err, DAOException) else DAOException('remove_delivery_msg failed.', err)
1410+
finally:
1411+
if conn is not None:
1412+
conn.commit()
1413+
free_conn(conn)
1414+
1415+
def retry_delivery_msg(name: str, uid: int) -> None:
1416+
try:
1417+
with _get_connection() as conn, conn.cursor() as cursor:
1418+
cursor.execute(f'select retry_count from {_get_delivery_table_id(name)} where uid = %s', (uid, ))
1419+
if cursor.rowcount < 1:
1420+
return
1421+
1422+
retry_count = cursor.fetchone()[0] + 1
1423+
cursor.execute(f'update {_get_delivery_table_id(name)} set retry_count = %s where uid = %s', (retry_count, uid))
1424+
1425+
except Exception as err:
1426+
raise err if isinstance(err, DAOException) else DAOException('retry_delivery_msg failed.', err)
1427+
finally:
1428+
if conn is not None:
1429+
conn.commit()
1430+
free_conn(conn)

src/python/api/client/Ubidots.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
from pdmodels.Models import Location, LogicalDevice
66

7+
import util.LoggingUtil as lu
8+
79
BASE_1_6 = "https://industrial.api.ubidots.com.au/api/v1.6"
810
BASE_2_0 = "https://industrial.api.ubidots.com.au/api/v2.0"
911

@@ -100,25 +102,25 @@ def get_all_devices() -> List[LogicalDevice]:
100102

101103
if response_obj['next'] is None:
102104
break
103-
105+
104106
page += 1
105107

106108
return devices
107109

108110

109-
def get_device(label: str) -> LogicalDevice:
111+
def get_device(label: str, logging_ctx: dict) -> LogicalDevice:
110112
url = f'{BASE_2_0}/devices/~{label}'
111113
time.sleep(0.3)
112114
r = requests.get(url, headers=headers)
113115
if r.status_code != 200:
114-
logging.warn(f'devices/~{label} received response: {r.status_code}: {r.reason}')
116+
lu.cid_logger.error(f'devices/~{label} received response: {r.status_code}: {r.reason}', extra=logging_ctx)
115117
return None
116118

117119
response_obj = json.loads(r.content)
118120
return _dict_to_logical_device(response_obj)
119121

120122

121-
def post_device_data(label: str, body) -> None:
123+
def post_device_data(label: str, body: dict, logging_ctx: dict) -> bool:
122124
"""
123125
Post timeseries data to an Ubidots device.
124126
@@ -130,22 +132,25 @@ def post_device_data(label: str, body) -> None:
130132
'temperature': {'value': 37.17, 'timestamp': 1643934748392}
131133
}
132134
"""
135+
time.sleep(0.3)
133136
url = f'{BASE_1_6}/devices/{label}'
134137
hdrs = headers
135138
hdrs['Content-Type'] = 'application/json'
136139
body_str = json.dumps(body)
137140
r = requests.post(url, headers=hdrs, data=body_str)
138141
if r.status_code != 200:
139-
logging.info(f'POST {url}: {r.status_code}: {r.reason}')
140-
logging.info(body_str)
142+
lu.cid_logger.info(f'POST {url}: {r.status_code}: {r.reason}', extra=logging_ctx)
143+
return False
144+
145+
return True
141146

142147

143-
def update_device(label: str, patch_obj) -> None:
148+
def update_device(label: str, patch_obj: dict, logging_ctx: dict) -> None:
144149
url = f'{BASE_2_0}/devices/~{label}'
145150
time.sleep(0.3)
146151
response = requests.patch(url, headers=headers, json=patch_obj)
147152
if response.status_code != 200:
148-
logging.warning(f'PATCH response: {response.status_code}: {response.reason}')
153+
lu.cid_logger.error(f'PATCH response: {response.status_code}: {response.reason}', extra=logging_ctx)
149154

150155

151156
def main():

src/python/broker-cli.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def str_to_logical_device(val) -> LogicalDevice:
1919

2020

2121
def str_to_dict(val) -> Dict:
22+
print(val, type(val))
2223
return json.loads(val)
2324

2425

@@ -207,7 +208,7 @@ def plain_pd_list(devs: List[PhysicalDevice]):
207208

208209

209210
def dict_from_file_or_string() -> dict:
210-
if (hasattr(args, 'pd') or hasattr(args, 'ld')) and (hasattr(args, 'in_filename') and args.in_filename is not None):
211+
if ((hasattr(args, 'pd') and args.pd is not None) or (hasattr(args, 'ld') and args.ld is not None)) and (hasattr(args, 'in_filename') and args.in_filename is not None):
211212
raise RuntimeError('error: --json and --file are mutually exclusive.')
212213

213214
json_obj = None
@@ -283,7 +284,7 @@ def main() -> None:
283284

284285
dev = PhysicalDevice.parse_obj(dev)
285286
print(pretty_print_json(dao.update_physical_device(dev)))
286-
287+
287288
elif args.cmd2 == 'rm':
288289
# Delete all physical_logical mappings to avoid foreign key violation
289290
mappings = dao.get_physical_device_mappings(pd=args.p_uid)
@@ -373,9 +374,9 @@ def main() -> None:
373374
current_mapping = dao.get_current_device_mapping(pd=args.p_uid, ld=args.l_uid)
374375
if current_mapping is None:
375376
raise RuntimeError("No current mapping for the uid given")
376-
377+
377378
dao.toggle_device_mapping(args.enable, args.p_uid, args.l_uid)
378-
379+
379380
elif args.cmd1 == 'users':
380381
if args.cmd2 == 'add':
381382
dao.user_add(uname=args.uname, passwd=args.passwd, disabled=args.disabled)
@@ -391,13 +392,13 @@ def main() -> None:
391392

392393
elif args.enable == True:
393394
dao.token_enable(uname=args.uname)
394-
395+
395396
if args.refresh == True:
396397
dao.token_refresh(uname=args.uname)
397398

398399
elif args.cmd2 == 'chng':
399400
dao.user_change_password(args.uname, args.passwd)
400-
401+
401402
elif args.cmd2 == 'ls':
402403
print(dao.user_ls())
403404

0 commit comments

Comments
 (0)