diff --git a/examples/run.py b/examples/run.py index 273ec77..43ad8c0 100644 --- a/examples/run.py +++ b/examples/run.py @@ -9,8 +9,7 @@ _LOGGER = logging.getLogger(__name__) -ADDRESS = "D0291B39-3A1B-7FF2-787B-4E743FED5B25" -ADDRESS = "D0291B39-3A1B-7FF2-787B-4E743FED5B25" +ADDRESS = "BE:27:E1:00:10:63" # Hello Fairy-1063PPPP async def run() -> None: @@ -34,20 +33,36 @@ def on_state_changed(state: LEDBLEState) -> None: device = await future led = LEDBLE(device) cancel_callback = led.register_callback(on_state_changed) + _LOGGER.info("update...") await led.update() + _LOGGER.info("turn_on...") await led.turn_on() + _LOGGER.info("set_rgb(red)...") await led.set_rgb((255, 0, 0), 255) await asyncio.sleep(1) + _LOGGER.info("set_rgb(green)...") await led.set_rgb((0, 255, 0), 128) await asyncio.sleep(1) + _LOGGER.info("set_rgb(blue)...") await led.set_rgb((0, 0, 255), 255) await asyncio.sleep(1) + _LOGGER.info("set_rgbw(white)...") await led.set_rgbw((255, 255, 255, 128), 255) await asyncio.sleep(1) + _LOGGER.info("set_preset_pattern(1)...") + await led.async_set_preset_pattern(1, 100, 100) + await asyncio.sleep(2) + _LOGGER.info("set_preset_pattern(59)...") + await led.async_set_preset_pattern(59, 100, 100) + await asyncio.sleep(2) + _LOGGER.info("turn_off...") await led.turn_off() + _LOGGER.info("update...") await led.update() + _LOGGER.info("finish...") cancel_callback() await scanner.stop() + _LOGGER.info("done") logging.basicConfig(level=logging.INFO) diff --git a/src/led_ble/const.py b/src/led_ble/const.py index cc904af..0ca7485 100644 --- a/src/led_ble/const.py +++ b/src/led_ble/const.py @@ -11,11 +11,16 @@ class CharacteristicMissingError(Exception): """Raised when a characteristic is missing.""" +HELLO_FAIRY_WRITE_CHARACTERISTIC = "49535343-8841-43f4-a8d4-ecbe34729bb3" +HELLO_FAIRY_READ_CHARACTERISTIC = "49535343-1e4d-4bd9-ba61-23c647249616" + POSSIBLE_WRITE_CHARACTERISTIC_UUIDS = [ BASE_UUID_FORMAT.format(part) for part in ["ff01", "ffd5", "ffd9", "ffe5", "ffe9"] -] +] + [HELLO_FAIRY_WRITE_CHARACTERISTIC] + POSSIBLE_READ_CHARACTERISTIC_UUIDS = [ BASE_UUID_FORMAT.format(part) for part in ["ff02", "ffd0", "ffd4", "ffe0", "ffe4"] -] +] + [HELLO_FAIRY_READ_CHARACTERISTIC] + QUERY_STATE_BYTES = bytearray([0xEF, 0x01, 0x77]) diff --git a/src/led_ble/led_ble.py b/src/led_ble/led_ble.py index c7c05f6..91164eb 100644 --- a/src/led_ble/led_ble.py +++ b/src/led_ble/led_ble.py @@ -25,8 +25,10 @@ from flux_led.utils import rgbw_brightness from led_ble.model_db import LEDBLEModel +from led_ble.protocol import ProtocolFairy from .const import ( + HELLO_FAIRY_READ_CHARACTERISTIC, POSSIBLE_READ_CHARACTERISTIC_UUIDS, POSSIBLE_WRITE_CHARACTERISTIC_UUIDS, STATE_COMMAND, @@ -279,10 +281,14 @@ def _generate_preset_pattern( brightness = int(brightness * 255 / 100) speed = int(speed * 255 / 100) return bytearray([0x9E, 0x00, pattern, speed, brightness, 0x00, 0xE9]) - PresetPattern.valid_or_raise(pattern) + if not self._is_hello_fairy(): + PresetPattern.valid_or_raise(pattern) if not (1 <= brightness <= 100): raise ValueError("Brightness must be between 1 and 100") assert self._protocol is not None # nosec + if self._is_hello_fairy() and pattern > 58: + rgb = [[255, 0, 0], [0, 255, 0], [0, 0, 255]] * 8 + [[255, 0, 0]] + return self._protocol.construct_custom_effect(rgb, speed, "") return self._protocol.construct_preset_pattern(pattern, speed, brightness) async def async_set_preset_pattern( @@ -431,29 +437,64 @@ def _named_effect(self) -> str | None: """Returns the named effect.""" return EFFECT_ID_NAME.get(self.preset_pattern_num) + # ideally replace with classes to encapsulate the differences between device makes + def _is_hello_fairy(self) -> bool: + if self._read_char is None: + return False + d = self._read_char.descriptors + c = d[0].characteristic_uuid if (len(d) > 0) else None + return c == HELLO_FAIRY_READ_CHARACTERISTIC + def _notification_handler(self, _sender: int, data: bytearray) -> None: """Handle notification responses.""" _LOGGER.debug("%s: Notification received: %s", self.name, data.hex()) - if len(data) == 4 and data[0] == 0xCC: - on = data[1] == 0x23 - self._state = replace(self._state, power=on) - return - if len(data) < 11: - return - model_num = data[1] - on = data[2] == 0x23 - preset_pattern = data[3] - mode = data[4] - speed = data[5] - r = data[6] - g = data[7] - b = data[8] - w = data[9] - version = data[10] - self._state = LEDBLEState( - on, (r, g, b), w, model_num, preset_pattern, mode, speed, version - ) + model_num = 0 + if self._is_hello_fairy(): + if data[0] == 0xAA: + if data[1] == 0x00: # hw info + if len(data) > 7: + version_string = data[3:8].decode("ascii") + _LOGGER.debug("version %s", version_string) + self._state = replace( + self._state, + version_num=(data[3] - 48) * 100 + + (data[5] - 48) * 10 + + (data[7] - 48), + ) + if len(data) > 12: + model = data[8:13].decode("ascii") + _LOGGER.debug("model %s", model) + if len(data) > 24: + lights = data[24] # guessing + _LOGGER.debug("lights %d", lights) + if len(data) > 33: + effects = data[33] # guessing + _LOGGER.debug("effects %d", effects) + + if data[1] == 0x01: # state info + if len(data) > 6: + self._state = replace(self._state, power=data[6] > 0) + else: + if len(data) == 4 and data[0] == 0xCC: + on = data[1] == 0x23 + self._state = replace(self._state, power=on) + return + if len(data) < 11: + return + model_num = data[1] + on = data[2] == 0x23 + preset_pattern = data[3] + mode = data[4] + speed = data[5] + r = data[6] + g = data[7] + b = data[8] + w = data[9] + version = data[10] + self._state = LEDBLEState( + on, (r, g, b), w, model_num, preset_pattern, mode, speed, version + ) _LOGGER.debug( "%s: Notification received; RSSI: %s: %s %s", @@ -466,8 +507,10 @@ def _notification_handler(self, _sender: int, data: bytearray) -> None: if not self._resolve_protocol_event.is_set(): self._resolve_protocol_event.set() self._model_data = get_model(model_num) - self._set_protocol(self._model_data.protocol_for_version_num(version)) - + if self._is_hello_fairy(): + self._protocol = ProtocolFairy() + else: + self._set_protocol(self._model_data.protocol_for_version_num(version)) self._fire_callbacks() def _reset_disconnect_timer(self) -> None: @@ -622,13 +665,23 @@ def _resolve_characteristics(self, services: BleakGATTServiceCollection) -> bool if char := services.get_characteristic(characteristic): self._write_char = char break + _LOGGER.debug( + "using characteristic %s for read, characteristic %s for write", + self._read_char, + self._write_char, + ) return bool(self._read_char and self._write_char) async def _resolve_protocol(self) -> None: """Resolve protocol.""" if self._resolve_protocol_event.is_set(): return - await self._send_command_while_connected([STATE_COMMAND]) + if self._is_hello_fairy(): + await self._send_command_while_connected( + [b"\xaa\x00\x00\xaa"] + ) # get version and capabilities + else: + await self._send_command_while_connected([STATE_COMMAND]) async with asyncio_timeout(10): await self._resolve_protocol_event.wait() diff --git a/src/led_ble/model_db.py b/src/led_ble/model_db.py index 70cb25b..ec455ed 100644 --- a/src/led_ble/model_db.py +++ b/src/led_ble/model_db.py @@ -31,6 +31,15 @@ def protocol_for_version_num(self, version_num: int) -> str: MODELS = [ + LEDBLEModel( + model_num=0x00, + models=["Hello Fairy:BMSL6"], + description="Controller RGB", + protocols=[ + MinVersionProtocol(0, "Fairy"), + ], + color_modes=COLOR_MODES_RGB_W, # Formerly rgbwcapable + ), LEDBLEModel( model_num=0x04, models=["Triones:C10511000166"], diff --git a/src/led_ble/protocol.py b/src/led_ble/protocol.py new file mode 100644 index 0000000..fa3ba40 --- /dev/null +++ b/src/led_ble/protocol.py @@ -0,0 +1,119 @@ +import colorsys +from math import floor +from flux_led.protocol import ProtocolBase +from led_ble.led_ble import LevelWriteMode + + +class ProtocolFairy(ProtocolBase): + """Protocol for Hello Fairy devices.""" + + @property + def name(self) -> str: + """The name of the protocol.""" + return "Fairy" + + def construct_state_query(self) -> bytearray: + """The bytes to send for a query request.""" + return self.construct_message(bytearray([0xAA, 0x01, 0x00])) + + def construct_state_change(self, turn_on: int) -> bytearray: + """The bytes to send for a state change request.""" + return self.construct_message( + bytearray([0xAA, 0x02, 0x01, 1 if turn_on else 0]) + ) + + def construct_message(self, raw_bytes: bytearray) -> bytearray: + """Calculate checksum of byte array and add to end.""" + csum = sum(raw_bytes) & 0xFF + raw_bytes.append(csum) + return raw_bytes + + def construct_levels_change( + self, + persist: int, + red: int | None, + green: int | None, + blue: int | None, + warm_white: int | None, + cool_white: int | None, + write_mode: LevelWriteMode, + ) -> list[bytearray]: + """The bytes to send for a level change request.""" + h, s, v = colorsys.rgb_to_hsv( + (red or 0) / 255, (green or 0) / 255, (blue or 0) / 255 + ) + h_scaled = min(359, floor(h * 360)) + s_scaled = round(s * 1000) + v_scaled = round(v * 1000) + return [ + self.construct_message( + bytearray( + [ + 0xAA, + 0x03, + 0x07, + 0x01, + h_scaled >> 8, + h_scaled & 0xFF, + s_scaled >> 8, + s_scaled & 0xFF, + v_scaled >> 8, + v_scaled & 0xFF, + ] + ) + ) + ] + + def construct_preset_pattern( + self, pattern: int, speed: int, brightness: int + ) -> list[bytearray]: + """The bytes to send for a preset pattern.""" + return [ + self.construct_message( + bytearray( + [ + 0xAA, + 0x03, + 0x04, + 0x02, + pattern & 0xFF, + (brightness >> 8) & 0xFF, + brightness & 0xFF, + ] + ) + ), + self.construct_message(bytearray([0xAA, 0x0C, 0x01, min(speed, 100)])), + ] + + def construct_custom_effect( + self, rgb_list: list[tuple[int, int, int]], speed: int, transition_type: str + ) -> list[bytearray]: + """The bytes to send for a custom effect.""" + data_bytes = len(rgb_list) * 3 + 1 + hue_message = bytearray(data_bytes + 3) + hue_message[0:4] = [0xAA, 0xDA, data_bytes, 0x01] + for [i, [r, g, b]] in enumerate(rgb_list): + h, s, v = colorsys.rgb_to_hsv(r / 255, g / 255, b / 255) + if v < 0.25: + h = 0xFE # black + elif s < 0.25: + h = 0xFF # white + else: + h = floor(h * 0xAF) + # necessary to satisfy both flake and ruff: + a = i * 3 + 4 + b = a + 3 + hue_message[a:b] = [i >> 8, i & 0xFF, h] + return [ + *self.construct_motion(speed, 0), + self.construct_message(hue_message), + *self.construct_motion(speed, 2), + ] + + def construct_motion(self, speed: int, transition: int) -> list[bytearray]: + """The bytes to send for motion speed and transition.""" + return [ + self.construct_message( + bytearray([0xAA, 0xD0, 0x04, transition, 0x64, speed, 0x01]) + ) + ]