Przeglądaj źródła

ensure frigate can exit gracefully

Blake Blackshear 4 lat temu
rodzic
commit
adcfe699c2
6 zmienionych plików z 68 dodań i 17 usunięć
  1. 31 8
      detect_objects.py
  2. 1 1
      frigate/edgetpu.py
  3. 6 1
      frigate/events.py
  4. 13 4
      frigate/object_processing.py
  5. 10 1
      frigate/util.py
  6. 7 2
      frigate/video.py

+ 31 - 8
detect_objects.py

@@ -1,4 +1,5 @@
 import os
+import signal
 import sys
 import traceback
 import signal
@@ -71,13 +72,14 @@ def start_plasma_store():
     return plasma_process
 
 class CameraWatchdog(threading.Thread):
-    def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, plasma_process):
+    def __init__(self, camera_processes, config, tflite_process, tracked_objects_queue, plasma_process, stop_event):
         threading.Thread.__init__(self)
         self.camera_processes = camera_processes
         self.config = config
         self.tflite_process = tflite_process
         self.tracked_objects_queue = tracked_objects_queue
         self.plasma_process = plasma_process
+        self.stop_event = stop_event
 
     def run(self):
         time.sleep(10)
@@ -85,6 +87,10 @@ class CameraWatchdog(threading.Thread):
             # wait a bit before checking
             time.sleep(10)
 
+            if self.stop_event.is_set():
+                print(f"Exiting watchdog...")
+                break
+
             now = datetime.datetime.now().timestamp()
             
             # check the plasma process
@@ -125,7 +131,7 @@ class CameraWatchdog(threading.Thread):
                     frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
                     ffmpeg_process = start_or_restart_ffmpeg(camera_process['ffmpeg_cmd'], frame_size)
                     camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, camera_process['frame_queue'], 
-                        camera_process['take_frame'], camera_process['camera_fps'], camera_process['detection_frame'])
+                        camera_process['take_frame'], camera_process['camera_fps'], camera_process['detection_frame'], self.stop_event)
                     camera_capture.start()
                     camera_process['ffmpeg_process'] = ffmpeg_process
                     camera_process['capture_thread'] = camera_capture
@@ -142,6 +148,7 @@ class CameraWatchdog(threading.Thread):
                         ffmpeg_process.communicate()
 
 def main():
+    stop_event = threading.Event()
     # connect to mqtt and setup last will
     def on_connect(client, userdata, flags, rc):
         print("On connect called")
@@ -176,7 +183,7 @@ def main():
         }
 
     # Queue for cameras to push tracked objects to
-    tracked_objects_queue = mp.SimpleQueue()
+    tracked_objects_queue = mp.Queue()
 
     # Queue for clip processing
     event_queue = mp.Queue()
@@ -232,10 +239,10 @@ def main():
         detection_frame = mp.Value('d', 0.0)
 
         ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
-        frame_queue = mp.SimpleQueue()
+        frame_queue = mp.Queue()
         camera_fps = EventsPerSecond()
         camera_fps.start()
-        camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, detection_frame)
+        camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, detection_frame, stop_event)
         camera_capture.start()
 
         camera_processes[name] = {
@@ -263,15 +270,31 @@ def main():
         camera_process['process'].start()
         print(f"Camera_process started for {name}: {camera_process['process'].pid}")
 
-    event_processor = EventProcessor(CONFIG['cameras'], camera_processes, '/cache', '/clips', event_queue)
+    event_processor = EventProcessor(CONFIG['cameras'], camera_processes, '/cache', '/clips', event_queue, stop_event)
     event_processor.start()
     
-    object_processor = TrackedObjectProcessor(CONFIG['cameras'], CONFIG.get('zones', {}), client, MQTT_TOPIC_PREFIX, tracked_objects_queue, event_queue)
+    object_processor = TrackedObjectProcessor(CONFIG['cameras'], CONFIG.get('zones', {}), client, MQTT_TOPIC_PREFIX, tracked_objects_queue, event_queue,stop_event)
     object_processor.start()
     
-    camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, plasma_process)
+    camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], tflite_process, tracked_objects_queue, plasma_process, stop_event)
     camera_watchdog.start()
 
+    def receiveSignal(signalNumber, frame):
+        print('Received:', signalNumber)
+        stop_event.set()
+        event_processor.join()
+        object_processor.join()
+        camera_watchdog.join()
+        for name, camera_process in camera_processes.items():
+            camera_process['capture_thread'].join()
+        rc = camera_watchdog.plasma_process.poll()
+        if rc == None:
+            camera_watchdog.plasma_process.terminate()
+        sys.exit()
+    
+    signal.signal(signal.SIGTERM, receiveSignal)
+    signal.signal(signal.SIGINT, receiveSignal)
+
     # create a flask app that encodes frames a mjpeg on demand
     app = Flask(__name__)
     log = logging.getLogger('werkzeug')

+ 1 - 1
frigate/edgetpu.py

@@ -87,7 +87,7 @@ def run_detector(detection_queue, avg_speed, start):
         
 class EdgeTPUProcess():
     def __init__(self):
-        self.detection_queue = mp.SimpleQueue()
+        self.detection_queue = mp.Queue()
         self.avg_inference_speed = mp.Value('d', 0.01)
         self.detection_start = mp.Value('d', 0.0)
         self.detect_process = None

+ 6 - 1
frigate/events.py

@@ -9,7 +9,7 @@ import subprocess as sp
 import queue
 
 class EventProcessor(threading.Thread):
-    def __init__(self, config, camera_processes, cache_dir, clip_dir, event_queue):
+    def __init__(self, config, camera_processes, cache_dir, clip_dir, event_queue, stop_event):
         threading.Thread.__init__(self)
         self.config = config
         self.camera_processes = camera_processes
@@ -18,6 +18,7 @@ class EventProcessor(threading.Thread):
         self.cached_clips = {}
         self.event_queue = event_queue
         self.events_in_process = {}
+        self.stop_event = stop_event
     
     def refresh_cache(self):
         cached_files = os.listdir(self.cache_dir)
@@ -133,6 +134,10 @@ class EventProcessor(threading.Thread):
 
     def run(self):
         while True:
+            if self.stop_event.is_set():
+                print(f"Exiting event processor...")
+                break
+            
             try:
                 event_type, camera, event_data = self.event_queue.get(timeout=10)
             except queue.Empty:

+ 13 - 4
frigate/object_processing.py

@@ -5,6 +5,7 @@ import time
 import copy
 import cv2
 import threading
+import queue
 import numpy as np
 from collections import Counter, defaultdict
 import itertools
@@ -51,7 +52,7 @@ def zone_filtered(obj, object_config):
     return False
 
 class TrackedObjectProcessor(threading.Thread):
-    def __init__(self, camera_config, zone_config, client, topic_prefix, tracked_objects_queue, event_queue):
+    def __init__(self, camera_config, zone_config, client, topic_prefix, tracked_objects_queue, event_queue, stop_event):
         threading.Thread.__init__(self)
         self.camera_config = camera_config
         self.zone_config = zone_config
@@ -59,6 +60,7 @@ class TrackedObjectProcessor(threading.Thread):
         self.topic_prefix = topic_prefix
         self.tracked_objects_queue = tracked_objects_queue
         self.event_queue = event_queue
+        self.stop_event = stop_event
         self.camera_data = defaultdict(lambda: {
             'best_objects': {},
             'object_status': defaultdict(lambda: defaultdict(lambda: 'OFF')),
@@ -89,7 +91,7 @@ class TrackedObjectProcessor(threading.Thread):
         for i, zone in enumerate(self.zone_data.values()):
             zone['color'] = tuple(int(round(255 * c)) for c in colors(i)[:3])
 
-        self.plasma_client = PlasmaManager()
+        self.plasma_client = PlasmaManager(self.stop_event)
         
     def get_best(self, camera, label):
         if label in self.camera_data[camera]['best_objects']:
@@ -102,7 +104,14 @@ class TrackedObjectProcessor(threading.Thread):
 
     def run(self):
         while True:
-            camera, frame_time, current_tracked_objects = self.tracked_objects_queue.get()
+            if self.stop_event.is_set():
+                print(f"Exiting event processor...")
+                break
+
+            try:
+                camera, frame_time, current_tracked_objects = self.tracked_objects_queue.get(True, 10)
+            except queue.Empty:
+                continue
 
             camera_config = self.camera_config[camera]
             best_objects = self.camera_data[camera]['best_objects']
@@ -215,7 +224,7 @@ class TrackedObjectProcessor(threading.Thread):
             ###
             # Report over MQTT
             ###
-            
+
             # get the zones that are relevant for this camera
             relevant_zones = [zone for zone, config in self.zone_config.items() if camera in config]
             # for each zone

+ 10 - 1
frigate/util.py

@@ -140,11 +140,14 @@ def listen():
     signal.signal(signal.SIGUSR1, print_stack)
 
 class PlasmaManager:
-    def __init__(self):
+    def __init__(self, stop_event=None):
+        self.stop_event = stop_event
         self.connect()
     
     def connect(self):
         while True:
+            if self.stop_event != None and self.stop_event.is_set():
+                return
             try:
                 self.plasma_client = plasma.connect("/tmp/plasma")
                 return
@@ -155,6 +158,8 @@ class PlasmaManager:
     def get(self, name, timeout_ms=0):
         object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
         while True:
+            if self.stop_event != None and self.stop_event.is_set():
+                return
             try:
                 return self.plasma_client.get(object_id, timeout_ms=timeout_ms)
             except:
@@ -164,6 +169,8 @@ class PlasmaManager:
     def put(self, name, obj):
         object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
         while True:
+            if self.stop_event != None and self.stop_event.is_set():
+                return
             try:
                 self.plasma_client.put(obj, object_id)
                 return
@@ -175,6 +182,8 @@ class PlasmaManager:
     def delete(self, name):
         object_id = plasma.ObjectID(hashlib.sha1(str.encode(name)).digest())
         while True:
+            if self.stop_event != None and self.stop_event.is_set():
+                return
             try:
                 self.plasma_client.delete([object_id])
                 return

+ 7 - 2
frigate/video.py

@@ -116,7 +116,7 @@ def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None):
     return process
 
 class CameraCapture(threading.Thread):
-    def __init__(self, name, ffmpeg_process, frame_shape, frame_queue, take_frame, fps, detection_frame):
+    def __init__(self, name, ffmpeg_process, frame_shape, frame_queue, take_frame, fps, detection_frame, stop_event):
         threading.Thread.__init__(self)
         self.name = name
         self.frame_shape = frame_shape
@@ -125,16 +125,21 @@ class CameraCapture(threading.Thread):
         self.take_frame = take_frame
         self.fps = fps
         self.skipped_fps = EventsPerSecond()
-        self.plasma_client = PlasmaManager()
+        self.plasma_client = PlasmaManager(stop_event)
         self.ffmpeg_process = ffmpeg_process
         self.current_frame = 0
         self.last_frame = 0
         self.detection_frame = detection_frame
+        self.stop_event = stop_event
 
     def run(self):
         frame_num = 0
         self.skipped_fps.start()
         while True:
+            if self.stop_event.is_set():
+                print(f"{self.name}: stop event set. exiting capture thread...")
+                break
+
             if self.ffmpeg_process.poll() != None:
                 print(f"{self.name}: ffmpeg process is not running. exiting capture thread...")
                 break