瀏覽代碼

relay mqtt to clients

Blake Blackshear 4 年之前
父節點
當前提交
718b4f3fd7
共有 2 個文件被更改,包括 41 次插入4 次删除
  1. 1 1
      frigate/app.py
  2. 40 3
      frigate/http.py

+ 1 - 1
frigate/app.py

@@ -137,7 +137,7 @@ class FrigateApp():
         self.stats_tracking = stats_init(self.camera_metrics, self.detectors)
         self.stats_tracking = stats_init(self.camera_metrics, self.detectors)
 
 
     def init_web_server(self):
     def init_web_server(self):
-        self.flask_app = create_app(self.config, self.db, self.stats_tracking, self.detected_frames_processor)
+        self.flask_app = create_app(self.config, self.db, self.stats_tracking, self.detected_frames_processor, self.mqtt_client)
 
 
     def init_mqtt(self):
     def init_mqtt(self):
         self.mqtt_client = create_mqtt_client(self.config, self.camera_metrics)
         self.mqtt_client = create_mqtt_client(self.config, self.camera_metrics)

+ 40 - 3
frigate/http.py

@@ -1,11 +1,13 @@
 import base64
 import base64
 import datetime
 import datetime
+import json
 import logging
 import logging
 import os
 import os
 import time
 import time
 from functools import reduce
 from functools import reduce
 
 
 import cv2
 import cv2
+import gevent
 import numpy as np
 import numpy as np
 from flask import (Blueprint, Flask, Response, current_app, jsonify,
 from flask import (Blueprint, Flask, Response, current_app, jsonify,
                    make_response, request)
                    make_response, request)
@@ -24,7 +26,37 @@ logger = logging.getLogger(__name__)
 bp = Blueprint('frigate', __name__)
 bp = Blueprint('frigate', __name__)
 ws = Blueprint('ws', __name__)
 ws = Blueprint('ws', __name__)
 
 
-def create_app(frigate_config, database: SqliteDatabase, stats_tracking, detected_frames_processor):
+class MqttBackend():
+    """Interface for registering and updating WebSocket clients."""
+
+    def __init__(self, mqtt_client, topic_prefix):
+        self.clients = list()
+        self.mqtt_client = mqtt_client
+        self.topic_prefix = topic_prefix
+
+    def register(self, client):
+        """Register a WebSocket connection for Mqtt updates."""
+        self.clients.append(client)
+
+    def run(self):
+        def send(client, userdata, message):
+            """Sends mqtt messages to clients."""
+            logger.info(f"Sending mqtt to ws clients {len(self.clients)}")
+            ws_message = json.dumps({
+                'topic': message.topic,
+                'payload': message.payload.decode()
+            })
+            for client in self.clients:
+                client.send(ws_message)
+        
+        logger.info(f"Subscribing to {self.topic_prefix}/#")
+        self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send)
+
+    def start(self):
+        """Maintains mqtt subscription in the background."""
+        gevent.spawn(self.run)
+
+def create_app(frigate_config, database: SqliteDatabase, stats_tracking, detected_frames_processor, mqtt_client):
     app = Flask(__name__)
     app = Flask(__name__)
     sockets = Sockets(app)
     sockets = Sockets(app)
 
 
@@ -44,6 +76,9 @@ def create_app(frigate_config, database: SqliteDatabase, stats_tracking, detecte
     app.register_blueprint(bp)
     app.register_blueprint(bp)
     sockets.register_blueprint(ws)
     sockets.register_blueprint(ws)
 
 
+    app.mqtt_backend = MqttBackend(mqtt_client, frigate_config.mqtt.topic_prefix)
+    app.mqtt_backend.start()
+
     return app
     return app
 
 
 @bp.route('/')
 @bp.route('/')
@@ -311,6 +346,8 @@ def imagestream(detected_frames_processor, camera_name, fps, height, draw_option
 
 
 @ws.route('/ws')
 @ws.route('/ws')
 def echo_socket(socket):
 def echo_socket(socket):
+    current_app.mqtt_backend.register(socket)
+
     while not socket.closed:
     while not socket.closed:
-        message = socket.receive()
-        socket.send(message)
+        # Context switch while `ChatBackend.start` is running in the background.
+        gevent.sleep(0.1)