diff --git a/.github/release-drafter.yml b/.github/release-drafter.yml new file mode 100644 index 0000000..004002a --- /dev/null +++ b/.github/release-drafter.yml @@ -0,0 +1,22 @@ +name-template: '$NEXT_PATCH_VERSION Release.' +tag-template: '$NEXT_PATCH_VERSION' +categories: + - title: 'Breaking changes' + labels: + - 'breaking' + - title: '🚀 Features' + labels: + - 'feature' + - 'enhancement' + - title: '🐛 Bug Fixes' + labels: + - 'fix' + - 'bugfix' + - 'bug' + - title: '🧰 Maintenance' + label: 'chore' +change-template: '- $TITLE @$AUTHOR (#$NUMBER)' +template: | + ## Changes + + $CHANGES diff --git a/.github/workflows/release-management.yml b/.github/workflows/release-management.yml new file mode 100644 index 0000000..40e9702 --- /dev/null +++ b/.github/workflows/release-management.yml @@ -0,0 +1,16 @@ +name: Release Management + +on: + push: + # branches to consider in the event; optional, defaults to all + branches: + - master + +jobs: + update_draft_release: + runs-on: ubuntu-latest + steps: + # Drafts your next Release notes as Pull Requests are merged into "master" + - uses: release-drafter/release-drafter@v5 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..57ef587 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,45 @@ +name: Build & Tests + +on: + push: + branches: [ master, dev ] + pull_request: + branches: [ master, dev ] + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.7, 3.8, 3.9] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 pytest pytest-asyncio mock + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + pip install -e "." + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide + flake8 . --count --exit-zero --max-complexity=16 --max-line-length=127 --statistics + - name: Test with pytest + run: | + pytest + + markdownlint: + + runs-on: ubuntu-latest + name: Test Markdown + steps: + - name: Run mdl + uses: actionshub/markdownlint@master diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 333d47a..0000000 --- a/.travis.yml +++ /dev/null @@ -1,13 +0,0 @@ -language: python -matrix: - fast_finish: true - include: - - python: "3.7" - env: TOXENV=lint - - python: "3.7" - env: TOXENV=py37 - - python: "3.8" - env: TOXENV=py38 -install: pip install -U setuptools tox coveralls -script: tox -after_success: coveralls diff --git a/README.md b/README.md index 0177472..a4d5b71 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # zigpy-zigate -[![Build Status](https://travis-ci.com/zigpy/zigpy-zigate.svg?branch=master)](https://travis-ci.com/zigpy/zigpy-zigate) +![Build & Tests](https://github.com/zigpy/zigpy-zigate/workflows/Build%20&%20Tests/badge.svg?branch=master) [![Coverage](https://coveralls.io/repos/github/zigpy/zigpy-zigate/badge.svg?branch=master)](https://coveralls.io/github/zigpy/zigpy-zigate?branch=master) **WARNING: EXPERIMENTAL! This project is under development as WIP (work in progress). Developer’s work provided “AS IS”.** @@ -14,29 +14,57 @@ ZiGate is a open source ZigBee adapter hardware that was initially launched on K - https://www.zigate.fr - https://www.kickstarter.com/projects/1361563794/zigate-universal-zigbee-gateway-for-smarthome -## Compatible hardware +## Hardware and firmware compatibility The ZiGate USB adapter communicates via a PL-2303HX USB to Serial Bridge Controller module by Prolific. There's also a Wi-Fi adapter to communicate with ZiGate over network. -Note! ZiGate open source ZigBee adapter hardware requires ZiGate firmware 3.1a or later to work with this zigpy-zigate module. +Note! ZiGate open source ZigBee USB and GPIO adapter hardware requires ZiGate 3.1a firmware or later to work with this zigpy-zigate module, however ZiGate 3.1d firmware or later is recommended as it contains a specific bug-fix related to zigpy. ### Known working Zigbee radio modules - [ZiGate USB-TTL](https://zigate.fr/produit/zigate-ttl/) - [ZiGate USB-DIN](https://zigate.fr/produit/zigate-usb-din/) - [PiZiGate (ZiGate module for Raspberry Pi GPIO)](https://zigate.fr/produit/pizigate-v1-0/) +- [ZiGate Pack WiFi](https://zigate.fr/produit/zigate-pack-wifi-v1-3/) ### Experimental Zigbee radio modules -- [ZiGate Pack WiFi](https://zigate.fr/produit/zigate-pack-wifi-v1-3/) (work in progress) +- [Open Lumi Gateway](https://github.com/openlumi) - [DIY ZiGate WiFi bridge hacked from an Xiaomi Lumi Gateway with modded OpenWRT firmware](https://github.com/zigpy/zigpy-zigate/issues/59) ## Port configuration -- To configure __usb__ ZiGate port, just specify the port, example : `/dev/ttyUSB0` - - Alternatively you could set port to `auto` to enable automatic usb port discovery -- To configure __pizigate__ port, prefix the port with `pizigate:`, example : `pizigate:/dev/serial0` -- To configure __wifi__ ZiGate, specify IP address and port, example : `socket://192.168.1.10:9999` +- To configure __usb__ ZiGate (USB TTL or DIN) port, just specify the port, example : `/dev/ttyUSB0` + - Alternatively you could manually set port to `auto` to enable automatic usb port discovery +- To configure __pizigate__ port, specify the port, example : `/dev/serial0` or `/dev/ttyAMA0` +- To configure __wifi__ ZiGate, manually specify IP address and port, example : `socket://192.168.1.10:9999` -Note! Requires ZiGate firmware 3.1a and later -- https://zigate.fr/tag/firmware/ +__pizigate__ does require some additional adjustements on Raspberry Pi 3/Zero, and 4: +- [Raspberry Pi 3 and Raspberry Pi Zero configuration adjustements](https://zigate.fr/documentation/compatibilite-raspberry-pi-3-et-zero-w/) +- [Raspberry Pi 4 configuration adjustements](https://zigate.fr/documentation/compatibilite-raspberry-pi-4-b/) + +## Flasher (ZiGate Firmware Tool) + +zigpy-zigate has an integrated Python "flasher" tool to flash firmware updates on your ZiGate (NXP Jennic JN5168). + +Thanks to Sander Hoentjen (tjikkun) zigpy-zigate now has an integrated firmware flasher tool! +- [tjikkun original zigate-flasher repo](https://github.com/tjikkun/zigate-flasher) + +### Flasher Usage + +```bash +usage: python3 -m zigpy_zigate.tools.flasher [-h] -p {/dev/ttyUSB0} [-w WRITE] [-s SAVE] [-u] [-d] [--gpio] [--din] + +optional arguments: + -h, --help show this help message and exit + -p {/dev/ttyUSB0}, --serialport {/dev/ttyUSB0} + Serial port, e.g. /dev/ttyUSB0 + -w WRITE, --write WRITE + Firmware bin to flash onto the chip + -s SAVE, --save SAVE File to save the currently loaded firmware to + -u, --upgrade Download and flash the lastest available firmware + -d, --debug Set log level to DEBUG + --gpio Configure GPIO for PiZiGate flash + --din Configure USB for ZiGate DIN flash + +``` ## Testing new releases @@ -71,6 +99,9 @@ Tagged versions are also released via PyPI Documents that layout the serial protocol used for ZiGate serial interface communication can be found here: - https://github.com/fairecasoimeme/ZiGate/tree/master/Protocol +- https://github.com/doudz/zigate +- https://github.com/Neonox31/zigate +- https://github.com/nouknouk/node-zigate ## How to contribute @@ -82,17 +113,32 @@ Some developers might also be interested in receiving donations in the form of h ## Related projects -### Zigpy -[Zigpy](https://github.com/zigpy/zigpy)** is **[Zigbee protocol stack](https://en.wikipedia.org/wiki/Zigbee)** integration project to implement the **[Zigbee Home Automation](https://www.zigbee.org/)** standard as a Python 3 library. Zigbee Home Automation integration with zigpy allows you to connect one of many off-the-shelf Zigbee adapters using one of the available Zigbee radio library modules compatible with zigpy to control Zigbee based devices. There is currently support for controlling Zigbee device types such as binary sensors (e.g., motion and door sensors), sensors (e.g., temperature sensors), lightbulbs, switches, and fans. A working implementation of zigbe exist in **[Home Assistant](https://www.home-assistant.io)** (Python based open source home automation software) as part of its **[ZHA component](https://www.home-assistant.io/components/zha/)** +#### Zigpy +[Zigpy](https://github.com/zigpy/zigpy) is [Zigbee protocol stack](https://en.wikipedia.org/wiki/Zigbee) integration project to implement the [Zigbee Home Automation](https://www.zigbee.org/) standard as a Python 3 library. Zigbee Home Automation integration with zigpy allows you to connect one of many off-the-shelf Zigbee adapters using one of the available Zigbee radio library modules compatible with zigpy to control Zigbee based devices. There is currently support for controlling Zigbee device types such as binary sensors (e.g., motion and door sensors), sensors (e.g., temperature sensors), lightbulbs, switches, and fans. A working implementation of zigbe exist in [Home Assistant](https://www.home-assistant.io) (Python based open source home automation software) as part of its [ZHA component](https://www.home-assistant.io/components/zha/) + +#### ZHA Device Handlers +ZHA deviation handling in Home Assistant relies on the third-party [ZHA Device Handlers](https://github.com/zigpy/zha-device-handlers) project. Zigbee devices that deviate from or do not fully conform to the standard specifications set by the [Zigbee Alliance](https://www.zigbee.org) may require the development of custom [ZHA Device Handlers](https://github.com/zigpy/zha-device-handlers) (ZHA custom quirks handler implementation) to for all their functions to work properly with the ZHA component in Home Assistant. These ZHA Device Handlers for Home Assistant can thus be used to parse custom messages to and from non-compliant Zigbee devices. The custom quirks implementations for zigpy implemented as ZHA Device Handlers for Home Assistant are a similar concept to that of [Hub-connected Device Handlers for the SmartThings platform](https://docs.smartthings.com/en/latest/device-type-developers-guide/) as well as that of [zigbee-herdsman converters as used by Zigbee2mqtt](https://www.zigbee2mqtt.io/how_tos/how_to_support_new_devices.html), meaning they are each virtual representations of a physical device that expose additional functionality that is not provided out-of-the-box by the existing integration between these platforms. + +#### ZHA integration component for Home Assistant +[ZHA integration component for Home Assistant](https://www.home-assistant.io/integrations/zha/) is a reference implementation of the zigpy library as integrated into the core of [Home Assistant](https://www.home-assistant.io) (a Python based open source home automation software). There are also other GUI and non-GUI projects for Home Assistant's ZHA components which builds on or depends on its features and functions to enhance or improve its user-experience, some of those are listed and linked below. + +#### ZHA Custom Radios +[zha-custom-radios](https://github.com/zha-ng/zha-custom-radios) adds support for custom radio modules for zigpy to [[Home Assistant's ZHA (Zigbee Home Automation) integration component]](https://www.home-assistant.io/integrations/zha/). This custom component for Home Assistant allows users to test out new modules for zigpy in Home Assistant's ZHA integration component before they are integrated into zigpy ZHA and also helps developers new zigpy radio modules without having to modify the Home Assistant's source code. + +#### ZHA Custom +[zha_custom](https://github.com/Adminiuga/zha_custom) is a custom component package for Home Assistant (with its ZHA component for zigpy integration) that acts as zigpy commands service wrapper, when installed it allows you to enter custom commands via to zigy to example change advanced configuration and settings that are not available in the UI. + +#### ZHA Map +[zha-map](https://github.com/zha-ng/zha-map) for Home Assistant's ZHA component can build a Zigbee network topology map. -### ZHA Device Handlers -ZHA deviation handling in Home Assistant relies on on the third-party [ZHA Device Handlers](https://github.com/dmulcahey/zha-device-handlers) project. Zigbee devices that deviate from or do not fully conform to the standard specifications set by the [Zigbee Alliance](https://www.zigbee.org) may require the development of custom [ZHA Device Handlers](https://github.com/dmulcahey/zha-device-handlers) (ZHA custom quirks handler implementation) to for all their functions to work properly with the ZHA component in Home Assistant. These ZHA Device Handlers for Home Assistant can thus be used to parse custom messages to and from non-compliant Zigbee devices. The custom quirks implementations for zigpy implemented as ZHA Device Handlers for Home Assistant are a similar concept to that of [Hub-connected Device Handlers for the SmartThings Classics platform](https://docs.smartthings.com/en/latest/device-type-developers-guide/) as well as that of [Zigbee-Shepherd Converters as used by Zigbee2mqtt](https://www.zigbee2mqtt.io/how_tos/how_to_support_new_devices.html), meaning they are each virtual representations of a physical device that expose additional functionality that is not provided out-of-the-box by the existing integration between these platforms. +#### ZHA Network Visualization Card +[zha-network-visualization-card](https://github.com/dmulcahey/zha-network-visualization-card) is a custom Lovelace element for Home Assistant which visualize the Zigbee network for the ZHA component. -### ZHA Map -Home Assistant can build ZHA network topology map using the [zha-map](https://github.com/zha-ng/zha-map) project. +#### ZHA Network Card +[zha-network-card](https://github.com/dmulcahey/zha-network-card) is a custom Lovelace card for Home Assistant that displays ZHA component Zigbee network and device information in Home Assistant -### zha-network-visualization-card -[zha-network-visualization-card](https://github.com/dmulcahey/zha-network-visualization-card) is a custom Lovelace element for visualizing the ZHA Zigbee network in Home Assistant. +#### Zigzag +[Zigzag](https://github.com/Samantha-uk/zigzag) is an custom card/panel for [Home Assistant](https://www.home-assistant.io/) that displays a graphical layout of Zigbee devices and the connections between them. Zigzag can be installed as a panel or a custom card and relies on the data provided by the [zha-map](https://github.com/zha-ng/zha-map) integration commponent. -### ZHA Network Card -[zha-network-card](https://github.com/dmulcahey/zha-network-card) is a custom Lovelace card that displays ZHA network and device information in Home Assistant +#### ZHA Device Exporter +[zha-device-exporter](https://github.com/dmulcahey/zha-device-exporter) is a custom component for Home Assistant to allow the ZHA component to export lists of Zigbee devices. diff --git a/setup.py b/setup.py index 1ece6a4..5b426c2 100644 --- a/setup.py +++ b/setup.py @@ -50,6 +50,7 @@ def is_raspberry_pi(raise_on_errors=False): requires = [ 'pyserial-asyncio', + 'pyusb', 'zigpy>=0.22.2', ] @@ -66,7 +67,7 @@ def is_raspberry_pi(raise_on_errors=False): description="A library which communicates with ZiGate radios for zigpy", long_description=long_description, long_description_content_type="text/markdown", - url="http://github.com/doudz/zigpy-zigate", + url="http://github.com/zigpy/zigpy-zigate", author="SĂ©bastien RAMAGE", author_email="sebatien.ramage@gmail.com", license="GPL-3.0", @@ -74,5 +75,7 @@ def is_raspberry_pi(raise_on_errors=False): install_requires=requires, tests_require=[ 'pytest', + 'pytest-asyncio', + 'mock' ], ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/async_mock.py b/tests/async_mock.py new file mode 100644 index 0000000..7d6b494 --- /dev/null +++ b/tests/async_mock.py @@ -0,0 +1,7 @@ +"""Mock utilities that are async aware.""" +import sys + +if sys.version_info[:2] < (3, 8): + from mock import * # noqa +else: + from unittest.mock import * # noqa diff --git a/tests/test_api.py b/tests/test_api.py index 7cdeec6..9b65be5 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -3,7 +3,7 @@ import pytest import serial import serial_asyncio -from asynctest import CoroutineMock, mock +from .async_mock import AsyncMock, MagicMock, patch, sentinel import zigpy_zigate.config as config import zigpy_zigate.uart @@ -15,13 +15,13 @@ @pytest.fixture def api(): api = zigate_api.ZiGate(DEVICE_CONFIG) - api._uart = mock.MagicMock() + api._uart = MagicMock() return api def test_set_application(api): - api.set_application(mock.sentinel.app) - assert api._app == mock.sentinel.app + api.set_application(sentinel.app) + assert api._app == sentinel.app @pytest.mark.asyncio @@ -39,7 +39,7 @@ async def mock_conn(loop, protocol_factory, **kwargs): def test_close(api): - api._uart.close = mock.MagicMock() + api._uart.close = MagicMock() uart = api._uart api.close() assert uart.close.call_count == 1 @@ -47,18 +47,18 @@ def test_close(api): @pytest.mark.asyncio -@mock.patch.object(zigpy_zigate.uart, "connect") +@patch.object(zigpy_zigate.uart, "connect") async def test_api_new(conn_mck): """Test new class method.""" - api = await zigate_api.ZiGate.new(DEVICE_CONFIG, mock.sentinel.application) + api = await zigate_api.ZiGate.new(DEVICE_CONFIG, sentinel.application) assert isinstance(api, zigate_api.ZiGate) assert conn_mck.call_count == 1 assert conn_mck.await_count == 1 @pytest.mark.asyncio -@mock.patch.object(zigate_api.ZiGate, "set_raw_mode", new_callable=CoroutineMock) -@mock.patch.object(zigpy_zigate.uart, "connect") +@patch.object(zigate_api.ZiGate, "set_raw_mode", new_callable=AsyncMock) +@patch.object(zigpy_zigate.uart, "connect") async def test_probe_success(mock_connect, mock_raw_mode): """Test device probing.""" @@ -72,8 +72,8 @@ async def test_probe_success(mock_connect, mock_raw_mode): @pytest.mark.asyncio -@mock.patch.object(zigate_api.ZiGate, "set_raw_mode", side_effect=asyncio.TimeoutError) -@mock.patch.object(zigpy_zigate.uart, "connect") +@patch.object(zigate_api.ZiGate, "set_raw_mode", side_effect=asyncio.TimeoutError) +@patch.object(zigpy_zigate.uart, "connect") @pytest.mark.parametrize( "exception", (asyncio.TimeoutError, serial.SerialException, zigate_api.NoResponseError), @@ -91,3 +91,14 @@ async def test_probe_fail(mock_connect, mock_raw_mode, exception): assert mock_connect.call_args[0][0] == DEVICE_CONFIG assert mock_raw_mode.call_count == 1 assert mock_connect.return_value.close.call_count == 1 + + +@pytest.mark.asyncio +@patch.object(zigate_api.ZiGate, "_command", side_effect=asyncio.TimeoutError) +async def test_api_command(mock_command, api): + """Test command method.""" + try: + await api.set_raw_mode() + except zigate_api.NoResponseError: + pass + assert mock_command.call_count == 2 diff --git a/tests/test_application.py b/tests/test_application.py index 90ae337..bfb8c11 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -13,11 +13,13 @@ config.CONF_DATABASE: None, } ) - +FAKE_FIRMWARE_VERSION = '3.1z' @pytest.fixture def app(): - return zigpy_zigate.zigbee.application.ControllerApplication(APP_CONFIG) + a = zigpy_zigate.zigbee.application.ControllerApplication(APP_CONFIG) + a.version = FAKE_FIRMWARE_VERSION + return a def test_zigpy_ieee(app): @@ -30,3 +32,8 @@ def test_zigpy_ieee(app): dst_addr = app.get_dst_address(cluster) assert dst_addr.serialize() == b"\x03" + data[::-1] + b"\x01" + + +def test_model_detection(app): + device = zigpy_zigate.zigbee.application.ZiGateDevice(app, 0, 0) + assert device.model == 'ZiGate USB-TTL {}'.format(FAKE_FIRMWARE_VERSION) diff --git a/tox.ini b/tox.ini deleted file mode 100644 index 8c390df..0000000 --- a/tox.ini +++ /dev/null @@ -1,28 +0,0 @@ -# Tox (http://tox.testrun.org/) is a tool for running tests -# in multiple virtualenvs. This configuration file will run the -# test suite on all supported python versions. To use it, "pip install tox" -# and then run "tox" from this directory. - -[tox] -envlist = py37, py38, lint -skip_missing_interpreters = True - -[testenv] -setenv = PYTHONPATH = {toxinidir} -install_command = pip install {opts} {packages} -commands = py.test --cov --cov-report= -deps = - asynctest - coveralls - pytest - pytest-cov - pytest-asyncio - -[testenv:lint] -basepython = python3 -deps = flake8 -commands = flake8 - -[flake8] -ignore = E501 -max-complexity = 16 diff --git a/zigpy_zigate/__init__.py b/zigpy_zigate/__init__.py index b44f76a..509ed44 100644 --- a/zigpy_zigate/__init__.py +++ b/zigpy_zigate/__init__.py @@ -1,5 +1,5 @@ MAJOR_VERSION = 0 -MINOR_VERSION = 6 -PATCH_VERSION = '2' +MINOR_VERSION = 7 +PATCH_VERSION = '0' __short_version__ = '{}.{}'.format(MAJOR_VERSION, MINOR_VERSION) __version__ = '{}.{}'.format(__short_version__, PATCH_VERSION) diff --git a/zigpy_zigate/api.py b/zigpy_zigate/api.py index e092100..c2e04ce 100644 --- a/zigpy_zigate/api.py +++ b/zigpy_zigate/api.py @@ -2,6 +2,8 @@ import binascii import functools import logging +import enum +import datetime from typing import Any, Dict import serial @@ -17,28 +19,60 @@ COMMAND_TIMEOUT = 1.5 RESPONSES = { - 0x004D: (t.NWK, t.EUI64, t.uint8_t), + 0x004D: (t.NWK, t.EUI64, t.uint8_t, t.uint8_t), 0x8000: (t.uint8_t, t.uint8_t, t.uint16_t, t.Bytes), 0x8002: (t.uint8_t, t.uint16_t, t.uint16_t, t.uint8_t, t.uint8_t, t.Address, t.Address, t.Bytes), 0x8009: (t.NWK, t.EUI64, t.uint16_t, t.uint64_t, t.uint8_t), 0x8010: (t.uint16_t, t.uint16_t), + 0x8011: (t.uint8_t, t.NWK, t.uint8_t, t.uint16_t, t.uint8_t), + 0x8017: (t.uint32_t,), 0x8024: (t.uint8_t, t.NWK, t.EUI64, t.uint8_t), + 0x8035: (t.uint8_t, t.uint32_t), 0x8048: (t.EUI64, t.uint8_t), 0x8701: (t.uint8_t, t.uint8_t), 0x8702: (t.uint8_t, t.uint8_t, t.uint8_t, t.Address, t.uint8_t), + 0x8806: (t.uint8_t,), } COMMANDS = { 0x0002: (t.uint8_t,), + 0x0016: (t.uint32_t,), + 0x0018: (t.uint8_t,), + 0x0019: (t.uint8_t,), 0x0020: (t.uint64_t,), 0x0021: (t.uint32_t,), 0x0026: (t.EUI64, t.EUI64), 0x0049: (t.NWK, t.uint8_t, t.uint8_t), 0x0530: (t.uint8_t, t.NWK, t.uint8_t, t.uint8_t, t.uint16_t, t.uint16_t, t.uint8_t, t.uint8_t, t.LBytes), + 0x0806: (t.uint8_t,), } +class AutoEnum(enum.IntEnum): + def _generate_next_value_(name, start, count, last_values): + return count + + +class PDM_EVENT(AutoEnum): + E_PDM_SYSTEM_EVENT_WEAR_COUNT_TRIGGER_VALUE_REACHED = enum.auto() + E_PDM_SYSTEM_EVENT_DESCRIPTOR_SAVE_FAILED = enum.auto() + E_PDM_SYSTEM_EVENT_PDM_NOT_ENOUGH_SPACE = enum.auto() + E_PDM_SYSTEM_EVENT_LARGEST_RECORD_FULL_SAVE_NO_LONGER_POSSIBLE = enum.auto() + E_PDM_SYSTEM_EVENT_SEGMENT_DATA_CHECKSUM_FAIL = enum.auto() + E_PDM_SYSTEM_EVENT_SEGMENT_SAVE_OK = enum.auto() + E_PDM_SYSTEM_EVENT_EEPROM_SEGMENT_HEADER_REPAIRED = enum.auto() + E_PDM_SYSTEM_EVENT_SYSTEM_INTERNAL_BUFFER_WEAR_COUNT_SWAP = enum.auto() + E_PDM_SYSTEM_EVENT_SYSTEM_DUPLICATE_FILE_SEGMENT_DETECTED = enum.auto() + E_PDM_SYSTEM_EVENT_SYSTEM_ERROR = enum.auto() + E_PDM_SYSTEM_EVENT_SEGMENT_PREWRITE = enum.auto() + E_PDM_SYSTEM_EVENT_SEGMENT_POSTWRITE = enum.auto() + E_PDM_SYSTEM_EVENT_SEQUENCE_DUPLICATE_DETECTED = enum.auto() + E_PDM_SYSTEM_EVENT_SEQUENCE_VERIFY_FAIL = enum.auto() + E_PDM_SYSTEM_EVENT_PDM_SMART_SAVE = enum.auto() + E_PDM_SYSTEM_EVENT_PDM_FULL_SAVE = enum.auto() + + class NoResponseError(zigpy.exceptions.APIException): pass @@ -50,6 +84,7 @@ def __init__(self, device_config: Dict[str, Any]): self._uart = None self._awaiting = {} self._status_awaiting = {} + self._lock = asyncio.Lock() self.network_state = None @@ -76,7 +111,7 @@ def data_received(self, cmd, data, lqi): LOGGER.debug("data received %s %s LQI:%s", hex(cmd), binascii.hexlify(data), lqi) if cmd not in RESPONSES: - LOGGER.error('Received unhandled response %s', hex(cmd)) + LOGGER.error('Received unhandled response 0x%04x', cmd) return data, rest = t.deserialize(data, RESPONSES[cmd]) if cmd == 0x8000: @@ -89,19 +124,28 @@ def data_received(self, cmd, data, lqi): self.handle_callback(cmd, data, lqi) async def command(self, cmd, data=b'', wait_response=None, wait_status=True): - try: - return await asyncio.wait_for( - self._command(cmd, data, wait_response, wait_status), - timeout=COMMAND_TIMEOUT - ) - except asyncio.TimeoutError: - LOGGER.warning("No response to command 0x{:04x}".format(cmd)) - raise NoResponseError + tries = 2 + while tries > 0: + tries -= 1 + await self._lock.acquire() + try: + return await asyncio.wait_for( + self._command(cmd, data, wait_response, wait_status), + timeout=COMMAND_TIMEOUT + ) + except asyncio.TimeoutError: + LOGGER.warning("No response to command 0x%04x", cmd) + if tries > 0: + LOGGER.warning("Retry command 0x%04x", cmd) + else: + raise NoResponseError + finally: + self._lock.release() def _command(self, cmd, data=b'', wait_response=None, wait_status=True): self._uart.send(cmd, data) - fut = asyncio.Future() if wait_status: + fut = asyncio.Future() self._status_awaiting[cmd] = fut if wait_response: fut = asyncio.Future() @@ -112,6 +156,12 @@ def _command(self, cmd, data=b'', wait_response=None, wait_status=True): async def version(self): return await self.command(0x0010, wait_response=0x8010) + async def version_str(self): + version, lqi = await self.version() + version = '{:x}'.format(version[1]) + version = '{}.{}'.format(version[0], version[1:]) + return version + async def get_network_state(self): return await self.command(0x0009, wait_response=0x8009) @@ -122,6 +172,41 @@ async def set_raw_mode(self, enable=True): async def reset(self): self._command(0x0011, wait_status=False) + async def erase_persistent_data(self): + self._command(0x0012, wait_status=False) + + async def set_time(self, dt=None): + """ set internal time + if timestamp is None, now is used + """ + dt = dt or datetime.datetime.now() + timestamp = int((dt - datetime.datetime(2000, 1, 1)).total_seconds()) + data = t.serialize([timestamp], COMMANDS[0x0016]) + self._command(0x0016, data) + + async def get_time_server(self): + timestamp, lqi = await self._command(0x0017, wait_response=0x8017) + dt = datetime.datetime(2000, 1, 1) + datetime.timedelta(seconds=timestamp[0]) + return dt + + async def set_led(self, enable=True): + data = t.serialize([enable], COMMANDS[0x0018]) + await self.command(0x0018, data) + + async def set_certification(self, typ='CE'): + cert = {'CE': 1, 'FCC': 2}[typ] + data = t.serialize([cert], COMMANDS[0x0019]) + await self.command(0x0019, data) + + async def set_tx_power(self, power=63): + if power > 63: + power = 63 + if power < 0: + power = 0 + data = t.serialize([power], COMMANDS[0x0806]) + power, lqi = await self.command(0x0806, data, wait_response=0x8806) + return power[0] + async def set_channel(self, channels=None): channels = channels or [11, 14, 15, 19, 20, 24, 25, 26] if not isinstance(channels, list): @@ -188,5 +273,11 @@ async def probe(cls, device_config: Dict[str, Any]) -> bool: async def _probe(self) -> None: """Open port and try sending a command""" + try: + device = next(serial.tools.list_ports.grep(self._config[zigpy_zigate.config.CONF_DEVICE_PATH])) + if device.description == 'ZiGate': + return + except StopIteration: + pass await self.connect() await self.set_raw_mode() diff --git a/zigpy_zigate/common.py b/zigpy_zigate/common.py new file mode 100644 index 0000000..30c3703 --- /dev/null +++ b/zigpy_zigate/common.py @@ -0,0 +1,135 @@ +import re +import serial.tools.list_ports +import serial +import usb +import logging +import asyncio + +LOGGER = logging.getLogger(__name__) + + +def discover_port(): + """ discover zigate port """ + devices = list(serial.tools.list_ports.grep('ZiGate')) + if devices: + port = devices[0].device + LOGGER.info('ZiGate found at %s', port) + else: + devices = list(serial.tools.list_ports.grep('067b:2303|CP2102')) + if devices: + port = devices[0].device + LOGGER.info('ZiGate probably found at %s', port) + else: + LOGGER.error('Unable to find ZiGate using auto mode') + raise serial.SerialException("Unable to find Zigate using auto mode") + return port + + +def is_pizigate(port): + """ detect pizigate """ + # Suppose pizigate on /dev/ttyAMAx or /dev/serialx + return re.match(r"/dev/(tty(S|AMA)|serial)\d+", port) is not None + + +def is_zigate_din(port): + """ detect zigate din """ + if re.match(r"/dev/ttyUSB\d+", port): + try: + device = next(serial.tools.list_ports.grep(port)) + # Suppose zigate din /dev/ttyUSBx + return device.description == 'ZiGate' and device.manufacturer == 'FTDI' + except StopIteration: + pass + return False + + +def is_zigate_wifi(port): + """ detect zigate din """ + return port.startswith('socket://') + + +async def set_pizigate_running_mode(): + try: + import RPi.GPIO as GPIO + LOGGER.info('Put PiZiGate in running mode') + GPIO.setmode(GPIO.BCM) + GPIO.setup(17, GPIO.OUT) # GPIO0 + GPIO.setup(27, GPIO.OUT) # GPIO2 + GPIO.output(27, GPIO.HIGH) + await asyncio.sleep(0.5) + GPIO.output(17, GPIO.LOW) + await asyncio.sleep(0.5) + GPIO.output(17, GPIO.HIGH) + await asyncio.sleep(0.5) + except Exception as e: + LOGGER.error('Unable to set PiZiGate GPIO, please check configuration') + LOGGER.error(str(e)) + + +async def set_pizigate_flashing_mode(): + try: + import RPi.GPIO as GPIO + LOGGER.info('Put PiZiGate in flashing mode') + GPIO.setmode(GPIO.BCM) + GPIO.setup(17, GPIO.OUT) # GPIO0 + GPIO.setup(27, GPIO.OUT) # GPIO2 + GPIO.output(27, GPIO.LOW) + await asyncio.sleep(0.5) + GPIO.output(17, GPIO.LOW) + await asyncio.sleep(0.5) + GPIO.output(17, GPIO.HIGH) + await asyncio.sleep(0.5) + except Exception as e: + LOGGER.error('Unable to set PiZiGate GPIO, please check configuration') + LOGGER.error(str(e)) + + +def ftdi_set_bitmode(dev, bitmask): + ''' + Set mode for ZiGate DIN module + ''' + BITMODE_CBUS = 0x20 + SIO_SET_BITMODE_REQUEST = 0x0b + bmRequestType = usb.util.build_request_type(usb.util.CTRL_OUT, + usb.util.CTRL_TYPE_VENDOR, + usb.util.CTRL_RECIPIENT_DEVICE) + wValue = bitmask | (BITMODE_CBUS << BITMODE_CBUS) + dev.ctrl_transfer(bmRequestType, SIO_SET_BITMODE_REQUEST, wValue) + + +async def set_zigatedin_running_mode(): + try: + dev = usb.core.find(idVendor=0x0403, idProduct=0x6001) + if not dev: + LOGGER.error('ZiGate DIN not found.') + return + LOGGER.info('Put ZiGate DIN in running mode') + ftdi_set_bitmode(dev, 0xC8) + await asyncio.sleep(0.5) + ftdi_set_bitmode(dev, 0xCC) + await asyncio.sleep(0.5) + except Exception as e: + LOGGER.error('Unable to set FTDI bitmode, please check configuration') + LOGGER.error(str(e)) + + +async def set_zigatedin_flashing_mode(): + try: + dev = usb.core.find(idVendor=0x0403, idProduct=0x6001) + if not dev: + LOGGER.error('ZiGate DIN not found.') + return + LOGGER.info('Put ZiGate DIN in flashing mode') + ftdi_set_bitmode(dev, 0x00) + await asyncio.sleep(0.5) + ftdi_set_bitmode(dev, 0xCC) + await asyncio.sleep(0.5) + ftdi_set_bitmode(dev, 0xC0) + await asyncio.sleep(0.5) + ftdi_set_bitmode(dev, 0xC4) + await asyncio.sleep(0.5) + ftdi_set_bitmode(dev, 0xCC) + await asyncio.sleep(0.5) + except Exception as e: + LOGGER.error('Unable to set FTDI bitmode, please check configuration') + LOGGER.error(str(e)) diff --git a/zigpy_zigate/tools/__init__.py b/zigpy_zigate/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/zigpy_zigate/tools/cli.py b/zigpy_zigate/tools/cli.py new file mode 100644 index 0000000..d1ec18a --- /dev/null +++ b/zigpy_zigate/tools/cli.py @@ -0,0 +1,56 @@ +""" + Simple CLI ZiGate tool +""" + +import argparse +import asyncio +import logging +from zigpy_zigate.api import ZiGate +import zigpy_zigate.config + + +async def main(): + logging.basicConfig(level=logging.INFO) + parser = argparse.ArgumentParser() + parser.add_argument("command", help="Command to start", + choices=["version", "reset", "erase_persistent", + "set_time", "get_time", "set_led", "set_certification", "set_tx_power"]) + parser.add_argument("-p", "--port", help="Port", default='auto') + parser.add_argument("-d", "--debug", help="Debug log", action='store_true') + parser.add_argument("-v", "--value", help="Set command's value") + args = parser.parse_args() + print('Port set to', args.port) + if args.debug: + logging.root.setLevel(logging.DEBUG) + device_config = {zigpy_zigate.config.CONF_DEVICE_PATH: args.port} + api = ZiGate(device_config) + await api.connect() + if args.command == 'version': + print('Firmware version', await api.version_str()) + elif args.command == 'reset': + await api.reset() + print('ZiGate reseted') + elif args.command == 'erase_persistent': + await api.erase_persistent_data() + print('ZiGate pesistent date erased') + elif args.command == 'set_time': + await api.set_time() + print('ZiGate internal time server set to current time') + elif args.command == 'get_time': + print('ZiGate internal time server is', await api.get_time_server()) + elif args.command == 'set_led': + enable = int(args.value or 1) + print('Set ZiGate led to', enable) + await api.set_led(enable) + elif args.command == 'set_certification': + _type = args.value or 'CE' + print('Set ZiGate Certification to', _type) + await api.set_certification(_type) + elif args.command == 'set_tx_power': + power = int(args.value or 63) + print('Set ZiGate TX Power to', power) + print('Tx power set to', await api.set_tx_power(power)) + api.close() + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/zigpy_zigate/tools/firmware.py b/zigpy_zigate/tools/firmware.py new file mode 100644 index 0000000..e03d445 --- /dev/null +++ b/zigpy_zigate/tools/firmware.py @@ -0,0 +1,64 @@ +import urllib.request +import logging +import os +import argparse +import json +from collections import OrderedDict + +URL = 'https://api.github.com/repos/fairecasoimeme/ZiGate/releases' +LOGGER = logging.getLogger("ZiGate Firmware") + + +def get_releases(): + LOGGER.info('Searching for ZiGate firmware') + releases = OrderedDict() + r = urllib.request.urlopen(URL) + if r.status == 200: + for release in json.loads(r.read()): + for asset in release['assets']: + if asset['name'].endswith('.bin'): + LOGGER.info('Found %s', asset['name']) + releases[asset['name']] = asset['browser_download_url'] + return releases + + +def download(url, dest='/tmp'): + filename = url.rsplit('/', 1)[1] + LOGGER.info('Downloading %s to %s', url, dest) + r = urllib.request.urlopen(url) + if r.status == 200: + filename = os.path.join(dest, filename) + with open(filename, 'wb') as fp: + fp.write(r.read()) + LOGGER.info('Done') + return filename + else: + LOGGER.error('Error downloading %s %s', r.status, r.reason) + + +def download_latest(dest='/tmp'): + LOGGER.info('Download latest firmware') + releases = get_releases() + if releases: + latest = list(releases.keys())[0] + LOGGER.info('Latest is %s', latest) + return download(releases[latest], dest) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + parser = argparse.ArgumentParser() + parser.add_argument("command", help="Command to start", choices=["list", "download", "download_latest"]) + parser.add_argument('--url', help="Download URL") + parser.add_argument('--dest', help="Download folder, default to /tmp", default='/tmp') + args = parser.parse_args() + if args.command == 'list': + for k, v in get_releases().items(): + print(k, v) + elif args.command == 'download': + if not args.url: + LOGGER.error('You have to give a URL to download using --url') + else: + download(args.url, args.dest) + elif args.command == 'download_latest': + download_latest(args.dest) diff --git a/zigpy_zigate/tools/flasher.py b/zigpy_zigate/tools/flasher.py new file mode 100644 index 0000000..1569fe1 --- /dev/null +++ b/zigpy_zigate/tools/flasher.py @@ -0,0 +1,527 @@ +#!/usr/bin/python3 +# +# Copyright (c) 2018 SĂ©bastien RAMAGE +# +# For the full copyright and license information, please view the LICENSE +# file that was distributed with this source code. +# +# Thanks to Sander Hoentjen (tjikkun) we now have a flasher ! +# https://github.com/tjikkun/zigate-flasher + +import argparse +import functools +import itertools +import logging +import struct +from operator import xor +import datetime +from .firmware import download_latest +from zigpy_zigate import common as c +import time +import serial +from serial.tools.list_ports import comports +try: + import RPi.GPIO as GPIO +except Exception: + # Fake GPIO + class GPIO: + def fake(self, *args, **kwargs): + pass + + def __getattr__(self, *args, **kwargs): + return self.fake + GPIO = GPIO() +import usb + + +LOGGER = logging.getLogger(__name__) +_responses = {} + +ZIGATE_CHIP_ID = 0x10408686 +ZIGATE_BINARY_VERSION = bytes.fromhex('07030008') +ZIGATE_FLASH_START = 0x00000000 +ZIGATE_FLASH_END = 0x00040000 + + +class Command: + + def __init__(self, type_, fmt=None, raw=False): + assert not (raw and fmt), 'Raw commands cannot use built-in struct formatting' + LOGGER.debug('Command {} {} {}'.format(type_, fmt, raw)) + self.type = type_ + self.raw = raw + if fmt: + self.struct = struct.Struct(fmt) + else: + self.struct = None + + def __call__(self, func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + rv = func(*args, **kwargs) + + if self.struct: + try: + data = self.struct.pack(*rv) + except TypeError: + data = self.struct.pack(rv) + elif self.raw: + data = rv + else: + data = bytearray() + + return prepare(self.type, data) + + return wrapper + + +class Response: + + def __init__(self, type_, data, chksum): + LOGGER.debug('Response {} {} {}'.format(type_, data, chksum)) + self.type = type_ + self.data = data[1:] + self.chksum = chksum + self.status = data[0] + + @property + def ok(self): + return self.status == 0 + + def __str__(self): + return 'Response(type=0x%02x, data=0x%s, checksum=0x%02x)' % (self.type, + self.data.hex(), + self.chksum) + + +def register(type_): + assert type_ not in _responses, 'Duplicate response type 0x%02x' % type_ + + def decorator(func): + _responses[type_] = func + return func + + return decorator + + +def prepare(type_, data): + length = len(data) + 2 + + checksum = functools.reduce(xor, + itertools.chain(type_.to_bytes(2, 'big'), + length.to_bytes(2, 'big'), + data), 0) + + message = struct.pack('!BB%dsB' % len(data), length, type_, data, checksum) + # print('Prepared command 0x%s' % message.hex()) + return message + + +def read_response(ser): + length = ser.read() + length = int.from_bytes(length, 'big') + LOGGER.debug('read_response length {}'.format(length)) + answer = ser.read(length) + LOGGER.debug('read_response answer {}'.format(answer)) + return _unpack_raw_message(length, answer) + # type_, data, chksum = struct.unpack('!B%dsB' % (length - 2), answer) + # return {'type': type_, 'data': data, 'chksum': chksum} + + +def _unpack_raw_message(length, decoded): + LOGGER.debug('unpack raw message {} {}'.format(length, decoded)) + if len(decoded) != length or length < 2: + LOGGER.exception("Unpack failed, length: %d, msg %s" % (length, decoded.hex())) + return + type_, data, chksum = \ + struct.unpack('!B%dsB' % (length - 2), decoded) + return _responses.get(type_, Response)(type_, data, chksum) + + +@Command(0x07) +def req_flash_erase(): + pass + + +@Command(0x09, raw=True) +def req_flash_write(addr, data): + msg = struct.pack(' flash_end: + read_bytes = flash_end - cur + ser.write(req_flash_read(cur, read_bytes)) + res = read_response(ser) + if not res or not res.ok: + print('Reading flash failed') + raise SystemExit(1) + if cur == 0: + (flash_end,) = struct.unpack('>L', res.data[0x20:0x24]) + fd.write(res.data) + printProgressBar(cur, flash_end, 'Reading') + cur += read_bytes + printProgressBar(flash_end, flash_end, 'Reading') + LOGGER.info('Backup firmware done') + + +def write_file_to_flash(ser, filename): + LOGGER.info('Writing new firmware from %s', filename) + with open(filename, 'rb') as fd: + ser.write(req_flash_erase()) + res = read_response(ser) + if not res or not res.ok: + print('Erasing flash failed') + raise SystemExit(1) + + # flash_start = cur = ZIGATE_FLASH_START + cur = ZIGATE_FLASH_START + flash_end = ZIGATE_FLASH_END + + bin_ver = fd.read(4) + if bin_ver != ZIGATE_BINARY_VERSION: + print('Not a valid image for Zigate') + raise SystemExit(1) + read_bytes = 128 + while cur < flash_end: + data = fd.read(read_bytes) + if not data: + break + ser.write(req_flash_write(cur, data)) + res = read_response(ser) + if not res.ok: + print('writing failed at 0x%08x, status: 0x%x, data: %s' % (cur, res.status, data.hex())) + raise SystemExit(1) + printProgressBar(cur, flash_end, 'Writing') + cur += read_bytes + printProgressBar(flash_end, flash_end, 'Writing') + LOGGER.info('Writing new firmware done') + + +def erase_EEPROM(ser, pdm_only=False): + ser.timeout = 10 # increase timeout because official NXP programmer do it + ser.write(req_eeprom_erase(pdm_only)) + res = read_response(ser) + if not res or not res.ok: + print('Erasing EEPROM failed') + raise SystemExit(1) + + +def flash(serialport='auto', write=None, save=None, erase=False, pdm_only=False): + """ + Read or write firmware + """ + if serialport == 'auto': + serialport = c.discover_port() + try: + ser = serial.Serial(serialport, 38400, timeout=5) + except serial.SerialException: + LOGGER.exception("Could not open serial device %s", serialport) + return + + change_baudrate(ser, 115200) + check_chip_id(ser) + flash_type = get_flash_type(ser) + mac_address = get_mac(ser) + LOGGER.info('Found MAC-address: %s', mac_address) + if write or save or erase: + select_flash(ser, flash_type) + + if save: + write_flash_to_file(ser, save) + + if write: + write_file_to_flash(ser, write) + + if erase: + erase_EEPROM(ser, pdm_only) + change_baudrate(ser, 38400) + ser.close() + + +def upgrade_firmware(port): + backup_filename = 'zigate_backup_{:%Y%m%d%H%M%S}.bin'.format(datetime.datetime.now()) + flash(port, save=backup_filename) + print('ZiGate backup created {}'.format(backup_filename)) + firmware_path = download_latest() + print('Firmware downloaded', firmware_path) + flash(port, write=firmware_path) + print('ZiGate flashed with {}'.format(firmware_path)) + + +def ftdi_set_bitmode(dev, bitmask): + ''' + Set mode for ZiGate DIN module + ''' + BITMODE_CBUS = 0x20 + SIO_SET_BITMODE_REQUEST = 0x0b + bmRequestType = usb.util.build_request_type(usb.util.CTRL_OUT, + usb.util.CTRL_TYPE_VENDOR, + usb.util.CTRL_RECIPIENT_DEVICE) + wValue = bitmask | (BITMODE_CBUS << BITMODE_CBUS) + dev.ctrl_transfer(bmRequestType, SIO_SET_BITMODE_REQUEST, wValue) + + +def main(): + ports_available = [port for (port, _, _) in sorted(comports())] + parser = argparse.ArgumentParser() + parser.add_argument('-p', '--serialport', choices=ports_available, + help='Serial port, e.g. /dev/ttyUSB0', required=True) + parser.add_argument('-w', '--write', help='Firmware bin to flash onto the chip') + parser.add_argument('-s', '--save', help='File to save the currently loaded firmware to') + parser.add_argument('-u', '--upgrade', help='Download and flash the lastest available firmware', + action='store_true', default=False) +# parser.add_argument('-e', '--erase', help='Erase EEPROM', action='store_true') +# parser.add_argument('--pdm-only', help='Erase PDM only, use it with --erase', action='store_true') + parser.add_argument('-d', '--debug', help='Set log level to DEBUG', action='store_true') + parser.add_argument('--gpio', help='Configure GPIO for PiZiGate flash', action='store_true', default=False) + parser.add_argument('--din', help='Configure USB for ZiGate DIN flash', action='store_true', default=False) + args = parser.parse_args() + LOGGER.setLevel(logging.INFO) + if args.debug: + LOGGER.setLevel(logging.DEBUG) + + if args.gpio: + LOGGER.info('Put PiZiGate in flash mode') + GPIO.setmode(GPIO.BCM) + GPIO.setup(27, GPIO.OUT) # GPIO2 + GPIO.output(27, GPIO.LOW) # GPIO2 + GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # GPIO0 + time.sleep(0.5) + GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_UP) # GPIO0 + time.sleep(0.5) + elif args.din: + LOGGER.info('Put ZiGate DIN in flash mode') + dev = usb.core.find(idVendor=0x0403, idProduct=0x6001) + if not dev: + LOGGER.error('ZiGate DIN not found.') + return + ftdi_set_bitmode(dev, 0x00) + time.sleep(0.5) + # Set CBUS2/3 high... + ftdi_set_bitmode(dev, 0xCC) + time.sleep(0.5) + # Set CBUS2/3 low... + ftdi_set_bitmode(dev, 0xC0) + time.sleep(0.5) + ftdi_set_bitmode(dev, 0xC4) + time.sleep(0.5) + # Set CBUS2/3 back to tristate + ftdi_set_bitmode(dev, 0xCC) + time.sleep(0.5) + + if args.upgrade: + upgrade_firmware(args.serialport) + + else: + try: + ser = serial.Serial(args.serialport, 38400, timeout=5) + except serial.SerialException: + LOGGER.exception("Could not open serial device %s", args.serialport) + raise SystemExit(1) + + change_baudrate(ser, 115200) + check_chip_id(ser) + flash_type = get_flash_type(ser) + mac_address = get_mac(ser) + LOGGER.info('Found MAC-address: %s', mac_address) + if args.write or args.save: # or args.erase: + select_flash(ser, flash_type) + + if args.save: + write_flash_to_file(ser, args.save) + + if args.write: + write_file_to_flash(ser, args.write) + +# if args.erase: +# erase_EEPROM(ser, args.pdm_only) + + if args.gpio: + LOGGER.info('Put PiZiGate in running mode') + GPIO.output(27, GPIO.HIGH) # GPIO2 + GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # GPIO0 + time.sleep(0.5) + GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_UP) # GPIO0 + time.sleep(0.5) + elif args.din: + LOGGER.info('Put ZiGate DIN in running mode') + ftdi_set_bitmode(dev, 0xC8) + time.sleep(0.5) + ftdi_set_bitmode(dev, 0xCC) + time.sleep(0.5) + + +if __name__ == "__main__": + logging.basicConfig() + main() diff --git a/zigpy_zigate/uart.py b/zigpy_zigate/uart.py index 5ee9970..f2e58dc 100644 --- a/zigpy_zigate/uart.py +++ b/zigpy_zigate/uart.py @@ -2,13 +2,15 @@ import binascii import logging import struct +import os.path from typing import Any, Dict import serial # noqa import serial.tools.list_ports import serial_asyncio -from zigpy_zigate.config import CONF_DEVICE_PATH +from .config import CONF_DEVICE_PATH +from . import common as c LOGGER = logging.getLogger(__name__) ZIGATE_BAUDRATE = 115200 @@ -35,7 +37,7 @@ def close(self): def send(self, cmd, data=b''): """Send data, taking care of escaping and framing""" - LOGGER.debug("Send: %s %s", hex(cmd), binascii.hexlify(data)) + LOGGER.debug("Send: 0x%04x %s", cmd, binascii.hexlify(data)) length = len(data) byte_head = struct.pack('!HH', cmd, length) checksum = self._checksum(byte_head, data) @@ -61,12 +63,14 @@ def data_received(self, data): length, len(frame) - 6) self._buffer = self._buffer[endpos + 1:] + endpos = self._buffer.find(self.END) continue if self._checksum(frame[:4], lqi, f_data) != checksum: LOGGER.warning("Invalid checksum: %s, data: 0x%s", checksum, binascii.hexlify(frame).decode()) self._buffer = self._buffer[endpos + 1:] + endpos = self._buffer.find(self.END) continue LOGGER.debug("Frame received: %s", binascii.hexlify(frame).decode()) self._api.data_received(cmd, f_data, lqi) @@ -120,48 +124,40 @@ async def connect(device_config: Dict[str, Any], api, loop=None): protocol = Gateway(api, connected_future) port = device_config[CONF_DEVICE_PATH] - if port.startswith('pizigate:'): - await set_pizigate_running_mode() - port = port.split(':', 1)[1] - elif port == 'auto': - devices = list(serial.tools.list_ports.grep('ZiGate')) - if devices: - port = devices[0].device - LOGGER.info('ZiGate found at %s', port) + if port == 'auto': + port = c.discover_port() + + if c.is_zigate_wifi(port): + LOGGER.debug('ZiGate WiFi detected') + port = port.split('socket://', 1)[1] + if ':' in port: + host, port = port.split(':', 1) # 192.168.x.y:9999 + port = int(port) else: - devices = list(serial.tools.list_ports.grep('067b:2303|CP2102')) - if devices: - port = devices[0].device - LOGGER.info('ZiGate probably found at %s', port) - else: - LOGGER.error('Unable to find ZiGate using auto mode') - raise serial.SerialException("Unable to find Zigate using auto mode") - - _, protocol = await serial_asyncio.create_serial_connection( - loop, - lambda: protocol, - url=port, - baudrate=ZIGATE_BAUDRATE, - parity=serial.PARITY_NONE, - stopbits=serial.STOPBITS_ONE, - xonxoff=False, - ) + host = port + port = 9999 + _, protocol = await loop.create_connection( + lambda: protocol, + host, port) + else: + port = os.path.realpath(port) + if c.is_pizigate(port): + LOGGER.debug('PiZiGate detected') + await c.set_pizigate_running_mode() + elif c.is_zigate_din: + LOGGER.debug('ZiGate USB DIN detected') + await c.set_zigatedin_running_mode() + + _, protocol = await serial_asyncio.create_serial_connection( + loop, + lambda: protocol, + url=port, + baudrate=ZIGATE_BAUDRATE, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + xonxoff=False, + ) await connected_future return protocol - - -async def set_pizigate_running_mode(): - try: - import RPi.GPIO as GPIO - GPIO.setmode(GPIO.BCM) - GPIO.setup(27, GPIO.OUT) # GPIO2 - GPIO.output(27, GPIO.HIGH) # GPIO2 - GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) # GPIO0 - await asyncio.sleep(0.5) - GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_UP) # GPIO0 - await asyncio.sleep(0.5) - except Exception as e: - LOGGER.error('Unable to set PiZiGate GPIO, please check configuration') - LOGGER.error(str(e)) diff --git a/zigpy_zigate/zigbee/application.py b/zigpy_zigate/zigbee/application.py index 09fed65..5a61398 100644 --- a/zigpy_zigate/zigbee/application.py +++ b/zigpy_zigate/zigbee/application.py @@ -9,8 +9,9 @@ import zigpy.util from zigpy_zigate import types as t -from zigpy_zigate.api import NoResponseError, ZiGate -from zigpy_zigate.config import CONF_DEVICE, CONFIG_SCHEMA, SCHEMA_DEVICE +from zigpy_zigate import common as c +from zigpy_zigate.api import NoResponseError, ZiGate, PDM_EVENT +from zigpy_zigate.config import CONF_DEVICE, CONF_DEVICE_PATH, CONFIG_SCHEMA, SCHEMA_DEVICE LOGGER = logging.getLogger(__name__) @@ -26,6 +27,7 @@ def __init__(self, config: Dict[str, Any]): self._api: Optional[ZiGate] = None self._pending = {} + self._pending_join = [] self._nwk = 0 self._ieee = 0 @@ -35,10 +37,13 @@ async def startup(self, auto_form=False): """Perform a complete application startup""" self._api = await ZiGate.new(self._config[CONF_DEVICE], self) await self._api.set_raw_mode() + await self._api.set_time() version, lqi = await self._api.version() version = '{:x}'.format(version[1]) version = '{}.{}'.format(version[0], version[1:]) self.version = version + if version < '3.1d': + LOGGER.warning('Old ZiGate firmware detected, you should upgrade to 3.1d or newer') network_state, lqi = await self._api.get_network_state() should_form = not network_state or network_state[0] == 0xffff or network_state[3] == 0 @@ -100,6 +105,16 @@ def zigate_callback_handler(self, msg, response, lqi): ieee = zigpy.types.EUI64(response[1]) parent_nwk = 0 self.handle_join(nwk, ieee, parent_nwk) + # Temporary disable two stages pairing due to firmware bug + # rejoin = response[3] + # if nwk in self._pending_join or rejoin: + # LOGGER.debug('Finish pairing {} (2nd device announce)'.format(nwk)) + # if nwk in self._pending_join: + # self._pending_join.remove(nwk) + # self.handle_join(nwk, ieee, parent_nwk) + # else: + # LOGGER.debug('Start pairing {} (1st device announce)'.format(nwk)) + # self._pending_join.append(nwk) elif msg == 0x8002: try: if response[5].address_mode == t.ADDRESS_MODE.NWK: @@ -117,7 +132,18 @@ def zigate_callback_handler(self, msg, response, lqi): self.handle_message(device, response[1], response[2], response[3], response[4], response[-1]) + elif msg == 0x8011: # ACK Data + LOGGER.debug('ACK Data received %s %s', response[4], response[0]) + # disabled because of https://github.com/fairecasoimeme/ZiGate/issues/324 + # self._handle_frame_failure(response[4], response[0]) + elif msg == 0x8035: # PDM Event + try: + event = PDM_EVENT(response[0]).name + except ValueError: + event = 'Unknown event' + LOGGER.debug('PDM Event %s %s, record %s', response[0], event, response[1]) elif msg == 0x8702: # APS Data confirm Fail + LOGGER.debug('APS Data confirm Fail %s %s', response[4], response[0]) self._handle_frame_failure(response[4], response[0]) def _handle_frame_failure(self, message_tag, status): @@ -135,28 +161,26 @@ async def request(self, device, profile, cluster, src_ep, dst_ep, sequence, data src_ep = 1 if dst_ep else 0 # ZiGate only support endpoint 1 LOGGER.debug('request %s', (device.nwk, profile, cluster, src_ep, dst_ep, sequence, data, expect_reply, use_ieee)) - req_id = self.get_sequence() - send_fut = asyncio.Future() - self._pending[req_id] = send_fut try: v, lqi = await self._api.raw_aps_data_request(device.nwk, src_ep, dst_ep, profile, cluster, data) except NoResponseError: return 1, "ZiGate doesn't answer to command" + req_id = v[1] + send_fut = asyncio.Future() + self._pending[req_id] = send_fut if v[0] != 0: self._pending.pop(req_id) return v[0], "Message send failure {}".format(v[0]) - # Commented out for now - # Currently (Firmware 3.1a) only send APS Data confirm in case of failure - # https://github.com/fairecasoimeme/ZiGate/issues/239 -# try: -# v = await asyncio.wait_for(send_fut, 120) -# except asyncio.TimeoutError: -# return 1, "timeout waiting for message %s send ACK" % (sequence, ) -# finally: -# self._pending.pop(req_id) -# return v, "Message sent" + # disabled because of https://github.com/fairecasoimeme/ZiGate/issues/324 + # try: + # v = await asyncio.wait_for(send_fut, 120) + # except asyncio.TimeoutError: + # return 1, "timeout waiting for message %s send ACK" % (sequence, ) + # finally: + # self._pending.pop(req_id) + # return v, "Message sent" return 0, "Message sent" async def permit_ncp(self, time_s=60): @@ -171,10 +195,24 @@ async def broadcast(self, profile, cluster, src_ep, dst_ep, grpid, radius, class ZiGateDevice(zigpy.device.Device): + def __init__(self, application, ieee, nwk): + """Initialize instance.""" + + super().__init__(application, ieee, nwk) + port = application._config[CONF_DEVICE][CONF_DEVICE_PATH] + model = 'ZiGate USB-TTL' + if c.is_zigate_wifi(port): + model = 'ZiGate WiFi' + elif c.is_pizigate(port): + model = 'PiZiGate' + elif c.is_zigate_din(port): + model = 'ZiGate USB-DIN' + self._model = '{} {}'.format(model, application.version) + @property def manufacturer(self): return "ZiGate" @property def model(self): - return 'ZiGate' + return self._model