Skip to content

Improve can.io.* type hierarchy and annotations #1951

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.10"
python-version: "3.13"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
Expand Down
2 changes: 1 addition & 1 deletion can/_entry_points.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,5 @@ def read_entry_points(group: str) -> list[_EntryPoint]:
def read_entry_points(group: str) -> list[_EntryPoint]:
return [
_EntryPoint(ep.name, *ep.value.split(":", maxsplit=1))
for ep in entry_points().get(group, [])
for ep in entry_points().get(group, []) # pylint: disable=no-member
]
4 changes: 0 additions & 4 deletions can/io/asc.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ class ASCReader(TextIOMessageReader):
bus statistics, J1939 Transport Protocol messages) is ignored.
"""

file: TextIO

def __init__(
self,
file: Union[StringPathLike, TextIO],
Expand Down Expand Up @@ -322,8 +320,6 @@ class ASCWriter(TextIOMessageWriter):
It the first message does not have a timestamp, it is set to zero.
"""

file: TextIO

FORMAT_MESSAGE = "{channel} {id:<15} {dir:<4} {dtype} {data}"
FORMAT_MESSAGE_FD = " ".join(
[
Expand Down
46 changes: 21 additions & 25 deletions can/io/blf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import struct
import time
import zlib
from collections.abc import Generator
from collections.abc import Generator, Iterator
from decimal import Decimal
from typing import Any, BinaryIO, Optional, Union, cast

from ..message import Message
from ..typechecking import StringPathLike
from ..util import channel2int, dlc2len, len2dlc
from .generic import BinaryIOMessageReader, FileIOMessageWriter
from .generic import BinaryIOMessageReader, BinaryIOMessageWriter

TSystemTime = tuple[int, int, int, int, int, int, int, int]

Expand Down Expand Up @@ -104,7 +104,7 @@ class BLFParseError(Exception):
TIME_ONE_NANS_FACTOR = Decimal("1e-9")


def timestamp_to_systemtime(timestamp: float) -> TSystemTime:
def timestamp_to_systemtime(timestamp: Optional[float]) -> TSystemTime:
if timestamp is None or timestamp < 631152000:
# Probably not a Unix timestamp
return 0, 0, 0, 0, 0, 0, 0, 0
Expand Down Expand Up @@ -146,8 +146,6 @@ class BLFReader(BinaryIOMessageReader):
silently ignored.
"""

file: BinaryIO

def __init__(
self,
file: Union[StringPathLike, BinaryIO],
Expand Down Expand Up @@ -206,7 +204,7 @@ def __iter__(self) -> Generator[Message, None, None]:
yield from self._parse_container(data)
self.stop()

def _parse_container(self, data):
def _parse_container(self, data: bytes) -> Iterator[Message]:
if self._tail:
data = b"".join((self._tail, data))
try:
Expand All @@ -217,7 +215,7 @@ def _parse_container(self, data):
# Save the remaining data that could not be processed
self._tail = data[self._pos :]

def _parse_data(self, data):
def _parse_data(self, data: bytes) -> Iterator[Message]:
"""Optimized inner loop by making local copies of global variables
and class members and hardcoding some values."""
unpack_obj_header_base = OBJ_HEADER_BASE_STRUCT.unpack_from
Expand Down Expand Up @@ -375,13 +373,11 @@ def _parse_data(self, data):
pos = next_pos


class BLFWriter(FileIOMessageWriter):
class BLFWriter(BinaryIOMessageWriter):
"""
Logs CAN data to a Binary Logging File compatible with Vector's tools.
"""

file: BinaryIO

#: Max log container size of uncompressed data
max_container_size = 128 * 1024

Expand Down Expand Up @@ -412,14 +408,12 @@ def __init__(
Z_DEFAULT_COMPRESSION represents a default compromise between
speed and compression (currently equivalent to level 6).
"""
mode = "rb+" if append else "wb"
try:
super().__init__(file, mode=mode)
super().__init__(file, mode="rb+" if append else "wb")
except FileNotFoundError:
# Trying to append to a non-existing file, create a new one
append = False
mode = "wb"
super().__init__(file, mode=mode)
super().__init__(file, mode="wb")
assert self.file is not None
self.channel = channel
self.compression_level = compression_level
Expand Down Expand Up @@ -452,7 +446,7 @@ def __init__(
# Write a default header which will be updated when stopped
self._write_header(FILE_HEADER_SIZE)

def _write_header(self, filesize):
def _write_header(self, filesize: int) -> None:
header = [b"LOGG", FILE_HEADER_SIZE, self.application_id, 0, 0, 0, 2, 6, 8, 1]
# The meaning of "count of objects read" is unknown
header.extend([filesize, self.uncompressed_size, self.object_count, 0])
Expand All @@ -462,7 +456,7 @@ def _write_header(self, filesize):
# Pad to header size
self.file.write(b"\x00" * (FILE_HEADER_SIZE - FILE_HEADER_STRUCT.size))

def on_message_received(self, msg):
def on_message_received(self, msg: Message) -> None:
channel = channel2int(msg.channel)
if channel is None:
channel = self.channel
Expand Down Expand Up @@ -514,7 +508,7 @@ def on_message_received(self, msg):
data = CAN_MSG_STRUCT.pack(channel, flags, msg.dlc, arb_id, can_data)
self._add_object(CAN_MESSAGE, data, msg.timestamp)

def log_event(self, text, timestamp=None):
def log_event(self, text: str, timestamp: Optional[float] = None) -> None:
"""Add an arbitrary message to the log file as a global marker.

:param str text:
Expand All @@ -525,17 +519,19 @@ def log_event(self, text, timestamp=None):
"""
try:
# Only works on Windows
text = text.encode("mbcs")
encoded = text.encode("mbcs")
except LookupError:
text = text.encode("ascii")
encoded = text.encode("ascii")
comment = b"Added by python-can"
marker = b"python-can"
data = GLOBAL_MARKER_STRUCT.pack(
0, 0xFFFFFF, 0xFF3300, 0, len(text), len(marker), len(comment)
0, 0xFFFFFF, 0xFF3300, 0, len(encoded), len(marker), len(comment)
)
self._add_object(GLOBAL_MARKER, data + text + marker + comment, timestamp)
self._add_object(GLOBAL_MARKER, data + encoded + marker + comment, timestamp)

def _add_object(self, obj_type, data, timestamp=None):
def _add_object(
self, obj_type: int, data: bytes, timestamp: Optional[float] = None
) -> None:
if timestamp is None:
timestamp = self.stop_timestamp or time.time()
if self.start_timestamp is None:
Expand Down Expand Up @@ -564,7 +560,7 @@ def _add_object(self, obj_type, data, timestamp=None):
if self._buffer_size >= self.max_container_size:
self._flush()

def _flush(self):
def _flush(self) -> None:
"""Compresses and writes data in the buffer to file."""
if self.file.closed:
return
Expand All @@ -578,7 +574,7 @@ def _flush(self):
self._buffer = [tail]
self._buffer_size = len(tail)
if not self.compression_level:
data = uncompressed_data
data: "Union[bytes, memoryview[int]]" = uncompressed_data # noqa: UP037
method = NO_COMPRESSION
else:
data = zlib.compress(uncompressed_data, self.compression_level)
Expand All @@ -601,7 +597,7 @@ def file_size(self) -> int:
"""Return an estimate of the current file size in bytes."""
return self.file.tell() + self._buffer_size

def stop(self):
def stop(self) -> None:
"""Stops logging and closes the file."""
self._flush()
if self.file.seekable():
Expand Down
11 changes: 4 additions & 7 deletions can/io/canutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import logging
from collections.abc import Generator
from typing import Any, TextIO, Union
from typing import Any, Optional, TextIO, Union

from can.message import Message

Expand Down Expand Up @@ -34,8 +34,6 @@ class CanutilsLogReader(TextIOMessageReader):
``(0.0) vcan0 001#8d00100100820100``
"""

file: TextIO

def __init__(
self,
file: Union[StringPathLike, TextIO],
Expand Down Expand Up @@ -148,13 +146,12 @@ def __init__(
:param bool append: if set to `True` messages are appended to
the file, else the file is truncated
"""
mode = "a" if append else "w"
super().__init__(file, mode=mode)
super().__init__(file, mode="a" if append else "w")

self.channel = channel
self.last_timestamp = None
self.last_timestamp: Optional[float] = None

def on_message_received(self, msg):
def on_message_received(self, msg: Message) -> None:
# this is the case for the very first message:
if self.last_timestamp is None:
self.last_timestamp = msg.timestamp or 0.0
Expand Down
7 changes: 1 addition & 6 deletions can/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ class CSVReader(TextIOMessageReader):
Any line separator is accepted.
"""

file: TextIO

def __init__(
self,
file: Union[StringPathLike, TextIO],
Expand Down Expand Up @@ -89,8 +87,6 @@ class CSVWriter(TextIOMessageWriter):
Each line is terminated with a platform specific line separator.
"""

file: TextIO

def __init__(
self,
file: Union[StringPathLike, TextIO],
Expand All @@ -106,8 +102,7 @@ def __init__(
the file is truncated and starts with a newly
written header line
"""
mode = "a" if append else "w"
super().__init__(file, mode=mode)
super().__init__(file, mode="a" if append else "w")

# Write a header row
if not append:
Expand Down
Loading