Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker/docker-compose/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ The list of all possible parameters to configure the transport service are given
| WM_GW_VERSION | Version of the gateway | None | Any string |
| WM_GW_IGNORED_ENDPOINTS_FILTER | Destination endpoints list to ignore (not published) | None | List of endpoints (i.e. [1,2,3]), a range of endpoints (i.e. [1-3]), or a combination of both |
| WM_GW_WHITENED_ENDPOINTS_FILTER | Destination endpoints list to whiten (no payload content, only size) | None | List of endpoints (i.e. [1,2,3]), a range of endpoints (i.e. [1-3]), or a combination of both |
| WM_KEEP_ALIVE_ACTIVATE | Activate the keep alive service thread to send those messages periodically to the network. | False | "yes", "true", "t", "y", "1","no", "false", "f", "n", "0", "" |
| WM_KEEP_ALIVE_INTERVAL_S | The interval in seconds between keep-alive messages if those are activated with WM_KEEP_ALIVE_ACTIVATE. | 300 | Any positive integer |
| WM_KEEP_ALIVE_TIMEZONE_NAME | Time zone name used to set the timezone offset in the keep alive message. Check https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List to see the list of timezone identifiers: for example "Etc/UTC" | "Etc/UTC" | Any valid timezone identifiers. |
| WM_DEBUG_LEVEL | Configure log level for the transport service. Please be aware that levels such as debug should not be used in a production system | info | debug, info, critical, fatal, error, warning |


Expand Down
3 changes: 3 additions & 0 deletions python_transport/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ PyYAML==6.0.1

# mqtt
paho-mqtt==1.4.0

# pytz
pytz==2024.2
1 change: 1 addition & 0 deletions python_transport/wirepas_gateway/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# flake8: noqa

from . import dbus
from . import keep_alive_service
from . import protocol
from . import utils

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright 2024 Wirepas Ltd licensed under Apache License, Version 2.0
#
# See file LICENSE for full license details.
#
from .keep_alive_service import *
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'.keep_alive_service.*' imported but unused

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LePailleurThibault Is it a valid comment from hound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, i don't know how this comment is generated and imports in a init file are never really "used".
However, for example, in the transport service there is this line
from wirepas_gateway.keep_alive_service import KeepAliveServiceThread.
The init.py is required but maybe not this line

Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
# Copyright 2024 Wirepas Ltd licensed under Apache License, Version 2.0
#
# See file LICENSE for full license details.
#
from datetime import datetime
from enum import IntEnum
import logging
import struct
import pytz
from time import monotonic, sleep, time
from threading import Thread

import wirepas_mesh_messaging as wmm


KEEP_ALIVE_SERVICE_VERSION = 0x01
KEEP_ALIVE_SRC_EP = 67
KEEP_ALIVE_DST_EP = 67

# Timeouts and periods used for the keep alive service.
WM_MSG_RETRY_PERIOD_S = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 1s enough ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if a message can't be sent to a sink 3 times in a row in ~2s, there are definitely issue with this sink. So not sending a keep alive message for that sink wouldn't be that bad.


# Maximum number of time the service is trying to send a message to sinks.
KEEP_ALIVE_MSG_RETRIES_NUMBER = 3

BROADCAST_ADDRESS = 0xFFFFFFFF


class KeepAliveType(IntEnum):
"""Keep alive fields TLV type enumerate."""
VERSION_TYPE = 0x01
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Black would make changes.

GATEWAY_STATUS_TYPE = 0x02
RTC_TIMESTAMP_TYPE = 0x03
TIME_ZONE_OFFSET_TYPE = 0x04
KEEP_ALIVE_INTERVAL_TYPE = 0x05


class KeepAliveMessage():
"""
Class to store keep alive message attributes.

Attributes:
version: The version number for the keep-alive message.
gateway_status: The running status of the gateway.
Bit 0: Backhaul (MQTT broker) Connectivity (0 = Disconnected, 1 = Connected)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (88 > 79 characters)

Bits 1-7: Reserved for future use or other status indicators
rtc_timestamp_ms: Unix epoch timestamp (seconds since January 1, 1970).
timezone_offset_mn: Time zone offset from UTC in minutes (-720 to +840).
keep_alive_interval_s: Interval in seconds until the next keepalive message is expected.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (96 > 79 characters)

"""
def __init__(self, version, gateway_status=None, rtc_timestamp_ms=None,
timezone_offset_mn=None, keep_alive_interval_s=None):
self.version = version
self.gateway_status = gateway_status
self.rtc_timestamp_ms = rtc_timestamp_ms
self.timezone_offset_mn = timezone_offset_mn
self.keep_alive_interval_s = keep_alive_interval_s

@staticmethod
def _encode_tlv_item(elt_type, length, value, packing):
"""
Encode a new TLV item in little endian.

Args:
elt_type (int): Type of the element to encode.
length (int): Number of bytes of the value to be encoded.
value: Value to encode. Note: Value should have a specific type
corresponding to the packing parameter.
packing (str): String representing the format characters allowing
the conversion between C and Python bytes when packing the value.
See https://docs.python.org/3/library/struct.html#format-characters.
"""
assert (0 <= elt_type <= 0xFF), "A TLV type must be include between 0 and 255"
assert isinstance(length, int), "A TLV length must be an integer"
return bytes(struct.pack("<bb" + packing, elt_type, length, value))

def encode_tlv(self):
"""
Encode a keep alive message with TLV.
Each of the RTC item are encoding as:
Type of the item - Length of the value to encode - value to encode.
"""
logging.debug(f"Prepare keep alive message with version={self.version}, "
f"gateway_status={self.gateway_status}, "
f"rtc_timestamp_ms={self.rtc_timestamp_ms}, "
f"timezone_offset_mn={self.timezone_offset_mn} and "
f"keep_alive_interval_s={self.keep_alive_interval_s}")

buffer = bytes()
buffer += KeepAliveMessage._encode_tlv_item(
KeepAliveType.VERSION_TYPE, 1, self.version, "B"
)
if self.gateway_status is not None:
buffer += KeepAliveMessage._encode_tlv_item(
KeepAliveType.GATEWAY_STATUS_TYPE, 1, self.gateway_status, "B"
)
if self.rtc_timestamp_ms is not None and \
self.timezone_offset_mn is not None:
buffer += KeepAliveMessage._encode_tlv_item(
KeepAliveType.RTC_TIMESTAMP_TYPE, 4, self.rtc_timestamp_ms, "I",
)
buffer += KeepAliveMessage._encode_tlv_item(
KeepAliveType.TIME_ZONE_OFFSET_TYPE, 2, self.timezone_offset_mn, "h"
)
if self.keep_alive_interval_s is not None:
buffer += KeepAliveMessage._encode_tlv_item(
KeepAliveType.KEEP_ALIVE_INTERVAL_TYPE, 2, self.keep_alive_interval_s, "H",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (91 > 79 characters)

)

return buffer


class KeepAliveServiceThread(Thread):
def __init__(self, sink_manager, mqtt_wrapper,
keep_alive_interval_s=300,
keep_alive_timezone_name="Etc/UTC"):
""" Thread sending periodically keep alive messages to the network.

Args:
sink_manager: The sink manager to send sinks the keep alive messages.
mqtt_wrapper: The mqtt wrapper to get access to queue level of the mqtt broker.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (91 > 79 characters)

keep_alive_interval_s (int): Default to 300 seconds.
The interval in seconds between keep-alive messages.
keep_alive_timezone_name (str): Default to "Etc/UTC".
Time zone name used to set the timezone offset in the keep alive message.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (89 > 79 characters)

Check https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (87 > 79 characters)

to see the list of timezone identifiers: for example "Etc/UTC"
"""
Thread.__init__(self)

# Daemonize thread to exit with full process
self.daemon = True

self.sink_manager = sink_manager
self.mqtt_wrapper = mqtt_wrapper

# All sinks that are detected as disconnected to the gateway
self.disconnected_sinks = set()
# All sinks that are detected as connected to the gateway
self.connected_sinks = set()

self.keep_alive_interval_s = keep_alive_interval_s
try:
self.keep_alive_timezone = pytz.timezone(keep_alive_timezone_name)
except pytz.UnknownTimeZoneError:
logging.error("%s is not a valid timezone name.",
self.keep_alive_timezone)
self.keep_alive_timezone = pytz.timezone("Etc/UTC")

def get_timezone_offset_mns(self):
""" Return the timezone offset in minutes. """
local_time = datetime.now(self.keep_alive_timezone)
return int(local_time.utcoffset().total_seconds() / 60)

def prepare_keep_alive_msg(self):
""" Prepare and return a keep alive message. """
rtc_timestamp_ms = int(time())
time_zone_offset = self.get_timezone_offset_mns()

gateway_status = int(self.mqtt_wrapper.connected)
keep_alive_msg = KeepAliveMessage(KEEP_ALIVE_SERVICE_VERSION,
gateway_status,
rtc_timestamp_ms,
time_zone_offset,
self.keep_alive_interval_s)

return keep_alive_msg

def send_keep_alive_msg_to_sink(self, sink) -> bool:
"""
Send the keep alive message to the network.
Returns True if the keep alive message could be sent to the sink,
False otherwise.

Args:
sink: Sink to send the keep alive message to.
"""
retries_left = KEEP_ALIVE_MSG_RETRIES_NUMBER
res = wmm.GatewayResultCode.GW_RES_UNKNOWN_ERROR

while retries_left > 0 and res != wmm.GatewayResultCode.GW_RES_OK:
retries_left -= 1
keep_alive_message = self.prepare_keep_alive_msg()
payload = keep_alive_message.encode_tlv()
logging.debug("Send the following keep alive payload to sink %s: %s",
sink.sink_id, payload.hex())

res = sink.send_data(
dst=BROADCAST_ADDRESS,
src_ep=KEEP_ALIVE_SRC_EP,
dst_ep=KEEP_ALIVE_DST_EP,
qos=0,
initial_time=0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I recall right, it is already the default value, so could be skipped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This argument is required by the method. I don't think there is any default value for the initial_time.

data=payload
)
if res != wmm.GatewayResultCode.GW_RES_OK and retries_left > 0:
sleep(WM_MSG_RETRY_PERIOD_S)
logging.debug("Retry sending the keep alive message that couldn't be sent to %s sink: %s. ",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (108 > 79 characters)

sink.sink_id, res)

if res != wmm.GatewayResultCode.GW_RES_OK:
logging.error("Keep alive message couldn't be sent to %s sink: %s",
sink.sink_id, res)
return False

return True

def wait_for_next_keep_alive_message_iteration(self, time_to_wait, start_timer=None):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (89 > 79 characters)

""" Wait for the next keep alive message iteration. """
if start_timer:
time_to_wait = max(time_to_wait - (monotonic() - start_timer), 0)

sleep(time_to_wait)

def run(self):
""" Main loop that send periodically keep alive message to the network. """
while True:
# Put a timer so that the message are periodic with a good precision
start_timer = monotonic()

# Get current connected sinks
current_sinks = [sink.sink_id for sink in self.sink_manager.get_sinks()]

if not current_sinks:
logging.error("No sinks are detected!")
self.wait_for_next_keep_alive_message_iteration(self.keep_alive_interval_s)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (91 > 79 characters)

continue

# Send keep alive messages to all sinks
logging.info("Send a keep alive message to the network")
for sink_id in current_sinks:
self.send_keep_alive_msg_to_sink(self.sink_manager.get_sink(sink_id))

self.wait_for_next_keep_alive_message_iteration(self.keep_alive_interval_s, start_timer)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (100 > 79 characters)

14 changes: 13 additions & 1 deletion python_transport/wirepas_gateway/transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from threading import Thread, Event

from wirepas_gateway.dbus.dbus_client import BusClient
from wirepas_gateway.keep_alive_service import KeepAliveServiceThread
from wirepas_gateway.protocol.topic_helper import TopicGenerator, TopicParser
from wirepas_gateway.protocol.mqtt_wrapper import MQTTWrapper
from wirepas_gateway.utils import ParserHelper
Expand Down Expand Up @@ -432,6 +433,17 @@ def __init__(self, settings, **kwargs):
)
self.status_thread.start()

# Run the keep alive service if it is activated
self.keep_alive_service = None
if settings.activate_keep_alive_service:
self.keep_alive_service = KeepAliveServiceThread(
self.sink_manager,
self.mqtt_wrapper,
settings.keep_alive_interval_s,
settings.keep_alive_timezone_name
)
self.keep_alive_service.start()

def _on_mqtt_wrapper_termination_cb(self):
"""
Callback used to be informed when the MQTT wrapper has exited
Expand All @@ -441,7 +453,6 @@ def _on_mqtt_wrapper_termination_cb(self):
logging.error("MQTT wrapper ends. Terminate the program")
self.stop_dbus_client()


def update_gateway_status_dec(fn):
"""
Decorator to update the gateway status when needed
Expand Down Expand Up @@ -1060,6 +1071,7 @@ def main():
parse.add_buffering_settings()
parse.add_debug_settings()
parse.add_deprecated_args()
parse.add_keep_alive_config()

settings = parse.settings()

Expand Down
33 changes: 33 additions & 0 deletions python_transport/wirepas_gateway/utils/argument_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,39 @@ def add_filtering_config(self):
),
)

def add_keep_alive_config(self):
self.filtering.add_argument(
"--activate_keep_alive_service",
default=os.environ.get("WM_KEEP_ALIVE_ACTIVATE", False),
type=self.str2bool,
nargs="?",
const=True,
help=("Default to False. Activate the keep alive service thread. "
"Note: Gateway time is supposed to be synchronized "
"with the NTP server before launching the service.")
)

self.filtering.add_argument(
"--keep_alive_interval_s",
default=os.environ.get("WM_KEEP_ALIVE_INTERVAL_S", 300),
action="store",
type=int,
help=("Default to 300 seconds. "
"The interval in seconds between keep-alive messages.")
)

self.filtering.add_argument(
"--keep_alive_timezone_name",
default=os.environ.get("WM_KEEP_ALIVE_TIMEZONE_NAME", "Etc/UTC"),
type=self.str2none,
nargs="?",
const=True,
help=("Default to 'Etc/UTC'. Time zone name used to set the "
"timezone offset in the keep alive message. Check "
"https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List "
"to see the list of timezone identifiers."),
)

def dump(self, path):
""" dumps the arguments into a file """
with open(path, "w") as f:
Expand Down