diff --git a/custom_components/lock_code_manager/manifest.json b/custom_components/lock_code_manager/manifest.json index 1a196858..477930aa 100644 --- a/custom_components/lock_code_manager/manifest.json +++ b/custom_components/lock_code_manager/manifest.json @@ -9,6 +9,7 @@ "schedule", "template", "virtual", + "zha", "zwave_js" ], "codeowners": [ diff --git a/custom_components/lock_code_manager/providers/__init__.py b/custom_components/lock_code_manager/providers/__init__.py index 6973cd41..93b2d4df 100644 --- a/custom_components/lock_code_manager/providers/__init__.py +++ b/custom_components/lock_code_manager/providers/__init__.py @@ -8,9 +8,11 @@ from ._base import BaseLock from .virtual import VirtualLock +from .zha import ZHALock from .zwave_js import ZWaveJSLock INTEGRATIONS_CLASS_MAP: dict[str, type[BaseLock]] = { "virtual": VirtualLock, + "zha": ZHALock, "zwave_js": ZWaveJSLock, } diff --git a/custom_components/lock_code_manager/providers/zha/__init__.py b/custom_components/lock_code_manager/providers/zha/__init__.py new file mode 100644 index 00000000..6beb9f9c --- /dev/null +++ b/custom_components/lock_code_manager/providers/zha/__init__.py @@ -0,0 +1,7 @@ +"""ZHA (Zigbee Home Automation) lock provider package.""" + +from __future__ import annotations + +from .provider import ZHALock + +__all__ = ["ZHALock"] diff --git a/custom_components/lock_code_manager/providers/zha/const.py b/custom_components/lock_code_manager/providers/zha/const.py new file mode 100644 index 00000000..fec0f615 --- /dev/null +++ b/custom_components/lock_code_manager/providers/zha/const.py @@ -0,0 +1,31 @@ +"""Constants for ZHA (Zigbee Home Automation) lock provider. + +Custom mappings specific to this provider. ZCL types come directly from zigpy. +""" + +from __future__ import annotations + +from zigpy.zcl.clusters.closures import DoorLock + +# Map operation events to locked state (True = locked, False = unlocked) +OPERATION_TO_LOCKED: dict[DoorLock.OperationEvent, bool] = { + DoorLock.OperationEvent.Lock: True, + DoorLock.OperationEvent.KeyLock: True, + DoorLock.OperationEvent.AutoLock: True, + DoorLock.OperationEvent.Manual_Lock: True, + DoorLock.OperationEvent.ScheduleLock: True, + DoorLock.OperationEvent.OnTouchLock: True, + DoorLock.OperationEvent.Unlock: False, + DoorLock.OperationEvent.KeyUnlock: False, + DoorLock.OperationEvent.Manual_Unlock: False, + DoorLock.OperationEvent.ScheduleUnlock: False, +} + +# Map operation source to human-readable name +OPERATION_SOURCE_NAMES: dict[DoorLock.OperationEventSource, str] = { + DoorLock.OperationEventSource.Keypad: "Keypad", + DoorLock.OperationEventSource.RF: "RF", + DoorLock.OperationEventSource.Manual: "Manual", + DoorLock.OperationEventSource.RFID: "RFID", + DoorLock.OperationEventSource.Indeterminate: "Unknown", +} diff --git a/custom_components/lock_code_manager/providers/zha/helpers.py b/custom_components/lock_code_manager/providers/zha/helpers.py new file mode 100644 index 00000000..2cc61b66 --- /dev/null +++ b/custom_components/lock_code_manager/providers/zha/helpers.py @@ -0,0 +1,22 @@ +"""Helper functions for ZHA lock provider.""" + +from __future__ import annotations + +from typing import Any + +from homeassistant.components.zha.helpers import ( + get_zha_gateway_proxy as _get_zha_gateway_proxy, +) +from homeassistant.core import HomeAssistant + + +def get_zha_gateway(hass: HomeAssistant) -> Any | None: + """Get the ZHA gateway proxy. + + Returns the gateway proxy from the ZHA integration's runtime data, + or None if ZHA is not loaded or has no gateway. + """ + try: + return _get_zha_gateway_proxy(hass) + except (KeyError, ValueError): + return None diff --git a/custom_components/lock_code_manager/providers/zha/provider.py b/custom_components/lock_code_manager/providers/zha/provider.py new file mode 100644 index 00000000..08b8e65b --- /dev/null +++ b/custom_components/lock_code_manager/providers/zha/provider.py @@ -0,0 +1,523 @@ +"""ZHA lock provider implementation.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass, field +from datetime import timedelta +import logging +from typing import Any + +from zigpy.zcl.clusters.closures import DoorLock + +from homeassistant.components.zha.const import DOMAIN as ZHA_DOMAIN +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import callback + +from ...const import CONF_LOCKS, CONF_SLOTS, DOMAIN +from ...data import get_entry_data +from ...exceptions import LockDisconnected +from .._base import BaseLock +from .const import OPERATION_SOURCE_NAMES, OPERATION_TO_LOCKED +from .helpers import get_zha_gateway + +_LOGGER = logging.getLogger(__name__) + + +@dataclass(repr=False, eq=False) +class ZHALock(BaseLock): + """Class to represent ZHA lock. + + Supports push updates via zigpy cluster listeners for: + - programming_event_notification (0x21): PIN code added/deleted/changed + - operation_event_notification (0x20): lock/unlock operations with user ID + + If the lock doesn't support programming event notifications (detected via + event mask attributes), falls back to drift detection polling. + """ + + lock_config_entry: ConfigEntry = field(repr=False) + _door_lock_cluster: DoorLock | None = field(init=False, default=None) + _endpoint_id: int | None = field(init=False, default=None) + _cluster_listener_unsub: Callable[[], None] | None = field(init=False, default=None) + _supports_programming_events: bool | None = field(init=False, default=None) + + @property + def domain(self) -> str: + """Return integration domain.""" + return ZHA_DOMAIN + + @property + def supports_push(self) -> bool: + """Return whether this lock supports push-based updates. + + Always True - we subscribe to cluster events for operation notifications + (lock/unlock with user ID). Programming event support is checked separately + to determine if we need drift detection fallback. + """ + return True + + @property + def hard_refresh_interval(self) -> timedelta | None: + """Return interval for hard refresh. + + Returns 1 hour if the lock doesn't support programming event notifications + (detected during setup), otherwise None to disable drift detection. + """ + if self._supports_programming_events is False: + return timedelta(hours=1) + return None + + @property + def connection_check_interval(self) -> timedelta | None: + """Return interval for connection checks.""" + return timedelta(seconds=30) + + def _get_door_lock_cluster(self) -> DoorLock | None: + """Get the Door Lock cluster for this device.""" + if self._door_lock_cluster is not None: + return self._door_lock_cluster + + gateway = get_zha_gateway(self.hass) + if not gateway: + _LOGGER.debug("ZHA gateway not available") + return None + + # Get device from entity + entity_ref = gateway.get_entity_reference(self.lock.entity_id) + if not entity_ref: + _LOGGER.debug("Could not find entity reference for %s", self.lock.entity_id) + return None + + device_proxy = entity_ref.entity_data.device_proxy + if not device_proxy: + _LOGGER.debug("Could not find device proxy for %s", self.lock.entity_id) + return None + + # Get the underlying zigpy device (device_proxy.device is ZHA Device, + # device_proxy.device.device is the zigpy device) + zha_device = device_proxy.device + if not zha_device: + _LOGGER.debug("Could not find ZHA device for %s", self.lock.entity_id) + return None + + zigpy_device = zha_device.device + if not zigpy_device: + _LOGGER.debug("Could not find zigpy device for %s", self.lock.entity_id) + return None + + # Find the Door Lock cluster + for endpoint_id, endpoint in zigpy_device.endpoints.items(): + if endpoint_id == 0: # Skip ZDO endpoint + continue + for cluster in endpoint.in_clusters.values(): + if cluster.cluster_id == DoorLock.cluster_id: + self._door_lock_cluster = cluster + self._endpoint_id = endpoint_id + _LOGGER.debug( + "Found Door Lock cluster on endpoint %s for %s", + endpoint_id, + self.lock.entity_id, + ) + return cluster + + _LOGGER.warning("Could not find Door Lock cluster for %s", self.lock.entity_id) + return None + + async def _get_connected_cluster(self) -> DoorLock: + """Get the Door Lock cluster, ensuring connection is up. + + Raises LockDisconnected if cluster is unavailable or device is disconnected. + """ + cluster = self._get_door_lock_cluster() + if not cluster: + raise LockDisconnected("Door Lock cluster not available") + + if not await self.async_is_connection_up(): + raise LockDisconnected("Lock not connected") + + return cluster + + async def async_is_connection_up(self) -> bool: + """Return whether connection to lock is up.""" + gateway = get_zha_gateway(self.hass) + if not gateway: + return False + + entity_ref = gateway.get_entity_reference(self.lock.entity_id) + if not entity_ref: + return False + + device_proxy = entity_ref.entity_data.device_proxy + if not device_proxy: + return False + + # Check if device is available + return device_proxy.device.available + + async def async_get_usercodes(self) -> dict[int, int | str]: + """Get dictionary of code slots and usercodes.""" + cluster = await self._get_connected_cluster() + + # Get configured code slots for this lock + code_slots = { + int(code_slot) + for entry in self.hass.config_entries.async_entries(DOMAIN) + for code_slot in get_entry_data(entry, CONF_SLOTS, {}) + if self.lock.entity_id in get_entry_data(entry, CONF_LOCKS, []) + } + + data: dict[int, int | str] = {} + + for slot_num in code_slots: + try: + # Call get_pin_code cluster command (0x06) + result = await cluster.get_pin_code(slot_num) + _LOGGER.debug( + "Lock %s slot %s get_pin_code result: %s", + self.lock.entity_id, + slot_num, + result, + ) + + # Parse result - format depends on zigpy version + # Typically returns a foundation.Status and response fields + if hasattr(result, "user_status"): + user_status = result.user_status + pin_code = getattr(result, "code", "") or "" + elif isinstance(result, (list, tuple)) and len(result) >= 4: + # Result format: [user_id, user_status, user_type, code] + user_status = result[1] + pin_code = result[3] + else: + _LOGGER.warning( + "Unexpected get_pin_code response format for %s slot %s: %s", + self.lock.entity_id, + slot_num, + result, + ) + data[slot_num] = "" + continue + + # Check if slot is in use + if user_status == DoorLock.UserStatus.Enabled: + # Convert bytes to string if needed + if isinstance(pin_code, bytes): + pin_code = pin_code.decode("utf-8", errors="ignore") + data[slot_num] = str(pin_code) if pin_code else "" + else: + data[slot_num] = "" + + except Exception as err: + _LOGGER.debug( + "Failed to get PIN for %s slot %s: %s", + self.lock.entity_id, + slot_num, + err, + ) + # Fall back to assuming empty if we can't read + data[slot_num] = "" + + return data + + async def async_set_usercode( + self, code_slot: int, usercode: int | str, name: str | None = None + ) -> bool: + """Set a usercode on a code slot.""" + cluster = await self._get_connected_cluster() + + try: + # Call set_pin_code cluster command (0x05) + # Parameters: user_id, user_status, user_type, pin_code + result = await cluster.set_pin_code( + code_slot, + DoorLock.UserStatus.Enabled, + DoorLock.UserType.Unrestricted, + str(usercode), + ) + _LOGGER.debug( + "Lock %s slot %s set_pin_code result: %s", + self.lock.entity_id, + code_slot, + result, + ) + + # Check result status + if hasattr(result, "status"): + if result.status != 0: + _LOGGER.warning( + "set_pin_code failed for %s slot %s: status %s", + self.lock.entity_id, + code_slot, + result.status, + ) + raise LockDisconnected( + f"set_pin_code failed: status {result.status}" + ) + + return True + + except LockDisconnected: + raise + except Exception as err: + _LOGGER.error( + "Failed to set PIN for %s slot %s: %s", + self.lock.entity_id, + code_slot, + err, + ) + raise LockDisconnected(f"Failed to set PIN: {err}") from err + + async def async_clear_usercode(self, code_slot: int) -> bool: + """Clear a usercode on a code slot.""" + cluster = await self._get_connected_cluster() + + try: + # Call clear_pin_code cluster command (0x07) + result = await cluster.clear_pin_code(code_slot) + _LOGGER.debug( + "Lock %s slot %s clear_pin_code result: %s", + self.lock.entity_id, + code_slot, + result, + ) + + # Check result status + if hasattr(result, "status"): + if result.status != 0: + _LOGGER.warning( + "clear_pin_code failed for %s slot %s: status %s", + self.lock.entity_id, + code_slot, + result.status, + ) + raise LockDisconnected( + f"clear_pin_code failed: status {result.status}" + ) + + return True + + except LockDisconnected: + raise + except Exception as err: + _LOGGER.error( + "Failed to clear PIN for %s slot %s: %s", + self.lock.entity_id, + code_slot, + err, + ) + raise LockDisconnected(f"Failed to clear PIN: {err}") from err + + async def async_hard_refresh_codes(self) -> dict[int, int | str]: + """Perform hard refresh and return all codes. + + For ZHA, we just re-read all codes from the lock. + """ + return await self.async_get_usercodes() + + # Push update support via zigpy cluster listeners + + async def _async_check_programming_event_support(self) -> bool: + """Check if the lock supports programming event notifications. + + Reads the programming event mask attributes to determine if the lock + will send programming_event_notification when codes change. + """ + cluster = self._get_door_lock_cluster() + if not cluster: + return False + + # Check if any programming event mask attribute has a non-zero value + # Use AttributeDefs for guaranteed-accurate IDs + mask_attrs = ( + DoorLock.AttributeDefs.keypad_programming_event_mask, + DoorLock.AttributeDefs.rf_programming_event_mask, + DoorLock.AttributeDefs.rfid_programming_event_mask, + ) + + for attr in mask_attrs: + try: + if hasattr(cluster, "get"): + value = cluster.get(attr.name) + if value is not None and value != 0: + _LOGGER.debug( + "Lock %s: supports programming events (%s [0x%04x]=0x%04x)", + self.lock.entity_id, + attr.name, + attr.id, + value, + ) + return True + except Exception as err: + _LOGGER.debug( + "Lock %s: could not read %s [0x%04x]: %s", + self.lock.entity_id, + attr.name, + attr.id, + err, + ) + + _LOGGER.debug( + "Lock %s: no programming event mask attributes found, " + "will use drift detection fallback", + self.lock.entity_id, + ) + return False + + @callback + def subscribe_push_updates(self) -> None: + """Subscribe to push-based value updates via zigpy cluster listener.""" + if self._cluster_listener_unsub is not None: + return # Already subscribed + + cluster = self._get_door_lock_cluster() + if not cluster: + _LOGGER.debug( + "Lock %s: cannot subscribe to push updates - cluster not available", + self.lock.entity_id, + ) + return + + # Check programming event support if not already done + if self._supports_programming_events is None: + # Schedule async check - for now assume supported, will update on next refresh + self.hass.async_create_task( + self._async_detect_programming_support(), + f"Detect programming event support for {self.lock.entity_id}", + ) + + # Register as a listener on the cluster + cluster.add_listener(self) + self._cluster_listener_unsub = lambda: cluster.remove_listener(self) + + _LOGGER.debug( + "Lock %s: subscribed to Door Lock cluster events", + self.lock.entity_id, + ) + + async def _async_detect_programming_support(self) -> None: + """Detect programming event support and log result.""" + self._supports_programming_events = ( + await self._async_check_programming_event_support() + ) + if not self._supports_programming_events: + _LOGGER.info( + "Lock %s: programming event notifications not supported, " + "enabling drift detection (1 hour interval)", + self.lock.entity_id, + ) + + @callback + def unsubscribe_push_updates(self) -> None: + """Unsubscribe from push-based value updates.""" + if self._cluster_listener_unsub is not None: + self._cluster_listener_unsub() + self._cluster_listener_unsub = None + _LOGGER.debug( + "Lock %s: unsubscribed from Door Lock cluster events", + self.lock.entity_id, + ) + + def cluster_command( + self, + tsn: int, + command_id: int, + args: Any, + ) -> None: + """Handle incoming cluster commands from the lock. + + Called by zigpy when the lock sends a cluster command (client -> server). + """ + if command_id == DoorLock.ClientCommandDefs.programming_event_notification.id: + self._handle_programming_event(args) + elif command_id == DoorLock.ClientCommandDefs.operation_event_notification.id: + self._handle_operation_event(args) + + def _handle_programming_event(self, args: Any) -> None: + """Handle programming event notification (PIN added/deleted/changed). + + This triggers a coordinator refresh to pick up the new code state. + """ + # Extract event details + try: + event_code = ( + args.program_event_code + if hasattr(args, "program_event_code") + else args[1] + ) + user_id = args.user_id if hasattr(args, "user_id") else args[2] + except (AttributeError, IndexError, TypeError): + _LOGGER.debug( + "Lock %s: could not parse programming event args: %s", + self.lock.entity_id, + args, + ) + return + + _LOGGER.debug( + "Lock %s: programming event - code=%s, user_id=%s", + self.lock.entity_id, + event_code, + user_id, + ) + + # Trigger coordinator refresh to pick up the change + if self.coordinator: + self.hass.async_create_task( + self.coordinator.async_request_refresh(), + f"Refresh {self.lock.entity_id} after programming event", + ) + + def _handle_operation_event(self, args: Any) -> None: + """Handle operation event notification (lock/unlock with user ID). + + This fires a code slot event so automations can react to lock usage. + """ + # Extract event details + try: + source = ( + args.operation_event_source + if hasattr(args, "operation_event_source") + else args[0] + ) + event_code = ( + args.operation_event_code + if hasattr(args, "operation_event_code") + else args[1] + ) + user_id = args.user_id if hasattr(args, "user_id") else args[2] + except (AttributeError, IndexError, TypeError): + _LOGGER.debug( + "Lock %s: could not parse operation event args: %s", + self.lock.entity_id, + args, + ) + return + + _LOGGER.debug( + "Lock %s: operation event - source=%s, code=%s, user_id=%s", + self.lock.entity_id, + source, + event_code, + user_id, + ) + + # Determine if this is a lock or unlock event + to_locked = OPERATION_TO_LOCKED.get(event_code) + + # Build action text from source and event + source_name = OPERATION_SOURCE_NAMES.get(source, f"Source {source}") + action = "lock" if to_locked else "unlock" if to_locked is False else "event" + action_text = f"{source_name} {action} operation" + + # Fire code slot event (user_id 0 typically means no code was used) + self.async_fire_code_slot_event( + code_slot=user_id if user_id > 0 else None, + to_locked=to_locked, + action_text=action_text, + source_data={ + "source": source, + "event_code": event_code, + "user_id": user_id, + }, + ) diff --git a/requirements_test.txt b/requirements_test.txt index bc2e715b..dca6e636 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -1,3 +1,9 @@ +aiousbwatcher>=1.1.0 +ha-silabs-firmware-client>=0.3.0 pylint-strict-informational>=0.1 pytest>=8.0.2 pytest-homeassistant-custom-component==0.13.307 +universal-silabs-flasher>=0.1.0 +usb-devices>=0.4.5 +zha>=0.0.81 +zigpy>=0.60.0 diff --git a/tests/providers/conftest.py b/tests/providers/conftest.py index 5ad6fd4f..bbdf469c 100644 --- a/tests/providers/conftest.py +++ b/tests/providers/conftest.py @@ -1,32 +1,56 @@ -"""Provide common Z-Wave JS fixtures for Lock Code Manager tests.""" +"""Provide common provider fixtures for Lock Code Manager tests.""" from __future__ import annotations import asyncio -from collections.abc import Generator +from collections.abc import Awaitable, Callable, Generator import copy +import itertools import json from pathlib import Path import sys +import time from typing import Any -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, create_autospec, patch +import warnings import pytest from pytest_homeassistant_custom_component.common import MockConfigEntry +from zigpy.application import ControllerApplication +import zigpy.config +import zigpy.device +from zigpy.profiles import zha as zha_profile +import zigpy.quirks +import zigpy.state +import zigpy.types +from zigpy.zcl.clusters import closures, general +from zigpy.zcl.foundation import Status +import zigpy.zdo.types as zdo_t from zwave_js_server.model.driver import Driver from zwave_js_server.model.node import Node from zwave_js_server.version import VersionInfo +from homeassistant.components.zha import const as zha_const +from homeassistant.components.zha.helpers import get_zha_gateway from homeassistant.components.zwave_js.const import DOMAIN as ZWAVE_JS_DOMAIN +from homeassistant.const import Platform from homeassistant.core import HomeAssistant from homeassistant.helpers import entity_registry as er +from homeassistant.setup import async_setup_component -FIXTURES_DIR = Path(__file__).parent / "fixtures" / "zwave_js" +ZWAVE_JS_FIXTURES_DIR = Path(__file__).parent / "fixtures" / "zwave_js" +# ZHA endpoint signature constants +SIG_EP_INPUT = 1 +SIG_EP_OUTPUT = 2 +SIG_EP_PROFILE = 3 +SIG_EP_TYPE = 4 -def load_json_fixture(filename: str) -> dict[str, Any]: - """Load a fixture JSON file.""" - with open(FIXTURES_DIR / filename, encoding="utf-8") as f: + +def load_json_fixture(filename: str, provider: str = "zwave_js") -> dict[str, Any]: + """Load a fixture JSON file for a provider.""" + fixtures_dir = Path(__file__).parent / "fixtures" / provider + with open(fixtures_dir / filename, encoding="utf-8") as f: return json.load(f) @@ -217,3 +241,351 @@ async def lock_entity_fixture( lock_entries = [e for e in entries if e.domain == "lock"] assert len(lock_entries) == 1, f"Expected 1 lock entity, found {len(lock_entries)}" return lock_entries[0] + + +# ============================================================================= +# ZHA Fixtures +# ============================================================================= + + +@pytest.fixture +def mock_zha_radio_delays() -> Generator[None]: + """Mock ZHA radio manager delays to speed up tests.""" + with ( + patch( + "homeassistant.components.zha.radio_manager.CONNECT_DELAY_S", + 0, + ), + patch( + "homeassistant.components.zha.radio_manager.RETRY_DELAY_S", + 0, + ), + ): + yield + + +class _FakeZigbeeApp(ControllerApplication): + """Fake Zigbee application controller for testing.""" + + async def add_endpoint(self, descriptor: zdo_t.SimpleDescriptor): + """Add endpoint.""" + + async def connect(self): + """Connect.""" + + async def disconnect(self): + """Disconnect.""" + + async def force_remove(self, dev: zigpy.device.Device): + """Force remove device.""" + + async def load_network_info(self, *, load_devices: bool = False): + """Load network info.""" + + async def permit_ncp(self, time_s: int = 60): + """Permit NCP.""" + + async def permit_with_link_key( + self, + node: zigpy.types.EUI64, + link_key: zigpy.types.KeyData, + time_s: int = 60, + ): + """Permit with link key.""" + + async def reset_network_info(self): + """Reset network info.""" + + async def send_packet(self, packet: zigpy.types.ZigbeePacket): + """Send packet.""" + + async def start_network(self): + """Start network.""" + + async def write_network_info( + self, + *, + network_info: zigpy.state.NetworkInfo, + node_info: zigpy.state.NodeInfo, + ) -> None: + """Write network info.""" + + async def request( + self, + device: zigpy.device.Device, + profile: zigpy.types.uint16_t, + cluster: zigpy.types.uint16_t, + src_ep: zigpy.types.uint8_t, + dst_ep: zigpy.types.uint8_t, + sequence: zigpy.types.uint8_t, + data: bytes, + *, + expect_reply: bool = True, + use_ieee: bool = False, + extended_timeout: bool = False, + ): + """Request.""" + + async def move_network_to_channel( + self, new_channel: int, *, num_broadcasts: int = 5 + ) -> None: + """Move network to channel.""" + + def _persist_coordinator_model_strings_in_db(self) -> None: + """Persist coordinator model strings.""" + + +def _wrap_mock_instance(obj: Any) -> MagicMock: + """Auto-mock every attribute and method in an object.""" + mock = create_autospec(obj, spec_set=True, instance=True) + + for attr_name in dir(obj): + if attr_name.startswith("__") and attr_name not in {"__getitem__"}: + continue + + real_attr = getattr(obj, attr_name) + mock_attr = getattr(mock, attr_name) + + if callable(real_attr) and not hasattr(real_attr, "__aenter__"): + mock_attr.side_effect = real_attr + else: + setattr(mock, attr_name, real_attr) + + return mock + + +def patch_zha_cluster(cluster): + """Patch a ZHA cluster for testing.""" + cluster.PLUGGED_ATTR_READS = {} + cluster.bind = AsyncMock(return_value=[0]) + cluster.configure_reporting = AsyncMock(return_value=[[]]) + cluster.configure_reporting_multiple = AsyncMock(return_value=[]) + cluster.handle_cluster_request = MagicMock() + cluster.read_attributes = AsyncMock(return_value=[{}, {}]) + cluster.read_attributes_raw = AsyncMock(return_value=[]) + cluster.unbind = AsyncMock(return_value=[0]) + cluster.write_attributes = AsyncMock(return_value=[]) + cluster._write_attributes = AsyncMock(return_value=[]) + + +@pytest.fixture +async def zigpy_app_controller(): + """Zigpy ApplicationController fixture.""" + app = _FakeZigbeeApp( + { + zigpy.config.CONF_DATABASE: None, + zigpy.config.CONF_DEVICE: {zigpy.config.CONF_DEVICE_PATH: "/dev/null"}, + zigpy.config.CONF_STARTUP_ENERGY_SCAN: False, + zigpy.config.CONF_NWK_BACKUP_ENABLED: False, + zigpy.config.CONF_TOPO_SCAN_ENABLED: False, + zigpy.config.CONF_OTA: { + zigpy.config.CONF_OTA_ENABLED: False, + }, + } + ) + + app.state.node_info.nwk = 0x0000 + app.state.node_info.ieee = zigpy.types.EUI64.convert("00:15:8d:00:02:32:4f:32") + app.state.node_info.manufacturer = "Coordinator Manufacturer" + app.state.node_info.model = "Coordinator Model" + app.state.network_info.pan_id = 0x1234 + app.state.network_info.extended_pan_id = app.state.node_info.ieee + app.state.network_info.channel = 15 + app.state.network_info.network_key.key = zigpy.types.KeyData(range(16)) + app.state.counters = zigpy.state.CounterGroups() + + # Create a fake coordinator device + dev = app.add_device(nwk=app.state.node_info.nwk, ieee=app.state.node_info.ieee) + dev.node_desc = zdo_t.NodeDescriptor() + dev.node_desc.logical_type = zdo_t.LogicalType.Coordinator + dev.manufacturer = "Coordinator Manufacturer" + dev.model = "Coordinator Model" + + ep = dev.add_endpoint(1) + ep.profile_id = zha_profile.PROFILE_ID + ep.add_input_cluster(general.Basic.cluster_id) + ep.add_input_cluster(general.Groups.cluster_id) + + with patch("zigpy.device.Device.request", return_value=[Status.SUCCESS]): + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + mock_app = _wrap_mock_instance(app) + mock_app.backups = _wrap_mock_instance(app.backups) + + # Ensure _concurrent_requests_semaphore has proper attributes + # This is needed by ZHA gateway's radio_concurrency property + # Use a simple object instead of MagicMock to ensure values are ints + # Include both max_value (older) and max_concurrency (newer) for compatibility + class MockSemaphore: + max_value = 8 + max_concurrency = 8 + + mock_app._concurrent_requests_semaphore = MockSemaphore() + + yield mock_app + + +@pytest.fixture(name="zha_config_entry") +async def zha_config_entry_fixture() -> MockConfigEntry: + """Fixture representing a ZHA config entry.""" + return MockConfigEntry( + version=5, + domain=zha_const.DOMAIN, + data={ + zigpy.config.CONF_DEVICE: { + zigpy.config.CONF_DEVICE_PATH: "/dev/ttyUSB0", + zigpy.config.CONF_DEVICE_BAUDRATE: 115200, + zigpy.config.CONF_DEVICE_FLOW_CONTROL: "hardware", + }, + zha_const.CONF_RADIO_TYPE: "ezsp", + }, + options={}, + ) + + +@pytest.fixture +def mock_zigpy_connect( + zigpy_app_controller: ControllerApplication, +) -> Generator[ControllerApplication]: + """Patch the zigpy radio connection with our mock application.""" + with ( + patch( + "bellows.zigbee.application.ControllerApplication.new", + return_value=zigpy_app_controller, + ), + patch( + "bellows.zigbee.application.ControllerApplication", + return_value=zigpy_app_controller, + ), + ): + yield zigpy_app_controller + + +@pytest.fixture +def setup_zha( + hass: HomeAssistant, + zha_config_entry: MockConfigEntry, + mock_zigpy_connect: ControllerApplication, + mock_zha_radio_delays: None, +) -> Callable[..., Awaitable[None]]: + """Set up ZHA component.""" + + async def _setup(config=None) -> None: + zha_config_entry.add_to_hass(hass) + config = config or {} + + # Only set up lock platform to speed up tests + with patch( + "homeassistant.components.zha.PLATFORMS", + (Platform.DEVICE_TRACKER, Platform.LOCK, Platform.SENSOR), + ): + status = await async_setup_component( + hass, + zha_const.DOMAIN, + {zha_const.DOMAIN: {zha_const.CONF_ENABLE_QUIRKS: False, **config}}, + ) + assert status is True + await hass.async_block_till_done() + + return _setup + + +@pytest.fixture +def zigpy_device_mock( + zigpy_app_controller, +) -> Callable[..., zigpy.device.Device]: + """Make a fake device using the specified cluster classes.""" + + def _mock_dev( + endpoints, + ieee="00:0d:6f:00:0a:90:69:e7", + manufacturer="FakeManufacturer", + model="FakeModel", + node_descriptor=b"\x02@\x807\x10\x7fd\x00\x00*d\x00\x00", + nwk=0xB79C, + patch_cluster_flag=True, + ): + """Make a fake device using the specified cluster classes.""" + device = zigpy.device.Device( + zigpy_app_controller, zigpy.types.EUI64.convert(ieee), nwk + ) + device.manufacturer = manufacturer + device.model = model + device.node_desc = zdo_t.NodeDescriptor.deserialize(node_descriptor)[0] + device.last_seen = time.time() + + for epid, ep in endpoints.items(): + endpoint = device.add_endpoint(epid) + endpoint.device_type = ep[SIG_EP_TYPE] + endpoint.profile_id = ep.get(SIG_EP_PROFILE, 0x0104) + endpoint.request = AsyncMock() + + for cluster_id in ep.get(SIG_EP_INPUT, []): + endpoint.add_input_cluster(cluster_id) + + for cluster_id in ep.get(SIG_EP_OUTPUT, []): + endpoint.add_output_cluster(cluster_id) + + device.status = zigpy.device.Status.ENDPOINTS_INIT + + # Allow zigpy to apply quirks + device = zigpy.quirks.get_device(device) + + if patch_cluster_flag: + for endpoint in (ep for epid, ep in device.endpoints.items() if epid): + endpoint.request = AsyncMock(return_value=[0]) + for cluster in itertools.chain( + endpoint.in_clusters.values(), endpoint.out_clusters.values() + ): + patch_zha_cluster(cluster) + + return device + + return _mock_dev + + +@pytest.fixture +def zigpy_lock_device( + zigpy_device_mock: Callable[..., zigpy.device.Device], +) -> zigpy.device.Device: + """Create a mock Zigbee lock device.""" + return zigpy_device_mock( + { + 1: { + SIG_EP_INPUT: [ + closures.DoorLock.cluster_id, + general.Basic.cluster_id, + ], + SIG_EP_OUTPUT: [], + SIG_EP_TYPE: zha_profile.DeviceType.DOOR_LOCK, + SIG_EP_PROFILE: zha_profile.PROFILE_ID, + } + }, + ieee="01:2d:6f:00:0a:90:69:e8", + manufacturer="Yale", + model="YRD256", + ) + + +@pytest.fixture +async def zha_lock_entity( + hass: HomeAssistant, + setup_zha: Callable[..., Awaitable[None]], + zigpy_lock_device: zigpy.device.Device, + zha_config_entry: MockConfigEntry, +) -> er.RegistryEntry: + """Set up ZHA with a lock device and return the lock entity.""" + await setup_zha() + gateway = get_zha_gateway(hass) + + gateway.get_or_create_device(zigpy_lock_device) + await gateway.async_device_initialized(zigpy_lock_device) + await hass.async_block_till_done(wait_background_tasks=True) + + ent_reg = er.async_get(hass) + entries = list( + er.async_entries_for_config_entry(ent_reg, zha_config_entry.entry_id) + ) + lock_entries = [e for e in entries if e.domain == "lock"] + assert len(lock_entries) == 1, f"Expected 1 lock entity, found {len(lock_entries)}" + return lock_entries[0] diff --git a/tests/providers/test_zha.py b/tests/providers/test_zha.py new file mode 100644 index 00000000..ef36b9af --- /dev/null +++ b/tests/providers/test_zha.py @@ -0,0 +1,338 @@ +"""Test the ZHA lock provider.""" + +from __future__ import annotations + +from datetime import timedelta +from unittest.mock import AsyncMock + +import pytest +from pytest_homeassistant_custom_component.common import MockConfigEntry +import zigpy.device +from zigpy.zcl.clusters.closures import DoorLock + +from homeassistant.components.zha.const import DOMAIN as ZHA_DOMAIN +from homeassistant.core import HomeAssistant +from homeassistant.helpers import device_registry as dr, entity_registry as er + +from custom_components.lock_code_manager.const import ( + CONF_LOCKS, + CONF_SLOTS, + DOMAIN, +) +from custom_components.lock_code_manager.providers.zha import ZHALock + + +@pytest.fixture(name="zha_lock") +async def zha_lock_fixture( + hass: HomeAssistant, + zha_lock_entity: er.RegistryEntry, + zha_config_entry: MockConfigEntry, + zigpy_lock_device: zigpy.device.Device, +) -> ZHALock: + """Create a ZHALock instance for testing.""" + dev_reg = dr.async_get(hass) + ent_reg = er.async_get(hass) + + lock = ZHALock( + hass=hass, + dev_reg=dev_reg, + ent_reg=ent_reg, + lock_config_entry=zha_config_entry, + lock=zha_lock_entity, + ) + return lock + + +# ============================================================================= +# Property tests +# ============================================================================= + + +async def test_domain(zha_lock: ZHALock) -> None: + """Test domain property returns zha.""" + assert zha_lock.domain == ZHA_DOMAIN + + +async def test_supports_push(zha_lock: ZHALock) -> None: + """Test that ZHA locks support push updates.""" + assert zha_lock.supports_push is True + + +async def test_connection_check_interval(zha_lock: ZHALock) -> None: + """Test that connection check interval is 30 seconds.""" + assert zha_lock.connection_check_interval == timedelta(seconds=30) + + +async def test_hard_refresh_interval_when_programming_events_supported( + zha_lock: ZHALock, +) -> None: + """Test hard refresh interval is None when programming events are supported.""" + zha_lock._supports_programming_events = True + assert zha_lock.hard_refresh_interval is None + + +async def test_hard_refresh_interval_when_programming_events_not_supported( + zha_lock: ZHALock, +) -> None: + """Test hard refresh interval is 1 hour when programming events not supported.""" + zha_lock._supports_programming_events = False + assert zha_lock.hard_refresh_interval == timedelta(hours=1) + + +# ============================================================================= +# Connection tests +# ============================================================================= + + +async def test_is_connection_up_when_available( + hass: HomeAssistant, + zha_lock: ZHALock, +) -> None: + """Test connection is up when device is available.""" + # The mock device should be available by default + assert await zha_lock.async_is_connection_up() is True + + +# ============================================================================= +# Cluster access tests +# ============================================================================= + + +async def test_get_door_lock_cluster( + hass: HomeAssistant, + zha_lock: ZHALock, + zigpy_lock_device: zigpy.device.Device, +) -> None: + """Test getting the Door Lock cluster.""" + cluster = zha_lock._get_door_lock_cluster() + assert cluster is not None + assert cluster.cluster_id == DoorLock.cluster_id + + +async def test_get_door_lock_cluster_caches_result( + hass: HomeAssistant, + zha_lock: ZHALock, +) -> None: + """Test that cluster is cached after first access.""" + cluster1 = zha_lock._get_door_lock_cluster() + cluster2 = zha_lock._get_door_lock_cluster() + assert cluster1 is cluster2 + + +# ============================================================================= +# Usercode tests +# ============================================================================= + + +async def test_get_usercodes( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, + zigpy_lock_device: zigpy.device.Device, +) -> None: + """Test reading usercodes from the lock.""" + # Create LCM config entry with slots + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_LOCKS: [zha_lock.lock.entity_id], + CONF_SLOTS: {"1": {}, "2": {}}, + }, + ) + lcm_entry.add_to_hass(hass) + + # Mock the cluster's get_pin_code method + cluster = zha_lock._get_door_lock_cluster() + + # Mock get_pin_code responses + async def mock_get_pin_code(slot_num): + if slot_num == 1: + # Return enabled slot with code + return type( + "Response", + (), + {"user_status": DoorLock.UserStatus.Enabled, "code": "1234"}, + )() + # Return disabled slot + return type( + "Response", + (), + {"user_status": DoorLock.UserStatus.Available, "code": ""}, + )() + + cluster.get_pin_code = AsyncMock(side_effect=mock_get_pin_code) + + await zha_lock.async_setup(lcm_entry) + + codes = await zha_lock.async_get_usercodes() + + assert codes[1] == "1234" + assert codes[2] == "" + + await zha_lock.async_unload(False) + + +async def test_set_usercode_calls_cluster( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test that set_usercode calls the cluster's set_pin_code.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + cluster = zha_lock._get_door_lock_cluster() + cluster.set_pin_code = AsyncMock(return_value=type("Response", (), {"status": 0})()) + + result = await zha_lock.async_set_usercode(3, "5678", "Test User") + + assert result is True + cluster.set_pin_code.assert_called_once_with( + 3, + DoorLock.UserStatus.Enabled, + DoorLock.UserType.Unrestricted, + "5678", + ) + + await zha_lock.async_unload(False) + + +async def test_clear_usercode_calls_cluster( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test that clear_usercode calls the cluster's clear_pin_code.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + cluster = zha_lock._get_door_lock_cluster() + cluster.clear_pin_code = AsyncMock( + return_value=type("Response", (), {"status": 0})() + ) + + result = await zha_lock.async_clear_usercode(3) + + assert result is True + cluster.clear_pin_code.assert_called_once_with(3) + + await zha_lock.async_unload(False) + + +# ============================================================================= +# Push update tests +# ============================================================================= + + +async def test_subscribe_push_updates( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test subscribing to push updates.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + # Subscribe to push updates + zha_lock.subscribe_push_updates() + + assert zha_lock._cluster_listener_unsub is not None + + # Unsubscribe + zha_lock.unsubscribe_push_updates() + assert zha_lock._cluster_listener_unsub is None + + await zha_lock.async_unload(False) + + +async def test_subscribe_is_idempotent( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test that calling subscribe multiple times is safe.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + zha_lock.subscribe_push_updates() + first_unsub = zha_lock._cluster_listener_unsub + + zha_lock.subscribe_push_updates() + assert zha_lock._cluster_listener_unsub is first_unsub + + zha_lock.unsubscribe_push_updates() + await zha_lock.async_unload(False) + + +# ============================================================================= +# Programming event support detection +# ============================================================================= + + +async def test_check_programming_event_support_with_mask( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test detecting programming event support via mask attributes.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + cluster = zha_lock._get_door_lock_cluster() + + # Mock get() to return a non-zero mask value + def mock_get(attr_name): + if attr_name == "keypad_programming_event_mask": + return 0x0001 + return None + + cluster.get = mock_get + + supports = await zha_lock._async_check_programming_event_support() + assert supports is True + + await zha_lock.async_unload(False) + + +async def test_check_programming_event_support_without_mask( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test detecting programming event not supported when no mask attributes.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + cluster = zha_lock._get_door_lock_cluster() + + # Mock get() to return None/0 for all mask attributes + cluster.get = lambda attr_name: None + + supports = await zha_lock._async_check_programming_event_support() + assert supports is False + + await zha_lock.async_unload(False)