1
+ import collections .abc
1
2
import functools
2
3
import inspect
4
+ import typing
3
5
import warnings
4
- from typing import Callable , Generic , Optional , TypeVar , overload
6
+ from typing import Callable , Generic , Optional , Sequence , TypeVar , Union , overload
5
7
6
- from ntcore import NetworkTableInstance , Value
8
+ import ntcore
9
+ from ntcore import NetworkTableInstance
7
10
from ntcore .types import ValueT
8
11
12
+
13
+ class StructSerializable (typing .Protocol ):
14
+ """Any type that is a wpiutil.wpistruct."""
15
+
16
+ WPIStruct : typing .ClassVar
17
+
18
+
9
19
T = TypeVar ("T" )
10
- V = TypeVar ("V" , bound = ValueT )
20
+ V = TypeVar ("V" , bound = Union [ ValueT , StructSerializable , Sequence [ StructSerializable ]] )
11
21
12
22
13
23
class tunable (Generic [V ]):
@@ -50,6 +60,10 @@ def execute(self):
50
60
you will want to use setup_tunables to set the object up.
51
61
In normal usage, MagicRobot does this for you, so you don't
52
62
have to do anything special.
63
+
64
+ .. versionchanged:: 2024.1.0
65
+ Added support for WPILib Struct serializable types.
66
+ Integer defaults now create integer topics instead of double topics.
53
67
"""
54
68
55
69
# the way this works is we use a special class to indicate that it
@@ -66,7 +80,7 @@ def execute(self):
66
80
"_ntsubtable" ,
67
81
"_ntwritedefault" ,
68
82
# "__doc__",
69
- "_mkv " ,
83
+ "_topic_type " ,
70
84
"_nt" ,
71
85
)
72
86
@@ -84,10 +98,15 @@ def __init__(
84
98
self ._ntdefault = default
85
99
self ._ntsubtable = subtable
86
100
self ._ntwritedefault = writeDefault
87
- d = Value .makeValue (default )
88
- self ._mkv = Value .getFactoryByType (d .type ())
89
101
# self.__doc__ = doc
90
102
103
+ self ._topic_type = _get_topic_type_for_value (self ._ntdefault )
104
+ if self ._topic_type is None :
105
+ checked_type : type = type (self ._ntdefault )
106
+ raise TypeError (
107
+ f"tunable is not publishable to NetworkTables, type: { checked_type .__name__ } "
108
+ )
109
+
91
110
@overload
92
111
def __get__ (self , instance : None , owner = None ) -> "tunable[V]" : ...
93
112
@@ -96,11 +115,23 @@ def __get__(self, instance, owner=None) -> V: ...
96
115
97
116
def __get__ (self , instance , owner = None ):
98
117
if instance is not None :
99
- return instance ._tunables [self ].value
118
+ return instance ._tunables [self ].get ()
100
119
return self
101
120
102
121
def __set__ (self , instance , value : V ) -> None :
103
- instance ._tunables [self ].setValue (self ._mkv (value ))
122
+ instance ._tunables [self ].set (value )
123
+
124
+
125
+ def _get_topic_type_for_value (value ) -> Optional [Callable [[ntcore .Topic ], typing .Any ]]:
126
+ topic_type = _get_topic_type (type (value ))
127
+ # bytes and str are Sequences. They must be checked before Sequence.
128
+ if topic_type is None and isinstance (value , collections .abc .Sequence ):
129
+ if not value :
130
+ raise ValueError (
131
+ f"tunable default cannot be an empty sequence, got { value } "
132
+ )
133
+ topic_type = _get_topic_type (Sequence [type (value [0 ])]) # type: ignore [misc]
134
+ return topic_type
104
135
105
136
106
137
def setup_tunables (component , cname : str , prefix : Optional [str ] = "components" ) -> None :
@@ -124,7 +155,7 @@ def setup_tunables(component, cname: str, prefix: Optional[str] = "components")
124
155
125
156
NetworkTables = NetworkTableInstance .getDefault ()
126
157
127
- tunables = {}
158
+ tunables : dict [ tunable , ntcore . Topic ] = {}
128
159
129
160
for n in dir (cls ):
130
161
if n .startswith ("_" ):
@@ -139,11 +170,12 @@ def setup_tunables(component, cname: str, prefix: Optional[str] = "components")
139
170
else :
140
171
key = "%s/%s" % (prefix , n )
141
172
142
- ntvalue = NetworkTables .getEntry (key )
173
+ topic = prop ._topic_type (NetworkTables .getTopic (key ))
174
+ ntvalue = topic .getEntry (prop ._ntdefault )
143
175
if prop ._ntwritedefault :
144
- ntvalue .setValue (prop ._ntdefault )
176
+ ntvalue .set (prop ._ntdefault )
145
177
else :
146
- ntvalue .setDefaultValue (prop ._ntdefault )
178
+ ntvalue .setDefault (prop ._ntdefault )
147
179
tunables [prop ] = ntvalue
148
180
149
181
component ._tunables = tunables
@@ -201,6 +233,10 @@ class MyRobot(magicbot.MagicRobot):
201
233
especially if you wish to monitor WPILib objects.
202
234
203
235
.. versionadded:: 2018.1.0
236
+
237
+ .. versionchanged:: 2024.1.0
238
+ WPILib Struct serializable types are supported when the return type is type hinted.
239
+ An ``int`` return type hint now creates an integer topic.
204
240
"""
205
241
if f is None :
206
242
return functools .partial (feedback , key = key )
@@ -222,10 +258,50 @@ class MyRobot(magicbot.MagicRobot):
222
258
return f
223
259
224
260
261
+ _topic_types = {
262
+ bool : ntcore .BooleanTopic ,
263
+ int : ntcore .IntegerTopic ,
264
+ float : ntcore .DoubleTopic ,
265
+ str : ntcore .StringTopic ,
266
+ bytes : ntcore .RawTopic ,
267
+ }
268
+ _array_topic_types = {
269
+ bool : ntcore .BooleanArrayTopic ,
270
+ int : ntcore .IntegerArrayTopic ,
271
+ float : ntcore .DoubleArrayTopic ,
272
+ str : ntcore .StringArrayTopic ,
273
+ }
274
+
275
+
276
+ def _get_topic_type (
277
+ return_annotation ,
278
+ ) -> Optional [Callable [[ntcore .Topic ], typing .Any ]]:
279
+ if return_annotation in _topic_types :
280
+ return _topic_types [return_annotation ]
281
+ if hasattr (return_annotation , "WPIStruct" ):
282
+ return lambda topic : ntcore .StructTopic (topic , return_annotation )
283
+
284
+ # Check for PEP 484 generic types
285
+ origin = getattr (return_annotation , "__origin__" , None )
286
+ args = typing .get_args (return_annotation )
287
+ if origin in (list , tuple , collections .abc .Sequence ) and args :
288
+ # Ensure tuples are tuple[T, ...] or homogenous
289
+ if origin is tuple and not (
290
+ (len (args ) == 2 and args [1 ] is Ellipsis ) or len (set (args )) == 1
291
+ ):
292
+ return None
293
+
294
+ inner_type = args [0 ]
295
+ if inner_type in _array_topic_types :
296
+ return _array_topic_types [inner_type ]
297
+ if hasattr (inner_type , "WPIStruct" ):
298
+ return lambda topic : ntcore .StructArrayTopic (topic , inner_type )
299
+
300
+
225
301
def collect_feedbacks (component , cname : str , prefix : Optional [str ] = "components" ):
226
302
"""
227
303
Finds all methods decorated with :func:`feedback` on an object
228
- and returns a list of 2-tuples (method, NetworkTables entry).
304
+ and returns a list of 2-tuples (method, NetworkTables entry setter ).
229
305
230
306
.. note:: This isn't useful for normal use.
231
307
"""
@@ -246,7 +322,19 @@ def collect_feedbacks(component, cname: str, prefix: Optional[str] = "components
246
322
else :
247
323
key = name
248
324
249
- entry = nt .getEntry (key )
250
- feedbacks .append ((method , entry ))
325
+ return_annotation = typing .get_type_hints (method ).get ("return" , None )
326
+ if return_annotation is not None :
327
+ topic_type = _get_topic_type (return_annotation )
328
+ else :
329
+ topic_type = None
330
+
331
+ if topic_type is None :
332
+ entry = nt .getEntry (key )
333
+ setter = entry .setValue
334
+ else :
335
+ publisher = topic_type (nt .getTopic (key )).publish ()
336
+ setter = publisher .set
337
+
338
+ feedbacks .append ((method , setter ))
251
339
252
340
return feedbacks
0 commit comments