Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

basic pubsub #826

Draft
wants to merge 33 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
14901f0
re commit
schroeder- May 8, 2022
153cfa7
Fix merge errors
schroeder- May 8, 2022
95342f8
Make package version available at runtime
jrast Apr 22, 2022
5f9d3e6
remove usless print
schroeder- Apr 22, 2022
b5dc5b5
allow instantiate methods
schroeder- Apr 22, 2022
9cb5dfe
enable mypy
vruge Mar 6, 2022
b1572aa
ignore mypy no-redef
vruge Mar 6, 2022
a6ab13a
ignore mypy Name 'Union' already defined
vruge Mar 6, 2022
b4f5a1d
try to avoid code scanning error
vruge Mar 17, 2022
303ba8d
fix: read_attribute can optional not raise an exception
Falital Apr 27, 2022
45cf1d6
reject not supported identity tokens
schroeder- Apr 26, 2022
c2fe377
Remove getter
schroeder- Apr 27, 2022
6aa812c
MinimumSamplingInterval parse as float
schroeder- Apr 27, 2022
a182325
feat: improve run-tests.sh
Falital Apr 27, 2022
8b8662d
Fixed code generator for structures and enums
jrast Apr 26, 2022
62c0ce9
Adjusted dataclass serializer and deserializer to handle forward anno…
jrast Apr 26, 2022
785be82
allow setting StatusCode
schroeder- Apr 22, 2022
3f53422
expand test
schroeder- Apr 22, 2022
2d10030
use dataclasses replace
schroeder- Apr 27, 2022
e898dec
Fix test
schroeder- Apr 27, 2022
c5ee3a8
handle locale correct in schema
schroeder- Apr 29, 2022
268c94b
fix model comparison
schroeder- Apr 30, 2022
34666cd
support fields with "AllowSubtypes"
schroeder- Apr 30, 2022
250210f
update nodetset to current nodeset
schroeder- Apr 30, 2022
9c9522e
Fix merge error
schroeder- Apr 30, 2022
073ca47
add mypy ignore
schroeder- May 1, 2022
c7b7b59
fix merge error
schroeder- May 1, 2022
c28c43e
typo
schroeder- May 1, 2022
cee436a
Handle DataSetFieldFlags
schroeder- May 1, 2022
3aab791
test, enable check override
vruge May 7, 2022
a72db11
re commit
schroeder- May 8, 2022
0e6bd7a
Fix merge errors
schroeder- May 8, 2022
c5f9c09
add binary config
schroeder- May 9, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 pytest
python -m pip install flake8 pytest mypy
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
if [ -f dev_requirements.txt ]; then pip install -r dev_requirements.txt; fi
- name: Lint with flake8
Expand All @@ -35,6 +35,10 @@ jobs:
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --statistics
- name: Lint with mypy
run: |
python -m pip install types-aiofiles types-python-dateutil types-pytz
python -m mypy --show-error-code asyncua/
- name: Test with pytest
run: |
pytest -v -s
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ examples/history.db
/.settings/
/venv/
.eggs*
.coverage
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ A set of command line tools also available: https://github.com/FreeOpcUa/opcua-a
* `uacall `(call method of a node)
* `uasubscribe `(subscribe to a node and print datachange events)
* `uaclient `(connect to server and start python shell)
* `uaserver `(starts a demo OPC UA server)
* `uaserver `(starts a demo OPC UA server)
`tools/uaserver --populate --certificate cert.pem --private_key pk.pem`

How to generate certificate: https://github.com/FreeOpcUa/opcua-asyncio/tree/master/examples/generate_certificate.sh
Expand Down Expand Up @@ -196,11 +196,21 @@ All protocol code is under opcua directory
## Running tests:

```
python -m pip install -r requirements.txt
python -m pip install -r dev_requirements.txt
pytest -v -s
```
Or
```
./run-test.sh -v -s
```

## Coverage

```
pytest -v -s --cov asyncua --cov-report=html
```
Or
```
./run-test.sh -v -s --cov asyncua --cov-report=html
```
7 changes: 7 additions & 0 deletions asyncua/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
"""
Pure Python OPC-UA library
"""
import sys
if sys.version_info >= (3, 8):
from importlib import metadata
else:
import importlib_metadata as metadata

__version__ = metadata.version("asyncua")

from .common import Node, uamethod
from .client import Client
Expand Down
Binary file modified asyncua/binary_address_space.pickle
Binary file not shown.
8 changes: 4 additions & 4 deletions asyncua/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async def _set_security(
private_key.password,
private_key.extension,
)
self.security_policy = policy(server_cert, cert, pk, mode)
self.security_policy = policy(server_cert, cert, pk, mode) # type: ignore
self.uaclient.set_security(self.security_policy)

async def load_client_certificate(self, path: str, extension: Optional[str] = None):
Expand Down Expand Up @@ -566,17 +566,17 @@ async def create_subscription(
_logger.info(f"Result from subscription update: {results}")
return subscription

def get_subscription_revised_params(
def get_subscription_revised_params( # type: ignore
self,
params: ua.CreateSubscriptionParameters,
results: ua.CreateSubscriptionResult,
) -> None:
) -> Optional[ua.ModifySubscriptionParameters]:
if (
results.RevisedPublishingInterval == params.RequestedPublishingInterval
and results.RevisedLifetimeCount == params.RequestedLifetimeCount
and results.RevisedMaxKeepAliveCount == params.RequestedMaxKeepAliveCount
):
return
return # type: ignore
_logger.warning(
f"Revised values returned differ from subscription values: {results}"
)
Expand Down
2 changes: 1 addition & 1 deletion asyncua/client/ha/ha_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from enum import IntEnum
from functools import partial
from itertools import chain
from sortedcontainers import SortedDict
from sortedcontainers import SortedDict # type: ignore
from asyncua import Node, ua, Client
from asyncua.client.ua_client import UASocketProtocol
from asyncua.ua.uaerrors import BadSessionClosed, BadSessionNotActivated
Expand Down
18 changes: 9 additions & 9 deletions asyncua/client/ha/reconciliator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from enum import Enum
from functools import partial
from typing import TYPE_CHECKING, Dict, Set, Union, List, Optional
from sortedcontainers import SortedDict
from sortedcontainers import SortedDict # type: ignore
from asyncua import ua, Client
from pickle import PicklingError

Expand Down Expand Up @@ -179,8 +179,8 @@ async def update_subscriptions(

def _subs_to_del(
self, url: str, real_map: SubMap, ideal_map: SubMap
) -> List[Optional[asyncio.Task]]:
to_del = []
) -> List[asyncio.Task]:
to_del: List[asyncio.Task] = []
sub_to_del = set(real_map[url]) - set(ideal_map[url])
if sub_to_del:
_logger.info(f"Removing {len(sub_to_del)} subscriptions")
Expand All @@ -195,8 +195,8 @@ def _subs_to_del(

def _subs_to_add(
self, url: str, real_map: SubMap, ideal_map: SubMap
) -> List[Optional[asyncio.Task]]:
to_add = []
) -> List[asyncio.Task]:
to_add: List[asyncio.Task] = []
sub_to_add = set(ideal_map[url]) - set(real_map[url])
if sub_to_add:
_logger.info(f"Adding {len(sub_to_add)} subscriptions")
Expand Down Expand Up @@ -254,8 +254,8 @@ def _nodes_to_add(
client: Client,
vs_real: VirtualSubscription,
vs_ideal: VirtualSubscription,
) -> List[Optional[asyncio.Task]]:
tasks = []
) -> List[asyncio.Task]:
tasks: List[asyncio.Task] = []
real_sub = self.name_to_subscription[url].get(sub_name)
monitoring = vs_real.monitoring
node_to_add = set(vs_ideal.nodes) - set(vs_real.nodes)
Expand Down Expand Up @@ -301,8 +301,8 @@ def _nodes_to_del(
sub_name: str,
vs_real: VirtualSubscription,
vs_ideal: VirtualSubscription,
) -> List[Optional[asyncio.Task]]:
to_del = []
) -> List[asyncio.Task]:
to_del: List[asyncio.Task] = []
node_to_del = set(vs_real.nodes) - set(vs_ideal.nodes)
real_sub = self.name_to_subscription[url].get(sub_name)
if node_to_del:
Expand Down
2 changes: 1 addition & 1 deletion asyncua/client/ha/virtual_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any, Iterable, Optional, Set

from asyncua import ua
from sortedcontainers import SortedDict
from sortedcontainers import SortedDict # type: ignore


TypeSubHandler = Any
Expand Down
4 changes: 2 additions & 2 deletions asyncua/client/ua_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ def __init__(self, timeout: float = 1, security_policy: ua.SecurityPolicy = ua.S
# passing back the processed response to the request so that it can return it.
self._open_secure_channel_exchange = None

def connection_made(self, transport: asyncio.Transport):
def connection_made(self, transport: asyncio.Transport): # type: ignore
self.state = self.OPEN
self.transport = transport

def connection_lost(self, exc: Exception):
def connection_lost(self, exc: Optional[Exception]):
self.logger.info("Socket has closed connection")
self.state = self.CLOSED
self.transport = None
Expand Down
2 changes: 1 addition & 1 deletion asyncua/common/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
try:
from ..crypto.uacrypto import InvalidSignature
except ImportError:
class InvalidSignature(Exception):
class InvalidSignature(Exception): # type: ignore
pass

logger = logging.getLogger('asyncua.uaprotocol')
Expand Down
8 changes: 4 additions & 4 deletions asyncua/common/event_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
Autogenerated code from xml"

Model Uri:http://opcfoundation.org/UA/"
Version:1.04.9"
Publication date:2021-01-21T00:00:00Z"
Version:1.04.10"
Publication date:2021-09-15T00:00:00Z"

File creation Date:2021-04-06 17:23:55.234980"
File creation Date:2022-04-30 13:47:49.823843"
"""
from asyncua import ua
from .events import Event
Expand Down Expand Up @@ -537,7 +537,7 @@ def __init__(self, sourcenode=None, message=None, severity=1):
self.add_property('SuppressedOrShelved', None, ua.VariantType.Boolean)
self.add_property('MaxTimeShelved', None, ua.NodeId(ua.ObjectIds.Duration))
self.add_property('AudibleEnabled', None, ua.VariantType.Boolean)
self.add_property('AudibleSound', None, ua.NodeId(ua.ObjectIds.AudioDataType))
self.add_variable('AudibleSound', None, ua.NodeId(ua.ObjectIds.AudioDataType))
self.add_property('SilenceState/Id', None, ua.VariantType.Boolean)
self.add_property('SilenceState/TransitionTime', None, ua.NodeId(ua.ObjectIds.UtcTime))
self.add_property('SilenceState/TrueState', None, ua.VariantType.LocalizedText)
Expand Down
10 changes: 5 additions & 5 deletions asyncua/common/instantiate_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ async def instantiate(parent, node_type, nodeid: ua.NodeId=None, bname: Union[st
If they exists children of the node type, such as components, variables and
properties are also instantiated
"""
abstract = await is_abstract(node_type)
if abstract:
raise ua.UaError(f"InstantiationError NodeId: {node_type.nodeid} is abstract and cant be instantiated!")

rdesc = await _rdesc_from_node(parent, node_type)
rdesc.TypeDefinition = node_type.nodeid

if rdesc.NodeClass in (ua.NodeClass.DataType, ua.NodeClass.ReferenceType, ua.NodeClass.ObjectType, ua.NodeClass.ReferenceType):
# Only some nodes can be abstract
abstract = await is_abstract(node_type)
if abstract:
raise ua.UaError(f"InstantiationError NodeId: {node_type.nodeid} is abstract and cant be instantiated!")
if nodeid is None:
nodeid = ua.NodeId(NamespaceIndex=idx) # will trigger automatic node generation in namespace idx
if bname is None:
Expand Down
9 changes: 5 additions & 4 deletions asyncua/common/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,13 @@ async def read_value(self):

get_value = read_value # legacy compatibility

async def read_data_value(self):
async def read_data_value(self, raise_on_bad_status=True):
"""
Get value of a node as a DataValue object. Only variables (and properties) have values.
An exception will be generated for other node types.
DataValue contain a variable value as a variant as well as server and source timestamps
"""
return await self.read_attribute(ua.AttributeIds.Value)
return await self.read_attribute(ua.AttributeIds.Value, None, raise_on_bad_status)

async def write_array_dimensions(self, value):
"""
Expand Down Expand Up @@ -286,7 +286,7 @@ async def write_params(self, params):
result = await self.server.write(params)
return result

async def read_attribute(self, attr, indexrange=None):
async def read_attribute(self, attr, indexrange=None, raise_on_bad_status=True):
"""
Read one attribute of a node
attributeid is a member of ua.AttributeIds
Expand All @@ -300,7 +300,8 @@ async def read_attribute(self, attr, indexrange=None):
params = ua.ReadParameters()
params.NodesToRead.append(rv)
result = await self.server.read(params)
result[0].StatusCode.check()
if raise_on_bad_status:
result[0].StatusCode.check()
return result[0]

async def read_attributes(self, attrs):
Expand Down
11 changes: 8 additions & 3 deletions asyncua/common/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ class {self.name}:

"""
for sfield in self.fields:
uatype = f"ua.{sfield.uatype}"
uatype = f"'ua.{sfield.uatype}'"
if sfield.array:
uatype = f"List[{uatype}]"
if uatype == 'List[ua.Char]':
uatype = 'String'
code += f" {sfield.name}:{uatype} = {sfield.value}\n"
print(code)
uavalue = sfield.value
if isinstance(uavalue, str) and uavalue.startswith("ua."):
uavalue = f"field(default_factory=lambda: {uavalue})"
code += f" {sfield.name}:{uatype} = {uavalue}\n"
return code


Expand Down Expand Up @@ -169,6 +171,8 @@ def save_to_file(self, path, register=False):
def _make_registration(self):
code = "\n\n"
for struct in self.model:
if isinstance(struct, EnumType):
continue # No registration required for enums
code += f"ua.register_extension_object('{struct.name}'," \
f" ua.NodeId.from_string('{struct.typeid}'), {struct.name})\n"
return code
Expand All @@ -186,6 +190,7 @@ def _make_header(self, _file):
import uuid
from dataclasses import dataclass, field
from typing import List, Union
from enum import IntEnum

from asyncua import ua
""")
Expand Down
10 changes: 5 additions & 5 deletions asyncua/common/structures104.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import re
import keyword
from typing import Union, List, TYPE_CHECKING, Tuple, Optional, Any
from typing import Union, List, TYPE_CHECKING, Tuple, Optional, Any, Dict
from dataclasses import dataclass, field

from asyncua import ua
Expand Down Expand Up @@ -44,7 +44,7 @@ def new_struct_field(
raise ValueError(f"DataType of a field must be a NodeId, not {dtype} of type {type(dtype)}")
if array:
field.ValueRank = ua.ValueRank.OneOrMoreDimensions
field.ArrayDimensions = [1]
field.ArrayDimensions = [1] # type: ignore
else:
field.ValueRank = ua.ValueRank.Scalar
field.ArrayDimensions = None
Expand Down Expand Up @@ -339,7 +339,7 @@ async def load_custom_struct(node: Node) -> Any:
return env[name]


async def load_data_type_definitions(server: Union["Server", "Client"], base_node: Node = None, overwrite_existing=False) -> None:
async def load_data_type_definitions(server: Union["Server", "Client"], base_node: Node = None, overwrite_existing=False) -> Dict:
"""
Read DataTypeDefition attribute on all Structure and Enumeration defined
on server and generate Python objects in ua namespace to be used to talk with server
Expand All @@ -355,7 +355,7 @@ async def load_data_type_definitions(server: Union["Server", "Client"], base_nod
try:
env = await _generate_object(dts.name, dts.sdef, data_type=dts.data_type)
ua.register_extension_object(dts.name, dts.encoding_id, env[dts.name], dts.data_type)
new_objects[dts.name] = env[dts.name]
new_objects[dts.name] = env[dts.name] # type: ignore
except NotImplementedError:
logger.exception("Structure type %s not implemented", dts.sdef)
return new_objects
Expand Down Expand Up @@ -403,7 +403,7 @@ class {name}({enum_type}):
return code


async def load_enums(server: Union["Server", "Client"], base_node: Node = None, option_set: bool = False) -> None:
async def load_enums(server: Union["Server", "Client"], base_node: Node = None, option_set: bool = False) -> Dict:
if base_node is None:
base_node = server.nodes.enum_data_type
new_enums = {}
Expand Down
8 changes: 4 additions & 4 deletions asyncua/common/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ async def subscribe_events(self,
sourcenode = Node(self.server, sourcenode)
if evfilter is None:
evfilter = await self._create_eventfilter(evtypes)
return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize) # type: ignore

async def subscribe_alarms_and_conditions(self,
sourcenode: Node = ua.ObjectIds.Server,
Expand Down Expand Up @@ -257,7 +257,7 @@ async def subscribe_alarms_and_conditions(self,
conditionIdOperand.TypeDefinitionId = ua.NodeId(ua.ObjectIds.ConditionType)
conditionIdOperand.AttributeId = ua.AttributeIds.NodeId
evfilter.SelectClauses.append(conditionIdOperand)
return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize)
return await self._subscribe(sourcenode, ua.AttributeIds.EventNotifier, evfilter, queuesize=queuesize) # type: ignore

async def _subscribe(self,
nodes: Union[Node, Iterable[Node]],
Expand Down Expand Up @@ -296,7 +296,7 @@ async def _subscribe(self,
# Check and return result for single node (raise `UaStatusCodeError` if subscription failed)
if type(mids[0]) == ua.StatusCode:
mids[0].check()
return mids[0]
return mids[0] # type: ignore

def _make_monitored_item_request(self,
node: Node,
Expand Down Expand Up @@ -468,7 +468,7 @@ async def set_publishing_mode(self, publishing: bool) -> ua.uatypes.StatusCode:
"""
self.logger.info("set_publishing_mode")
params = ua.SetPublishingModeParameters()
params.SubscriptionIds = [self.subscription_id]
params.SubscriptionIds = [self.subscription_id] # type: ignore
params.PublishingEnabled = publishing
result = await self.server.set_publishing_mode(params)
if result[0].is_good():
Expand Down
Loading