1
1
"""Modbus Request/Response Decoders."""
2
2
from __future__ import annotations
3
3
4
+ import copy
5
+
4
6
from pymodbus .exceptions import MessageRegisterException , ModbusException
5
7
from pymodbus .logging import Log
6
8
10
12
class DecodePDU :
11
13
"""Decode pdu requests/responses (server/client)."""
12
14
13
- _pdu_class_table : set [tuple [type [ModbusPDU ], type [ModbusPDU ]]] = set ()
14
- _pdu_sub_class_table : set [tuple [type [ModbusPDU ], type [ModbusPDU ]]] = set ()
15
+ pdu_table : dict [int , tuple [type [ModbusPDU ], type [ModbusPDU ]]] = {}
16
+ pdu_sub_table : dict [int , dict [int , tuple [type [ModbusPDU ], type [ModbusPDU ]]]] = {}
17
+
15
18
16
19
def __init__ (self , is_server : bool ) -> None :
17
20
"""Initialize function_tables."""
18
- inx = 0 if is_server else 1
19
- self .lookup : dict [int , type [ModbusPDU ]] = {cl [inx ].function_code : cl [inx ] for cl in self ._pdu_class_table }
20
- self .sub_lookup : dict [int , dict [int , type [ModbusPDU ]]] = {}
21
- for f in self ._pdu_sub_class_table :
22
- if (function_code := f [inx ].function_code ) not in self .sub_lookup :
23
- self .sub_lookup [function_code ] = {f [inx ].sub_function_code : f [inx ]}
24
- else :
25
- self .sub_lookup [function_code ][f [inx ].sub_function_code ] = f [inx ]
21
+ self .pdu_inx = 0 if is_server else 1
26
22
27
23
def lookupPduClass (self , data : bytes ) -> type [ModbusPDU ] | None :
28
24
"""Use `function_code` to determine the class of the PDU."""
29
- func_code = int (data [1 ])
30
- if func_code & 0x80 :
25
+ if (func_code := int (data [1 ])) & 0x80 :
31
26
return ExceptionResponse
27
+ if not (pdu := self .pdu_table .get (func_code , (None , None ))[self .pdu_inx ]):
28
+ return None
29
+
32
30
if func_code == 0x2B : # mei message, sub_function_code is 1 byte
33
31
sub_func_code = int (data [2 ])
34
- return self .sub_lookup [func_code ].get (sub_func_code , None )
35
- if func_code == 0x08 : # diag message, sub_function_code is 2 bytes
32
+ elif func_code == 0x08 : # diag message, sub_function_code is 2 bytes
36
33
sub_func_code = int .from_bytes (data [2 :4 ], "big" )
37
- return self .sub_lookup [func_code ].get (sub_func_code , None )
38
- return self .lookup .get (func_code , None )
34
+ else :
35
+ return pdu
36
+ return self .pdu_sub_table [func_code ].get (sub_func_code , (None , None ))[self .pdu_inx ]
39
37
40
38
def list_function_codes (self ):
41
39
"""Return list of function codes."""
42
- return list (self .lookup )
40
+ return list (self .pdu_table )
43
41
44
42
@classmethod
45
43
def add_pdu (cls , req : type [ModbusPDU ], resp : type [ModbusPDU ]):
46
44
"""Register request/response."""
47
- cls ._pdu_class_table . add (( req , resp ) )
45
+ cls .pdu_table [ req . function_code ] = ( req , resp )
48
46
49
47
@classmethod
50
48
def add_sub_pdu (cls , req : type [ModbusPDU ], resp : type [ModbusPDU ]):
51
49
"""Register request/response."""
52
- cls ._pdu_sub_class_table .add ((req , resp ))
50
+ if req .function_code not in cls .pdu_sub_table :
51
+ cls .pdu_sub_table [req .function_code ] = {}
52
+ cls .pdu_sub_table [req .function_code ][req .sub_function_code ] = (req , resp )
53
53
54
54
def register (self , custom_class : type [ModbusPDU ]) -> None :
55
55
"""Register a function and sub function class with the decoder."""
@@ -59,13 +59,9 @@ def register(self, custom_class: type[ModbusPDU]) -> None:
59
59
". Class needs to be derived from "
60
60
"`pymodbus.pdu.ModbusPDU` "
61
61
)
62
- self .lookup [custom_class .function_code ] = custom_class
63
- if custom_class .sub_function_code >= 0 :
64
- if custom_class .function_code not in self .sub_lookup :
65
- self .sub_lookup [custom_class .function_code ] = {}
66
- self .sub_lookup [custom_class .function_code ][
67
- custom_class .sub_function_code
68
- ] = custom_class
62
+ if "pdu_table" not in self .__dict__ :
63
+ self .pdu_table = copy .deepcopy (DecodePDU .pdu_table )
64
+ self .pdu_table [custom_class .function_code ] = (custom_class , custom_class )
69
65
70
66
def decode (self , frame : bytes ) -> ModbusPDU | None :
71
67
"""Decode a frame."""
@@ -74,14 +70,14 @@ def decode(self, frame: bytes) -> ModbusPDU | None:
74
70
pdu_exp = ExceptionResponse (function_code & 0x7F )
75
71
pdu_exp .decode (frame [1 :])
76
72
return pdu_exp
77
- if not (pdu_class := self .lookup .get (function_code , None ) ):
73
+ if not (pdu_class := self .pdu_table .get (function_code , ( None , None ))[ self . pdu_inx ] ):
78
74
Log .debug ("decode PDU failed for function code {}" , function_code )
79
75
raise ModbusException (f"Unknown response { function_code } " )
80
76
pdu = pdu_class ()
81
77
pdu .decode (frame [1 :])
82
78
if pdu .sub_function_code >= 0 :
83
- lookup = self .sub_lookup .get (pdu .function_code , {})
84
- if sub_class := lookup .get (pdu .sub_function_code , None ) :
79
+ lookup = self .pdu_sub_table .get (pdu .function_code , {})
80
+ if sub_class := lookup .get (pdu .sub_function_code , ( None , None ))[ self . pdu_inx ] :
85
81
pdu = sub_class ()
86
82
pdu .decode (frame [1 :])
87
83
Log .debug ("decoded PDU function_code({} sub {}) -> {} " , pdu .function_code , pdu .sub_function_code , str (pdu ))
0 commit comments