From 3c2b85aa0d8fbfd7bd314081dd0d0197ccb3bef3 Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Sun, 4 Jan 2026 23:17:58 -0500 Subject: [PATCH 01/12] Add ZHA (Zigbee Home Automation) lock provider MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for ZHA locks by accessing the zigpy DoorLock cluster (0x0101) directly. This enables user code management for Zigbee locks via: - get_pin_code command (0x06) for reading codes - set_pin_code command (0x05) for setting codes - clear_pin_code command (0x07) for clearing codes Uses polling mode (5 min interval) since ZHA doesn't support push updates for user codes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../lock_code_manager/providers/__init__.py | 2 + .../lock_code_manager/providers/zha.py | 315 ++++++++++++++++++ 2 files changed, 317 insertions(+) create mode 100644 custom_components/lock_code_manager/providers/zha.py diff --git a/custom_components/lock_code_manager/providers/__init__.py b/custom_components/lock_code_manager/providers/__init__.py index 6973cd41..93b2d4df 100644 --- a/custom_components/lock_code_manager/providers/__init__.py +++ b/custom_components/lock_code_manager/providers/__init__.py @@ -8,9 +8,11 @@ from ._base import BaseLock from .virtual import VirtualLock +from .zha import ZHALock from .zwave_js import ZWaveJSLock INTEGRATIONS_CLASS_MAP: dict[str, type[BaseLock]] = { "virtual": VirtualLock, + "zha": ZHALock, "zwave_js": ZWaveJSLock, } diff --git a/custom_components/lock_code_manager/providers/zha.py b/custom_components/lock_code_manager/providers/zha.py new file mode 100644 index 00000000..a7c81c30 --- /dev/null +++ b/custom_components/lock_code_manager/providers/zha.py @@ -0,0 +1,315 @@ +"""Module for ZHA (Zigbee Home Automation) locks.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import timedelta +import logging +from typing import TYPE_CHECKING + +from homeassistant.components.zha.const import ( + DOMAIN as ZHA_DOMAIN, +) +from homeassistant.config_entries import ConfigEntry + +from ..const import CONF_LOCKS, CONF_SLOTS, DOMAIN +from ..data import get_entry_data +from ..exceptions import LockDisconnected +from ._base import BaseLock + +if TYPE_CHECKING: + from zigpy.zcl.clusters.closures import DoorLock + +_LOGGER = logging.getLogger(__name__) + +# Door Lock cluster ID +CLUSTER_ID_DOOR_LOCK = 0x0101 + +# User status values per ZCL spec +USER_STATUS_AVAILABLE = 0x00 +USER_STATUS_ENABLED = 0x01 +USER_STATUS_DISABLED = 0x03 + +# User type values per ZCL spec +USER_TYPE_UNRESTRICTED = 0x00 + + +def _get_zha_gateway(hass): + """Get the ZHA gateway proxy.""" + if ZHA_DOMAIN not in hass.data: + return None + # ZHA stores gateway in runtime_data on the config entry + for entry in hass.config_entries.async_entries(ZHA_DOMAIN): + if hasattr(entry, "runtime_data") and entry.runtime_data: + return getattr(entry.runtime_data, "gateway_proxy", None) + return None + + +@dataclass(repr=False, eq=False) +class ZHALock(BaseLock): + """Class to represent ZHA lock.""" + + lock_config_entry: ConfigEntry = field(repr=False) + _door_lock_cluster: DoorLock | None = field(init=False, default=None) + _endpoint_id: int | None = field(init=False, default=None) + + @property + def domain(self) -> str: + """Return integration domain.""" + return ZHA_DOMAIN + + @property + def usercode_scan_interval(self) -> timedelta: + """Return scan interval for usercodes. + + ZHA locks don't support push updates for user codes, so we poll. + Use a longer interval to reduce Zigbee traffic. + """ + return timedelta(minutes=5) + + @property + def hard_refresh_interval(self) -> timedelta | None: + """Return interval for hard refresh. + + Since we can't subscribe to user code changes, periodic refresh + helps detect out-of-band changes. + """ + return timedelta(hours=1) + + @property + def connection_check_interval(self) -> timedelta | None: + """Return interval for connection checks.""" + return timedelta(seconds=30) + + def _get_door_lock_cluster(self) -> DoorLock | None: + """Get the Door Lock cluster for this device.""" + if self._door_lock_cluster is not None: + return self._door_lock_cluster + + gateway = _get_zha_gateway(self.hass) + if not gateway: + _LOGGER.debug("ZHA gateway not available") + return None + + # Get device from entity + entity_ref = gateway.get_entity_reference(self.lock.entity_id) + if not entity_ref: + _LOGGER.debug("Could not find entity reference for %s", self.lock.entity_id) + return None + + device_proxy = entity_ref.entity_data.device_proxy + if not device_proxy: + _LOGGER.debug("Could not find device proxy for %s", self.lock.entity_id) + return None + + # Get the underlying zigpy device + zha_device = device_proxy.device + if not zha_device: + _LOGGER.debug("Could not find ZHA device for %s", self.lock.entity_id) + return None + + # Find the Door Lock cluster + for endpoint_id, endpoint in zha_device.endpoints.items(): + if endpoint_id == 0: # Skip ZDO endpoint + continue + for cluster in endpoint.in_clusters.values(): + if cluster.cluster_id == CLUSTER_ID_DOOR_LOCK: + self._door_lock_cluster = cluster + self._endpoint_id = endpoint_id + _LOGGER.debug( + "Found Door Lock cluster on endpoint %s for %s", + endpoint_id, + self.lock.entity_id, + ) + return cluster + + _LOGGER.warning("Could not find Door Lock cluster for %s", self.lock.entity_id) + return None + + async def async_is_connection_up(self) -> bool: + """Return whether connection to lock is up.""" + gateway = _get_zha_gateway(self.hass) + if not gateway: + return False + + entity_ref = gateway.get_entity_reference(self.lock.entity_id) + if not entity_ref: + return False + + device_proxy = entity_ref.entity_data.device_proxy + if not device_proxy: + return False + + # Check if device is available + return device_proxy.device.available + + async def async_get_usercodes(self) -> dict[int, int | str]: + """Get dictionary of code slots and usercodes.""" + cluster = self._get_door_lock_cluster() + if not cluster: + raise LockDisconnected("Door Lock cluster not available") + + if not await self.async_is_connection_up(): + raise LockDisconnected("Lock not connected") + + # Get configured code slots for this lock + code_slots = { + int(code_slot) + for entry in self.hass.config_entries.async_entries(DOMAIN) + for code_slot in get_entry_data(entry, CONF_SLOTS, {}) + if self.lock.entity_id in get_entry_data(entry, CONF_LOCKS, []) + } + + data: dict[int, int | str] = {} + + for slot_num in code_slots: + try: + # Call get_pin_code cluster command (0x06) + result = await cluster.get_pin_code(slot_num) + _LOGGER.debug( + "Lock %s slot %s get_pin_code result: %s", + self.lock.entity_id, + slot_num, + result, + ) + + # Parse result - format depends on zigpy version + # Typically returns a foundation.Status and response fields + if hasattr(result, "user_status"): + user_status = result.user_status + pin_code = getattr(result, "code", "") or "" + elif isinstance(result, (list, tuple)) and len(result) >= 4: + # Result format: [user_id, user_status, user_type, code] + user_status = result[1] + pin_code = result[3] if len(result) > 3 else "" + else: + _LOGGER.warning( + "Unexpected get_pin_code response format for %s slot %s: %s", + self.lock.entity_id, + slot_num, + result, + ) + data[slot_num] = "" + continue + + # Check if slot is in use + if user_status == USER_STATUS_ENABLED: + # Convert bytes to string if needed + if isinstance(pin_code, bytes): + pin_code = pin_code.decode("utf-8", errors="ignore") + data[slot_num] = str(pin_code) if pin_code else "" + else: + data[slot_num] = "" + + except Exception as err: + _LOGGER.debug( + "Failed to get PIN for %s slot %s: %s", + self.lock.entity_id, + slot_num, + err, + ) + # Fall back to assuming empty if we can't read + data[slot_num] = "" + + return data + + async def async_set_usercode( + self, code_slot: int, usercode: int | str, name: str | None = None + ) -> bool: + """Set a usercode on a code slot.""" + cluster = self._get_door_lock_cluster() + if not cluster: + raise LockDisconnected("Door Lock cluster not available") + + if not await self.async_is_connection_up(): + raise LockDisconnected("Lock not connected") + + try: + # Call set_pin_code cluster command (0x05) + # Parameters: user_id, user_status, user_type, pin_code + result = await cluster.set_pin_code( + code_slot, + USER_STATUS_ENABLED, + USER_TYPE_UNRESTRICTED, + str(usercode), + ) + _LOGGER.debug( + "Lock %s slot %s set_pin_code result: %s", + self.lock.entity_id, + code_slot, + result, + ) + + # Check result status + if hasattr(result, "status"): + if result.status != 0: + _LOGGER.warning( + "set_pin_code failed for %s slot %s: status %s", + self.lock.entity_id, + code_slot, + result.status, + ) + raise LockDisconnected( + f"set_pin_code failed: status {result.status}" + ) + + return True + + except Exception as err: + _LOGGER.error( + "Failed to set PIN for %s slot %s: %s", + self.lock.entity_id, + code_slot, + err, + ) + raise LockDisconnected(f"Failed to set PIN: {err}") from err + + async def async_clear_usercode(self, code_slot: int) -> bool: + """Clear a usercode on a code slot.""" + cluster = self._get_door_lock_cluster() + if not cluster: + raise LockDisconnected("Door Lock cluster not available") + + if not await self.async_is_connection_up(): + raise LockDisconnected("Lock not connected") + + try: + # Call clear_pin_code cluster command (0x07) + result = await cluster.clear_pin_code(code_slot) + _LOGGER.debug( + "Lock %s slot %s clear_pin_code result: %s", + self.lock.entity_id, + code_slot, + result, + ) + + # Check result status + if hasattr(result, "status"): + if result.status != 0: + _LOGGER.warning( + "clear_pin_code failed for %s slot %s: status %s", + self.lock.entity_id, + code_slot, + result.status, + ) + raise LockDisconnected( + f"clear_pin_code failed: status {result.status}" + ) + + return True + + except Exception as err: + _LOGGER.error( + "Failed to clear PIN for %s slot %s: %s", + self.lock.entity_id, + code_slot, + err, + ) + raise LockDisconnected(f"Failed to clear PIN: {err}") from err + + async def async_hard_refresh_codes(self) -> dict[int, int | str]: + """Perform hard refresh and return all codes. + + For ZHA, we just re-read all codes from the lock. + """ + return await self.async_get_usercodes() From f4d9d362b1ca99ea4b71e261cf4a04c09d52c014 Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Sun, 4 Jan 2026 23:33:54 -0500 Subject: [PATCH 02/12] Add zha to after_dependencies in manifest MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- custom_components/lock_code_manager/manifest.json | 1 + 1 file changed, 1 insertion(+) diff --git a/custom_components/lock_code_manager/manifest.json b/custom_components/lock_code_manager/manifest.json index e51e14ff..1fad6e93 100644 --- a/custom_components/lock_code_manager/manifest.json +++ b/custom_components/lock_code_manager/manifest.json @@ -6,6 +6,7 @@ "frontend", "lovelace", "virtual", + "zha", "zwave_js" ], "codeowners": [ From 7b7026c3254c8000c01995e183e4a05450b2f8ad Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Mon, 5 Jan 2026 00:35:04 -0500 Subject: [PATCH 03/12] Add push update support via zigpy cluster listeners MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Subscribe to Door Lock cluster events directly via zigpy to receive: - programming_event_notification (0x21): PIN code added/deleted/changed → triggers coordinator refresh to sync new code state - operation_event_notification (0x20): lock/unlock with user ID → fires code slot event for automations This eliminates the need for 5-minute polling on locks that support these notifications, significantly reducing Zigbee traffic and improving battery life. Also adds zigpy to test dependencies. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../lock_code_manager/providers/zha.py | 212 +++++++++++++++++- requirements_test.txt | 1 + 2 files changed, 203 insertions(+), 10 deletions(-) diff --git a/custom_components/lock_code_manager/providers/zha.py b/custom_components/lock_code_manager/providers/zha.py index a7c81c30..44b2fb7b 100644 --- a/custom_components/lock_code_manager/providers/zha.py +++ b/custom_components/lock_code_manager/providers/zha.py @@ -2,15 +2,17 @@ from __future__ import annotations +from collections.abc import Callable from dataclasses import dataclass, field from datetime import timedelta import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from homeassistant.components.zha.const import ( DOMAIN as ZHA_DOMAIN, ) from homeassistant.config_entries import ConfigEntry +from homeassistant.core import callback from ..const import CONF_LOCKS, CONF_SLOTS, DOMAIN from ..data import get_entry_data @@ -25,6 +27,10 @@ # Door Lock cluster ID CLUSTER_ID_DOOR_LOCK = 0x0101 +# Cluster command IDs +CMD_OPERATION_EVENT_NOTIFICATION = 0x20 +CMD_PROGRAMMING_EVENT_NOTIFICATION = 0x21 + # User status values per ZCL spec USER_STATUS_AVAILABLE = 0x00 USER_STATUS_ENABLED = 0x01 @@ -33,6 +39,39 @@ # User type values per ZCL spec USER_TYPE_UNRESTRICTED = 0x00 +# Operation event source values +OPERATION_SOURCE_KEYPAD = 0x00 +OPERATION_SOURCE_RF = 0x01 +OPERATION_SOURCE_MANUAL = 0x02 +OPERATION_SOURCE_RFID = 0x03 + +# Operation event codes (subset relevant for lock/unlock) +OPERATION_LOCK = 0x01 +OPERATION_UNLOCK = 0x02 +OPERATION_LOCK_FAILURE_INVALID_PIN = 0x03 +OPERATION_UNLOCK_FAILURE_INVALID_PIN = 0x05 +OPERATION_KEY_LOCK = 0x08 +OPERATION_KEY_UNLOCK = 0x09 +OPERATION_AUTO_LOCK = 0x0A +OPERATION_MANUAL_LOCK = 0x0D +OPERATION_MANUAL_UNLOCK = 0x0E + +# Map operation events to locked state +OPERATION_TO_LOCKED: dict[int, bool] = { + OPERATION_LOCK: True, + OPERATION_KEY_LOCK: True, + OPERATION_AUTO_LOCK: True, + OPERATION_MANUAL_LOCK: True, + OPERATION_UNLOCK: False, + OPERATION_KEY_UNLOCK: False, + OPERATION_MANUAL_UNLOCK: False, +} + +# Programming event codes +PROGRAMMING_PIN_ADDED = 0x02 +PROGRAMMING_PIN_DELETED = 0x03 +PROGRAMMING_PIN_CHANGED = 0x04 + def _get_zha_gateway(hass): """Get the ZHA gateway proxy.""" @@ -47,11 +86,17 @@ def _get_zha_gateway(hass): @dataclass(repr=False, eq=False) class ZHALock(BaseLock): - """Class to represent ZHA lock.""" + """Class to represent ZHA lock. + + Supports push updates via zigpy cluster listeners for: + - programming_event_notification (0x21): PIN code added/deleted/changed + - operation_event_notification (0x20): lock/unlock operations with user ID + """ lock_config_entry: ConfigEntry = field(repr=False) _door_lock_cluster: DoorLock | None = field(init=False, default=None) _endpoint_id: int | None = field(init=False, default=None) + _cluster_listener_unsub: Callable[[], None] | None = field(init=False, default=None) @property def domain(self) -> str: @@ -59,22 +104,23 @@ def domain(self) -> str: return ZHA_DOMAIN @property - def usercode_scan_interval(self) -> timedelta: - """Return scan interval for usercodes. + def supports_push(self) -> bool: + """Return whether this lock supports push-based updates. - ZHA locks don't support push updates for user codes, so we poll. - Use a longer interval to reduce Zigbee traffic. + ZHA locks support push updates via zigpy cluster listeners for + programming_event_notification and operation_event_notification. """ - return timedelta(minutes=5) + return True @property def hard_refresh_interval(self) -> timedelta | None: """Return interval for hard refresh. - Since we can't subscribe to user code changes, periodic refresh - helps detect out-of-band changes. + Disabled since we receive programming_event_notification when codes + change. Locks that don't support this notification will just have + slightly stale data until manually refreshed. """ - return timedelta(hours=1) + return None @property def connection_check_interval(self) -> timedelta | None: @@ -313,3 +359,149 @@ async def async_hard_refresh_codes(self) -> dict[int, int | str]: For ZHA, we just re-read all codes from the lock. """ return await self.async_get_usercodes() + + # Push update support via zigpy cluster listeners + + @callback + def subscribe_push_updates(self) -> None: + """Subscribe to push-based value updates via zigpy cluster listener.""" + if self._cluster_listener_unsub is not None: + return # Already subscribed + + cluster = self._get_door_lock_cluster() + if not cluster: + _LOGGER.debug( + "Lock %s: cannot subscribe to push updates - cluster not available", + self.lock.entity_id, + ) + return + + # Register as a listener on the cluster + cluster.add_listener(self) + self._cluster_listener_unsub = lambda: cluster.remove_listener(self) + + _LOGGER.debug( + "Lock %s: subscribed to Door Lock cluster events", + self.lock.entity_id, + ) + + @callback + def unsubscribe_push_updates(self) -> None: + """Unsubscribe from push-based value updates.""" + if self._cluster_listener_unsub is not None: + self._cluster_listener_unsub() + self._cluster_listener_unsub = None + _LOGGER.debug( + "Lock %s: unsubscribed from Door Lock cluster events", + self.lock.entity_id, + ) + + def cluster_command( + self, + tsn: int, + command_id: int, + args: Any, + ) -> None: + """Handle incoming cluster commands from the lock. + + Called by zigpy when the lock sends a cluster command (client -> server). + """ + if command_id == CMD_PROGRAMMING_EVENT_NOTIFICATION: + self._handle_programming_event(args) + elif command_id == CMD_OPERATION_EVENT_NOTIFICATION: + self._handle_operation_event(args) + + def _handle_programming_event(self, args: Any) -> None: + """Handle programming event notification (PIN added/deleted/changed). + + This triggers a coordinator refresh to pick up the new code state. + """ + # Extract event details + try: + event_code = ( + args.program_event_code + if hasattr(args, "program_event_code") + else args[1] + ) + user_id = args.user_id if hasattr(args, "user_id") else args[2] + except (AttributeError, IndexError, TypeError): + _LOGGER.debug( + "Lock %s: could not parse programming event args: %s", + self.lock.entity_id, + args, + ) + return + + _LOGGER.debug( + "Lock %s: programming event - code=%s, user_id=%s", + self.lock.entity_id, + event_code, + user_id, + ) + + # Trigger coordinator refresh to pick up the change + if self.coordinator: + self.hass.async_create_task( + self.coordinator.async_request_refresh(), + f"Refresh {self.lock.entity_id} after programming event", + ) + + def _handle_operation_event(self, args: Any) -> None: + """Handle operation event notification (lock/unlock with user ID). + + This fires a code slot event so automations can react to lock usage. + """ + # Extract event details + try: + source = ( + args.operation_event_source + if hasattr(args, "operation_event_source") + else args[0] + ) + event_code = ( + args.operation_event_code + if hasattr(args, "operation_event_code") + else args[1] + ) + user_id = args.user_id if hasattr(args, "user_id") else args[2] + except (AttributeError, IndexError, TypeError): + _LOGGER.debug( + "Lock %s: could not parse operation event args: %s", + self.lock.entity_id, + args, + ) + return + + _LOGGER.debug( + "Lock %s: operation event - source=%s, code=%s, user_id=%s", + self.lock.entity_id, + source, + event_code, + user_id, + ) + + # Determine if this is a lock or unlock event + to_locked = OPERATION_TO_LOCKED.get(event_code) + + # Build action text from source and event + source_names = { + OPERATION_SOURCE_KEYPAD: "Keypad", + OPERATION_SOURCE_RF: "RF", + OPERATION_SOURCE_MANUAL: "Manual", + OPERATION_SOURCE_RFID: "RFID", + } + source_name = source_names.get(source, f"Source {source}") + action = "lock" if to_locked else "unlock" if to_locked is False else "event" + action_text = f"{source_name} {action} operation" + + # Fire code slot event (user_id 0 typically means no code was used) + self.async_fire_code_slot_event( + code_slot=user_id if user_id > 0 else None, + to_locked=to_locked, + action_text=action_text, + source_data={ + "source": source, + "event_code": event_code, + "user_id": user_id, + }, + ) diff --git a/requirements_test.txt b/requirements_test.txt index 809de08c..52308ed6 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -1,3 +1,4 @@ pylint-strict-informational>=0.1 pytest>=8.0.2 pytest-homeassistant-custom-component==0.13.306 +zigpy>=0.60.0 From 27412fe081d60edfb1cbc5635720cdc62f5cd944 Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Mon, 5 Jan 2026 00:38:34 -0500 Subject: [PATCH 04/12] Detect programming event support and fallback to drift detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Check programming event mask attributes to determine if the lock supports programming_event_notification. If not supported, enable 1-hour drift detection interval as fallback. This ensures locks that don't support programming events still get periodic code sync, while locks that do support them avoid unnecessary polling. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../lock_code_manager/providers/zha.py | 93 ++++++++++++++++++- 1 file changed, 88 insertions(+), 5 deletions(-) diff --git a/custom_components/lock_code_manager/providers/zha.py b/custom_components/lock_code_manager/providers/zha.py index 44b2fb7b..38a6e883 100644 --- a/custom_components/lock_code_manager/providers/zha.py +++ b/custom_components/lock_code_manager/providers/zha.py @@ -84,6 +84,12 @@ def _get_zha_gateway(hass): return None +# Programming event mask attribute IDs +ATTR_KEYPAD_PROGRAMMING_EVENT_MASK = 0x0045 +ATTR_RF_PROGRAMMING_EVENT_MASK = 0x0046 +ATTR_RFID_PROGRAMMING_EVENT_MASK = 0x0047 + + @dataclass(repr=False, eq=False) class ZHALock(BaseLock): """Class to represent ZHA lock. @@ -91,12 +97,16 @@ class ZHALock(BaseLock): Supports push updates via zigpy cluster listeners for: - programming_event_notification (0x21): PIN code added/deleted/changed - operation_event_notification (0x20): lock/unlock operations with user ID + + If the lock doesn't support programming event notifications (detected via + event mask attributes), falls back to drift detection polling. """ lock_config_entry: ConfigEntry = field(repr=False) _door_lock_cluster: DoorLock | None = field(init=False, default=None) _endpoint_id: int | None = field(init=False, default=None) _cluster_listener_unsub: Callable[[], None] | None = field(init=False, default=None) + _supports_programming_events: bool | None = field(init=False, default=None) @property def domain(self) -> str: @@ -107,8 +117,9 @@ def domain(self) -> str: def supports_push(self) -> bool: """Return whether this lock supports push-based updates. - ZHA locks support push updates via zigpy cluster listeners for - programming_event_notification and operation_event_notification. + Always True - we subscribe to cluster events for operation notifications + (lock/unlock with user ID). Programming event support is checked separately + to determine if we need drift detection fallback. """ return True @@ -116,10 +127,11 @@ def supports_push(self) -> bool: def hard_refresh_interval(self) -> timedelta | None: """Return interval for hard refresh. - Disabled since we receive programming_event_notification when codes - change. Locks that don't support this notification will just have - slightly stale data until manually refreshed. + Returns 1 hour if the lock doesn't support programming event notifications + (detected during setup), otherwise None to disable drift detection. """ + if self._supports_programming_events is False: + return timedelta(hours=1) return None @property @@ -362,6 +374,57 @@ async def async_hard_refresh_codes(self) -> dict[int, int | str]: # Push update support via zigpy cluster listeners + async def _async_check_programming_event_support(self) -> bool: + """Check if the lock supports programming event notifications. + + Reads the programming event mask attributes to determine if the lock + will send programming_event_notification when codes change. + """ + cluster = self._get_door_lock_cluster() + if not cluster: + return False + + # Check if any programming event mask attribute has a non-zero value + mask_attrs = [ + ATTR_KEYPAD_PROGRAMMING_EVENT_MASK, + ATTR_RF_PROGRAMMING_EVENT_MASK, + ATTR_RFID_PROGRAMMING_EVENT_MASK, + ] + + for attr_id in mask_attrs: + try: + # Try to read the attribute from cache first + attr_name = { + ATTR_KEYPAD_PROGRAMMING_EVENT_MASK: "keypad_programming_event_mask", + ATTR_RF_PROGRAMMING_EVENT_MASK: "rf_programming_event_mask", + ATTR_RFID_PROGRAMMING_EVENT_MASK: "rfid_programming_event_mask", + }.get(attr_id) + + if attr_name and hasattr(cluster, "get"): + value = cluster.get(attr_name) + if value is not None and value != 0: + _LOGGER.debug( + "Lock %s: supports programming events (%s=0x%04x)", + self.lock.entity_id, + attr_name, + value, + ) + return True + except Exception as err: + _LOGGER.debug( + "Lock %s: could not read %s: %s", + self.lock.entity_id, + attr_name, + err, + ) + + _LOGGER.debug( + "Lock %s: no programming event mask attributes found, " + "will use drift detection fallback", + self.lock.entity_id, + ) + return False + @callback def subscribe_push_updates(self) -> None: """Subscribe to push-based value updates via zigpy cluster listener.""" @@ -376,6 +439,14 @@ def subscribe_push_updates(self) -> None: ) return + # Check programming event support if not already done + if self._supports_programming_events is None: + # Schedule async check - for now assume supported, will update on next refresh + self.hass.async_create_task( + self._async_detect_programming_support(), + f"Detect programming event support for {self.lock.entity_id}", + ) + # Register as a listener on the cluster cluster.add_listener(self) self._cluster_listener_unsub = lambda: cluster.remove_listener(self) @@ -385,6 +456,18 @@ def subscribe_push_updates(self) -> None: self.lock.entity_id, ) + async def _async_detect_programming_support(self) -> None: + """Detect programming event support and log result.""" + self._supports_programming_events = ( + await self._async_check_programming_event_support() + ) + if not self._supports_programming_events: + _LOGGER.info( + "Lock %s: programming event notifications not supported, " + "enabling drift detection (1 hour interval)", + self.lock.entity_id, + ) + @callback def unsubscribe_push_updates(self) -> None: """Unsubscribe from push-based value updates.""" From cdafd2dbdcfbec3aece9bf3517478d7197df1c96 Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Mon, 5 Jan 2026 00:53:45 -0500 Subject: [PATCH 05/12] Refactor ZHA provider into package structure using zigpy types MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Convert providers/zha.py to providers/zha/ package - Use zigpy types directly (DoorLock.UserStatus, DoorLock.OperationEvent, etc.) - Keep only custom mappings in const.py (OPERATION_TO_LOCKED, OPERATION_SOURCE_NAMES) - Extract get_zha_gateway helper to helpers.py 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../providers/zha/__init__.py | 7 + .../lock_code_manager/providers/zha/const.py | 31 ++++ .../providers/zha/helpers.py | 21 +++ .../providers/{zha.py => zha/provider.py} | 146 +++++------------- 4 files changed, 94 insertions(+), 111 deletions(-) create mode 100644 custom_components/lock_code_manager/providers/zha/__init__.py create mode 100644 custom_components/lock_code_manager/providers/zha/const.py create mode 100644 custom_components/lock_code_manager/providers/zha/helpers.py rename custom_components/lock_code_manager/providers/{zha.py => zha/provider.py} (82%) diff --git a/custom_components/lock_code_manager/providers/zha/__init__.py b/custom_components/lock_code_manager/providers/zha/__init__.py new file mode 100644 index 00000000..6beb9f9c --- /dev/null +++ b/custom_components/lock_code_manager/providers/zha/__init__.py @@ -0,0 +1,7 @@ +"""ZHA (Zigbee Home Automation) lock provider package.""" + +from __future__ import annotations + +from .provider import ZHALock + +__all__ = ["ZHALock"] diff --git a/custom_components/lock_code_manager/providers/zha/const.py b/custom_components/lock_code_manager/providers/zha/const.py new file mode 100644 index 00000000..fec0f615 --- /dev/null +++ b/custom_components/lock_code_manager/providers/zha/const.py @@ -0,0 +1,31 @@ +"""Constants for ZHA (Zigbee Home Automation) lock provider. + +Custom mappings specific to this provider. ZCL types come directly from zigpy. +""" + +from __future__ import annotations + +from zigpy.zcl.clusters.closures import DoorLock + +# Map operation events to locked state (True = locked, False = unlocked) +OPERATION_TO_LOCKED: dict[DoorLock.OperationEvent, bool] = { + DoorLock.OperationEvent.Lock: True, + DoorLock.OperationEvent.KeyLock: True, + DoorLock.OperationEvent.AutoLock: True, + DoorLock.OperationEvent.Manual_Lock: True, + DoorLock.OperationEvent.ScheduleLock: True, + DoorLock.OperationEvent.OnTouchLock: True, + DoorLock.OperationEvent.Unlock: False, + DoorLock.OperationEvent.KeyUnlock: False, + DoorLock.OperationEvent.Manual_Unlock: False, + DoorLock.OperationEvent.ScheduleUnlock: False, +} + +# Map operation source to human-readable name +OPERATION_SOURCE_NAMES: dict[DoorLock.OperationEventSource, str] = { + DoorLock.OperationEventSource.Keypad: "Keypad", + DoorLock.OperationEventSource.RF: "RF", + DoorLock.OperationEventSource.Manual: "Manual", + DoorLock.OperationEventSource.RFID: "RFID", + DoorLock.OperationEventSource.Indeterminate: "Unknown", +} diff --git a/custom_components/lock_code_manager/providers/zha/helpers.py b/custom_components/lock_code_manager/providers/zha/helpers.py new file mode 100644 index 00000000..61c331c9 --- /dev/null +++ b/custom_components/lock_code_manager/providers/zha/helpers.py @@ -0,0 +1,21 @@ +"""Helper functions for ZHA lock provider.""" + +from __future__ import annotations + +from homeassistant.components.zha.const import DOMAIN as ZHA_DOMAIN +from homeassistant.core import HomeAssistant + + +def get_zha_gateway(hass: HomeAssistant): + """Get the ZHA gateway proxy. + + Returns the gateway proxy from the ZHA integration's runtime data, + or None if ZHA is not loaded or has no gateway. + """ + if ZHA_DOMAIN not in hass.data: + return None + # ZHA stores gateway in runtime_data on the config entry + for entry in hass.config_entries.async_entries(ZHA_DOMAIN): + if hasattr(entry, "runtime_data") and entry.runtime_data: + return getattr(entry.runtime_data, "gateway_proxy", None) + return None diff --git a/custom_components/lock_code_manager/providers/zha.py b/custom_components/lock_code_manager/providers/zha/provider.py similarity index 82% rename from custom_components/lock_code_manager/providers/zha.py rename to custom_components/lock_code_manager/providers/zha/provider.py index 38a6e883..6c10cb5a 100644 --- a/custom_components/lock_code_manager/providers/zha.py +++ b/custom_components/lock_code_manager/providers/zha/provider.py @@ -1,4 +1,4 @@ -"""Module for ZHA (Zigbee Home Automation) locks.""" +"""ZHA lock provider implementation.""" from __future__ import annotations @@ -6,89 +6,23 @@ from dataclasses import dataclass, field from datetime import timedelta import logging -from typing import TYPE_CHECKING, Any +from typing import Any -from homeassistant.components.zha.const import ( - DOMAIN as ZHA_DOMAIN, -) +from zigpy.zcl.clusters.closures import DoorLock + +from homeassistant.components.zha.const import DOMAIN as ZHA_DOMAIN from homeassistant.config_entries import ConfigEntry from homeassistant.core import callback -from ..const import CONF_LOCKS, CONF_SLOTS, DOMAIN -from ..data import get_entry_data -from ..exceptions import LockDisconnected -from ._base import BaseLock - -if TYPE_CHECKING: - from zigpy.zcl.clusters.closures import DoorLock +from ...const import CONF_LOCKS, CONF_SLOTS, DOMAIN +from ...data import get_entry_data +from ...exceptions import LockDisconnected +from .._base import BaseLock +from .const import OPERATION_SOURCE_NAMES, OPERATION_TO_LOCKED +from .helpers import get_zha_gateway _LOGGER = logging.getLogger(__name__) -# Door Lock cluster ID -CLUSTER_ID_DOOR_LOCK = 0x0101 - -# Cluster command IDs -CMD_OPERATION_EVENT_NOTIFICATION = 0x20 -CMD_PROGRAMMING_EVENT_NOTIFICATION = 0x21 - -# User status values per ZCL spec -USER_STATUS_AVAILABLE = 0x00 -USER_STATUS_ENABLED = 0x01 -USER_STATUS_DISABLED = 0x03 - -# User type values per ZCL spec -USER_TYPE_UNRESTRICTED = 0x00 - -# Operation event source values -OPERATION_SOURCE_KEYPAD = 0x00 -OPERATION_SOURCE_RF = 0x01 -OPERATION_SOURCE_MANUAL = 0x02 -OPERATION_SOURCE_RFID = 0x03 - -# Operation event codes (subset relevant for lock/unlock) -OPERATION_LOCK = 0x01 -OPERATION_UNLOCK = 0x02 -OPERATION_LOCK_FAILURE_INVALID_PIN = 0x03 -OPERATION_UNLOCK_FAILURE_INVALID_PIN = 0x05 -OPERATION_KEY_LOCK = 0x08 -OPERATION_KEY_UNLOCK = 0x09 -OPERATION_AUTO_LOCK = 0x0A -OPERATION_MANUAL_LOCK = 0x0D -OPERATION_MANUAL_UNLOCK = 0x0E - -# Map operation events to locked state -OPERATION_TO_LOCKED: dict[int, bool] = { - OPERATION_LOCK: True, - OPERATION_KEY_LOCK: True, - OPERATION_AUTO_LOCK: True, - OPERATION_MANUAL_LOCK: True, - OPERATION_UNLOCK: False, - OPERATION_KEY_UNLOCK: False, - OPERATION_MANUAL_UNLOCK: False, -} - -# Programming event codes -PROGRAMMING_PIN_ADDED = 0x02 -PROGRAMMING_PIN_DELETED = 0x03 -PROGRAMMING_PIN_CHANGED = 0x04 - - -def _get_zha_gateway(hass): - """Get the ZHA gateway proxy.""" - if ZHA_DOMAIN not in hass.data: - return None - # ZHA stores gateway in runtime_data on the config entry - for entry in hass.config_entries.async_entries(ZHA_DOMAIN): - if hasattr(entry, "runtime_data") and entry.runtime_data: - return getattr(entry.runtime_data, "gateway_proxy", None) - return None - - -# Programming event mask attribute IDs -ATTR_KEYPAD_PROGRAMMING_EVENT_MASK = 0x0045 -ATTR_RF_PROGRAMMING_EVENT_MASK = 0x0046 -ATTR_RFID_PROGRAMMING_EVENT_MASK = 0x0047 - @dataclass(repr=False, eq=False) class ZHALock(BaseLock): @@ -144,7 +78,7 @@ def _get_door_lock_cluster(self) -> DoorLock | None: if self._door_lock_cluster is not None: return self._door_lock_cluster - gateway = _get_zha_gateway(self.hass) + gateway = get_zha_gateway(self.hass) if not gateway: _LOGGER.debug("ZHA gateway not available") return None @@ -171,7 +105,7 @@ def _get_door_lock_cluster(self) -> DoorLock | None: if endpoint_id == 0: # Skip ZDO endpoint continue for cluster in endpoint.in_clusters.values(): - if cluster.cluster_id == CLUSTER_ID_DOOR_LOCK: + if cluster.cluster_id == DoorLock.cluster_id: self._door_lock_cluster = cluster self._endpoint_id = endpoint_id _LOGGER.debug( @@ -186,7 +120,7 @@ def _get_door_lock_cluster(self) -> DoorLock | None: async def async_is_connection_up(self) -> bool: """Return whether connection to lock is up.""" - gateway = _get_zha_gateway(self.hass) + gateway = get_zha_gateway(self.hass) if not gateway: return False @@ -251,7 +185,7 @@ async def async_get_usercodes(self) -> dict[int, int | str]: continue # Check if slot is in use - if user_status == USER_STATUS_ENABLED: + if user_status == DoorLock.UserStatus.Enabled: # Convert bytes to string if needed if isinstance(pin_code, bytes): pin_code = pin_code.decode("utf-8", errors="ignore") @@ -287,8 +221,8 @@ async def async_set_usercode( # Parameters: user_id, user_status, user_type, pin_code result = await cluster.set_pin_code( code_slot, - USER_STATUS_ENABLED, - USER_TYPE_UNRESTRICTED, + DoorLock.UserStatus.Enabled, + DoorLock.UserType.Unrestricted, str(usercode), ) _LOGGER.debug( @@ -385,36 +319,32 @@ async def _async_check_programming_event_support(self) -> bool: return False # Check if any programming event mask attribute has a non-zero value - mask_attrs = [ - ATTR_KEYPAD_PROGRAMMING_EVENT_MASK, - ATTR_RF_PROGRAMMING_EVENT_MASK, - ATTR_RFID_PROGRAMMING_EVENT_MASK, - ] + # Use AttributeDefs for guaranteed-accurate IDs + mask_attrs = ( + DoorLock.AttributeDefs.keypad_programming_event_mask, + DoorLock.AttributeDefs.rf_programming_event_mask, + DoorLock.AttributeDefs.rfid_programming_event_mask, + ) - for attr_id in mask_attrs: + for attr in mask_attrs: try: - # Try to read the attribute from cache first - attr_name = { - ATTR_KEYPAD_PROGRAMMING_EVENT_MASK: "keypad_programming_event_mask", - ATTR_RF_PROGRAMMING_EVENT_MASK: "rf_programming_event_mask", - ATTR_RFID_PROGRAMMING_EVENT_MASK: "rfid_programming_event_mask", - }.get(attr_id) - - if attr_name and hasattr(cluster, "get"): - value = cluster.get(attr_name) + if hasattr(cluster, "get"): + value = cluster.get(attr.name) if value is not None and value != 0: _LOGGER.debug( - "Lock %s: supports programming events (%s=0x%04x)", + "Lock %s: supports programming events (%s [0x%04x]=0x%04x)", self.lock.entity_id, - attr_name, + attr.name, + attr.id, value, ) return True except Exception as err: _LOGGER.debug( - "Lock %s: could not read %s: %s", + "Lock %s: could not read %s [0x%04x]: %s", self.lock.entity_id, - attr_name, + attr.name, + attr.id, err, ) @@ -489,9 +419,9 @@ def cluster_command( Called by zigpy when the lock sends a cluster command (client -> server). """ - if command_id == CMD_PROGRAMMING_EVENT_NOTIFICATION: + if command_id == DoorLock.ClientCommandDefs.programming_event_notification.id: self._handle_programming_event(args) - elif command_id == CMD_OPERATION_EVENT_NOTIFICATION: + elif command_id == DoorLock.ClientCommandDefs.operation_event_notification.id: self._handle_operation_event(args) def _handle_programming_event(self, args: Any) -> None: @@ -567,13 +497,7 @@ def _handle_operation_event(self, args: Any) -> None: to_locked = OPERATION_TO_LOCKED.get(event_code) # Build action text from source and event - source_names = { - OPERATION_SOURCE_KEYPAD: "Keypad", - OPERATION_SOURCE_RF: "RF", - OPERATION_SOURCE_MANUAL: "Manual", - OPERATION_SOURCE_RFID: "RFID", - } - source_name = source_names.get(source, f"Source {source}") + source_name = OPERATION_SOURCE_NAMES.get(source, f"Source {source}") action = "lock" if to_locked else "unlock" if to_locked is False else "event" action_text = f"{source_name} {action} operation" From c7e01a8af6e7504bc1fe0309e896b9726c8d4c0e Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Mon, 5 Jan 2026 01:24:34 -0500 Subject: [PATCH 06/12] Add ZHA provider tests and fix gateway/device access MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix get_zha_gateway helper to use HA's official get_zha_gateway_proxy - Fix _get_door_lock_cluster to access underlying zigpy device correctly (device_proxy.device is ZHA Device, .device again gets zigpy device) - Add comprehensive ZHA test fixtures adapted from HA core's ZHA tests - Add test_zha.py with 15 tests covering properties, cluster access, usercodes, push updates, and programming event detection - Fix _concurrent_requests_semaphore.max_value mock for gateway teardown 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../providers/zha/helpers.py | 13 +- .../providers/zha/provider.py | 10 +- tests/providers/conftest.py | 365 +++++++++++++++++- tests/providers/test_zha.py | 338 ++++++++++++++++ 4 files changed, 710 insertions(+), 16 deletions(-) create mode 100644 tests/providers/test_zha.py diff --git a/custom_components/lock_code_manager/providers/zha/helpers.py b/custom_components/lock_code_manager/providers/zha/helpers.py index 61c331c9..57709c13 100644 --- a/custom_components/lock_code_manager/providers/zha/helpers.py +++ b/custom_components/lock_code_manager/providers/zha/helpers.py @@ -2,7 +2,9 @@ from __future__ import annotations -from homeassistant.components.zha.const import DOMAIN as ZHA_DOMAIN +from homeassistant.components.zha.helpers import ( + get_zha_gateway_proxy as _get_zha_gateway_proxy, +) from homeassistant.core import HomeAssistant @@ -12,10 +14,7 @@ def get_zha_gateway(hass: HomeAssistant): Returns the gateway proxy from the ZHA integration's runtime data, or None if ZHA is not loaded or has no gateway. """ - if ZHA_DOMAIN not in hass.data: + try: + return _get_zha_gateway_proxy(hass) + except (KeyError, ValueError): return None - # ZHA stores gateway in runtime_data on the config entry - for entry in hass.config_entries.async_entries(ZHA_DOMAIN): - if hasattr(entry, "runtime_data") and entry.runtime_data: - return getattr(entry.runtime_data, "gateway_proxy", None) - return None diff --git a/custom_components/lock_code_manager/providers/zha/provider.py b/custom_components/lock_code_manager/providers/zha/provider.py index 6c10cb5a..6e82b31e 100644 --- a/custom_components/lock_code_manager/providers/zha/provider.py +++ b/custom_components/lock_code_manager/providers/zha/provider.py @@ -94,14 +94,20 @@ def _get_door_lock_cluster(self) -> DoorLock | None: _LOGGER.debug("Could not find device proxy for %s", self.lock.entity_id) return None - # Get the underlying zigpy device + # Get the underlying zigpy device (device_proxy.device is ZHA Device, + # device_proxy.device.device is the zigpy device) zha_device = device_proxy.device if not zha_device: _LOGGER.debug("Could not find ZHA device for %s", self.lock.entity_id) return None + zigpy_device = zha_device.device + if not zigpy_device: + _LOGGER.debug("Could not find zigpy device for %s", self.lock.entity_id) + return None + # Find the Door Lock cluster - for endpoint_id, endpoint in zha_device.endpoints.items(): + for endpoint_id, endpoint in zigpy_device.endpoints.items(): if endpoint_id == 0: # Skip ZDO endpoint continue for cluster in endpoint.in_clusters.values(): diff --git a/tests/providers/conftest.py b/tests/providers/conftest.py index 5ad6fd4f..c4990f2e 100644 --- a/tests/providers/conftest.py +++ b/tests/providers/conftest.py @@ -1,32 +1,56 @@ -"""Provide common Z-Wave JS fixtures for Lock Code Manager tests.""" +"""Provide common provider fixtures for Lock Code Manager tests.""" from __future__ import annotations import asyncio -from collections.abc import Generator +from collections.abc import Callable, Coroutine, Generator import copy +import itertools import json from pathlib import Path import sys +import time from typing import Any -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, create_autospec, patch +import warnings import pytest from pytest_homeassistant_custom_component.common import MockConfigEntry +from zigpy.application import ControllerApplication +import zigpy.config +import zigpy.device +from zigpy.profiles import zha as zha_profile +import zigpy.quirks +import zigpy.state +import zigpy.types +from zigpy.zcl.clusters import closures, general +from zigpy.zcl.foundation import Status +import zigpy.zdo.types as zdo_t from zwave_js_server.model.driver import Driver from zwave_js_server.model.node import Node from zwave_js_server.version import VersionInfo +from homeassistant.components.zha import const as zha_const +from homeassistant.components.zha.helpers import get_zha_gateway from homeassistant.components.zwave_js.const import DOMAIN as ZWAVE_JS_DOMAIN +from homeassistant.const import Platform from homeassistant.core import HomeAssistant from homeassistant.helpers import entity_registry as er +from homeassistant.setup import async_setup_component -FIXTURES_DIR = Path(__file__).parent / "fixtures" / "zwave_js" +ZWAVE_JS_FIXTURES_DIR = Path(__file__).parent / "fixtures" / "zwave_js" +# ZHA endpoint signature constants +SIG_EP_INPUT = 1 +SIG_EP_OUTPUT = 2 +SIG_EP_PROFILE = 3 +SIG_EP_TYPE = 4 -def load_json_fixture(filename: str) -> dict[str, Any]: - """Load a fixture JSON file.""" - with open(FIXTURES_DIR / filename, encoding="utf-8") as f: + +def load_json_fixture(filename: str, provider: str = "zwave_js") -> dict[str, Any]: + """Load a fixture JSON file for a provider.""" + fixtures_dir = Path(__file__).parent / "fixtures" / provider + with open(fixtures_dir / filename, encoding="utf-8") as f: return json.load(f) @@ -217,3 +241,330 @@ async def lock_entity_fixture( lock_entries = [e for e in entries if e.domain == "lock"] assert len(lock_entries) == 1, f"Expected 1 lock entity, found {len(lock_entries)}" return lock_entries[0] + + +# ============================================================================= +# ZHA Fixtures +# ============================================================================= + + +class _FakeZigbeeApp(ControllerApplication): + """Fake Zigbee application controller for testing.""" + + async def add_endpoint(self, descriptor: zdo_t.SimpleDescriptor): + """Add endpoint.""" + + async def connect(self): + """Connect.""" + + async def disconnect(self): + """Disconnect.""" + + async def force_remove(self, dev: zigpy.device.Device): + """Force remove device.""" + + async def load_network_info(self, *, load_devices: bool = False): + """Load network info.""" + + async def permit_ncp(self, time_s: int = 60): + """Permit NCP.""" + + async def permit_with_link_key( + self, + node: zigpy.types.EUI64, + link_key: zigpy.types.KeyData, + time_s: int = 60, + ): + """Permit with link key.""" + + async def reset_network_info(self): + """Reset network info.""" + + async def send_packet(self, packet: zigpy.types.ZigbeePacket): + """Send packet.""" + + async def start_network(self): + """Start network.""" + + async def write_network_info( + self, + *, + network_info: zigpy.state.NetworkInfo, + node_info: zigpy.state.NodeInfo, + ) -> None: + """Write network info.""" + + async def request( + self, + device: zigpy.device.Device, + profile: zigpy.types.uint16_t, + cluster: zigpy.types.uint16_t, + src_ep: zigpy.types.uint8_t, + dst_ep: zigpy.types.uint8_t, + sequence: zigpy.types.uint8_t, + data: bytes, + *, + expect_reply: bool = True, + use_ieee: bool = False, + extended_timeout: bool = False, + ): + """Request.""" + + async def move_network_to_channel( + self, new_channel: int, *, num_broadcasts: int = 5 + ) -> None: + """Move network to channel.""" + + def _persist_coordinator_model_strings_in_db(self) -> None: + """Persist coordinator model strings.""" + + +def _wrap_mock_instance(obj: Any) -> MagicMock: + """Auto-mock every attribute and method in an object.""" + mock = create_autospec(obj, spec_set=True, instance=True) + + for attr_name in dir(obj): + if attr_name.startswith("__") and attr_name not in {"__getitem__"}: + continue + + real_attr = getattr(obj, attr_name) + mock_attr = getattr(mock, attr_name) + + if callable(real_attr) and not hasattr(real_attr, "__aenter__"): + mock_attr.side_effect = real_attr + else: + setattr(mock, attr_name, real_attr) + + return mock + + +def patch_zha_cluster(cluster): + """Patch a ZHA cluster for testing.""" + cluster.PLUGGED_ATTR_READS = {} + cluster.bind = AsyncMock(return_value=[0]) + cluster.configure_reporting = AsyncMock(return_value=[[]]) + cluster.configure_reporting_multiple = AsyncMock(return_value=[]) + cluster.handle_cluster_request = MagicMock() + cluster.read_attributes = AsyncMock(return_value=[{}, {}]) + cluster.read_attributes_raw = AsyncMock(return_value=[]) + cluster.unbind = AsyncMock(return_value=[0]) + cluster.write_attributes = AsyncMock(return_value=[]) + cluster._write_attributes = AsyncMock(return_value=[]) + + +@pytest.fixture +async def zigpy_app_controller(): + """Zigpy ApplicationController fixture.""" + app = _FakeZigbeeApp( + { + zigpy.config.CONF_DATABASE: None, + zigpy.config.CONF_DEVICE: {zigpy.config.CONF_DEVICE_PATH: "/dev/null"}, + zigpy.config.CONF_STARTUP_ENERGY_SCAN: False, + zigpy.config.CONF_NWK_BACKUP_ENABLED: False, + zigpy.config.CONF_TOPO_SCAN_ENABLED: False, + zigpy.config.CONF_OTA: { + zigpy.config.CONF_OTA_ENABLED: False, + }, + } + ) + + app.state.node_info.nwk = 0x0000 + app.state.node_info.ieee = zigpy.types.EUI64.convert("00:15:8d:00:02:32:4f:32") + app.state.node_info.manufacturer = "Coordinator Manufacturer" + app.state.node_info.model = "Coordinator Model" + app.state.network_info.pan_id = 0x1234 + app.state.network_info.extended_pan_id = app.state.node_info.ieee + app.state.network_info.channel = 15 + app.state.network_info.network_key.key = zigpy.types.KeyData(range(16)) + app.state.counters = zigpy.state.CounterGroups() + + # Create a fake coordinator device + dev = app.add_device(nwk=app.state.node_info.nwk, ieee=app.state.node_info.ieee) + dev.node_desc = zdo_t.NodeDescriptor() + dev.node_desc.logical_type = zdo_t.LogicalType.Coordinator + dev.manufacturer = "Coordinator Manufacturer" + dev.model = "Coordinator Model" + + ep = dev.add_endpoint(1) + ep.profile_id = zha_profile.PROFILE_ID + ep.add_input_cluster(general.Basic.cluster_id) + ep.add_input_cluster(general.Groups.cluster_id) + + with patch("zigpy.device.Device.request", return_value=[Status.SUCCESS]): + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + mock_app = _wrap_mock_instance(app) + mock_app.backups = _wrap_mock_instance(app.backups) + + # Ensure _concurrent_requests_semaphore has a proper max_value + # This is needed by ZHA gateway's radio_concurrency property + mock_semaphore = MagicMock() + mock_semaphore.max_value = 8 # Default concurrent requests limit + mock_app._concurrent_requests_semaphore = mock_semaphore + + yield mock_app + + +@pytest.fixture(name="zha_config_entry") +async def zha_config_entry_fixture() -> MockConfigEntry: + """Fixture representing a ZHA config entry.""" + return MockConfigEntry( + version=5, + domain=zha_const.DOMAIN, + data={ + zigpy.config.CONF_DEVICE: { + zigpy.config.CONF_DEVICE_PATH: "/dev/ttyUSB0", + zigpy.config.CONF_DEVICE_BAUDRATE: 115200, + zigpy.config.CONF_DEVICE_FLOW_CONTROL: "hardware", + }, + zha_const.CONF_RADIO_TYPE: "ezsp", + }, + options={}, + ) + + +@pytest.fixture +def mock_zigpy_connect( + zigpy_app_controller: ControllerApplication, +) -> Generator[ControllerApplication]: + """Patch the zigpy radio connection with our mock application.""" + with ( + patch( + "bellows.zigbee.application.ControllerApplication.new", + return_value=zigpy_app_controller, + ), + patch( + "bellows.zigbee.application.ControllerApplication", + return_value=zigpy_app_controller, + ), + ): + yield zigpy_app_controller + + +@pytest.fixture +def setup_zha( + hass: HomeAssistant, + zha_config_entry: MockConfigEntry, + mock_zigpy_connect: ControllerApplication, +) -> Callable[..., Coroutine[None]]: + """Set up ZHA component.""" + + async def _setup(config=None) -> None: + zha_config_entry.add_to_hass(hass) + config = config or {} + + # Only set up lock platform to speed up tests + with patch( + "homeassistant.components.zha.PLATFORMS", + (Platform.DEVICE_TRACKER, Platform.LOCK, Platform.SENSOR), + ): + status = await async_setup_component( + hass, + zha_const.DOMAIN, + {zha_const.DOMAIN: {zha_const.CONF_ENABLE_QUIRKS: False, **config}}, + ) + assert status is True + await hass.async_block_till_done() + + return _setup + + +@pytest.fixture +def zigpy_device_mock( + zigpy_app_controller, +) -> Callable[..., zigpy.device.Device]: + """Make a fake device using the specified cluster classes.""" + + def _mock_dev( + endpoints, + ieee="00:0d:6f:00:0a:90:69:e7", + manufacturer="FakeManufacturer", + model="FakeModel", + node_descriptor=b"\x02@\x807\x10\x7fd\x00\x00*d\x00\x00", + nwk=0xB79C, + patch_cluster_flag=True, + ): + """Make a fake device using the specified cluster classes.""" + device = zigpy.device.Device( + zigpy_app_controller, zigpy.types.EUI64.convert(ieee), nwk + ) + device.manufacturer = manufacturer + device.model = model + device.node_desc = zdo_t.NodeDescriptor.deserialize(node_descriptor)[0] + device.last_seen = time.time() + + for epid, ep in endpoints.items(): + endpoint = device.add_endpoint(epid) + endpoint.device_type = ep[SIG_EP_TYPE] + endpoint.profile_id = ep.get(SIG_EP_PROFILE, 0x0104) + endpoint.request = AsyncMock() + + for cluster_id in ep.get(SIG_EP_INPUT, []): + endpoint.add_input_cluster(cluster_id) + + for cluster_id in ep.get(SIG_EP_OUTPUT, []): + endpoint.add_output_cluster(cluster_id) + + device.status = zigpy.device.Status.ENDPOINTS_INIT + + # Allow zigpy to apply quirks + device = zigpy.quirks.get_device(device) + + if patch_cluster_flag: + for endpoint in (ep for epid, ep in device.endpoints.items() if epid): + endpoint.request = AsyncMock(return_value=[0]) + for cluster in itertools.chain( + endpoint.in_clusters.values(), endpoint.out_clusters.values() + ): + patch_zha_cluster(cluster) + + return device + + return _mock_dev + + +@pytest.fixture +def zigpy_lock_device( + zigpy_device_mock: Callable[..., zigpy.device.Device], +) -> zigpy.device.Device: + """Create a mock Zigbee lock device.""" + return zigpy_device_mock( + { + 1: { + SIG_EP_INPUT: [ + closures.DoorLock.cluster_id, + general.Basic.cluster_id, + ], + SIG_EP_OUTPUT: [], + SIG_EP_TYPE: zha_profile.DeviceType.DOOR_LOCK, + SIG_EP_PROFILE: zha_profile.PROFILE_ID, + } + }, + ieee="01:2d:6f:00:0a:90:69:e8", + manufacturer="Yale", + model="YRD256", + ) + + +@pytest.fixture +async def zha_lock_entity( + hass: HomeAssistant, + setup_zha: Callable[..., Coroutine[None]], + zigpy_lock_device: zigpy.device.Device, + zha_config_entry: MockConfigEntry, +) -> er.RegistryEntry: + """Set up ZHA with a lock device and return the lock entity.""" + await setup_zha() + gateway = get_zha_gateway(hass) + + gateway.get_or_create_device(zigpy_lock_device) + await gateway.async_device_initialized(zigpy_lock_device) + await hass.async_block_till_done(wait_background_tasks=True) + + ent_reg = er.async_get(hass) + entries = list( + er.async_entries_for_config_entry(ent_reg, zha_config_entry.entry_id) + ) + lock_entries = [e for e in entries if e.domain == "lock"] + assert len(lock_entries) == 1, f"Expected 1 lock entity, found {len(lock_entries)}" + return lock_entries[0] diff --git a/tests/providers/test_zha.py b/tests/providers/test_zha.py new file mode 100644 index 00000000..ef36b9af --- /dev/null +++ b/tests/providers/test_zha.py @@ -0,0 +1,338 @@ +"""Test the ZHA lock provider.""" + +from __future__ import annotations + +from datetime import timedelta +from unittest.mock import AsyncMock + +import pytest +from pytest_homeassistant_custom_component.common import MockConfigEntry +import zigpy.device +from zigpy.zcl.clusters.closures import DoorLock + +from homeassistant.components.zha.const import DOMAIN as ZHA_DOMAIN +from homeassistant.core import HomeAssistant +from homeassistant.helpers import device_registry as dr, entity_registry as er + +from custom_components.lock_code_manager.const import ( + CONF_LOCKS, + CONF_SLOTS, + DOMAIN, +) +from custom_components.lock_code_manager.providers.zha import ZHALock + + +@pytest.fixture(name="zha_lock") +async def zha_lock_fixture( + hass: HomeAssistant, + zha_lock_entity: er.RegistryEntry, + zha_config_entry: MockConfigEntry, + zigpy_lock_device: zigpy.device.Device, +) -> ZHALock: + """Create a ZHALock instance for testing.""" + dev_reg = dr.async_get(hass) + ent_reg = er.async_get(hass) + + lock = ZHALock( + hass=hass, + dev_reg=dev_reg, + ent_reg=ent_reg, + lock_config_entry=zha_config_entry, + lock=zha_lock_entity, + ) + return lock + + +# ============================================================================= +# Property tests +# ============================================================================= + + +async def test_domain(zha_lock: ZHALock) -> None: + """Test domain property returns zha.""" + assert zha_lock.domain == ZHA_DOMAIN + + +async def test_supports_push(zha_lock: ZHALock) -> None: + """Test that ZHA locks support push updates.""" + assert zha_lock.supports_push is True + + +async def test_connection_check_interval(zha_lock: ZHALock) -> None: + """Test that connection check interval is 30 seconds.""" + assert zha_lock.connection_check_interval == timedelta(seconds=30) + + +async def test_hard_refresh_interval_when_programming_events_supported( + zha_lock: ZHALock, +) -> None: + """Test hard refresh interval is None when programming events are supported.""" + zha_lock._supports_programming_events = True + assert zha_lock.hard_refresh_interval is None + + +async def test_hard_refresh_interval_when_programming_events_not_supported( + zha_lock: ZHALock, +) -> None: + """Test hard refresh interval is 1 hour when programming events not supported.""" + zha_lock._supports_programming_events = False + assert zha_lock.hard_refresh_interval == timedelta(hours=1) + + +# ============================================================================= +# Connection tests +# ============================================================================= + + +async def test_is_connection_up_when_available( + hass: HomeAssistant, + zha_lock: ZHALock, +) -> None: + """Test connection is up when device is available.""" + # The mock device should be available by default + assert await zha_lock.async_is_connection_up() is True + + +# ============================================================================= +# Cluster access tests +# ============================================================================= + + +async def test_get_door_lock_cluster( + hass: HomeAssistant, + zha_lock: ZHALock, + zigpy_lock_device: zigpy.device.Device, +) -> None: + """Test getting the Door Lock cluster.""" + cluster = zha_lock._get_door_lock_cluster() + assert cluster is not None + assert cluster.cluster_id == DoorLock.cluster_id + + +async def test_get_door_lock_cluster_caches_result( + hass: HomeAssistant, + zha_lock: ZHALock, +) -> None: + """Test that cluster is cached after first access.""" + cluster1 = zha_lock._get_door_lock_cluster() + cluster2 = zha_lock._get_door_lock_cluster() + assert cluster1 is cluster2 + + +# ============================================================================= +# Usercode tests +# ============================================================================= + + +async def test_get_usercodes( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, + zigpy_lock_device: zigpy.device.Device, +) -> None: + """Test reading usercodes from the lock.""" + # Create LCM config entry with slots + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={ + CONF_LOCKS: [zha_lock.lock.entity_id], + CONF_SLOTS: {"1": {}, "2": {}}, + }, + ) + lcm_entry.add_to_hass(hass) + + # Mock the cluster's get_pin_code method + cluster = zha_lock._get_door_lock_cluster() + + # Mock get_pin_code responses + async def mock_get_pin_code(slot_num): + if slot_num == 1: + # Return enabled slot with code + return type( + "Response", + (), + {"user_status": DoorLock.UserStatus.Enabled, "code": "1234"}, + )() + # Return disabled slot + return type( + "Response", + (), + {"user_status": DoorLock.UserStatus.Available, "code": ""}, + )() + + cluster.get_pin_code = AsyncMock(side_effect=mock_get_pin_code) + + await zha_lock.async_setup(lcm_entry) + + codes = await zha_lock.async_get_usercodes() + + assert codes[1] == "1234" + assert codes[2] == "" + + await zha_lock.async_unload(False) + + +async def test_set_usercode_calls_cluster( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test that set_usercode calls the cluster's set_pin_code.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + cluster = zha_lock._get_door_lock_cluster() + cluster.set_pin_code = AsyncMock(return_value=type("Response", (), {"status": 0})()) + + result = await zha_lock.async_set_usercode(3, "5678", "Test User") + + assert result is True + cluster.set_pin_code.assert_called_once_with( + 3, + DoorLock.UserStatus.Enabled, + DoorLock.UserType.Unrestricted, + "5678", + ) + + await zha_lock.async_unload(False) + + +async def test_clear_usercode_calls_cluster( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test that clear_usercode calls the cluster's clear_pin_code.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + cluster = zha_lock._get_door_lock_cluster() + cluster.clear_pin_code = AsyncMock( + return_value=type("Response", (), {"status": 0})() + ) + + result = await zha_lock.async_clear_usercode(3) + + assert result is True + cluster.clear_pin_code.assert_called_once_with(3) + + await zha_lock.async_unload(False) + + +# ============================================================================= +# Push update tests +# ============================================================================= + + +async def test_subscribe_push_updates( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test subscribing to push updates.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + # Subscribe to push updates + zha_lock.subscribe_push_updates() + + assert zha_lock._cluster_listener_unsub is not None + + # Unsubscribe + zha_lock.unsubscribe_push_updates() + assert zha_lock._cluster_listener_unsub is None + + await zha_lock.async_unload(False) + + +async def test_subscribe_is_idempotent( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test that calling subscribe multiple times is safe.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + zha_lock.subscribe_push_updates() + first_unsub = zha_lock._cluster_listener_unsub + + zha_lock.subscribe_push_updates() + assert zha_lock._cluster_listener_unsub is first_unsub + + zha_lock.unsubscribe_push_updates() + await zha_lock.async_unload(False) + + +# ============================================================================= +# Programming event support detection +# ============================================================================= + + +async def test_check_programming_event_support_with_mask( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test detecting programming event support via mask attributes.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + cluster = zha_lock._get_door_lock_cluster() + + # Mock get() to return a non-zero mask value + def mock_get(attr_name): + if attr_name == "keypad_programming_event_mask": + return 0x0001 + return None + + cluster.get = mock_get + + supports = await zha_lock._async_check_programming_event_support() + assert supports is True + + await zha_lock.async_unload(False) + + +async def test_check_programming_event_support_without_mask( + hass: HomeAssistant, + zha_lock: ZHALock, + zha_config_entry: MockConfigEntry, +) -> None: + """Test detecting programming event not supported when no mask attributes.""" + lcm_entry = MockConfigEntry( + domain=DOMAIN, + data={CONF_LOCKS: [], CONF_SLOTS: {}}, + ) + lcm_entry.add_to_hass(hass) + await zha_lock.async_setup(lcm_entry) + + cluster = zha_lock._get_door_lock_cluster() + + # Mock get() to return None/0 for all mask attributes + cluster.get = lambda attr_name: None + + supports = await zha_lock._async_check_programming_event_support() + assert supports is False + + await zha_lock.async_unload(False) From d31159ac5576a229837b14cb03121ded919f690a Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Mon, 5 Jan 2026 14:50:05 -0500 Subject: [PATCH 07/12] Address Copilot review comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Simplify redundant len(result) > 3 check (already checked >= 4) - Let LockDisconnected propagate without re-wrapping in exception handlers - Extract duplicated connection check logic into _get_connected_cluster helper - Add return type hint to get_zha_gateway function Note: Unused constants (USER_STATUS_*) were already addressed in previous commits. Test coverage exists in tests/providers/test_zha.py. The eq=False dataclass setting is intentional for identity-based comparison. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- .../providers/zha/helpers.py | 4 +- .../providers/zha/provider.py | 41 ++++++++++--------- 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/custom_components/lock_code_manager/providers/zha/helpers.py b/custom_components/lock_code_manager/providers/zha/helpers.py index 57709c13..2cc61b66 100644 --- a/custom_components/lock_code_manager/providers/zha/helpers.py +++ b/custom_components/lock_code_manager/providers/zha/helpers.py @@ -2,13 +2,15 @@ from __future__ import annotations +from typing import Any + from homeassistant.components.zha.helpers import ( get_zha_gateway_proxy as _get_zha_gateway_proxy, ) from homeassistant.core import HomeAssistant -def get_zha_gateway(hass: HomeAssistant): +def get_zha_gateway(hass: HomeAssistant) -> Any | None: """Get the ZHA gateway proxy. Returns the gateway proxy from the ZHA integration's runtime data, diff --git a/custom_components/lock_code_manager/providers/zha/provider.py b/custom_components/lock_code_manager/providers/zha/provider.py index 6e82b31e..08b8e65b 100644 --- a/custom_components/lock_code_manager/providers/zha/provider.py +++ b/custom_components/lock_code_manager/providers/zha/provider.py @@ -124,6 +124,20 @@ def _get_door_lock_cluster(self) -> DoorLock | None: _LOGGER.warning("Could not find Door Lock cluster for %s", self.lock.entity_id) return None + async def _get_connected_cluster(self) -> DoorLock: + """Get the Door Lock cluster, ensuring connection is up. + + Raises LockDisconnected if cluster is unavailable or device is disconnected. + """ + cluster = self._get_door_lock_cluster() + if not cluster: + raise LockDisconnected("Door Lock cluster not available") + + if not await self.async_is_connection_up(): + raise LockDisconnected("Lock not connected") + + return cluster + async def async_is_connection_up(self) -> bool: """Return whether connection to lock is up.""" gateway = get_zha_gateway(self.hass) @@ -143,12 +157,7 @@ async def async_is_connection_up(self) -> bool: async def async_get_usercodes(self) -> dict[int, int | str]: """Get dictionary of code slots and usercodes.""" - cluster = self._get_door_lock_cluster() - if not cluster: - raise LockDisconnected("Door Lock cluster not available") - - if not await self.async_is_connection_up(): - raise LockDisconnected("Lock not connected") + cluster = await self._get_connected_cluster() # Get configured code slots for this lock code_slots = { @@ -179,7 +188,7 @@ async def async_get_usercodes(self) -> dict[int, int | str]: elif isinstance(result, (list, tuple)) and len(result) >= 4: # Result format: [user_id, user_status, user_type, code] user_status = result[1] - pin_code = result[3] if len(result) > 3 else "" + pin_code = result[3] else: _LOGGER.warning( "Unexpected get_pin_code response format for %s slot %s: %s", @@ -215,12 +224,7 @@ async def async_set_usercode( self, code_slot: int, usercode: int | str, name: str | None = None ) -> bool: """Set a usercode on a code slot.""" - cluster = self._get_door_lock_cluster() - if not cluster: - raise LockDisconnected("Door Lock cluster not available") - - if not await self.async_is_connection_up(): - raise LockDisconnected("Lock not connected") + cluster = await self._get_connected_cluster() try: # Call set_pin_code cluster command (0x05) @@ -253,6 +257,8 @@ async def async_set_usercode( return True + except LockDisconnected: + raise except Exception as err: _LOGGER.error( "Failed to set PIN for %s slot %s: %s", @@ -264,12 +270,7 @@ async def async_set_usercode( async def async_clear_usercode(self, code_slot: int) -> bool: """Clear a usercode on a code slot.""" - cluster = self._get_door_lock_cluster() - if not cluster: - raise LockDisconnected("Door Lock cluster not available") - - if not await self.async_is_connection_up(): - raise LockDisconnected("Lock not connected") + cluster = await self._get_connected_cluster() try: # Call clear_pin_code cluster command (0x07) @@ -296,6 +297,8 @@ async def async_clear_usercode(self, code_slot: int) -> bool: return True + except LockDisconnected: + raise except Exception as err: _LOGGER.error( "Failed to clear PIN for %s slot %s: %s", From c4f82d161f845cdcd63d8eb63192ad398424f145 Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Mon, 5 Jan 2026 14:55:06 -0500 Subject: [PATCH 08/12] Speed up ZHA tests by mocking radio manager delays MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ZHA integration has CONNECT_DELAY_S and RETRY_DELAY_S constants (both 1.0s) in radio_manager.py that are used for real hardware timing. These caused each ZHA test to spend ~1.1s in setup. Patching these to 0 in tests reduces: - ZHA tests: 18s → 3s - Full suite: 20s → 5.5s 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- tests/providers/conftest.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/providers/conftest.py b/tests/providers/conftest.py index c4990f2e..2a21e3cd 100644 --- a/tests/providers/conftest.py +++ b/tests/providers/conftest.py @@ -248,6 +248,22 @@ async def lock_entity_fixture( # ============================================================================= +@pytest.fixture +def mock_zha_radio_delays() -> Generator[None]: + """Mock ZHA radio manager delays to speed up tests.""" + with ( + patch( + "homeassistant.components.zha.radio_manager.CONNECT_DELAY_S", + 0, + ), + patch( + "homeassistant.components.zha.radio_manager.RETRY_DELAY_S", + 0, + ), + ): + yield + + class _FakeZigbeeApp(ControllerApplication): """Fake Zigbee application controller for testing.""" @@ -446,6 +462,7 @@ def setup_zha( hass: HomeAssistant, zha_config_entry: MockConfigEntry, mock_zigpy_connect: ControllerApplication, + mock_zha_radio_delays: None, ) -> Callable[..., Coroutine[None]]: """Set up ZHA component.""" From 0a53530f6b8fe7e52744cb9b07d6aab2b4e02921 Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Mon, 5 Jan 2026 14:58:02 -0500 Subject: [PATCH 09/12] Fix incomplete Coroutine type hints in test fixtures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changed Coroutine[None] to Awaitable[None] which is the correct type for async callables that return None. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- tests/providers/conftest.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/providers/conftest.py b/tests/providers/conftest.py index 2a21e3cd..4bd670e2 100644 --- a/tests/providers/conftest.py +++ b/tests/providers/conftest.py @@ -3,7 +3,7 @@ from __future__ import annotations import asyncio -from collections.abc import Callable, Coroutine, Generator +from collections.abc import Awaitable, Callable, Generator import copy import itertools import json @@ -463,7 +463,7 @@ def setup_zha( zha_config_entry: MockConfigEntry, mock_zigpy_connect: ControllerApplication, mock_zha_radio_delays: None, -) -> Callable[..., Coroutine[None]]: +) -> Callable[..., Awaitable[None]]: """Set up ZHA component.""" async def _setup(config=None) -> None: @@ -566,7 +566,7 @@ def zigpy_lock_device( @pytest.fixture async def zha_lock_entity( hass: HomeAssistant, - setup_zha: Callable[..., Coroutine[None]], + setup_zha: Callable[..., Awaitable[None]], zigpy_lock_device: zigpy.device.Device, zha_config_entry: MockConfigEntry, ) -> er.RegistryEntry: From 8a181e009803f07774addaf551fe00116aeabcb3 Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Mon, 5 Jan 2026 15:02:21 -0500 Subject: [PATCH 10/12] Add zha package to test requirements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ZHA provider imports from homeassistant.components.zha which requires the zha package to be installed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- requirements_test.txt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/requirements_test.txt b/requirements_test.txt index 52308ed6..eacacebc 100644 --- a/requirements_test.txt +++ b/requirements_test.txt @@ -1,4 +1,9 @@ +aiousbwatcher>=1.1.0 +ha-silabs-firmware-client>=0.3.0 pylint-strict-informational>=0.1 pytest>=8.0.2 pytest-homeassistant-custom-component==0.13.306 +universal-silabs-flasher>=0.1.0 +usb-devices>=0.4.5 +zha>=0.0.81 zigpy>=0.60.0 From 81c3a75ac9d5ef75a86c2a9a0a2acf336dda691c Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Mon, 5 Jan 2026 16:13:24 -0500 Subject: [PATCH 11/12] Fix MockSemaphore to ensure max_value is an int MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ZHA gateway's radio_concurrency property uses max_value in arithmetic operations. Using MagicMock caused TypeError when comparing MagicMock with int. Using a simple class ensures the value is an actual integer. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- tests/providers/conftest.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/providers/conftest.py b/tests/providers/conftest.py index 4bd670e2..eefd019c 100644 --- a/tests/providers/conftest.py +++ b/tests/providers/conftest.py @@ -414,9 +414,11 @@ async def zigpy_app_controller(): # Ensure _concurrent_requests_semaphore has a proper max_value # This is needed by ZHA gateway's radio_concurrency property - mock_semaphore = MagicMock() - mock_semaphore.max_value = 8 # Default concurrent requests limit - mock_app._concurrent_requests_semaphore = mock_semaphore + # Use a simple object instead of MagicMock to ensure max_value is an int + class MockSemaphore: + max_value = 8 + + mock_app._concurrent_requests_semaphore = MockSemaphore() yield mock_app From 40b0f52d8ac6bf200dbc8ffbca509475643366d4 Mon Sep 17 00:00:00 2001 From: raman325 <7243222+raman325@users.noreply.github.com> Date: Mon, 5 Jan 2026 16:30:25 -0500 Subject: [PATCH 12/12] Add max_concurrency to MockSemaphore for newer ZHA compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Newer versions of ZHA access max_concurrency instead of max_value. Add both attributes to support both old and new ZHA versions. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- tests/providers/conftest.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/providers/conftest.py b/tests/providers/conftest.py index eefd019c..bbdf469c 100644 --- a/tests/providers/conftest.py +++ b/tests/providers/conftest.py @@ -412,11 +412,13 @@ async def zigpy_app_controller(): mock_app = _wrap_mock_instance(app) mock_app.backups = _wrap_mock_instance(app.backups) - # Ensure _concurrent_requests_semaphore has a proper max_value + # Ensure _concurrent_requests_semaphore has proper attributes # This is needed by ZHA gateway's radio_concurrency property - # Use a simple object instead of MagicMock to ensure max_value is an int + # Use a simple object instead of MagicMock to ensure values are ints + # Include both max_value (older) and max_concurrency (newer) for compatibility class MockSemaphore: max_value = 8 + max_concurrency = 8 mock_app._concurrent_requests_semaphore = MockSemaphore()