3
3
import inspect
4
4
import typing
5
5
import warnings
6
- from typing import Callable , Generic , Optional , TypeVar , overload
6
+ from typing import Callable , Generic , Optional , Sequence , TypeVar , Union , overload
7
7
8
8
import ntcore
9
- from ntcore import NetworkTableInstance , Value
9
+ from ntcore import NetworkTableInstance
10
10
from ntcore .types import ValueT
11
11
12
+
13
+ class StructSerializable (typing .Protocol ):
14
+ """Any type that is a wpiutil.wpistruct."""
15
+
16
+ WPIStruct : typing .ClassVar
17
+
18
+
12
19
T = TypeVar ("T" )
13
- V = TypeVar ("V" , bound = ValueT )
20
+ V = TypeVar ("V" , bound = Union [ ValueT , StructSerializable , Sequence [ StructSerializable ]] )
14
21
15
22
16
23
class tunable (Generic [V ]):
@@ -69,7 +76,7 @@ def execute(self):
69
76
"_ntsubtable" ,
70
77
"_ntwritedefault" ,
71
78
# "__doc__",
72
- "_mkv " ,
79
+ "_topic_type " ,
73
80
"_nt" ,
74
81
)
75
82
@@ -87,10 +94,15 @@ def __init__(
87
94
self ._ntdefault = default
88
95
self ._ntsubtable = subtable
89
96
self ._ntwritedefault = writeDefault
90
- d = Value .makeValue (default )
91
- self ._mkv = Value .getFactoryByType (d .type ())
92
97
# self.__doc__ = doc
93
98
99
+ self ._topic_type = _get_topic_type_for_value (self ._ntdefault )
100
+ if self ._topic_type is None :
101
+ checked_type : type = type (self ._ntdefault )
102
+ raise TypeError (
103
+ f"tunable is not publishable to NetworkTables, type: { checked_type .__name__ } "
104
+ )
105
+
94
106
@overload
95
107
def __get__ (self , instance : None , owner = None ) -> "tunable[V]" : ...
96
108
@@ -99,11 +111,23 @@ def __get__(self, instance, owner=None) -> V: ...
99
111
100
112
def __get__ (self , instance , owner = None ):
101
113
if instance is not None :
102
- return instance ._tunables [self ].value
114
+ return instance ._tunables [self ].get ()
103
115
return self
104
116
105
117
def __set__ (self , instance , value : V ) -> None :
106
- instance ._tunables [self ].setValue (self ._mkv (value ))
118
+ instance ._tunables [self ].set (value )
119
+
120
+
121
+ def _get_topic_type_for_value (value ) -> Optional [Callable [[ntcore .Topic ], typing .Any ]]:
122
+ topic_type = _get_topic_type (type (value ))
123
+ # bytes and str are Sequences. They must be checked before Sequence.
124
+ if topic_type is None and isinstance (value , collections .abc .Sequence ):
125
+ if not value :
126
+ raise ValueError (
127
+ f"tunable default cannot be an empty sequence, got { value } "
128
+ )
129
+ topic_type = _get_topic_type (Sequence [type (value [0 ])]) # type: ignore [misc]
130
+ return topic_type
107
131
108
132
109
133
def setup_tunables (component , cname : str , prefix : Optional [str ] = "components" ) -> None :
@@ -127,7 +151,7 @@ def setup_tunables(component, cname: str, prefix: Optional[str] = "components")
127
151
128
152
NetworkTables = NetworkTableInstance .getDefault ()
129
153
130
- tunables = {}
154
+ tunables : dict [ tunable , ntcore . Topic ] = {}
131
155
132
156
for n in dir (cls ):
133
157
if n .startswith ("_" ):
@@ -142,11 +166,12 @@ def setup_tunables(component, cname: str, prefix: Optional[str] = "components")
142
166
else :
143
167
key = "%s/%s" % (prefix , n )
144
168
145
- ntvalue = NetworkTables .getEntry (key )
169
+ topic = prop ._topic_type (NetworkTables .getTopic (key ))
170
+ ntvalue = topic .getEntry (prop ._ntdefault )
146
171
if prop ._ntwritedefault :
147
- ntvalue .setValue (prop ._ntdefault )
172
+ ntvalue .set (prop ._ntdefault )
148
173
else :
149
- ntvalue .setDefaultValue (prop ._ntdefault )
174
+ ntvalue .setDefault (prop ._ntdefault )
150
175
tunables [prop ] = ntvalue
151
176
152
177
component ._tunables = tunables
0 commit comments