Blake Blackshear 3 роки тому
батько
коміт
cbdf2c2c71

+ 0 - 3
docker/Dockerfile.base

@@ -38,9 +38,6 @@ RUN pip3 install \
     peewee_migrate \
     zeroconf \
     voluptuous\
-    Flask-Sockets \
-    gevent \
-    gevent-websocket \
     ws4py
 
 COPY --from=nginx /usr/local/nginx/ /usr/local/nginx/

+ 1 - 2
docker/Dockerfile.wheels

@@ -34,8 +34,7 @@ RUN pip3 wheel --wheel-dir=/wheels \
     matplotlib \
     click \
     setproctitle \
-    peewee \
-    gevent 
+    peewee
 
 FROM scratch
 

+ 6 - 1
docker/rootfs/usr/local/nginx/conf/nginx.conf

@@ -33,6 +33,11 @@ http {
       keepalive 1024;
     }
 
+    upstream mqtt_ws {
+      server localhost:5002;
+      keepalive 1024;
+    }
+
     upstream jsmpeg {
       server localhost:8082;
       keepalive 1024;
@@ -139,7 +144,7 @@ http {
         }
 
         location /ws {
-            proxy_pass http://frigate_api/ws;
+            proxy_pass http://mqtt_ws/;
             proxy_http_version 1.1;
             proxy_set_header Upgrade $http_upgrade;
             proxy_set_header Connection "Upgrade";

+ 18 - 19
frigate/app.py

@@ -2,26 +2,25 @@ import json
 import logging
 import multiprocessing as mp
 import os
+import signal
+import sys
+import threading
 from logging.handlers import QueueHandler
 from typing import Dict, List
-import sys
-import signal
 
 import yaml
-from gevent import pywsgi
-from geventwebsocket.handler import WebSocketHandler
 from peewee_migrate import Router
 from playhouse.sqlite_ext import SqliteExtDatabase
 from playhouse.sqliteq import SqliteQueueDatabase
 
 from frigate.config import FrigateConfig
-from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
+from frigate.const import CACHE_DIR, CLIPS_DIR, RECORD_DIR
 from frigate.edgetpu import EdgeTPUProcess
-from frigate.events import EventProcessor, EventCleanup
+from frigate.events import EventCleanup, EventProcessor
 from frigate.http import create_app
 from frigate.log import log_process, root_configurer
 from frigate.models import Event, Recordings
-from frigate.mqtt import create_mqtt_client
+from frigate.mqtt import create_mqtt_client, MqttSocketRelay
 from frigate.object_processing import TrackedObjectProcessor
 from frigate.output import output_frames
 from frigate.record import RecordingMaintainer
@@ -121,8 +120,8 @@ class FrigateApp:
         for log, level in self.config.logger.logs.items():
             logging.getLogger(log).setLevel(level)
 
-        if not "geventwebsocket.handler" in self.config.logger.logs:
-            logging.getLogger("geventwebsocket.handler").setLevel("ERROR")
+        if not "werkzeug" in self.config.logger.logs:
+            logging.getLogger("werkzeug").setLevel("ERROR")
 
     def init_queues(self):
         # Queues for clip processing
@@ -166,12 +165,18 @@ class FrigateApp:
             self.db,
             self.stats_tracking,
             self.detected_frames_processor,
-            self.mqtt_client,
+            # self.mqtt_client,
         )
 
     def init_mqtt(self):
         self.mqtt_client = create_mqtt_client(self.config, self.camera_metrics)
 
+    def start_mqtt_relay(self):
+        self.mqtt_relay = MqttSocketRelay(
+            self.mqtt_client, self.config.mqtt.topic_prefix
+        )
+        self.mqtt_relay.start()
+
     def start_detectors(self):
         model_shape = (self.config.model.height, self.config.model.width)
         for name in self.config.cameras.keys():
@@ -267,10 +272,6 @@ class FrigateApp:
             capture_process.start()
             logger.info(f"Capture process started for {name}: {capture_process.pid}")
 
-    def start_birdseye_outputter(self):
-        self.birdseye_outputter = BirdsEyeFrameOutputter(self.stop_event)
-        self.birdseye_outputter.start()
-
     def start_event_processor(self):
         self.event_processor = EventProcessor(
             self.config,
@@ -330,6 +331,7 @@ class FrigateApp:
         self.start_camera_capture_processes()
         self.init_stats()
         self.init_web_server()
+        self.start_mqtt_relay()
         self.start_event_processor()
         self.start_event_cleanup()
         self.start_recording_maintainer()
@@ -343,12 +345,8 @@ class FrigateApp:
 
         signal.signal(signal.SIGTERM, receiveSignal)
 
-        server = pywsgi.WSGIServer(
-            ("127.0.0.1", 5001), self.flask_app, handler_class=WebSocketHandler
-        )
-
         try:
-            server.serve_forever()
+            self.flask_app.run(host="127.0.0.1", port=5001, debug=False)
         except KeyboardInterrupt:
             pass
 
@@ -358,6 +356,7 @@ class FrigateApp:
         logger.info(f"Stopping...")
         self.stop_event.set()
 
+        self.mqtt_relay.stop()
         self.detected_frames_processor.join()
         self.event_processor.join()
         self.event_cleanup.join()

+ 3 - 90
frigate/http.py

@@ -11,7 +11,7 @@ from functools import reduce
 from pathlib import Path
 
 import cv2
-import gevent
+
 import numpy as np
 from flask import (
     Blueprint,
@@ -22,7 +22,7 @@ from flask import (
     make_response,
     request,
 )
-from flask_sockets import Sockets
+
 from peewee import SqliteDatabase, operator, fn, DoesNotExist, Value
 from playhouse.shortcuts import model_to_dict
 
@@ -35,74 +35,6 @@ from frigate.version import VERSION
 logger = logging.getLogger(__name__)
 
 bp = Blueprint("frigate", __name__)
-ws = Blueprint("ws", __name__)
-
-
-class MqttBackend:
-    """Interface for registering and updating WebSocket clients."""
-
-    def __init__(self, mqtt_client, topic_prefix):
-        self.clients = list()
-        self.mqtt_client = mqtt_client
-        self.topic_prefix = topic_prefix
-
-    def register(self, client):
-        """Register a WebSocket connection for Mqtt updates."""
-        self.clients.append(client)
-
-    def publish(self, message):
-        try:
-            json_message = json.loads(message)
-            json_message = {
-                "topic": f"{self.topic_prefix}/{json_message['topic']}",
-                "payload": json_message["payload"],
-                "retain": json_message.get("retain", False),
-            }
-        except:
-            logger.warning("Unable to parse websocket message as valid json.")
-            return
-
-        logger.debug(
-            f"Publishing mqtt message from websockets at {json_message['topic']}."
-        )
-        self.mqtt_client.publish(
-            json_message["topic"],
-            json_message["payload"],
-            retain=json_message["retain"],
-        )
-
-    def run(self):
-        def send(client, userdata, message):
-            """Sends mqtt messages to clients."""
-            try:
-                logger.debug(f"Received mqtt message on {message.topic}.")
-                ws_message = json.dumps(
-                    {
-                        "topic": message.topic.replace(f"{self.topic_prefix}/", ""),
-                        "payload": message.payload.decode(),
-                    }
-                )
-            except:
-                # if the payload can't be decoded don't relay to clients
-                logger.debug(
-                    f"MQTT payload for {message.topic} wasn't text. Skipping..."
-                )
-                return
-
-            for client in self.clients:
-                try:
-                    client.send(ws_message)
-                except:
-                    logger.debug(
-                        "Removing websocket client due to a closed connection."
-                    )
-                    self.clients.remove(client)
-
-        self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send)
-
-    def start(self):
-        """Maintains mqtt subscription in the background."""
-        gevent.spawn(self.run)
 
 
 def create_app(
@@ -110,10 +42,8 @@ def create_app(
     database: SqliteDatabase,
     stats_tracking,
     detected_frames_processor,
-    mqtt_client,
 ):
     app = Flask(__name__)
-    sockets = Sockets(app)
 
     @app.before_request
     def _db_connect():
@@ -129,10 +59,6 @@ def create_app(
     app.detected_frames_processor = detected_frames_processor
 
     app.register_blueprint(bp)
-    sockets.register_blueprint(ws)
-
-    app.mqtt_backend = MqttBackend(mqtt_client, frigate_config.mqtt.topic_prefix)
-    app.mqtt_backend.start()
 
     return app
 
@@ -613,7 +539,7 @@ def vod(year_month, day, hour, camera):
 def imagestream(detected_frames_processor, camera_name, fps, height, draw_options):
     while True:
         # max out at specified FPS
-        gevent.sleep(1 / fps)
+        time.sleep(1 / fps)
         frame = detected_frames_processor.get_current_frame(camera_name, draw_options)
         if frame is None:
             frame = np.zeros((height, int(height * 16 / 9), 3), np.uint8)
@@ -626,16 +552,3 @@ def imagestream(detected_frames_processor, camera_name, fps, height, draw_option
             b"--frame\r\n"
             b"Content-Type: image/jpeg\r\n\r\n" + jpg.tobytes() + b"\r\n\r\n"
         )
-
-
-@ws.route("/ws")
-def echo_socket(socket):
-    current_app.mqtt_backend.register(socket)
-
-    while not socket.closed:
-        # Sleep to prevent *constant* context-switches.
-        gevent.sleep(0.1)
-
-        message = socket.receive()
-        if message:
-            current_app.mqtt_backend.publish(message)

+ 94 - 2
frigate/mqtt.py

@@ -1,7 +1,16 @@
+import json
 import logging
 import threading
+from wsgiref.simple_server import make_server
 
 import paho.mqtt.client as mqtt
+from ws4py.server.wsgirefserver import (
+    WebSocketWSGIHandler,
+    WebSocketWSGIRequestHandler,
+    WSGIServer,
+)
+from ws4py.server.wsgiutils import WebSocketWSGIApplication
+from ws4py.websocket import WebSocket
 
 from frigate.config import FrigateConfig
 
@@ -117,8 +126,15 @@ def create_mqtt_client(config: FrigateConfig, camera_metrics):
         )
 
     if not mqtt_config.tls_ca_certs is None:
-        if not mqtt_config.tls_client_cert is None and not mqtt_config.tls_client_key is None:
-            client.tls_set(mqtt_config.tls_ca_certs, mqtt_config.tls_client_cert, mqtt_config.tls_client_key)
+        if (
+            not mqtt_config.tls_client_cert is None
+            and not mqtt_config.tls_client_key is None
+        ):
+            client.tls_set(
+                mqtt_config.tls_ca_certs,
+                mqtt_config.tls_client_cert,
+                mqtt_config.tls_client_key,
+            )
         else:
             client.tls_set(mqtt_config.tls_ca_certs)
     if not mqtt_config.tls_insecure is None:
@@ -151,3 +167,79 @@ def create_mqtt_client(config: FrigateConfig, camera_metrics):
         )
 
     return client
+
+
+class MqttSocketRelay:
+    def __init__(self, mqtt_client, topic_prefix):
+        self.mqtt_client = mqtt_client
+        self.topic_prefix = topic_prefix
+
+    def start(self):
+        class MqttWebSocket(WebSocket):
+            topic_prefix = self.topic_prefix
+            mqtt_client = self.mqtt_client
+
+            def received_message(self, message):
+                try:
+                    json_message = json.loads(message.data.decode("utf-8"))
+                    json_message = {
+                        "topic": f"{self.topic_prefix}/{json_message['topic']}",
+                        "payload": json_message["payload"],
+                        "retain": json_message.get("retain", False),
+                    }
+                except Exception as e:
+                    logger.warning("Unable to parse websocket message as valid json.")
+                    return
+
+                logger.debug(
+                    f"Publishing mqtt message from websockets at {json_message['topic']}."
+                )
+                self.mqtt_client.publish(
+                    json_message["topic"],
+                    json_message["payload"],
+                    retain=json_message["retain"],
+                )
+
+        # start a websocket server on 5002
+        WebSocketWSGIHandler.http_version = "1.1"
+        self.websocket_server = make_server(
+            "127.0.0.1",
+            5002,
+            server_class=WSGIServer,
+            handler_class=WebSocketWSGIRequestHandler,
+            app=WebSocketWSGIApplication(handler_cls=MqttWebSocket),
+        )
+        self.websocket_server.initialize_websockets_manager()
+        self.websocket_thread = threading.Thread(
+            target=self.websocket_server.serve_forever
+        )
+
+        def send(client, userdata, message):
+            """Sends mqtt messages to clients."""
+            try:
+                logger.debug(f"Received mqtt message on {message.topic}.")
+                ws_message = json.dumps(
+                    {
+                        "topic": message.topic.replace(f"{self.topic_prefix}/", ""),
+                        "payload": message.payload.decode(),
+                    }
+                )
+            except Exception as e:
+                # if the payload can't be decoded don't relay to clients
+                logger.debug(
+                    f"MQTT payload for {message.topic} wasn't text. Skipping..."
+                )
+                return
+
+            self.websocket_server.manager.broadcast(ws_message)
+
+        self.mqtt_client.message_callback_add(f"{self.topic_prefix}/#", send)
+
+        self.websocket_thread.start()
+
+    def stop(self):
+        self.websocket_server.manager.close_all()
+        self.websocket_server.manager.stop()
+        self.websocket_server.manager.join()
+        self.websocket_server.shutdown()
+        self.websocket_thread.join()