diff --git a/docker/docker-compose/README.md b/docker/docker-compose/README.md index 1993b8a2..a89a61e3 100644 --- a/docker/docker-compose/README.md +++ b/docker/docker-compose/README.md @@ -74,6 +74,9 @@ The list of all possible parameters to configure the transport service are given | WM_GW_VERSION | Version of the gateway | None | Any string | | WM_GW_IGNORED_ENDPOINTS_FILTER | Destination endpoints list to ignore (not published) | None | List of endpoints (i.e. [1,2,3]), a range of endpoints (i.e. [1-3]), or a combination of both | | WM_GW_WHITENED_ENDPOINTS_FILTER | Destination endpoints list to whiten (no payload content, only size) | None | List of endpoints (i.e. [1,2,3]), a range of endpoints (i.e. [1-3]), or a combination of both | +| WM_KEEP_ALIVE_ACTIVATE | Activate the keep alive service thread to send those messages periodically to the network. | False | "yes", "true", "t", "y", "1","no", "false", "f", "n", "0", "" | +| WM_KEEP_ALIVE_INTERVAL_S | The interval in seconds between keep-alive messages if those are activated with WM_KEEP_ALIVE_ACTIVATE. | 300 | Any positive integer | +| WM_KEEP_ALIVE_TIMEZONE_NAME | Time zone name used to set the timezone offset in the keep alive message. Check https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List to see the list of timezone identifiers: for example "Etc/UTC" | "Etc/UTC" | Any valid timezone identifiers. | | WM_DEBUG_LEVEL | Configure log level for the transport service. Please be aware that levels such as debug should not be used in a production system | info | debug, info, critical, fatal, error, warning | diff --git a/python_transport/requirements.txt b/python_transport/requirements.txt index 281f287e..85114bc5 100644 --- a/python_transport/requirements.txt +++ b/python_transport/requirements.txt @@ -9,3 +9,6 @@ PyYAML==6.0.1 # mqtt paho-mqtt==1.4.0 + +# pytz +pytz==2024.2 \ No newline at end of file diff --git a/python_transport/wirepas_gateway/__init__.py b/python_transport/wirepas_gateway/__init__.py index 6e204a1c..b79ec19f 100644 --- a/python_transport/wirepas_gateway/__init__.py +++ b/python_transport/wirepas_gateway/__init__.py @@ -12,6 +12,7 @@ # flake8: noqa from . import dbus +from . import keep_alive_service from . import protocol from . import utils diff --git a/python_transport/wirepas_gateway/keep_alive_service/__init__.py b/python_transport/wirepas_gateway/keep_alive_service/__init__.py new file mode 100644 index 00000000..96b6f20e --- /dev/null +++ b/python_transport/wirepas_gateway/keep_alive_service/__init__.py @@ -0,0 +1,5 @@ +# Copyright 2024 Wirepas Ltd licensed under Apache License, Version 2.0 +# +# See file LICENSE for full license details. +# +from .keep_alive_service import * diff --git a/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py b/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py new file mode 100644 index 00000000..8acef769 --- /dev/null +++ b/python_transport/wirepas_gateway/keep_alive_service/keep_alive_service.py @@ -0,0 +1,234 @@ +# Copyright 2024 Wirepas Ltd licensed under Apache License, Version 2.0 +# +# See file LICENSE for full license details. +# +from datetime import datetime +from enum import IntEnum +import logging +import struct +import pytz +from time import monotonic, sleep, time +from threading import Thread + +import wirepas_mesh_messaging as wmm + + +KEEP_ALIVE_SERVICE_VERSION = 0x01 +KEEP_ALIVE_SRC_EP = 67 +KEEP_ALIVE_DST_EP = 67 + +# Timeouts and periods used for the keep alive service. +WM_MSG_RETRY_PERIOD_S = 1 + +# Maximum number of time the service is trying to send a message to sinks. +KEEP_ALIVE_MSG_RETRIES_NUMBER = 3 + +BROADCAST_ADDRESS = 0xFFFFFFFF + + +class KeepAliveType(IntEnum): + """Keep alive fields TLV type enumerate.""" + VERSION_TYPE = 0x01 + GATEWAY_STATUS_TYPE = 0x02 + RTC_TIMESTAMP_TYPE = 0x03 + TIME_ZONE_OFFSET_TYPE = 0x04 + KEEP_ALIVE_INTERVAL_TYPE = 0x05 + + +class KeepAliveMessage(): + """ + Class to store keep alive message attributes. + + Attributes: + version: The version number for the keep-alive message. + gateway_status: The running status of the gateway. + Bit 0: Backhaul (MQTT broker) Connectivity (0 = Disconnected, 1 = Connected) + Bits 1-7: Reserved for future use or other status indicators + rtc_timestamp_ms: Unix epoch timestamp (seconds since January 1, 1970). + timezone_offset_mn: Time zone offset from UTC in minutes (-720 to +840). + keep_alive_interval_s: Interval in seconds until the next keepalive message is expected. + """ + def __init__(self, version, gateway_status=None, rtc_timestamp_ms=None, + timezone_offset_mn=None, keep_alive_interval_s=None): + self.version = version + self.gateway_status = gateway_status + self.rtc_timestamp_ms = rtc_timestamp_ms + self.timezone_offset_mn = timezone_offset_mn + self.keep_alive_interval_s = keep_alive_interval_s + + @staticmethod + def _encode_tlv_item(elt_type, length, value, packing): + """ + Encode a new TLV item in little endian. + + Args: + elt_type (int): Type of the element to encode. + length (int): Number of bytes of the value to be encoded. + value: Value to encode. Note: Value should have a specific type + corresponding to the packing parameter. + packing (str): String representing the format characters allowing + the conversion between C and Python bytes when packing the value. + See https://docs.python.org/3/library/struct.html#format-characters. + """ + assert (0 <= elt_type <= 0xFF), "A TLV type must be include between 0 and 255" + assert isinstance(length, int), "A TLV length must be an integer" + return bytes(struct.pack(" bool: + """ + Send the keep alive message to the network. + Returns True if the keep alive message could be sent to the sink, + False otherwise. + + Args: + sink: Sink to send the keep alive message to. + """ + retries_left = KEEP_ALIVE_MSG_RETRIES_NUMBER + res = wmm.GatewayResultCode.GW_RES_UNKNOWN_ERROR + + while retries_left > 0 and res != wmm.GatewayResultCode.GW_RES_OK: + retries_left -= 1 + keep_alive_message = self.prepare_keep_alive_msg() + payload = keep_alive_message.encode_tlv() + logging.debug("Send the following keep alive payload to sink %s: %s", + sink.sink_id, payload.hex()) + + res = sink.send_data( + dst=BROADCAST_ADDRESS, + src_ep=KEEP_ALIVE_SRC_EP, + dst_ep=KEEP_ALIVE_DST_EP, + qos=0, + initial_time=0, + data=payload + ) + if res != wmm.GatewayResultCode.GW_RES_OK and retries_left > 0: + sleep(WM_MSG_RETRY_PERIOD_S) + logging.debug("Retry sending the keep alive message that couldn't be sent to %s sink: %s. ", + sink.sink_id, res) + + if res != wmm.GatewayResultCode.GW_RES_OK: + logging.error("Keep alive message couldn't be sent to %s sink: %s", + sink.sink_id, res) + return False + + return True + + def wait_for_next_keep_alive_message_iteration(self, time_to_wait, start_timer=None): + """ Wait for the next keep alive message iteration. """ + if start_timer: + time_to_wait = max(time_to_wait - (monotonic() - start_timer), 0) + + sleep(time_to_wait) + + def run(self): + """ Main loop that send periodically keep alive message to the network. """ + while True: + # Put a timer so that the message are periodic with a good precision + start_timer = monotonic() + + # Get current connected sinks + current_sinks = [sink.sink_id for sink in self.sink_manager.get_sinks()] + + if not current_sinks: + logging.error("No sinks are detected!") + self.wait_for_next_keep_alive_message_iteration(self.keep_alive_interval_s) + continue + + # Send keep alive messages to all sinks + logging.info("Send a keep alive message to the network") + for sink_id in current_sinks: + self.send_keep_alive_msg_to_sink(self.sink_manager.get_sink(sink_id)) + + self.wait_for_next_keep_alive_message_iteration(self.keep_alive_interval_s, start_timer) diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index b0338a30..17485d0f 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -11,6 +11,7 @@ from threading import Thread, Event from wirepas_gateway.dbus.dbus_client import BusClient +from wirepas_gateway.keep_alive_service import KeepAliveServiceThread from wirepas_gateway.protocol.topic_helper import TopicGenerator, TopicParser from wirepas_gateway.protocol.mqtt_wrapper import MQTTWrapper from wirepas_gateway.utils import ParserHelper @@ -432,6 +433,17 @@ def __init__(self, settings, **kwargs): ) self.status_thread.start() + # Run the keep alive service if it is activated + self.keep_alive_service = None + if settings.activate_keep_alive_service: + self.keep_alive_service = KeepAliveServiceThread( + self.sink_manager, + self.mqtt_wrapper, + settings.keep_alive_interval_s, + settings.keep_alive_timezone_name + ) + self.keep_alive_service.start() + def _on_mqtt_wrapper_termination_cb(self): """ Callback used to be informed when the MQTT wrapper has exited @@ -441,7 +453,6 @@ def _on_mqtt_wrapper_termination_cb(self): logging.error("MQTT wrapper ends. Terminate the program") self.stop_dbus_client() - def update_gateway_status_dec(fn): """ Decorator to update the gateway status when needed @@ -1060,6 +1071,7 @@ def main(): parse.add_buffering_settings() parse.add_debug_settings() parse.add_deprecated_args() + parse.add_keep_alive_config() settings = parse.settings() diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index a5d78f80..507c0a46 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -563,6 +563,39 @@ def add_filtering_config(self): ), ) + def add_keep_alive_config(self): + self.filtering.add_argument( + "--activate_keep_alive_service", + default=os.environ.get("WM_KEEP_ALIVE_ACTIVATE", False), + type=self.str2bool, + nargs="?", + const=True, + help=("Default to False. Activate the keep alive service thread. " + "Note: Gateway time is supposed to be synchronized " + "with the NTP server before launching the service.") + ) + + self.filtering.add_argument( + "--keep_alive_interval_s", + default=os.environ.get("WM_KEEP_ALIVE_INTERVAL_S", 300), + action="store", + type=int, + help=("Default to 300 seconds. " + "The interval in seconds between keep-alive messages.") + ) + + self.filtering.add_argument( + "--keep_alive_timezone_name", + default=os.environ.get("WM_KEEP_ALIVE_TIMEZONE_NAME", "Etc/UTC"), + type=self.str2none, + nargs="?", + const=True, + help=("Default to 'Etc/UTC'. Time zone name used to set the " + "timezone offset in the keep alive message. Check " + "https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List " + "to see the list of timezone identifiers."), + ) + def dump(self, path): """ dumps the arguments into a file """ with open(path, "w") as f: