1
1
import argparse
2
2
from sys import argv
3
3
from typing import NamedTuple , List , Type , Iterable
4
+ import logging
4
5
5
6
from pymodbus .constants import Defaults
7
+ from pymodbus .exceptions import ModbusIOException , ConnectionException
6
8
7
9
from mate3 import mate3_connection
8
10
import time
9
11
10
12
from mate3 .api import AnyBlock , Device
11
13
14
+
15
+ logger = logging .getLogger ('mate3.mate3_pg' )
16
+
17
+
12
18
try :
13
19
from yaml import load , FullLoader
14
20
except ImportError :
@@ -37,6 +43,7 @@ class Definition(NamedTuple):
37
43
38
44
39
45
def read_definitions (f ) -> List [Table ]:
46
+ logger .info (f"Reading field definitions from { f .name } " )
40
47
in_yaml = load (f , Loader = FullLoader )
41
48
tables = []
42
49
@@ -55,10 +62,12 @@ def read_definitions(f) -> List[Table]:
55
62
Table (table_name , definitions )
56
63
)
57
64
65
+ logger .debug (f"Found definitions: { tables } " )
58
66
return tables
59
67
60
68
61
69
def create_tables (conn , tables : List [Table ], hypertables : bool ):
70
+ logger .info ("Creating tables (if needed)" )
62
71
with conn .cursor () as curs :
63
72
for table in tables :
64
73
sql = (
@@ -70,13 +79,17 @@ def create_tables(conn, tables: List[Table], hypertables: bool):
70
79
sql = sql .rstrip (',' )
71
80
sql += '\n )'
72
81
82
+ logger .debug (f"Executing: { sql } " )
73
83
curs .execute (sql )
84
+
74
85
if hypertables :
75
86
try :
76
- curs .execute ("SELECT create_hypertable(%s, 'timestamp')" , [table .name ])
87
+ sql = f"SELECT create_hypertable('{ table .name } ', 'timestamp')"
88
+ logger .debug (f"Executing: { sql } " )
89
+ curs .execute (sql , [table .name ])
77
90
except psycopg2 .DatabaseError as e :
78
91
if 'already a hypertable' in str (e ):
79
- pass
92
+ logger . debug ( "Table is already a hypertable" )
80
93
else :
81
94
raise
82
95
@@ -107,7 +120,9 @@ def insert(conn, tables: List[Table], blocks: List[AnyBlock]):
107
120
f"(timestamp, { ', ' .join (column_names )} ) "
108
121
f"VALUES (NOW(), { ', ' .join (placeholders )} )"
109
122
)
110
- curs .execute (sql , list (insert_kv .values ()))
123
+ values = list (insert_kv .values ())
124
+ logger .debug (f"Executing: { sql } ; With values { values } " )
125
+ curs .execute (sql , values )
111
126
112
127
113
128
def main ():
@@ -152,24 +167,56 @@ def main():
152
167
help = "Should we create tables as hypertables? Use only if you are using TimescaleDB" ,
153
168
action = 'store_true' ,
154
169
)
170
+ parser .add_argument (
171
+ "--quiet" , "-q" ,
172
+ dest = "quiet" ,
173
+ help = "Hide status output. Only errors will be shown" ,
174
+ action = 'store_true' ,
175
+ )
176
+ parser .add_argument (
177
+ "--debug" ,
178
+ dest = "debug" ,
179
+ help = "Show debug logging" ,
180
+ action = 'store_true' ,
181
+ )
155
182
156
183
args = parser .parse_args (argv [1 :])
184
+
185
+ logging .basicConfig (format = "%(asctime)s - %(levelname)s - %(name)s - %(message)s" , level = logging .ERROR )
186
+ root_logger = logging .getLogger ()
187
+ mate3_logger = logging .getLogger ('mate3' )
188
+
189
+ if args .debug :
190
+ root_logger .setLevel (logging .DEBUG )
191
+ elif args .quiet :
192
+ mate3_logger .setLevel (logging .ERROR )
193
+ else :
194
+ mate3_logger .setLevel (logging .INFO )
195
+
157
196
tables = read_definitions (args .definitions )
158
197
198
+ logger .info (f"Connecting to Postgres at { args .database_url } " )
159
199
with psycopg2 .connect (args .database_url ) as conn :
160
200
conn .autocommit = True
161
-
201
+ logger . debug ( f"Connected to Postgres" )
162
202
create_tables (conn , tables , hypertables = args .hypertables )
163
- with mate3_connection (args .host , args .port ) as client :
164
- while True :
165
- start = time .time ()
166
-
167
- insert (conn , tables , list (client .all_blocks ()))
168
203
169
- total = time .time () - start
170
- sleep_time = args .interval - total
171
- if sleep_time > 0 :
172
- time .sleep (args .interval - total )
204
+ while True : # Reconnect loop
205
+ try :
206
+ logger .info (f"Connecting to mate3 at { args .host } :{ args .port } " )
207
+ with mate3_connection (args .host , args .port ) as client :
208
+ while True :
209
+ start = time .time ()
210
+
211
+ insert (conn , tables , list (client .all_blocks ()))
212
+
213
+ total = time .time () - start
214
+ sleep_time = args .interval - total
215
+ if sleep_time > 0 :
216
+ time .sleep (args .interval - total )
217
+ except (ModbusIOException , ConnectionException ) as e :
218
+ logger .error (f"Communication error: { e } . Will try to reconnect in { args .interval } seconds" )
219
+ time .sleep (args .interval )
173
220
174
221
175
222
if __name__ == '__main__' :
0 commit comments