From 68d22f4a0bab7c3d45794825c2f29fff9a83b495 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Pereira?= Date: Fri, 20 Dec 2024 11:03:56 +0000 Subject: [PATCH 1/4] Small codebase improvements, mostly typing hints --- movai_core_shared/common/time.py | 13 ++++---- movai_core_shared/core/message_client.py | 39 +++++++++++++++++------ movai_core_shared/core/zmq/zmq_client.py | 5 +-- movai_core_shared/core/zmq/zmq_helpers.py | 3 +- movai_core_shared/core/zmq/zmq_manager.py | 13 +++++--- movai_core_shared/core/zmq/zmq_server.py | 8 +++-- movai_core_shared/logger.py | 13 +++----- 7 files changed, 60 insertions(+), 34 deletions(-) diff --git a/movai_core_shared/common/time.py b/movai_core_shared/common/time.py index 4d5158e..e7217fc 100644 --- a/movai_core_shared/common/time.py +++ b/movai_core_shared/common/time.py @@ -1,4 +1,5 @@ -from datetime import datetime +from datetime import datetime, timedelta +from typing import Union from movai_core_shared.exceptions import TimeError @@ -30,11 +31,11 @@ def current_timestamp_int() -> int: return int(datetime.now().timestamp()) -def delta_time_int(delta: int) -> int: +def delta_time_int(delta: timedelta) -> int: """returns a future time in timestamp format. Args: - expiration_delta (int): the time delta from now. + expiration_delta (timedelta): the time delta from now. Returns: int: an int representing the time delta. @@ -42,7 +43,7 @@ def delta_time_int(delta: int) -> int: return int((datetime.now() + delta).timestamp()) -def delta_time_float(delta: int) -> float: +def delta_time_float(delta: timedelta) -> float: """returns a future time in timestamp format. Args: @@ -73,11 +74,11 @@ def validate_timestamp(timestamp: int) -> int: raise TimeError("The supplied time argument is not in timestamp format!") from exc -def validate_time(value: int) -> str: +def validate_time(value: Union[int, str]) -> int: """Validate if value is timestamp or datetime Args: - value (int): The datetime to validate + value (int|str): The datetime to validate Raises: ValueError: In case value isn't a time format. diff --git a/movai_core_shared/core/message_client.py b/movai_core_shared/core/message_client.py index 4ebff96..58012fe 100755 --- a/movai_core_shared/core/message_client.py +++ b/movai_core_shared/core/message_client.py @@ -10,12 +10,17 @@ - Ofer Katz (ofer@mov.ai) - 2022 - Erez Zomer (erez@mov.ai) - 2022 """ +from datetime import datetime import time +from typing import TYPE_CHECKING, Optional, cast -from movai_core_shared.core.zmq.zmq_manager import ZMQManager, ZMQType +from movai_core_shared.core.zmq.zmq_manager import ZMQManager, ZMQType, AsyncZMQClient from movai_core_shared.envvars import DEVICE_NAME, FLEET_NAME, SERVICE_NAME from movai_core_shared.exceptions import ArgumentError, MessageFormatError +if TYPE_CHECKING: + from movai_core_shared.core.zmq.zmq_client import ZMQClient + class MessageClient: """ @@ -23,6 +28,7 @@ class MessageClient: It wraps the data into the message structure and send it to the message-server using ZMQClient. """ + _zmq_client: "ZMQClient" def __init__(self, server_addr: str, robot_id: str = "") -> None: """ @@ -48,7 +54,6 @@ def __init__(self, server_addr: str, robot_id: str = "") -> None: "service": SERVICE_NAME, "id": robot_id, } - self._zmq_client = None self._init_zmq_client() def _init_zmq_client(self) -> None: @@ -58,26 +63,28 @@ def _init_zmq_client(self) -> None: self._zmq_client = ZMQManager.get_client(self._server_addr, ZMQType.CLIENT) def _build_request( - self, msg_type: str, data: dict, creation_time: str = None, response_required: bool = False + self, msg_type: str, data: dict, creation_time: Optional[datetime] = None, response_required: bool = False ) -> dict: """Build a request in the format accepted by the message server. Args: msg_type (str): The type of the message (logs, alerts, metrics....) data (dict): The data to include in the request. - creation_time (str, optional): The time the request was created. + creation_time (str, optional): The time the request was created. Defaults to now. response_required (bool, optional): Tells the message-server if the client is wainting for response. Returns: {dict}: The message request to send the message-server """ if creation_time is None: - creation_time = time.time_ns() + creation_time_ns = time.time_ns() + else: + creation_time_ns = creation_time.timestamp() * 1000000000 + creation_time.microsecond * 1000 request = { "request": { "req_type": msg_type, - "created": creation_time, + "created": creation_time_ns, "response_required": response_required, "req_data": data, "robot_info": self._robot_info, @@ -108,7 +115,11 @@ def _fetch_response(self, msg) -> dict: return response def send_request( - self, msg_type: str, data: dict, creation_time: str = None, response_required: bool = False + self, + msg_type: str, + data: dict, + creation_time: Optional[datetime] = None, + response_required: bool = False, ) -> dict: """ Wrap the data into a message request and sent it to the robot message server @@ -116,7 +127,7 @@ def send_request( Args: msg_type (str): the type of message. data (dict): The message data to be sent to the robot message server. - creation_time (str): The time where the request is created. + creation_time (datetime, optional): The time where the request is created. Defaults to now. response_required (bool): whether to wait for response, Default False. """ # Add tags to the request data @@ -167,14 +178,22 @@ def send_msg(self, data: dict, **kwargs) -> None: class AsyncMessageClient(MessageClient): + _zmq_client: AsyncZMQClient + def _init_zmq_client(self) -> None: """ Initializes the ZMQ attributute. """ - self._zmq_client = ZMQManager.get_client(self._server_addr, ZMQType.ASYNC_CLIENT) + self._zmq_client = cast( + AsyncZMQClient, ZMQManager.get_client(self._server_addr, ZMQType.ASYNC_CLIENT) + ) async def send_request( - self, msg_type: str, data: dict, creation_time: str = None, response_required: bool = False + self, + msg_type: str, + data: dict, + creation_time: Optional[datetime] = None, + response_required: bool = False, ) -> dict: """ Wrap the data into a message request and sent it asynchonously to the robot message server diff --git a/movai_core_shared/core/zmq/zmq_client.py b/movai_core_shared/core/zmq/zmq_client.py index f19d8c8..ec042c3 100644 --- a/movai_core_shared/core/zmq/zmq_client.py +++ b/movai_core_shared/core/zmq/zmq_client.py @@ -87,12 +87,12 @@ def handle_socket_errors(self, exc: zmq.error.ZMQError, reset_socket=True) -> No if exc.errno == errno.ENOTSOCK: self._logger.warning(f"ZMQ socket error: {self._addr} got exception: {exc}.") if reset_socket: - self._logger.warning("Resetting ZMQ {self._addr} with potential data loss.") + self._logger.warning(f"Resetting ZMQ {self._addr} with potential data loss.") self.reset(force=True) elif exc.errno == errno.EAGAIN: self._logger.warning(f"ZMQ socket error: {self._addr} got exception: {exc}.") if reset_socket: - self._logger.warning("Resetting ZMQ {self._addr}.") + self._logger.warning(f"Resetting ZMQ {self._addr}.") self.reset() else: self._logger.error( @@ -156,6 +156,7 @@ def receive(self, use_lock: bool = False) -> dict: class AsyncZMQClient(ZMQClient): """An Async implementation of ZMQ Client""" + _socket: zmq.asyncio.Socket _context = zmq.asyncio.Context() def init_lock(self) -> None: diff --git a/movai_core_shared/core/zmq/zmq_helpers.py b/movai_core_shared/core/zmq/zmq_helpers.py index fd465cd..d9a5d98 100644 --- a/movai_core_shared/core/zmq/zmq_helpers.py +++ b/movai_core_shared/core/zmq/zmq_helpers.py @@ -12,6 +12,7 @@ import json from logging import getLogger import random +from typing import List from movai_core_shared.envvars import DEVICE_NAME, SERVICE_NAME from movai_core_shared.exceptions import MessageError @@ -41,7 +42,7 @@ def create_msg(msg: dict): return None -def extract_reponse(buffer: bytes): +def extract_reponse(buffer: List[bytes]) -> dict: """Extracts the response from the buffer. Args: diff --git a/movai_core_shared/core/zmq/zmq_manager.py b/movai_core_shared/core/zmq/zmq_manager.py index 0bef2df..77b1084 100644 --- a/movai_core_shared/core/zmq/zmq_manager.py +++ b/movai_core_shared/core/zmq/zmq_manager.py @@ -11,10 +11,10 @@ """ from enum import Enum from logging import getLogger +from typing import Dict, Type, TypedDict from beartype import beartype -from movai_core_shared.core.zmq.zmq_base import ZMQBase from movai_core_shared.core.zmq.zmq_client import ZMQClient, AsyncZMQClient from movai_core_shared.core.zmq.zmq_subscriber import ZMQSubscriber, AsyncZMQSubscriber from movai_core_shared.core.zmq.zmq_publisher import ZMQPublisher, AsyncZMQPublisher @@ -31,7 +31,12 @@ class ZMQType(Enum): ASYNC_SUBSCRIBER = 6 -ZMQ_TYPES = { +class ZMQTypeValue(TypedDict): + type: Type[ZMQClient] + identity: str + + +ZMQ_TYPES: Dict[ZMQType, ZMQTypeValue] = { ZMQType.CLIENT: {"type": ZMQClient, "identity": "dealer"}, ZMQType.ASYNC_CLIENT: {"type": AsyncZMQClient, "identity": "dealer"}, ZMQType.PUBLISHER: {"type": ZMQPublisher, "identity": "pub"}, @@ -45,7 +50,7 @@ class ZMQManager: """This class will host ZMQ objects by their type and address.""" _logger = getLogger("ZMQManager") - _clients = { + _clients: Dict[ZMQType, Dict[str, ZMQClient]] = { ZMQType.CLIENT: {}, ZMQType.ASYNC_CLIENT: {}, ZMQType.PUBLISHER: {}, @@ -63,7 +68,7 @@ def validate_server_addr(cls, server_addr: str): @classmethod @beartype - def _get_or_create_zmq_object(cls, server_addr: str, zmq_type: ZMQType) -> ZMQBase: + def _get_or_create_zmq_object(cls, server_addr: str, zmq_type: ZMQType) -> ZMQClient: if zmq_type not in cls._clients: raise TypeError(f"{zmq_type} does not exist!") diff --git a/movai_core_shared/core/zmq/zmq_server.py b/movai_core_shared/core/zmq/zmq_server.py index 9df9e77..857e924 100644 --- a/movai_core_shared/core/zmq/zmq_server.py +++ b/movai_core_shared/core/zmq/zmq_server.py @@ -12,6 +12,7 @@ import asyncio import logging from abc import ABC, abstractmethod +from typing import List import zmq import zmq.asyncio @@ -54,11 +55,12 @@ async def spin(self) -> None: """accepts new connections requests to zmq.""" try: self.init_server() + assert self._socket if self._running: self._logger.warning("%s is already running", self._name) self._running = True - except Exception: - self._logger.error("Failed to start %s", self._name) + except Exception as e: + self._logger.error("Failed to start %s: %s", self._name, e) return await self.at_startup() @@ -120,7 +122,7 @@ def stop(self): self._running = False @abstractmethod - async def handle(self, buffer: bytes) -> None: + async def handle(self, buffer: List[bytes]) -> None: pass async def at_startup(self): diff --git a/movai_core_shared/logger.py b/movai_core_shared/logger.py index e2b9a80..e922cfa 100644 --- a/movai_core_shared/logger.py +++ b/movai_core_shared/logger.py @@ -536,13 +536,10 @@ async def get_logs( "count_field": "message", } - try: - query_response = await message_client.send_request( - LOGS_QUERY_HANDLER_MSG_TYPE, query_data, None, True - ) - if "response" in query_response: - response = query_response["response"] - except Exception as error: - raise error + query_response = await message_client.send_request( + LOGS_QUERY_HANDLER_MSG_TYPE, query_data, None, True + ) + if "response" in query_response: + response = query_response["response"] return response if pagination else response.get("data", []) From a81081ecde7e02ae8446ee282aeeb9ab39eac378 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Pereira?= Date: Fri, 24 Jan 2025 11:29:36 +0000 Subject: [PATCH 2/4] Improvements based on review comments --- movai_core_shared/common/time.py | 2 +- movai_core_shared/core/zmq/zmq_client.py | 10 +++++----- movai_core_shared/core/zmq/zmq_helpers.py | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/movai_core_shared/common/time.py b/movai_core_shared/common/time.py index e7217fc..ef45545 100644 --- a/movai_core_shared/common/time.py +++ b/movai_core_shared/common/time.py @@ -47,7 +47,7 @@ def delta_time_float(delta: timedelta) -> float: """returns a future time in timestamp format. Args: - expiration_delta (int): the time delta from now. + expiration_delta (timedelta): the time delta from now. Returns: float: an float representing the time delta. diff --git a/movai_core_shared/core/zmq/zmq_client.py b/movai_core_shared/core/zmq/zmq_client.py index ec042c3..79ffb54 100644 --- a/movai_core_shared/core/zmq/zmq_client.py +++ b/movai_core_shared/core/zmq/zmq_client.py @@ -85,18 +85,18 @@ def handle_socket_errors(self, exc: zmq.error.ZMQError, reset_socket=True) -> No exc: the exception """ if exc.errno == errno.ENOTSOCK: - self._logger.warning(f"ZMQ socket error: {self._addr} got exception: {exc}.") + self._logger.warning("ZMQ socket error: %s got exception: %s.", self._addr, exc) if reset_socket: - self._logger.warning(f"Resetting ZMQ {self._addr} with potential data loss.") + self._logger.warning("Resetting ZMQ %s with potential data loss.", self._addr) self.reset(force=True) elif exc.errno == errno.EAGAIN: - self._logger.warning(f"ZMQ socket error: {self._addr} got exception: {exc}.") + self._logger.warning("ZMQ socket error: %s got exception: %s.", self._addr, exc) if reset_socket: - self._logger.warning(f"Resetting ZMQ {self._addr}.") + self._logger.warning("Resetting ZMQ %s.", self._addr) self.reset() else: self._logger.error( - f"ZMQ socket error: {self._addr} got unhandled ZMQ exception: {exc} " + "ZMQ socket error: %s got unhandled ZMQ exception: %s ", self._addr, exc ) def send(self, msg: dict, use_lock: bool = False) -> None: diff --git a/movai_core_shared/core/zmq/zmq_helpers.py b/movai_core_shared/core/zmq/zmq_helpers.py index d9a5d98..3b8a99c 100644 --- a/movai_core_shared/core/zmq/zmq_helpers.py +++ b/movai_core_shared/core/zmq/zmq_helpers.py @@ -46,7 +46,7 @@ def extract_reponse(buffer: List[bytes]) -> dict: """Extracts the response from the buffer. Args: - buffer (bytes): The memory buffer which contains the response msg. + buffer: List of memory buffers containing the message. Returns: (dict): A response from server. From ad7a6083eb88c404a4fba2e3ee831cf0f895441e Mon Sep 17 00:00:00 2001 From: OttoMation-Movai Date: Wed, 29 Jan 2025 09:54:05 +0000 Subject: [PATCH 3/4] [skip actions] Automatic Bump of build version to 3.0.1.1 --- CHANGELOG.md | 2 +- pyproject.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 19c4ebc..12cbc40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -# vTBD +# v3.0.1 - [BP-1340](https://movai.atlassian.net/browse/BP-1340): Migrate movai-core-shared to py-workflow@v2 # v3.0.0 (same as v2.5.0.20) diff --git a/pyproject.toml b/pyproject.toml index ee88df1..f4d09b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta" [project] name = "movai-core-shared" -version = "3.0.1.0" +version = "3.0.1.1" authors = [ {name = "Backend team", email = "backend@mov.ai"}, ] @@ -36,7 +36,7 @@ exclude = ["movai_core_shared.tests*"] line-length = 100 [tool.bumpversion] -current_version = "3.0.1.0" +current_version = "3.0.1.1" parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)?(\\.(?P\\d+))?" serialize = ["{major}.{minor}.{patch}.{build}"] From 77fb5616d649c789de75c47bdcc76829ae036545 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Pereira?= Date: Thu, 30 Jan 2025 09:55:56 +0000 Subject: [PATCH 4/4] Updated version and changelog --- CHANGELOG.md | 3 +++ pyproject.toml | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 12cbc40..0900bad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +# v3.0.2 +- [BP-1262]((https://movai.atlassian.net/browse/BP-1262): Small codebase improvements + # v3.0.1 - [BP-1340](https://movai.atlassian.net/browse/BP-1340): Migrate movai-core-shared to py-workflow@v2 diff --git a/pyproject.toml b/pyproject.toml index f4d09b1..150caab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ build-backend = "setuptools.build_meta" [project] name = "movai-core-shared" -version = "3.0.1.1" +version = "3.0.2.0" authors = [ {name = "Backend team", email = "backend@mov.ai"}, ] @@ -36,7 +36,7 @@ exclude = ["movai_core_shared.tests*"] line-length = 100 [tool.bumpversion] -current_version = "3.0.1.1" +current_version = "3.0.2.0" parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)?(\\.(?P\\d+))?" serialize = ["{major}.{minor}.{patch}.{build}"]