Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

312 #1672

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open

312 #1672

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ['3.7', '3.11', 'pypy-3.8']
python-version: ['3.7', '3.12', 'pypy-3.10']

steps:
- uses: actions/checkout@v4
Expand Down
42 changes: 24 additions & 18 deletions asyncua/common/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,35 +81,41 @@ def status_change_notification(self, status: ua.StatusChangeNotification) -> Non
...


SubscriptionHandler = Union[DataChangeNotificationHandler, EventNotificationHandler, StatusChangeNotificationHandler]
"""
Protocol class representing subscription handlers to receive events from server.
"""


class SubHandler:
"""
Subscription Handler. To receive events from server for a subscription
This class is just a sample class. Whatever class having these methods can be used
"""

def datachange_notification(self, node: Node, val: Any, data: DataChangeNotif) -> None:
class DataChangeNotificationHandlerAsync(Protocol):
async def datachange_notification(self, node: Node, val: Any, data: DataChangeNotif) -> None:
"""
called for every datachange notification from server
"""
pass
...

def event_notification(self, event: ua.EventNotificationList) -> None:

class EventNotificationHandlerAsync(Protocol):
async def event_notification(self, event: ua.EventNotificationList) -> None:
"""
called for every event notification from server
"""
pass
...

def status_change_notification(self, status: ua.StatusChangeNotification) -> None:

class StatusChangeNotificationHandlerAsync(Protocol):
async def status_change_notification(self, status: ua.StatusChangeNotification) -> None:
"""
called for every status change notification from server
"""
pass
...


SubscriptionHandler = Union[
DataChangeNotificationHandler,
EventNotificationHandler,
StatusChangeNotificationHandler,
DataChangeNotificationHandlerAsync,
EventNotificationHandlerAsync,
StatusChangeNotificationHandlerAsync,
]
"""
Protocol class representing subscription handlers to receive events from server.
"""


class Subscription:
Expand Down
3 changes: 1 addition & 2 deletions asyncua/server/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from datetime import timezone

from asyncua import ua
from asyncua.common import subscription
from asyncua.common.subscription import Subscription, SubscriptionHandler
from ..common.utils import Buffer

Expand Down Expand Up @@ -217,7 +216,7 @@ async def stop(self):
pass


class SubHandler(subscription.SubHandler):
class SubHandler:
def __init__(self, storage: HistoryStorageInterface):
self.storage = storage

Expand Down
2 changes: 1 addition & 1 deletion asyncua/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def set_security_string(self, string: str) -> None:
def load_enums(self) -> Dict[str, Type]: # type: ignore[empty-body]
pass

def create_subscription(self, period: Union[ua.CreateSubscriptionParameters, float], handler: subscription.SubHandler, publishing: bool = True) -> "Subscription":
def create_subscription(self, period: Union[ua.CreateSubscriptionParameters, float], handler: subscription.SubscriptionHandler, publishing: bool = True) -> "Subscription":
coro = self.aio_obj.create_subscription(period, _SubHandler(self.tloop, handler), publishing)
aio_sub = self.tloop.post(coro)
return Subscription(self.tloop, aio_sub)
Expand Down
Loading