From f6f8b6945834cf6cc65d5962116f1e52199dc623 Mon Sep 17 00:00:00 2001 From: LePailleurThibault Date: Fri, 22 Nov 2024 10:21:41 +0100 Subject: [PATCH 1/3] Implementation of the Keep Alive thread in the transport service. --- docker/docker-compose/README.md | 3 + python_transport/wirepas_gateway/__init__.py | 1 + .../keep_alive_service/__init__.py | 5 + .../keep_alive_service/keep_alive_service.py | 245 ++++++++++++++++++ .../wirepas_gateway/transport_service.py | 14 +- .../wirepas_gateway/utils/argument_tools.py | 30 +++ 6 files changed, 297 insertions(+), 1 deletion(-) create mode 100644 python_transport/wirepas_gateway/keep_alive_service/__init__.py create mode 100644 python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py diff --git a/docker/docker-compose/README.md b/docker/docker-compose/README.md index 1993b8a2..b697bfaa 100644 --- a/docker/docker-compose/README.md +++ b/docker/docker-compose/README.md @@ -74,6 +74,9 @@ The list of all possible parameters to configure the transport service are given | WM_GW_VERSION | Version of the gateway | None | Any string | | WM_GW_IGNORED_ENDPOINTS_FILTER | Destination endpoints list to ignore (not published) | None | List of endpoints (i.e. [1,2,3]), a range of endpoints (i.e. [1-3]), or a combination of both | | WM_GW_WHITENED_ENDPOINTS_FILTER | Destination endpoints list to whiten (no payload content, only size) | None | List of endpoints (i.e. [1,2,3]), a range of endpoints (i.e. [1-3]), or a combination of both | +| WM_KEEP_ALIVE_ACTIVATE | Activate the keep alive service thread to send those messages periodically to the network. | False | "yes", "true", "t", "y", "1","no", "false", "f", "n", "0", "" | +| WM_KEEP_ALIVE_INTERVAL_S | The interval in seconds between keep-alive messages if those are activated with WM_KEEP_ALIVE_ACTIVATE. | 300 | Any positive integer | +| WM_KEEP_ALIVE_TIMEZONE_OFFSET_MN | Time zone offset from UTC in minutes (-840 to +720) to be sent in keep alive messages if those are activated with WM_KEEP_ALIVE_ACTIVATE. | 0 | Any integer between -840 and 720 | | WM_DEBUG_LEVEL | Configure log level for the transport service. Please be aware that levels such as debug should not be used in a production system | info | debug, info, critical, fatal, error, warning | diff --git a/python_transport/wirepas_gateway/__init__.py b/python_transport/wirepas_gateway/__init__.py index 6e204a1c..b79ec19f 100644 --- a/python_transport/wirepas_gateway/__init__.py +++ b/python_transport/wirepas_gateway/__init__.py @@ -12,6 +12,7 @@ # flake8: noqa from . import dbus +from . import keep_alive_service from . import protocol from . import utils diff --git a/python_transport/wirepas_gateway/keep_alive_service/__init__.py b/python_transport/wirepas_gateway/keep_alive_service/__init__.py new file mode 100644 index 00000000..96b6f20e --- /dev/null +++ b/python_transport/wirepas_gateway/keep_alive_service/__init__.py @@ -0,0 +1,5 @@ +# Copyright 2024 Wirepas Ltd licensed under Apache License, Version 2.0 +# +# See file LICENSE for full license details. +# +from .keep_alive_service import * diff --git a/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py b/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py new file mode 100644 index 00000000..312ac19e --- /dev/null +++ b/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py @@ -0,0 +1,245 @@ +# Copyright 2024 Wirepas Ltd licensed under Apache License, Version 2.0 +# +# See file LICENSE for full license details. +# +from enum import IntEnum +import logging +import struct +from time import time, sleep +from threading import Thread + +import wirepas_mesh_messaging as wmm + + +KEEP_ALIVE_SERVICE_VERSION = 0x01 +KEEP_ALIVE_SRC_EP = 67 +KEEP_ALIVE_DST_EP = 67 + +# Timeouts and periods used for the keep alive service. +WM_MSG_RETRY_PERIOD_S = 1 +KEEP_ALIVE_MSG_RECONNECTION_PERIOD_S = 60 + +BROADCAST_ADDRESS = 0xFFFFFFFF + + +class KeepAliveType(IntEnum): + """ Keep alive fields TLV type enumerate. """ + VERSION_TYPE = 0x01 + GATEWAY_STATUS_TYPE = 0x02 + RTC_TIMESTAMP_TYPE = 0x03 + TIME_ZONE_OFFSET_TYPE = 0x04 + KEEP_ALIVE_INTERVAL_TYPE = 0x05 + + +class KeepAliveMessage(): + """ + Class to store keep alive message attributes. + + Attributes: + version: The version number for the keep-alive message. + gateway_status: The running status of the gateway. + Bit 0: Backhaul (MQTT broker) Connectivity (0 = Disconnected, 1 = Connected) + Bits 1-7: Reserved for future use or other status indicators + rtc_timestamp_ms: Unix epoch timestamp in ms (milliseconds since January 1, 1970). + timezone_offset_mn: Time zone offset from UTC in minutes (-840 to +720). + keep_alive_interval_s: Interval in seconds until the next keepalive message is expected. + """ + def __init__(self, version, gateway_status=None, rtc_timestamp_ms=None, + timezone_offset_mn=None, keep_alive_interval_s=None): + self.version = version + self.gateway_status = gateway_status + self.rtc_timestamp_ms = rtc_timestamp_ms + self.timezone_offset_mn = timezone_offset_mn + self.keep_alive_interval_s = keep_alive_interval_s + + @classmethod + def encode_tlv_item(cls, elt_type, length, value, packing): + """ + Encode a new TLV item in little endian. + + Args: + elt_type (int): Type of the element to encode. + length (int): Number of bytes of the value to be encoded. + value: Value to encode. Note: Value should have a specific type + corresponding to the packing parameter. + packing (str): String representing the format characters allowing + the conversion between C and Python bytes when packing the value. + See https://docs.python.org/3/library/struct.html#format-characters. + """ + assert (0 <= elt_type <= 0xFF), "A TLV type must be include between 0 and 255" + assert isinstance(length, int), "A TLV length must be an integer" + return bytes(struct.pack(" bool: + """ + Send the keep alive message to the network. + Returns True if the keep alive message could be sent to the sink, False otherwise. + + Args: + sink: Sink to send the keep alive message to. + """ + retries_left = 3 + res = wmm.GatewayResultCode.GW_RES_UNKNOWN_ERROR + + while retries_left > 0 and res != wmm.GatewayResultCode.GW_RES_OK: + retries_left -= 1 + keep_alive_message = self.prepare_keep_alive_msg() + payload = keep_alive_message.encode_tlv() + logging.debug("Send the following keep alive payload to sink %s: %s", + sink.sink_id, payload.hex()) + + res = sink.send_data( + dst=BROADCAST_ADDRESS, + src_ep=KEEP_ALIVE_SRC_EP, + dst_ep=KEEP_ALIVE_DST_EP, + qos=0, + initial_time=0, + data=payload + ) + if res != wmm.GatewayResultCode.GW_RES_OK and retries_left > 0: + sleep(WM_MSG_RETRY_PERIOD_S) + logging.debug("Retry sending the keep alive message that couldn't be sent to %s sink: %s. ", + sink.sink_id, res) + + if res != wmm.GatewayResultCode.GW_RES_OK: + logging.error("Keep alive message couldn't be sent to %s sink: %s", + sink.sink_id, res) + return False + + return True + + def wait_for_next_keep_alive_message_iteration(self, time_to_wait, start_timer=None): + """ Wait for the next keep alive message iteration. """ + if start_timer: + time_to_wait = max(time_to_wait - (time() - start_timer), 0) + + sleep(time_to_wait) + + def run(self): + """ Main loop that send periodically keep alive message to the network. """ + while True: + # Put a timer so that the message are periodic with a good precision + start_timer = time() + + # Get current connected sinks + current_sinks = set([sink.sink_id for sink in self.sink_manager.get_sinks()]) + current_connected_sinks = set() + + if not current_sinks: + logging.error("No sinks are detected!") + self.wait_for_next_keep_alive_message_iteration(self.keep_alive_interval_s) + continue + + # Send keep alive messages to all sinks + logging.info("Send a keep alive message to the network") + for sink_id in current_sinks: + if self.send_keep_alive_msg_to_sink(self.sink_manager.get_sink(sink_id)): + current_connected_sinks.add(sink_id) + else: + self.disconnected_sinks.add(sink_id) + + # Find sinks that disappeared and those which reconnected during this iteration + non_detected_sinks = self.connected_sinks.difference(current_sinks) + reconnected_sinks = current_connected_sinks.intersection(self.disconnected_sinks) + + # Update sinks current status + self.connected_sinks = current_connected_sinks + self.disconnected_sinks.difference_update(current_connected_sinks) + self.disconnected_sinks.update(non_detected_sinks) + + # If a sink reconnected, send 4 other keep alive messages in the next minutes + if reconnected_sinks: + messages_to_send = 4 + logging.debug("Sending keep alive messages more often to reconnected sinks: %s", + ' '.join(reconnected_sinks)) + self.wait_for_next_keep_alive_message_iteration(KEEP_ALIVE_MSG_RECONNECTION_PERIOD_S, start_timer) + + for _ in range(messages_to_send): + start_timer = time() + for sink_id in reconnected_sinks: + self.send_keep_alive_msg_to_sink(self.sink_manager.get_sink(sink_id)) + + self.wait_for_next_keep_alive_message_iteration(KEEP_ALIVE_MSG_RECONNECTION_PERIOD_S, start_timer) + else: # Else wait for sending the next keep alive message + self.wait_for_next_keep_alive_message_iteration(self.keep_alive_interval_s, start_timer) diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index b0338a30..d392576a 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -11,6 +11,7 @@ from threading import Thread, Event from wirepas_gateway.dbus.dbus_client import BusClient +from wirepas_gateway.keep_alive_service import KeepAliveServiceThread from wirepas_gateway.protocol.topic_helper import TopicGenerator, TopicParser from wirepas_gateway.protocol.mqtt_wrapper import MQTTWrapper from wirepas_gateway.utils import ParserHelper @@ -432,6 +433,17 @@ def __init__(self, settings, **kwargs): ) self.status_thread.start() + # Run the keep alive service if it is activated + self.keep_alive_service = None + if settings.activate_keep_alive_service: + self.keep_alive_service = KeepAliveServiceThread( + self.sink_manager, + self.mqtt_wrapper, + settings.keep_alive_interval_s, + settings.keep_alive_timezone_offset_mn + ) + self.keep_alive_service.start() + def _on_mqtt_wrapper_termination_cb(self): """ Callback used to be informed when the MQTT wrapper has exited @@ -441,7 +453,6 @@ def _on_mqtt_wrapper_termination_cb(self): logging.error("MQTT wrapper ends. Terminate the program") self.stop_dbus_client() - def update_gateway_status_dec(fn): """ Decorator to update the gateway status when needed @@ -1060,6 +1071,7 @@ def main(): parse.add_buffering_settings() parse.add_debug_settings() parse.add_deprecated_args() + parse.add_keep_alive_config() settings = parse.settings() diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index a5d78f80..eebbc17e 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -563,6 +563,36 @@ def add_filtering_config(self): ), ) + def add_keep_alive_config(self): + self.filtering.add_argument( + "--activate_keep_alive_service", + default=os.environ.get("WM_KEEP_ALIVE_ACTIVATE", False), + type=self.str2bool, + nargs="?", + const=True, + help=("Default to False. Activate the keep alive service thread. " + "Note: Gateway time is supposed to be synchronized " + "with the NTP server before launching the service.") + ) + + self.filtering.add_argument( + "--keep_alive_interval_s", + default=os.environ.get("WM_KEEP_ALIVE_INTERVAL_S", 300), + action="store", + type=int, + help=("Default to 300 seconds. " + "The interval in seconds between keep-alive messages.") + ) + + self.filtering.add_argument( + "--keep_alive_timezone_offset_mn", + default=os.environ.get("WM_KEEP_ALIVE_TIMEZONE_OFFSET_MN", 0), + action="store", + type=int, + help=("Default to 0. Time zone offset from UTC in minutes (-840 to +720) " + "to be sent in keep alive messages."), + ) + def dump(self, path): """ dumps the arguments into a file """ with open(path, "w") as f: From 1f77443ca1e088aa0df98176cad53c38e92ebc37 Mon Sep 17 00:00:00 2001 From: LePailleurThibault Date: Fri, 10 Jan 2025 17:53:22 +0100 Subject: [PATCH 2/3] Fix issues in the keep alive service based on PR#281. --- docker/docker-compose/README.md | 2 +- docker/transport_service/Dockerfile | 2 +- python_transport/requirements.txt | 3 + .../keep_alive_service/keep_alive_service.py | 107 ++++++++---------- .../wirepas_gateway/transport_service.py | 2 +- .../wirepas_gateway/utils/argument_tools.py | 15 ++- 6 files changed, 63 insertions(+), 68 deletions(-) diff --git a/docker/docker-compose/README.md b/docker/docker-compose/README.md index b697bfaa..a89a61e3 100644 --- a/docker/docker-compose/README.md +++ b/docker/docker-compose/README.md @@ -76,7 +76,7 @@ The list of all possible parameters to configure the transport service are given | WM_GW_WHITENED_ENDPOINTS_FILTER | Destination endpoints list to whiten (no payload content, only size) | None | List of endpoints (i.e. [1,2,3]), a range of endpoints (i.e. [1-3]), or a combination of both | | WM_KEEP_ALIVE_ACTIVATE | Activate the keep alive service thread to send those messages periodically to the network. | False | "yes", "true", "t", "y", "1","no", "false", "f", "n", "0", "" | | WM_KEEP_ALIVE_INTERVAL_S | The interval in seconds between keep-alive messages if those are activated with WM_KEEP_ALIVE_ACTIVATE. | 300 | Any positive integer | -| WM_KEEP_ALIVE_TIMEZONE_OFFSET_MN | Time zone offset from UTC in minutes (-840 to +720) to be sent in keep alive messages if those are activated with WM_KEEP_ALIVE_ACTIVATE. | 0 | Any integer between -840 and 720 | +| WM_KEEP_ALIVE_TIMEZONE_NAME | Time zone name used to set the timezone offset in the keep alive message. Check https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List to see the list of timezone identifiers: for example "Etc/UTC" | "Etc/UTC" | Any valid timezone identifiers. | | WM_DEBUG_LEVEL | Configure log level for the transport service. Please be aware that levels such as debug should not be used in a production system | info | debug, info, critical, fatal, error, warning | diff --git a/docker/transport_service/Dockerfile b/docker/transport_service/Dockerfile index 7a8784c6..66b681e5 100644 --- a/docker/transport_service/Dockerfile +++ b/docker/transport_service/Dockerfile @@ -27,7 +27,7 @@ RUN ./utils/generate_wheel.sh USER wirepas # Needed by pydbus -RUN pip3 install PyGObject~=3.0 --user +RUN pip3 install PyGObject~=3.0 pytz==2024.2 --user # Install protobuf from source to get the UPB implementation RUN pip3 install dist/wirepas_gateway*.whl --no-binary protobuf --user diff --git a/python_transport/requirements.txt b/python_transport/requirements.txt index 281f287e..85114bc5 100644 --- a/python_transport/requirements.txt +++ b/python_transport/requirements.txt @@ -9,3 +9,6 @@ PyYAML==6.0.1 # mqtt paho-mqtt==1.4.0 + +# pytz +pytz==2024.2 \ No newline at end of file diff --git a/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py b/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py index 312ac19e..f137cafe 100644 --- a/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py +++ b/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py @@ -2,10 +2,12 @@ # # See file LICENSE for full license details. # +from datetime import datetime from enum import IntEnum import logging import struct -from time import time, sleep +import pytz +from time import monotonic, sleep, time from threading import Thread import wirepas_mesh_messaging as wmm @@ -17,13 +19,15 @@ # Timeouts and periods used for the keep alive service. WM_MSG_RETRY_PERIOD_S = 1 -KEEP_ALIVE_MSG_RECONNECTION_PERIOD_S = 60 + +# Maximum number of time the service is trying to send a message to sinks. +KEEP_ALIVE_MSG_RETRIES_NUMBER = 3 BROADCAST_ADDRESS = 0xFFFFFFFF class KeepAliveType(IntEnum): - """ Keep alive fields TLV type enumerate. """ + """Keep alive fields TLV type enumerate.""" VERSION_TYPE = 0x01 GATEWAY_STATUS_TYPE = 0x02 RTC_TIMESTAMP_TYPE = 0x03 @@ -52,8 +56,8 @@ def __init__(self, version, gateway_status=None, rtc_timestamp_ms=None, self.timezone_offset_mn = timezone_offset_mn self.keep_alive_interval_s = keep_alive_interval_s - @classmethod - def encode_tlv_item(cls, elt_type, length, value, packing): + @staticmethod + def _encode_tlv_item(elt_type, length, value, packing): """ Encode a new TLV item in little endian. @@ -61,10 +65,10 @@ def encode_tlv_item(cls, elt_type, length, value, packing): elt_type (int): Type of the element to encode. length (int): Number of bytes of the value to be encoded. value: Value to encode. Note: Value should have a specific type - corresponding to the packing parameter. + corresponding to the packing parameter. packing (str): String representing the format characters allowing - the conversion between C and Python bytes when packing the value. - See https://docs.python.org/3/library/struct.html#format-characters. + the conversion between C and Python bytes when packing the value. + See https://docs.python.org/3/library/struct.html#format-characters. """ assert (0 <= elt_type <= 0xFF), "A TLV type must be include between 0 and 255" assert isinstance(length, int), "A TLV length must be an integer" @@ -83,23 +87,23 @@ def encode_tlv(self): f"keep_alive_interval_s={self.keep_alive_interval_s}") buffer = bytes() - buffer += KeepAliveMessage.encode_tlv_item( - KeepAliveType.VERSION_TYPE, 1, KEEP_ALIVE_SERVICE_VERSION, "B" + buffer += KeepAliveMessage._encode_tlv_item( + KeepAliveType.VERSION_TYPE, 1, self.version, "B" ) if self.gateway_status is not None: - buffer += KeepAliveMessage.encode_tlv_item( + buffer += KeepAliveMessage._encode_tlv_item( KeepAliveType.GATEWAY_STATUS_TYPE, 1, self.gateway_status, "B" ) if self.rtc_timestamp_ms is not None: - buffer += KeepAliveMessage.encode_tlv_item( + buffer += KeepAliveMessage._encode_tlv_item( KeepAliveType.RTC_TIMESTAMP_TYPE, 8, self.rtc_timestamp_ms, "Q", ) if self.timezone_offset_mn is not None: - buffer += KeepAliveMessage.encode_tlv_item( + buffer += KeepAliveMessage._encode_tlv_item( KeepAliveType.TIME_ZONE_OFFSET_TYPE, 2, self.timezone_offset_mn, "h" ) if self.keep_alive_interval_s is not None: - buffer += KeepAliveMessage.encode_tlv_item( + buffer += KeepAliveMessage._encode_tlv_item( KeepAliveType.KEEP_ALIVE_INTERVAL_TYPE, 2, self.keep_alive_interval_s, "H", ) @@ -109,7 +113,7 @@ def encode_tlv(self): class KeepAliveServiceThread(Thread): def __init__(self, sink_manager, mqtt_wrapper, keep_alive_interval_s=300, - keep_alive_timezone_offset_mn=0): + keep_alive_timezone_name="Etc/UTC"): """ Thread sending periodically keep alive messages to the network. Args: @@ -117,8 +121,10 @@ def __init__(self, sink_manager, mqtt_wrapper, mqtt_wrapper: The mqtt wrapper to get access to queue level of the mqtt broker. keep_alive_interval_s (int): Default to 300 seconds. The interval in seconds between keep-alive messages. - keep_alive_timezone_offset_mn (int): Default to 0. - Time zone offset from UTC in minutes (-840 to +720) of the keep alive message. + keep_alive_timezone_name (str): Default to "Etc/UTC". + Time zone name used to set the timezone offset in the keep alive message. + Check https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List + to see the list of timezone identifiers: for example "Etc/UTC" """ Thread.__init__(self) @@ -128,19 +134,28 @@ def __init__(self, sink_manager, mqtt_wrapper, self.sink_manager = sink_manager self.mqtt_wrapper = mqtt_wrapper - self.disconnected_sinks = set() # All sinks that are detected as disconnected to the gateway - self.connected_sinks = set() # All sinks that are detected as connected to the gateway + # All sinks that are detected as disconnected to the gateway + self.disconnected_sinks = set() + # All sinks that are detected as connected to the gateway + self.connected_sinks = set() self.keep_alive_interval_s = keep_alive_interval_s - self.keep_alive_timezone_offset_mn = keep_alive_timezone_offset_mn + try: + self.keep_alive_timezone = pytz.timezone(keep_alive_timezone_name) + except pytz.UnknownTimeZoneError: + logging.error("%s is not a valid timezone name.", + self.keep_alive_timezone) + self.keep_alive_timezone = pytz.timezone("Etc/UTC") + + def get_timezone_offset_mns(self): + """ Return the timezone offset in minutes. """ + local_time = datetime.now(self.keep_alive_timezone) + return int(local_time.utcoffset().total_seconds() / 60) def prepare_keep_alive_msg(self): """ Prepare and return a keep alive message. """ - rtc_timestamp_ms = None - time_zone_offset = None - rtc_timestamp_ms = int(time() * 1000) - time_zone_offset = self.keep_alive_timezone_offset_mn + time_zone_offset = self.get_timezone_offset_mns() gateway_status = int(self.mqtt_wrapper.connected) keep_alive_msg = KeepAliveMessage(KEEP_ALIVE_SERVICE_VERSION, @@ -154,12 +169,13 @@ def prepare_keep_alive_msg(self): def send_keep_alive_msg_to_sink(self, sink) -> bool: """ Send the keep alive message to the network. - Returns True if the keep alive message could be sent to the sink, False otherwise. + Returns True if the keep alive message could be sent to the sink, + False otherwise. Args: sink: Sink to send the keep alive message to. """ - retries_left = 3 + retries_left = KEEP_ALIVE_MSG_RETRIES_NUMBER res = wmm.GatewayResultCode.GW_RES_UNKNOWN_ERROR while retries_left > 0 and res != wmm.GatewayResultCode.GW_RES_OK: @@ -192,7 +208,7 @@ def send_keep_alive_msg_to_sink(self, sink) -> bool: def wait_for_next_keep_alive_message_iteration(self, time_to_wait, start_timer=None): """ Wait for the next keep alive message iteration. """ if start_timer: - time_to_wait = max(time_to_wait - (time() - start_timer), 0) + time_to_wait = max(time_to_wait - (monotonic() - start_timer), 0) sleep(time_to_wait) @@ -200,11 +216,10 @@ def run(self): """ Main loop that send periodically keep alive message to the network. """ while True: # Put a timer so that the message are periodic with a good precision - start_timer = time() + start_timer = monotonic() # Get current connected sinks - current_sinks = set([sink.sink_id for sink in self.sink_manager.get_sinks()]) - current_connected_sinks = set() + current_sinks = [sink.sink_id for sink in self.sink_manager.get_sinks()] if not current_sinks: logging.error("No sinks are detected!") @@ -214,32 +229,6 @@ def run(self): # Send keep alive messages to all sinks logging.info("Send a keep alive message to the network") for sink_id in current_sinks: - if self.send_keep_alive_msg_to_sink(self.sink_manager.get_sink(sink_id)): - current_connected_sinks.add(sink_id) - else: - self.disconnected_sinks.add(sink_id) - - # Find sinks that disappeared and those which reconnected during this iteration - non_detected_sinks = self.connected_sinks.difference(current_sinks) - reconnected_sinks = current_connected_sinks.intersection(self.disconnected_sinks) - - # Update sinks current status - self.connected_sinks = current_connected_sinks - self.disconnected_sinks.difference_update(current_connected_sinks) - self.disconnected_sinks.update(non_detected_sinks) - - # If a sink reconnected, send 4 other keep alive messages in the next minutes - if reconnected_sinks: - messages_to_send = 4 - logging.debug("Sending keep alive messages more often to reconnected sinks: %s", - ' '.join(reconnected_sinks)) - self.wait_for_next_keep_alive_message_iteration(KEEP_ALIVE_MSG_RECONNECTION_PERIOD_S, start_timer) - - for _ in range(messages_to_send): - start_timer = time() - for sink_id in reconnected_sinks: - self.send_keep_alive_msg_to_sink(self.sink_manager.get_sink(sink_id)) - - self.wait_for_next_keep_alive_message_iteration(KEEP_ALIVE_MSG_RECONNECTION_PERIOD_S, start_timer) - else: # Else wait for sending the next keep alive message - self.wait_for_next_keep_alive_message_iteration(self.keep_alive_interval_s, start_timer) + self.send_keep_alive_msg_to_sink(self.sink_manager.get_sink(sink_id)) + + self.wait_for_next_keep_alive_message_iteration(self.keep_alive_interval_s, start_timer) diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index d392576a..17485d0f 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -440,7 +440,7 @@ def __init__(self, settings, **kwargs): self.sink_manager, self.mqtt_wrapper, settings.keep_alive_interval_s, - settings.keep_alive_timezone_offset_mn + settings.keep_alive_timezone_name ) self.keep_alive_service.start() diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index eebbc17e..507c0a46 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -585,12 +585,15 @@ def add_keep_alive_config(self): ) self.filtering.add_argument( - "--keep_alive_timezone_offset_mn", - default=os.environ.get("WM_KEEP_ALIVE_TIMEZONE_OFFSET_MN", 0), - action="store", - type=int, - help=("Default to 0. Time zone offset from UTC in minutes (-840 to +720) " - "to be sent in keep alive messages."), + "--keep_alive_timezone_name", + default=os.environ.get("WM_KEEP_ALIVE_TIMEZONE_NAME", "Etc/UTC"), + type=self.str2none, + nargs="?", + const=True, + help=("Default to 'Etc/UTC'. Time zone name used to set the " + "timezone offset in the keep alive message. Check " + "https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List " + "to see the list of timezone identifiers."), ) def dump(self, path): From 4e343c229da391bfea30c98965152a5249234b2c Mon Sep 17 00:00:00 2001 From: LePailleurThibault Date: Mon, 14 Apr 2025 10:39:48 +0200 Subject: [PATCH 3/3] Update the keep alive service to the latest version --- docker/transport_service/Dockerfile | 2 +- .../keep_alive_service/keep_alive_service.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docker/transport_service/Dockerfile b/docker/transport_service/Dockerfile index 66b681e5..7a8784c6 100644 --- a/docker/transport_service/Dockerfile +++ b/docker/transport_service/Dockerfile @@ -27,7 +27,7 @@ RUN ./utils/generate_wheel.sh USER wirepas # Needed by pydbus -RUN pip3 install PyGObject~=3.0 pytz==2024.2 --user +RUN pip3 install PyGObject~=3.0 --user # Install protobuf from source to get the UPB implementation RUN pip3 install dist/wirepas_gateway*.whl --no-binary protobuf --user diff --git a/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py b/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py index f137cafe..8acef769 100644 --- a/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py +++ b/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py @@ -44,8 +44,8 @@ class KeepAliveMessage(): gateway_status: The running status of the gateway. Bit 0: Backhaul (MQTT broker) Connectivity (0 = Disconnected, 1 = Connected) Bits 1-7: Reserved for future use or other status indicators - rtc_timestamp_ms: Unix epoch timestamp in ms (milliseconds since January 1, 1970). - timezone_offset_mn: Time zone offset from UTC in minutes (-840 to +720). + rtc_timestamp_ms: Unix epoch timestamp (seconds since January 1, 1970). + timezone_offset_mn: Time zone offset from UTC in minutes (-720 to +840). keep_alive_interval_s: Interval in seconds until the next keepalive message is expected. """ def __init__(self, version, gateway_status=None, rtc_timestamp_ms=None, @@ -94,11 +94,11 @@ def encode_tlv(self): buffer += KeepAliveMessage._encode_tlv_item( KeepAliveType.GATEWAY_STATUS_TYPE, 1, self.gateway_status, "B" ) - if self.rtc_timestamp_ms is not None: + if self.rtc_timestamp_ms is not None and \ + self.timezone_offset_mn is not None: buffer += KeepAliveMessage._encode_tlv_item( - KeepAliveType.RTC_TIMESTAMP_TYPE, 8, self.rtc_timestamp_ms, "Q", + KeepAliveType.RTC_TIMESTAMP_TYPE, 4, self.rtc_timestamp_ms, "I", ) - if self.timezone_offset_mn is not None: buffer += KeepAliveMessage._encode_tlv_item( KeepAliveType.TIME_ZONE_OFFSET_TYPE, 2, self.timezone_offset_mn, "h" ) @@ -154,7 +154,7 @@ def get_timezone_offset_mns(self): def prepare_keep_alive_msg(self): """ Prepare and return a keep alive message. """ - rtc_timestamp_ms = int(time() * 1000) + rtc_timestamp_ms = int(time()) time_zone_offset = self.get_timezone_offset_mns() gateway_status = int(self.mqtt_wrapper.connected)