7
7
import psycopg2 .errors
8
8
from psycopg2 .extensions import AsIs
9
9
from psycopg2 .extras import Json , register_uuid
10
- from typing import Any , Dict , List , Optional , Union
10
+ from typing import Any , Dict , List , Optional , Tuple , Union
11
11
import hashlib
12
12
import os
13
13
14
14
import BrokerConstants
15
15
from pdmodels .Models import BaseDevice , DeviceNote , LogicalDevice , PhysicalDevice , PhysicalToLogicalMapping , User
16
+ from threading import Lock
16
17
17
18
logging .captureWarnings (True )
18
19
20
+ _lock = Lock ()
19
21
20
22
class DAOException (Exception ):
21
23
def __init__ (self , msg : str = None , wrapped : Exception = None ):
@@ -50,10 +52,19 @@ class DAOUniqeConstraintException(DAOException):
50
52
"""
51
53
52
54
55
+ _stopped = False
56
+
53
57
def stop () -> None :
58
+ global _stopped
54
59
logging .info ('Closing connection pool.' )
55
- if conn_pool is not None :
56
- conn_pool .closeall ()
60
+ _lock .acquire ()
61
+ try :
62
+ if not _stopped :
63
+ _stopped = True
64
+ if conn_pool is not None :
65
+ conn_pool .closeall ()
66
+ finally :
67
+ _lock .release ()
57
68
58
69
59
70
@contextmanager
@@ -73,8 +84,8 @@ def _get_connection():
73
84
# This throws an exception if the db hostname cannot be resolved, or
74
85
# the database is not accepting connections.
75
86
try :
76
- # Try lazy initialisation the connection pool and Location/point
77
- # converter to give the db as much time as possible to start.
87
+ # Try lazy initialisation the connection pool to give the db as
88
+ # much time as possible to start.
78
89
if conn_pool is None :
79
90
logging .info ('Creating connection pool, registering type converters.' )
80
91
conn_pool = pool .ThreadedConnectionPool (1 , 5 )
@@ -792,7 +803,7 @@ def toggle_device_mapping(is_active: bool, pd: Optional[Union[PhysicalDevice, in
792
803
_toggle_device_mapping (conn , is_active , pd , ld )
793
804
return
794
805
except Exception as err :
795
- raise err if isinstance (err , DAOException ) else DAOException ('pause_current_device_mapping failed.' , err )
806
+ raise err if isinstance (err , DAOException ) else DAOException ('toggle_device_mapping failed.' , err )
796
807
finally :
797
808
if conn is not None :
798
809
free_conn (conn )
@@ -1319,3 +1330,101 @@ def token_enable(uname)-> None:
1319
1330
if conn is not None :
1320
1331
free_conn (conn )
1321
1332
1333
+
1334
+ # ===========================================================================
1335
+ # Delivery thread related functions
1336
+ # ===========================================================================
1337
+
1338
+ def _get_delivery_table_id (name : str ) -> str :
1339
+ #return sql.Identifier(f'{name}_delivery_q')
1340
+ return f'{ name } _delivery_q'
1341
+
1342
+ def create_delivery_table (name : str ) -> None :
1343
+ logging .info (f'Creating message delivery table for service { name } ' )
1344
+
1345
+ try :
1346
+ qry = f"""create table if not exists { _get_delivery_table_id (name )} (
1347
+ uid integer generated always as identity primary key,
1348
+ json_msg jsonb not null,
1349
+ retry_count integer not null default 0)"""
1350
+
1351
+ with _get_connection () as conn , conn .cursor () as cursor :
1352
+ cursor .execute (qry )
1353
+
1354
+ except Exception as err :
1355
+ raise err if isinstance (err , DAOException ) else DAOException ('create_delivery_table failed.' , err )
1356
+ finally :
1357
+ if conn is not None :
1358
+ conn .commit ()
1359
+ free_conn (conn )
1360
+
1361
+ def get_delivery_msg_count (name : str ) -> int :
1362
+ try :
1363
+ with _get_connection () as conn , conn .cursor () as cursor :
1364
+ cursor .execute (f'select count(uid) from { _get_delivery_table_id (name )} ' )
1365
+ return cursor .fetchone ()[0 ]
1366
+
1367
+ except Exception as err :
1368
+ raise err if isinstance (err , DAOException ) else DAOException ('get_delivery_msg_count failed.' , err )
1369
+ finally :
1370
+ if conn is not None :
1371
+ conn .commit ()
1372
+ free_conn (conn )
1373
+
1374
+ def get_delivery_msg_batch (name : str , from_uid : int = 0 , batch_size : int = 10 ) -> List [Tuple [int , list [dict [Any ]]]]:
1375
+ try :
1376
+ with _get_connection () as conn , conn .cursor () as cursor :
1377
+ # Using order by asc in case time series databases need values inserted in timestamp order.
1378
+ cursor .execute (f'select uid, json_msg, retry_count from { _get_delivery_table_id (name )} where uid > %s order by uid asc limit %s' , (from_uid , batch_size ))
1379
+ if cursor .rowcount < 1 :
1380
+ return 0 , []
1381
+
1382
+ return cursor .fetchall ()
1383
+
1384
+ except Exception as err :
1385
+ raise err if isinstance (err , DAOException ) else DAOException ('get_delivery_msg_batch failed.' , err )
1386
+ finally :
1387
+ if conn is not None :
1388
+ conn .commit ()
1389
+ free_conn (conn )
1390
+
1391
+ def add_delivery_msg (name : str , msg : dict [Any ]) -> None :
1392
+ try :
1393
+ with _get_connection () as conn , conn .cursor () as cursor :
1394
+ cursor .execute (f'insert into { _get_delivery_table_id (name )} (json_msg) values (%s)' , (Json (msg ), ))
1395
+
1396
+ except Exception as err :
1397
+ raise err if isinstance (err , DAOException ) else DAOException ('add_delivery_msg failed.' , err )
1398
+ finally :
1399
+ if conn is not None :
1400
+ conn .commit ()
1401
+ free_conn (conn )
1402
+
1403
+ def remove_delivery_msg (name : str , uid : int ) -> None :
1404
+ try :
1405
+ with _get_connection () as conn , conn .cursor () as cursor :
1406
+ cursor .execute (f'delete from { _get_delivery_table_id (name )} where uid = %s' , (uid , ))
1407
+
1408
+ except Exception as err :
1409
+ raise err if isinstance (err , DAOException ) else DAOException ('remove_delivery_msg failed.' , err )
1410
+ finally :
1411
+ if conn is not None :
1412
+ conn .commit ()
1413
+ free_conn (conn )
1414
+
1415
+ def retry_delivery_msg (name : str , uid : int ) -> None :
1416
+ try :
1417
+ with _get_connection () as conn , conn .cursor () as cursor :
1418
+ cursor .execute (f'select retry_count from { _get_delivery_table_id (name )} where uid = %s' , (uid , ))
1419
+ if cursor .rowcount < 1 :
1420
+ return
1421
+
1422
+ retry_count = cursor .fetchone ()[0 ] + 1
1423
+ cursor .execute (f'update { _get_delivery_table_id (name )} set retry_count = %s where uid = %s' , (retry_count , uid ))
1424
+
1425
+ except Exception as err :
1426
+ raise err if isinstance (err , DAOException ) else DAOException ('retry_delivery_msg failed.' , err )
1427
+ finally :
1428
+ if conn is not None :
1429
+ conn .commit ()
1430
+ free_conn (conn )
0 commit comments