@@ -12,9 +12,13 @@ class DecodePDU:
12
12
13
13
_pdu_class_table : set [tuple [type [ModbusPDU ], type [ModbusPDU ]]] = set ()
14
14
_pdu_sub_class_table : set [tuple [type [ModbusPDU ], type [ModbusPDU ]]] = set ()
15
+ pdu_table : dict [int , tuple [type [ModbusPDU ], type [ModbusPDU ]]] = {}
16
+ pdu_sub_table : dict [int , dict [int , tuple [type [ModbusPDU ], type [ModbusPDU ]]]] = {}
17
+
15
18
16
19
def __init__ (self , is_server : bool ) -> None :
17
20
"""Initialize function_tables."""
21
+ self .pdu_inx = 0 if is_server else 1
18
22
inx = 0 if is_server else 1
19
23
self .lookup : dict [int , type [ModbusPDU ]] = {cl [inx ].function_code : cl [inx ] for cl in self ._pdu_class_table }
20
24
self .sub_lookup : dict [int , dict [int , type [ModbusPDU ]]] = {}
@@ -26,29 +30,35 @@ def __init__(self, is_server: bool) -> None:
26
30
27
31
def lookupPduClass (self , data : bytes ) -> type [ModbusPDU ] | None :
28
32
"""Use `function_code` to determine the class of the PDU."""
29
- func_code = int (data [1 ])
30
- if func_code & 0x80 :
33
+ if (func_code := int (data [1 ])) & 0x80 :
31
34
return ExceptionResponse
35
+ if not (pdu := self .pdu_table .get (func_code , (None , None ))[self .pdu_inx ]):
36
+ return None
37
+
32
38
if func_code == 0x2B : # mei message, sub_function_code is 1 byte
33
39
sub_func_code = int (data [2 ])
34
- return self .sub_lookup [func_code ].get (sub_func_code , None )
35
- if func_code == 0x08 : # diag message, sub_function_code is 2 bytes
40
+ elif func_code == 0x08 : # diag message, sub_function_code is 2 bytes
36
41
sub_func_code = int .from_bytes (data [2 :4 ], "big" )
37
- return self .sub_lookup [func_code ].get (sub_func_code , None )
38
- return self .lookup .get (func_code , None )
42
+ else :
43
+ return pdu
44
+ return self .pdu_sub_table [func_code ].get (sub_func_code , (None , None ))[self .pdu_inx ]
39
45
40
46
def list_function_codes (self ):
41
47
"""Return list of function codes."""
42
- return list (self .lookup )
48
+ return list (self .pdu_table )
43
49
44
50
@classmethod
45
51
def add_pdu (cls , req : type [ModbusPDU ], resp : type [ModbusPDU ]):
46
52
"""Register request/response."""
53
+ cls .pdu_table [req .function_code ] = (req , resp )
47
54
cls ._pdu_class_table .add ((req , resp ))
48
55
49
56
@classmethod
50
57
def add_sub_pdu (cls , req : type [ModbusPDU ], resp : type [ModbusPDU ]):
51
58
"""Register request/response."""
59
+ if req .function_code not in cls .pdu_sub_table :
60
+ cls .pdu_sub_table [req .function_code ] = {}
61
+ cls .pdu_sub_table [req .function_code ][req .sub_function_code ] = (req , resp )
52
62
cls ._pdu_sub_class_table .add ((req , resp ))
53
63
54
64
def register (self , custom_class : type [ModbusPDU ]) -> None :
@@ -60,12 +70,7 @@ def register(self, custom_class: type[ModbusPDU]) -> None:
60
70
"`pymodbus.pdu.ModbusPDU` "
61
71
)
62
72
self .lookup [custom_class .function_code ] = custom_class
63
- if custom_class .sub_function_code >= 0 :
64
- if custom_class .function_code not in self .sub_lookup :
65
- self .sub_lookup [custom_class .function_code ] = {}
66
- self .sub_lookup [custom_class .function_code ][
67
- custom_class .sub_function_code
68
- ] = custom_class
73
+ self .pdu_table [custom_class .function_code ] = (custom_class , custom_class )
69
74
70
75
def decode (self , frame : bytes ) -> ModbusPDU | None :
71
76
"""Decode a frame."""
0 commit comments