Skip to content

Commit a8e42b0

Browse files
committed
wip
1 parent f43bedb commit a8e42b0

File tree

9 files changed

+550
-246
lines changed

9 files changed

+550
-246
lines changed

can/cli.py

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
import argparse
2+
import re
3+
from collections.abc import Sequence
4+
from typing import Any, Optional, Union
5+
6+
import can
7+
from can.typechecking import CanFilter, TAdditionalCliArgs
8+
from can.util import _dict2timing, cast_from_string
9+
10+
11+
def add_bus_arguments(
12+
parser: argparse.ArgumentParser,
13+
*,
14+
filter_arg: bool = False,
15+
prefix: Optional[str] = None,
16+
group_title: str = "bus arguments",
17+
) -> None:
18+
"""Adds CAN bus configuration options to an argument parser.
19+
20+
:param parser:
21+
The argument parser to which the options will be added.
22+
:param filter_arg:
23+
Whether to include the filter argument.
24+
:param prefix:
25+
An optional prefix for the argument names, allowing configuration of multiple buses.
26+
:param group_title:
27+
The title of the argument group.
28+
"""
29+
if prefix:
30+
group_title += f" ({prefix})"
31+
group = parser.add_argument_group(group_title)
32+
33+
flags = [f"--{prefix}-channel"] if prefix else ["-c", "--channel"]
34+
dest = f"{prefix}_channel" if prefix else "channel"
35+
group.add_argument(
36+
*flags,
37+
dest=dest,
38+
default=argparse.SUPPRESS,
39+
help=r"Most backend interfaces require some sort of channel. For "
40+
r"example with the serial interface the channel might be a rfcomm"
41+
r' device: "/dev/rfcomm0". With the socketcan interface valid '
42+
r'channel examples include: "can0", "vcan0".',
43+
)
44+
45+
flags = [f"--{prefix}-interface"] if prefix else ["-i", "--interface"]
46+
dest = f"{prefix}_interface" if prefix else "interface"
47+
group.add_argument(
48+
*flags,
49+
dest=dest,
50+
default=argparse.SUPPRESS,
51+
choices=sorted(can.VALID_INTERFACES),
52+
metavar="INTERFACE",
53+
help="""Specify the backend CAN interface to use. If left blank,
54+
fall back to reading from configuration files.""",
55+
)
56+
57+
flags = [f"--{prefix}-bitrate"] if prefix else ["-b", "--bitrate"]
58+
dest = f"{prefix}_bitrate" if prefix else "bitrate"
59+
group.add_argument(
60+
*flags,
61+
dest=dest,
62+
type=int,
63+
default=argparse.SUPPRESS,
64+
help="Bitrate to use for the CAN bus.",
65+
)
66+
67+
flags = [f"--{prefix}-fd"] if prefix else ["--fd"]
68+
dest = f"{prefix}_fd" if prefix else "fd"
69+
group.add_argument(
70+
*flags,
71+
dest=dest,
72+
default=argparse.SUPPRESS,
73+
action="store_true",
74+
help="Activate CAN-FD support",
75+
)
76+
77+
flags = [f"--{prefix}-data-bitrate"] if prefix else ["--data-bitrate"]
78+
dest = f"{prefix}_data_bitrate" if prefix else "data_bitrate"
79+
group.add_argument(
80+
*flags,
81+
dest=dest,
82+
type=int,
83+
default=argparse.SUPPRESS,
84+
help="Bitrate to use for the data phase in case of CAN-FD.",
85+
)
86+
87+
flags = [f"--{prefix}-timing"] if prefix else ["--timing"]
88+
dest = f"{prefix}_timing" if prefix else "timing"
89+
group.add_argument(
90+
*flags,
91+
dest=dest,
92+
action=_BitTimingAction,
93+
nargs=argparse.ONE_OR_MORE,
94+
default=argparse.SUPPRESS,
95+
metavar="TIMING_ARG",
96+
help="Configure bit rate and bit timing. For example, use "
97+
"`--timing f_clock=8_000_000 tseg1=5 tseg2=2 sjw=2 brp=2 nof_samples=1` for classical CAN "
98+
"or `--timing f_clock=80_000_000 nom_tseg1=119 nom_tseg2=40 nom_sjw=40 nom_brp=1 "
99+
"data_tseg1=29 data_tseg2=10 data_sjw=10 data_brp=1` for CAN FD. "
100+
"Check the python-can documentation to verify whether your "
101+
"CAN interface supports the `timing` argument.",
102+
)
103+
104+
if filter_arg:
105+
flags = [f"--{prefix}-filter"] if prefix else ["--filter"]
106+
dest = f"{prefix}_can_filters" if prefix else "can_filters"
107+
group.add_argument(
108+
*flags,
109+
dest=dest,
110+
nargs=argparse.ONE_OR_MORE,
111+
action=_CanFilterAction,
112+
default=argparse.SUPPRESS,
113+
help="R|Space separated CAN filters for the given CAN interface:"
114+
"\n <can_id>:<can_mask> (matches when <received_can_id> & mask =="
115+
" can_id & mask)"
116+
"\n <can_id>~<can_mask> (matches when <received_can_id> & mask !="
117+
" can_id & mask)"
118+
"\nFx to show only frames with ID 0x100 to 0x103 and 0x200 to 0x20F:"
119+
"\n python -m can.viewer --filter 100:7FC 200:7F0"
120+
"\nNote that the ID and mask are always interpreted as hex values",
121+
metavar="{<can_id>:<can_mask>,<can_id>~<can_mask>}",
122+
)
123+
124+
flags = [f"--{prefix}-bus-kwargs"] if prefix else ["--bus-kwargs"]
125+
dest = f"{prefix}_bus_kwargs" if prefix else "bus_kwargs"
126+
group.add_argument(
127+
*flags,
128+
dest=dest,
129+
action=_BusKwargsAction,
130+
nargs=argparse.ONE_OR_MORE,
131+
default=argparse.SUPPRESS,
132+
metavar="BUS_KWARG",
133+
help="Pass keyword arguments down to the instantiation of the bus class. "
134+
"For example, `-i vector -c 1 --bus-kwargs app_name=MyCanApp serial=1234` is equivalent "
135+
"to opening the bus with `can.Bus('vector', channel=1, app_name='MyCanApp', serial=1234)",
136+
)
137+
138+
139+
def create_bus_from_namespace(
140+
namespace: argparse.Namespace,
141+
*,
142+
prefix: Optional[str] = None,
143+
**kwargs: Any,
144+
) -> can.BusABC:
145+
"""Creates and returns a CAN bus instance based on the provided namespace and arguments.
146+
147+
:param namespace:
148+
The namespace containing parsed arguments.
149+
:param prefix:
150+
An optional prefix for the argument names, enabling support for multiple buses.
151+
:param kwargs:
152+
Additional keyword arguments to configure the bus.
153+
:return:
154+
A CAN bus instance.
155+
"""
156+
config: dict[str, Any] = {"single_handle": True, **kwargs}
157+
158+
for keyword in (
159+
"channel",
160+
"interface",
161+
"bitrate",
162+
"fd",
163+
"data_bitrate",
164+
"can_filters",
165+
"timing",
166+
"bus_kwargs",
167+
):
168+
prefixed_keyword = f"{prefix}_{keyword}" if prefix else keyword
169+
170+
if prefixed_keyword in namespace:
171+
value = getattr(namespace, prefixed_keyword)
172+
173+
if keyword == "bus_kwargs":
174+
config.update(value)
175+
else:
176+
config[keyword] = value
177+
178+
return can.Bus(**config)
179+
180+
181+
class _CanFilterAction(argparse.Action):
182+
def __call__(
183+
self,
184+
parser: argparse.ArgumentParser,
185+
namespace: argparse.Namespace,
186+
values: Union[str, Sequence[Any], None],
187+
option_string: Optional[str] = None,
188+
) -> None:
189+
if not isinstance(values, list):
190+
raise argparse.ArgumentError(self, "Invalid filter argument")
191+
192+
print(f"Adding filter(s): {values}")
193+
can_filters: list[CanFilter] = []
194+
195+
for filt in values:
196+
if ":" in filt:
197+
parts = filt.split(":")
198+
can_id = int(parts[0], base=16)
199+
can_mask = int(parts[1], base=16)
200+
elif "~" in filt:
201+
parts = filt.split("~")
202+
can_id = int(parts[0], base=16) | 0x20000000 # CAN_INV_FILTER
203+
can_mask = int(parts[1], base=16) & 0x20000000 # socket.CAN_ERR_FLAG
204+
else:
205+
raise argparse.ArgumentError(self, "Invalid filter argument")
206+
can_filters.append({"can_id": can_id, "can_mask": can_mask})
207+
208+
setattr(namespace, self.dest, can_filters)
209+
210+
211+
class _BitTimingAction(argparse.Action):
212+
def __call__(
213+
self,
214+
parser: argparse.ArgumentParser,
215+
namespace: argparse.Namespace,
216+
values: Union[str, Sequence[Any], None],
217+
option_string: Optional[str] = None,
218+
) -> None:
219+
if not isinstance(values, list):
220+
raise argparse.ArgumentError(self, "Invalid --timing argument")
221+
222+
timing_dict: dict[str, int] = {}
223+
for arg in values:
224+
try:
225+
key, value_string = arg.split("=")
226+
value = int(value_string)
227+
timing_dict[key] = value
228+
except ValueError:
229+
raise argparse.ArgumentError(
230+
self, f"Invalid timing argument: {arg}"
231+
) from None
232+
233+
if not (timing := _dict2timing(timing_dict)):
234+
err_msg = "Invalid --timing argument. Incomplete parameters."
235+
raise argparse.ArgumentError(self, err_msg)
236+
237+
setattr(namespace, self.dest, timing)
238+
print(timing)
239+
240+
241+
class _BusKwargsAction(argparse.Action):
242+
def __call__(
243+
self,
244+
parser: argparse.ArgumentParser,
245+
namespace: argparse.Namespace,
246+
values: Union[str, Sequence[Any], None],
247+
option_string: Optional[str] = None,
248+
) -> None:
249+
if not isinstance(values, list):
250+
raise argparse.ArgumentError(self, "Invalid --bus-kwargs argument")
251+
252+
bus_kwargs: dict[str, Union[str, int, float, bool]] = {}
253+
254+
for arg in values:
255+
try:
256+
match = re.match(
257+
r"^(?P<name>[_a-zA-Z][_a-zA-Z0-9]*)=(?P<value>\S*?)$",
258+
arg,
259+
)
260+
if not match:
261+
raise ValueError
262+
key = match["name"].replace("-", "_")
263+
string_val = match["value"]
264+
bus_kwargs[key] = cast_from_string(string_val)
265+
except ValueError:
266+
raise argparse.ArgumentError(
267+
self,
268+
f"Unable to parse bus keyword argument '{arg}'",
269+
) from None
270+
271+
setattr(namespace, self.dest, bus_kwargs)
272+
273+
274+
def _add_extra_args(
275+
parser: Union[argparse.ArgumentParser, argparse._ArgumentGroup],
276+
) -> None:
277+
parser.add_argument(
278+
"extra_args",
279+
nargs=argparse.REMAINDER,
280+
help="The remaining arguments will be used for logger/player initialisation. "
281+
"For example, `can_logger -i virtual -c test -f logfile.blf --compression-level=9` "
282+
"passes the keyword argument `compression_level=9` to the BlfWriter.",
283+
)
284+
285+
286+
def _parse_additional_config(unknown_args: Sequence[str]) -> TAdditionalCliArgs:
287+
for arg in unknown_args:
288+
if not re.match(r"^--[a-zA-Z][a-zA-Z0-9\-]*=\S*?$", arg):
289+
raise ValueError(f"Parsing argument {arg} failed")
290+
291+
def _split_arg(_arg: str) -> tuple[str, str]:
292+
left, right = _arg.split("=", 1)
293+
return left.lstrip("-").replace("-", "_"), right
294+
295+
args: dict[str, Union[str, int, float, bool]] = {}
296+
for key, string_val in map(_split_arg, unknown_args):
297+
args[key] = cast_from_string(string_val)
298+
return args
299+
300+
301+
def _set_logging_level_from_namespace(namespace: argparse.Namespace) -> None:
302+
if "verbosity" in namespace:
303+
logging_level_names = [
304+
"critical",
305+
"error",
306+
"warning",
307+
"info",
308+
"debug",
309+
"subdebug",
310+
]
311+
can.set_logging_level(logging_level_names[min(5, namespace.verbosity)])

0 commit comments

Comments
 (0)