diff --git a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py index 6f2dce2c..8ec96a4a 100644 --- a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py +++ b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py @@ -44,6 +44,10 @@ def __init__( # Variable to keep track of latest published packet self._timestamp_last_publish = datetime.now() + # load special settings for borker compatibility + self.max_qos = settings.mqtt_max_qos_supported + self.retain_supported = not settings.mqtt_retain_flag_not_supported + self._client = mqtt.Client( client_id=settings.gateway_id, clean_session=not settings.mqtt_persist_session, @@ -65,6 +69,7 @@ def __init__( self._client.username_pw_set(settings.mqtt_username, settings.mqtt_password) self._client.on_connect = self._on_connect + self._client.on_disconnect = self._on_disconnect self._client.on_publish = self._on_publish if last_will_topic is not None and last_will_data is not None: @@ -102,6 +107,10 @@ def _on_connect(self, client, userdata, flags, rc): if self.on_connect_cb is not None: self.on_connect_cb() + def _on_disconnect(self, client, userdata, rc): + if rc != 0: + self.logger.error("MQTT disconnect: %s (%s)", connack_string(rc), rc) + def _on_publish(self, client, userdata, mid): self._unpublished_mid_set.remove(mid) self._timestamp_last_publish = datetime.now() @@ -196,7 +205,7 @@ def _get_socket(self): def _set_last_will(self, topic, data): # Set Last wil message - self._client.will_set(topic, data, qos=2, retain=True) + self._client.will_set(topic, data, qos=1, retain=self.retain_supported) def run(self): self.running = True @@ -238,6 +247,13 @@ def _publish_from_wrapper_thread(self, topic, payload, qos, retain): retain: Is it a retain message """ + # Limit qos in case broker has a limit + if qos > self.max_qos: + qos = self.max_qos + + # Clear retain flag if not supported + retain = retain and self.retain_supported + mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid if self.publish_queue_size == 0: # Reset last published packet @@ -250,14 +266,32 @@ def publish(self, topic, payload, qos=1, retain=False) -> None: Args: topic: Topic to publish on payload: Payload - qos: Qos to use - retain: Is it a retain message + qos: Qos to use. Can be less than requested if broker does + not support it + retain: Is it a retain message. Can be discarded if broker + does not support it """ + # No need to check qos or retain at this stage as + # done later in real publish to broker + # Send it to the queue to be published from Mqtt thread self._publish_queue.put((topic, payload, qos, retain)) def subscribe(self, topic, cb, qos=2) -> None: + """ Method to subscribe to mqtt topic + + Args: + topic: Topic to publish on + cb: Callback to call on message reception + qos: Qos to use. Can be less than requested if broker does + not support it + + """ + # Limit qos in case broker has a limit + if qos > self.max_qos: + qos = self.max_qos + self.logger.debug("Subscribing to: {}".format(topic)) self._client.subscribe(topic, qos) self._client.message_callback_add(topic, cb) diff --git a/python_transport/wirepas_gateway/protocol/topic_helper.py b/python_transport/wirepas_gateway/protocol/topic_helper.py index 84a5f51d..76e4cf47 100644 --- a/python_transport/wirepas_gateway/protocol/topic_helper.py +++ b/python_transport/wirepas_gateway/protocol/topic_helper.py @@ -68,6 +68,10 @@ def make_otap_process_scratchpad_request_topic(gw_id="+", sink_id="+"): def make_get_gateway_info_request_topic(gw_id): return TopicGenerator._make_request_topic("get_gw_info", [str(gw_id)]) + @staticmethod + def make_get_gateway_status_request_topic(): + return TopicGenerator._make_request_topic("get_gw_status", []) + ################## # Response Part ################## diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 9448f2e4..a6bb5497 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -197,6 +197,9 @@ def __init__(self, settings, logger=None, **kwargs): self.gw_model = settings.gateway_model self.gw_version = settings.gateway_version + # Does broker support retain flag + self.retain_supported = not settings.mqtt_retain_flag_not_supported + self.whitened_ep_filter = settings.whitened_endpoints_filter last_will_topic = TopicGenerator.make_status_topic(self.gw_id) @@ -253,8 +256,9 @@ def _set_status(self): ) topic = TopicGenerator.make_status_topic(self.gw_id) - - self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True) + self.mqtt_wrapper.publish( + topic, event_online.payload, qos=1, retain=self.retain_supported + ) def _on_connect(self): # Register for get gateway info @@ -291,6 +295,14 @@ def _on_connect(self): topic, self._on_otap_process_scratchpad_request_received ) + if not self.retain_supported: + # In case retain flag is not supported, it will allow gateway to + # resend its status every time a backend ask for it + # If retain is supported, no need to subscribe as backend will + # already receive the last retained status + topic = TopicGenerator.make_get_gateway_status_request_topic() + self.mqtt_wrapper.subscribe(topic, self._on_get_status_request_received) + self._set_status() self.logger.info("MQTT connected!") @@ -646,6 +658,12 @@ def _on_otap_process_scratchpad_request_received(self, client, userdata, message self.mqtt_wrapper.publish(topic, response.payload, qos=2) + @deferred_thread + def _on_get_status_request_received(self, client, userdata, message): + # This particular message has no payload + self.logger.debug("Get status request received") + self._set_status() + def parse_setting_list(list_setting): """ This function parse ep list specified from setting file or cmd line diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index f1651f21..a9cc57b2 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -344,6 +344,28 @@ def add_mqtt(self): ), ) + self.mqtt.add_argument( + "--mqtt_max_qos_supported", + default=os.environ.get("WM_SERVICES_MQTT_MAX_QOS_SUPPORTED", 2), + action="store", + type=self.str2int, + help=( + "Max qos supported by broker in case it has limitation" + "compare to MQTT specification that is 2." + ), + ) + + self.mqtt.add_argument( + "--mqtt_retain_flag_not_supported", + default=bool( + os.environ.get("WM_SERVICES_MQTT_RETAIN_FLAG_NOT_SUPPORTED", False) + ), + type=self.str2bool, + nargs="?", + const=True, + help=("Set to false if broker do not support retain flag"), + ) + def add_buffering_settings(self): """ Parameters used to avoid black hole case """ self.buffering.add_argument(