|
@@ -39,25 +39,39 @@ class MqttBackend():
|
|
|
self.clients.append(client)
|
|
|
|
|
|
def publish(self, message):
|
|
|
- json_message = json.loads(message)
|
|
|
- self.mqtt_client.publish(f"{self.topic_prefix}/{json_message['topic']}", json_message['payload'], retain=json_message['retain'])
|
|
|
+ try:
|
|
|
+ json_message = json.loads(message)
|
|
|
+ json_message = {
|
|
|
+ 'topic': f"{self.topic_prefix}/{json_message['topic']}",
|
|
|
+ 'payload': json_message.get['payload'],
|
|
|
+ 'retain': json_message.get('retain', False)
|
|
|
+ }
|
|
|
+ except:
|
|
|
+ logger.warning("Unable to parse websocket message as valid json.")
|
|
|
+ return
|
|
|
+
|
|
|
+ logger.debug(f"Publishing mqtt message from websockets at {json_message['topic']}.")
|
|
|
+ self.mqtt_client.publish(json_message['topic'], json_message['payload'], retain=json_message['retain'])
|
|
|
|
|
|
def run(self):
|
|
|
def send(client, userdata, message):
|
|
|
"""Sends mqtt messages to clients."""
|
|
|
try:
|
|
|
+ logger.debug(f"Received mqtt message on {message.topic}.")
|
|
|
ws_message = json.dumps({
|
|
|
'topic': message.topic.replace(f"{self.topic_prefix}/",""),
|
|
|
'payload': message.payload.decode()
|
|
|
})
|
|
|
except:
|
|
|
# if the payload can't be decoded don't relay to clients
|
|
|
+ logger.debug(f"MQTT payload for {message.topic} wasn't text. Skipping...")
|
|
|
return
|
|
|
|
|
|
for client in self.clients:
|
|
|
try:
|
|
|
client.send(ws_message)
|
|
|
except:
|
|
|
+ logger.debug("Removing websocket client due to a closed connection.")
|
|
|
self.clients.remove(client)
|
|
|
|
|
|
self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send)
|