-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodbus.py
151 lines (126 loc) · 4.94 KB
/
modbus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from xmlrpc.client import boolean
import serial, time
import modbus_tk.defines as cst
from modbus_tk import modbus_rtu
import serial.tools.list_ports as prtlst
DEBUG = True
MB_ADR_ENABLE485 = 4100
MB_ADR_SET_CTR = 4097
MB_ADR_SET_RPM = 12289
class ModbusPull():
def __init__(self):
self.msgError = ''
self.__timerun = 0
self.__timeout = 0.1
self.__init_modbus(timeout = 0.2)
def __init_modbus(self,timeout = 0.5):
PORT = self.__get_COMs()
if self.msgError == 'ok':
self.master = modbus_rtu.RtuMaster(serial.Serial(port=PORT, baudrate=9600, bytesize=8, parity='N', stopbits=1, xonxoff=0)
)
else:self.master = None
self.set_timeout(timeout)
def __get_COMs(self):
COMs=[]
pts= prtlst.comports()
for pt in pts:
if 'USB' in pt[1]: #check 'USB' string in device description
COMs.append(pt[0])
elif 'ttyS' in pt[1]: #check 'USB' string in device description
COMs.append(pt[0])
elif 'COM4' in pt[1]: #check 'USB' string in device description
COMs.append(pt[0])
try:
self.msgError = 'ok'
return COMs[0]
except:
self.msgError = 'NO COM'
return 0
@property
def msg_error(self):
return self.msgError
@msg_error.setter
def msg_error(self, msg):
self.msgError = msg
@property
def set_timeout(self):
return self.__timeout
@set_timeout.setter
def set_timeout(self,set_timeout):
if self.msgError != 'ok': return
self.__timeout = set_timeout
self.master.set_timeout(self.__timeout)
def retries(func):
def check(self,*args, **kwarg):
retries = 3
for i in range(retries):
self.__wait_to_next_time(timeout = 0.025)
try:
dataGet = func(self,*args, **kwarg)
self.__save_time_run()
return True, dataGet
except Exception:
self.__save_time_run()
continue
return False, 0
return check
@retries
def get_data(self,id,function,address,value,format = "", tries= 3):
dataGet = self.master.execute(id, function, address, value,data_format = format)
return dataGet
@retries
def send_data(self,id,function,address,value,format = "",tries = 3)->boolean:
dataGet = self.master.execute(id, function, address, output_value = value,data_format = format)
return dataGet
def __save_time_run(self):
self.__timerun = time.time()
def __wait_to_next_time(self,timeout = 0.035):
time_switch_frame = time.time() - self.__timerun
if time_switch_frame < timeout: time.sleep(timeout - time_switch_frame)
class controlPump(ModbusPull):
def __init__(self):
super().__init__()
self.time_start_pump=0
self.stateModbus = 0
for i in range(192,206):
self.set_speed_pump(i,rpm = 400.0)
def check_finish(self, id , total_time = 0)->boolean:
if time.time() - self.time_start_pump >total_time:
self.stop_pump(id)
return True
return False
def enable_rs485(self, id):
self.send_data(id, function = cst.WRITE_SINGLE_COIL, address = MB_ADR_ENABLE485, value = 1)
def set_speed_pump(self, id , rpm):
self.enable_rs485(id)
self.send_data(id, function = cst.WRITE_MULTIPLE_REGISTERS, address = MB_ADR_SET_RPM, value = [rpm],format='>f')
def stop_pump(self, id ):
self.enable_rs485(id)
self.send_data(id, function = cst.WRITE_SINGLE_COIL, address = MB_ADR_SET_CTR, value = 0)
def start_pump(self, id)->boolean:
self.enable_rs485(id)
self.time_start_pump = time.time()
return self.send_data(id, function = cst.WRITE_SINGLE_COIL, address = MB_ADR_SET_CTR, value = 1)
def check_state(self,id):
status = 1 # fail
start_address =0
if (id >192):
start_address = 4100
status,_ = self.get_data(id,cst.READ_COILS,start_address,1)
return status
def get_water_sensor(self,id,start_address=0,num_holding = 1):
return self.get_data(id,cst.READ_HOLDING_REGISTERS,start_address,num_holding)
if __name__ == "__main__":
controlPump = controlPump()
status = True
while(True):
time.sleep(1)
if status == True:
print("start pupm", controlPump.start_pump(193), flush=True)
status = False
if controlPump.check_finish(193,total_time = 5):
status = True
print("status",controlPump.check_state(193), flush=True)
time.sleep(5)