Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions python_transport/tests/test_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
env_vars["WM_SERVICES_MQTT_PERSIST_SESSION"] = True
env_vars["WM_SERVICES_MQTT_FORCE_UNSECURE"] = True
env_vars["WM_SERVICES_MQTT_ALLOW_UNTRUSTED"] = True
env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED"] = True

env_vars["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"] = 1000
env_vars["WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH"] = 128
Expand Down Expand Up @@ -60,11 +61,13 @@
file_vars["gateway_version"] = env_vars["WM_GW_VERSION"]
file_vars["ignored_endpoints_filter"] = env_vars["WM_GW_IGNORED_ENDPOINTS_FILTER"]
file_vars["whitened_endpoints_filter"] = env_vars["WM_GW_WHITENED_ENDPOINTS_FILTER"]
file_vars["mqtt_retain_flag_not_supported"] = env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Black would make changes.
line too long (100 > 79 characters)


booleans = [
"WM_SERVICES_MQTT_PERSIST_SESSION",
"WM_SERVICES_MQTT_FORCE_UNSECURE",
"WM_SERVICES_MQTT_ALLOW_UNTRUSTED",
"WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED",
]


Expand Down Expand Up @@ -139,6 +142,14 @@ def content_tests(settings, vcopy):
vcopy["WM_SERVICES_MQTT_ALLOW_UNTRUSTED"] == settings.mqtt_allow_untrusted
)

if "WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED" not in vcopy:
assert settings.mqtt_retain_flag_not_supported is False
else:
assert (
vcopy["WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED"]
== settings.mqtt_retain_flag_not_supported
)

assert vcopy["WM_SERVICES_MQTT_RECONNECT_DELAY"] == settings.mqtt_reconnect_delay
assert (
vcopy["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"]
Expand Down Expand Up @@ -199,6 +210,7 @@ def test_defaults():
assert settings.mqtt_persist_session is False
assert settings.mqtt_force_unsecure is False
assert settings.mqtt_allow_untrusted is False
assert settings.mqtt_retain_flag_not_supported is False
assert settings.mqtt_reconnect_delay == 0
assert settings.buffering_max_buffered_packets == 0
assert settings.buffering_max_delay_without_publish == 0
Expand Down
14 changes: 13 additions & 1 deletion python_transport/wirepas_gateway/protocol/mqtt_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def __init__(
# Keep track of latest published packet
self._publish_monitor = PublishMonitor()

# Load special settings for broker compatibility
self.retain_supported = not settings.mqtt_retain_flag_not_supported

if settings.mqtt_use_websocket:
transport = "websockets"
self._use_websockets = True
Expand Down Expand Up @@ -233,7 +236,7 @@ def _get_socket(self):

def _set_last_will(self, topic, data):
# Set Last wil message
self._client.will_set(topic, data, qos=2, retain=True)
self._client.will_set(topic, data, qos=2, retain=self.retain_supported)

def run(self):
self.running = True
Expand Down Expand Up @@ -278,6 +281,9 @@ def _publish_from_wrapper_thread(self, topic, payload, qos, retain):
retain: Is it a retain message
"""
# Clear retain flag if not supported
retain = retain and self.retain_supported

mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid
self._unpublished_mid_set.add(mid)

Expand All @@ -296,6 +302,12 @@ def publish(self, topic, payload, qos=1, retain=False) -> None:
self._publish_monitor.on_publish_request()

def subscribe(self, topic, cb, qos=2) -> None:
""" Method to subscribe to mqtt topic
Args:
topic: Topic to subscribe to
cb: Callback to call on message reception
qos: Qos to use.
"""
logging.debug("Subscribing to: {}".format(topic))
self._client.subscribe(topic, qos)
self._client.message_callback_add(topic, cb)
Expand Down
8 changes: 8 additions & 0 deletions python_transport/wirepas_gateway/protocol/topic_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def make_otap_set_target_scratchpad_request_topic(gw_id="+", sink_id="+"):
def make_get_gateway_info_request_topic(gw_id):
return TopicGenerator._make_request_topic("get_gw_info", [str(gw_id)])

@staticmethod
def make_get_gw_status_request_topic():
return TopicGenerator._make_request_topic("get_gw_status", [])

##################
# Response Part
##################
Expand Down Expand Up @@ -125,6 +129,10 @@ def make_otap_set_target_scratchpad_response_topic(gw_id="+", sink_id="+"):
def make_get_gateway_info_response_topic(gw_id):
return TopicGenerator._make_response_topic("get_gw_info", [str(gw_id)])

@staticmethod
def make_get_gw_status_response_topic(gw_id):
return TopicGenerator._make_response_topic("get_gw_status", [str(gw_id)])

##################
# Event Part
##################
Expand Down
120 changes: 113 additions & 7 deletions python_transport/wirepas_gateway/transport_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,67 @@ def initialize_sink(self, name):
sink.cost = self.minimum_sink_cost


class SendGatewayStatusThread(Thread):
def __init__(
self,
period_s,
mqtt_wrapper,
gw_id,
):
"""
Thread sending periodically Gateway Status to the MQTT broker.
Args:
period_s: the period to send the status
mqtt_wrapper: the mqtt wrapper to publish gateway status on
gw_id: the id of the gateway to be use for MQTT topic
"""
Thread.__init__(self)

# Daemonize thread to exit with full process
self.daemon = True

# How often to send status
self.period_s = period_s
self.mqtt_wrapper = mqtt_wrapper

self.gw_id = gw_id
self.running = False

def publish_status(self):
"""
Publish the gateway status in the MQTT Broker.
"""
event_online = wmm.StatusEvent(self.gw_id, wmm.GatewayState.ONLINE)
status_topic = TopicGenerator.make_status_topic(self.gw_id)
logging.info("Publishing periodic gateway status!")
self.mqtt_wrapper.publish(status_topic, event_online.payload, qos=1)

def is_connected(self):
"""
Check if the gateway is connected with the MQTT broker.
"""
return self.mqtt_wrapper.connected

def run(self):
"""
Main loop that send periodically the status of the gateway to the MQTT Broker.
"""
self.running = True

while self.running:
if self.is_connected():
self.publish_status()

# Wait for period_s seconds
sleep(self.period_s)

def stop(self):
"""
Stop the periodical sending gateway status thread.
"""
self.running = False


class TransportService(BusClient):
"""
Implementation of gateway to backend protocol
Expand Down Expand Up @@ -183,9 +244,12 @@ def __init__(self, settings, **kwargs):
self.gw_model = settings.gateway_model
self.gw_version = settings.gateway_version

# Does broker support retain flag
self.retain_supported = not settings.mqtt_retain_flag_not_supported

self.whitened_ep_filter = settings.whitened_endpoints_filter

last_will_topic = TopicGenerator.make_status_topic(self.gw_id)
self.make_status_topic = TopicGenerator.make_status_topic(self.gw_id)
last_will_message = wmm.StatusEvent(
self.gw_id, wmm.GatewayState.OFFLINE
).payload
Expand All @@ -194,7 +258,7 @@ def __init__(self, settings, **kwargs):
settings,
self._on_mqtt_wrapper_termination_cb,
self._on_connect,
last_will_topic,
self.make_status_topic,
last_will_message,
)

Expand Down Expand Up @@ -227,6 +291,18 @@ def __init__(self, settings, **kwargs):
else:
self.data_event_id = None

if not self.retain_supported:
logging.info("Gateway status is sent every %d seconds", settings.mqtt_send_gateway_status_period_s)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (111 > 79 characters)


self.send_status_thread = SendGatewayStatusThread(
settings.mqtt_send_gateway_status_period_s,
self.mqtt_wrapper,
self.gw_id,
)
self.send_status_thread.start()
else:
logging.info("Retain flag is supported by the gateway")

def _on_mqtt_wrapper_termination_cb(self):
"""
Callback used to be informed when the MQTT wrapper has exited
Expand All @@ -239,9 +315,12 @@ def _on_mqtt_wrapper_termination_cb(self):
def _set_status(self):
event_online = wmm.StatusEvent(self.gw_id, wmm.GatewayState.ONLINE)

topic = TopicGenerator.make_status_topic(self.gw_id)

self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True)
self.mqtt_wrapper.publish(
self.make_status_topic,
event_online.payload,
qos=1,
retain=self.retain_supported
)

def _on_connect(self):
# Register for get gateway info
Expand Down Expand Up @@ -283,13 +362,17 @@ def _on_connect(self):
topic, self._on_otap_set_target_scratchpad_request_received
)

topic = TopicGenerator.make_get_gw_status_request_topic()
self.mqtt_wrapper.subscribe(
topic, self._on_get_gw_status_request_received
)

# Register ourself to our status in case someone else (by mistake)
# update our status.
# It will work only if we are allowed to register for event topic
# at broker level
topic = TopicGenerator.make_status_topic(self.gw_id)
self.mqtt_wrapper.subscribe(
topic, self._on_own_status_received
self.make_status_topic, self._on_own_status_received
)

self._set_status()
Expand Down Expand Up @@ -746,6 +829,29 @@ def _on_otap_set_target_scratchpad_request_received(

self.mqtt_wrapper.publish(topic, response.payload, qos=2)

@deferred_thread
def _on_get_gw_status_request_received(
self, client, userdata, message
):
# pylint: disable=unused-argument
res = wmm.GatewayResultCode.GW_RES_OK
logging.info("Get gateway status request received")
try:
request = wmm.GetGatewayStatusRequest.from_payload(
message.payload
)
except wmm.GatewayAPIParsingException as e:
logging.error(str(e))
return

response = wmm.GetGatewayStatusResponse(
request.req_id, self.gw_id, res, wmm.GatewayState.ONLINE
)
topic = TopicGenerator.make_get_gw_status_response_topic(
self.gw_id
)

self.mqtt_wrapper.publish(topic, response.payload, qos=1)

def parse_setting_list(list_setting):
""" This function parse ep list specified from setting file or cmd line
Expand Down
25 changes: 25 additions & 0 deletions python_transport/wirepas_gateway/utils/argument_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,31 @@ def add_mqtt(self):
),
)

self.mqtt.add_argument(
"--mqtt_retain_flag_not_supported",
default=os.environ.get("WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED", False),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (88 > 79 characters)

type=self.str2bool,
nargs="?",
const=True,
help=(
"Set to true if broker support retain flag"
"Gateway is sending its status periodically only"
"if retain option is not supported"
"otherwise status is retained in the MQTT"
),
)

self.mqtt.add_argument(
"--mqtt_send_gateway_status_period_s",
default=os.environ.get("WM_SERVICES_MQTT_SEND_GATEWAY_STATUS_PERIOD_S", 20),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (88 > 79 characters)

action="store",
type=self.str2int,
help=(
"Period in seconds of a gateway to send its status"
"to the backend if retain option is not supported"
),
)

def add_buffering_settings(self):
""" Parameters used to avoid black hole case """
self.buffering.add_argument(
Expand Down