diff --git a/python_transport/tests/test_arguments.py b/python_transport/tests/test_arguments.py index 9d2ebc52..80cbf43c 100644 --- a/python_transport/tests/test_arguments.py +++ b/python_transport/tests/test_arguments.py @@ -18,6 +18,7 @@ env_vars["WM_SERVICES_MQTT_PERSIST_SESSION"] = True env_vars["WM_SERVICES_MQTT_FORCE_UNSECURE"] = True env_vars["WM_SERVICES_MQTT_ALLOW_UNTRUSTED"] = True +env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED"] = True env_vars["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"] = 1000 env_vars["WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH"] = 128 @@ -60,11 +61,13 @@ file_vars["gateway_version"] = env_vars["WM_GW_VERSION"] file_vars["ignored_endpoints_filter"] = env_vars["WM_GW_IGNORED_ENDPOINTS_FILTER"] file_vars["whitened_endpoints_filter"] = env_vars["WM_GW_WHITENED_ENDPOINTS_FILTER"] +file_vars["mqtt_retain_flag_not_supported"] = env_vars["WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED"] booleans = [ "WM_SERVICES_MQTT_PERSIST_SESSION", "WM_SERVICES_MQTT_FORCE_UNSECURE", "WM_SERVICES_MQTT_ALLOW_UNTRUSTED", + "WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED", ] @@ -139,6 +142,14 @@ def content_tests(settings, vcopy): vcopy["WM_SERVICES_MQTT_ALLOW_UNTRUSTED"] == settings.mqtt_allow_untrusted ) + if "WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED" not in vcopy: + assert settings.mqtt_retain_flag_not_supported is False + else: + assert ( + vcopy["WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED"] + == settings.mqtt_retain_flag_not_supported + ) + assert vcopy["WM_SERVICES_MQTT_RECONNECT_DELAY"] == settings.mqtt_reconnect_delay assert ( vcopy["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"] @@ -199,6 +210,7 @@ def test_defaults(): assert settings.mqtt_persist_session is False assert settings.mqtt_force_unsecure is False assert settings.mqtt_allow_untrusted is False + assert settings.mqtt_retain_flag_not_supported is False assert settings.mqtt_reconnect_delay == 0 assert settings.buffering_max_buffered_packets == 0 assert settings.buffering_max_delay_without_publish == 0 diff --git a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py index cdf69756..36d1adf4 100644 --- a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py +++ b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py @@ -43,6 +43,9 @@ def __init__( # Keep track of latest published packet self._publish_monitor = PublishMonitor() + # Load special settings for broker compatibility + self.retain_supported = not settings.mqtt_retain_flag_not_supported + if settings.mqtt_use_websocket: transport = "websockets" self._use_websockets = True @@ -233,7 +236,7 @@ def _get_socket(self): def _set_last_will(self, topic, data): # Set Last wil message - self._client.will_set(topic, data, qos=2, retain=True) + self._client.will_set(topic, data, qos=2, retain=self.retain_supported) def run(self): self.running = True @@ -278,6 +281,9 @@ def _publish_from_wrapper_thread(self, topic, payload, qos, retain): retain: Is it a retain message """ + # Clear retain flag if not supported + retain = retain and self.retain_supported + mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid self._unpublished_mid_set.add(mid) @@ -296,6 +302,12 @@ def publish(self, topic, payload, qos=1, retain=False) -> None: self._publish_monitor.on_publish_request() def subscribe(self, topic, cb, qos=2) -> None: + """ Method to subscribe to mqtt topic + Args: + topic: Topic to subscribe to + cb: Callback to call on message reception + qos: Qos to use. + """ logging.debug("Subscribing to: {}".format(topic)) self._client.subscribe(topic, qos) self._client.message_callback_add(topic, cb) diff --git a/python_transport/wirepas_gateway/protocol/topic_helper.py b/python_transport/wirepas_gateway/protocol/topic_helper.py index a5c26ae4..85f8a8fb 100644 --- a/python_transport/wirepas_gateway/protocol/topic_helper.py +++ b/python_transport/wirepas_gateway/protocol/topic_helper.py @@ -74,6 +74,10 @@ def make_otap_set_target_scratchpad_request_topic(gw_id="+", sink_id="+"): def make_get_gateway_info_request_topic(gw_id): return TopicGenerator._make_request_topic("get_gw_info", [str(gw_id)]) + @staticmethod + def make_get_gw_status_request_topic(): + return TopicGenerator._make_request_topic("get_gw_status", []) + ################## # Response Part ################## @@ -125,6 +129,10 @@ def make_otap_set_target_scratchpad_response_topic(gw_id="+", sink_id="+"): def make_get_gateway_info_response_topic(gw_id): return TopicGenerator._make_response_topic("get_gw_info", [str(gw_id)]) + @staticmethod + def make_get_gw_status_response_topic(gw_id): + return TopicGenerator._make_response_topic("get_gw_status", [str(gw_id)]) + ################## # Event Part ################## diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 3a0849e5..4d2363c7 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -156,6 +156,67 @@ def initialize_sink(self, name): sink.cost = self.minimum_sink_cost +class SendGatewayStatusThread(Thread): + def __init__( + self, + period_s, + mqtt_wrapper, + gw_id, + ): + """ + Thread sending periodically Gateway Status to the MQTT broker. + Args: + period_s: the period to send the status + mqtt_wrapper: the mqtt wrapper to publish gateway status on + gw_id: the id of the gateway to be use for MQTT topic + """ + Thread.__init__(self) + + # Daemonize thread to exit with full process + self.daemon = True + + # How often to send status + self.period_s = period_s + self.mqtt_wrapper = mqtt_wrapper + + self.gw_id = gw_id + self.running = False + + def publish_status(self): + """ + Publish the gateway status in the MQTT Broker. + """ + event_online = wmm.StatusEvent(self.gw_id, wmm.GatewayState.ONLINE) + status_topic = TopicGenerator.make_status_topic(self.gw_id) + logging.info("Publishing periodic gateway status!") + self.mqtt_wrapper.publish(status_topic, event_online.payload, qos=1) + + def is_connected(self): + """ + Check if the gateway is connected with the MQTT broker. + """ + return self.mqtt_wrapper.connected + + def run(self): + """ + Main loop that send periodically the status of the gateway to the MQTT Broker. + """ + self.running = True + + while self.running: + if self.is_connected(): + self.publish_status() + + # Wait for period_s seconds + sleep(self.period_s) + + def stop(self): + """ + Stop the periodical sending gateway status thread. + """ + self.running = False + + class TransportService(BusClient): """ Implementation of gateway to backend protocol @@ -183,9 +244,12 @@ def __init__(self, settings, **kwargs): self.gw_model = settings.gateway_model self.gw_version = settings.gateway_version + # Does broker support retain flag + self.retain_supported = not settings.mqtt_retain_flag_not_supported + self.whitened_ep_filter = settings.whitened_endpoints_filter - last_will_topic = TopicGenerator.make_status_topic(self.gw_id) + self.make_status_topic = TopicGenerator.make_status_topic(self.gw_id) last_will_message = wmm.StatusEvent( self.gw_id, wmm.GatewayState.OFFLINE ).payload @@ -194,7 +258,7 @@ def __init__(self, settings, **kwargs): settings, self._on_mqtt_wrapper_termination_cb, self._on_connect, - last_will_topic, + self.make_status_topic, last_will_message, ) @@ -227,6 +291,18 @@ def __init__(self, settings, **kwargs): else: self.data_event_id = None + if not self.retain_supported: + logging.info("Gateway status is sent every %d seconds", settings.mqtt_send_gateway_status_period_s) + + self.send_status_thread = SendGatewayStatusThread( + settings.mqtt_send_gateway_status_period_s, + self.mqtt_wrapper, + self.gw_id, + ) + self.send_status_thread.start() + else: + logging.info("Retain flag is supported by the gateway") + def _on_mqtt_wrapper_termination_cb(self): """ Callback used to be informed when the MQTT wrapper has exited @@ -239,9 +315,12 @@ def _on_mqtt_wrapper_termination_cb(self): def _set_status(self): event_online = wmm.StatusEvent(self.gw_id, wmm.GatewayState.ONLINE) - topic = TopicGenerator.make_status_topic(self.gw_id) - - self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True) + self.mqtt_wrapper.publish( + self.make_status_topic, + event_online.payload, + qos=1, + retain=self.retain_supported + ) def _on_connect(self): # Register for get gateway info @@ -283,13 +362,17 @@ def _on_connect(self): topic, self._on_otap_set_target_scratchpad_request_received ) + topic = TopicGenerator.make_get_gw_status_request_topic() + self.mqtt_wrapper.subscribe( + topic, self._on_get_gw_status_request_received + ) + # Register ourself to our status in case someone else (by mistake) # update our status. # It will work only if we are allowed to register for event topic # at broker level - topic = TopicGenerator.make_status_topic(self.gw_id) self.mqtt_wrapper.subscribe( - topic, self._on_own_status_received + self.make_status_topic, self._on_own_status_received ) self._set_status() @@ -746,6 +829,29 @@ def _on_otap_set_target_scratchpad_request_received( self.mqtt_wrapper.publish(topic, response.payload, qos=2) + @deferred_thread + def _on_get_gw_status_request_received( + self, client, userdata, message + ): + # pylint: disable=unused-argument + res = wmm.GatewayResultCode.GW_RES_OK + logging.info("Get gateway status request received") + try: + request = wmm.GetGatewayStatusRequest.from_payload( + message.payload + ) + except wmm.GatewayAPIParsingException as e: + logging.error(str(e)) + return + + response = wmm.GetGatewayStatusResponse( + request.req_id, self.gw_id, res, wmm.GatewayState.ONLINE + ) + topic = TopicGenerator.make_get_gw_status_response_topic( + self.gw_id + ) + + self.mqtt_wrapper.publish(topic, response.payload, qos=1) def parse_setting_list(list_setting): """ This function parse ep list specified from setting file or cmd line diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index 2f45b1ec..417f7181 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -363,6 +363,31 @@ def add_mqtt(self): ), ) + self.mqtt.add_argument( + "--mqtt_retain_flag_not_supported", + default=os.environ.get("WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED", False), + type=self.str2bool, + nargs="?", + const=True, + help=( + "Set to true if broker support retain flag" + "Gateway is sending its status periodically only" + "if retain option is not supported" + "otherwise status is retained in the MQTT" + ), + ) + + self.mqtt.add_argument( + "--mqtt_send_gateway_status_period_s", + default=os.environ.get("WM_SERVICES_MQTT_SEND_GATEWAY_STATUS_PERIOD_S", 20), + action="store", + type=self.str2int, + help=( + "Period in seconds of a gateway to send its status" + "to the backend if retain option is not supported" + ), + ) + def add_buffering_settings(self): """ Parameters used to avoid black hole case """ self.buffering.add_argument(