Bläddra i källkod

capture ffmpeg in a dedicated process

Blake Blackshear 4 år sedan
förälder
incheckning
eafde6c677
2 ändrade filer med 106 tillägg och 87 borttagningar
  1. 36 76
      detect_objects.py
  2. 70 11
      frigate/video.py

+ 36 - 76
detect_objects.py

@@ -18,7 +18,7 @@ import logging
 from flask import Flask, Response, make_response, jsonify, request
 import paho.mqtt.client as mqtt
 
-from frigate.video import track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg
+from frigate.video import capture_camera, track_camera, get_ffmpeg_input, get_frame_shape, CameraCapture, start_or_restart_ffmpeg
 from frigate.object_processing import TrackedObjectProcessor
 from frigate.events import EventProcessor
 from frigate.util import EventsPerSecond
@@ -78,13 +78,14 @@ GLOBAL_OBJECT_CONFIG = CONFIG.get('objects', {})
 WEB_PORT = CONFIG.get('web_port', 5000)
 DETECTORS = CONFIG.get('detectors', {'coral': {'type': 'edgetpu', 'device': 'usb'}})
 
-class CameraWatchdog(threading.Thread):
-    def __init__(self, camera_processes, config, detectors, detection_queue, tracked_objects_queue, stop_event):
+class FrigateWatchdog(threading.Thread):
+    def __init__(self, camera_processes, config, detectors, detection_queue, out_events, tracked_objects_queue, stop_event):
         threading.Thread.__init__(self)
         self.camera_processes = camera_processes
         self.config = config
         self.detectors = detectors
         self.detection_queue = detection_queue
+        self.out_events = out_events
         self.tracked_objects_queue = tracked_objects_queue
         self.stop_event = stop_event
 
@@ -116,38 +117,16 @@ class CameraWatchdog(threading.Thread):
                 process = camera_process['process']
                 if not process.is_alive():
                     print(f"Track process for {name} is not alive. Starting again...")
+                    camera_process['camera_fps'].value = 0.0
                     camera_process['process_fps'].value = 0.0
                     camera_process['detection_fps'].value = 0.0
                     camera_process['read_start'].value = 0.0
-                    process = mp.Process(target=track_camera, args=(name, self.config[name], camera_process['frame_queue'],
-                        camera_process['frame_shape'], self.detection_queue, self.tracked_objects_queue, 
-                        camera_process['process_fps'], camera_process['detection_fps'],
-                        camera_process['read_start'], self.stop_event))
+                    process = mp.Process(target=track_camera, args=(name, self.config,
+                        self.detection_queue, self.out_events[name], self.tracked_objects_queue, camera_process, self.stop_event))
                     process.daemon = True
                     camera_process['process'] = process
                     process.start()
                     print(f"Track process started for {name}: {process.pid}")
-                
-                if not camera_process['capture_thread'].is_alive():
-                    frame_shape = camera_process['frame_shape']
-                    frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
-                    ffmpeg_process = start_or_restart_ffmpeg(camera_process['ffmpeg_cmd'], frame_size)
-                    camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, camera_process['frame_queue'], 
-                        camera_process['take_frame'], camera_process['camera_fps'], self.stop_event)
-                    camera_capture.start()
-                    camera_process['ffmpeg_process'] = ffmpeg_process
-                    camera_process['capture_thread'] = camera_capture
-                elif now - camera_process['capture_thread'].current_frame.value > 5:
-                    print(f"No frames received from {name} in 5 seconds. Exiting ffmpeg...")
-                    ffmpeg_process = camera_process['ffmpeg_process']
-                    ffmpeg_process.terminate()
-                    try:
-                        print("Waiting for ffmpeg to exit gracefully...")
-                        ffmpeg_process.communicate(timeout=30)
-                    except sp.TimeoutExpired:
-                        print("FFmpeg didnt exit. Force killing...")
-                        ffmpeg_process.kill()
-                        ffmpeg_process.communicate()
 
 def main():
     stop_event = threading.Event()
@@ -210,7 +189,7 @@ def main():
             detectors[name] = EdgeTPUProcess(detection_queue, out_events=out_events, tf_device=detector['device'])
 
     # create the camera processes
-    camera_processes = {}
+    camera_process_info = {}
     for name, config in CONFIG['cameras'].items():
         # Merge the ffmpeg config with the global config
         ffmpeg = config.get('ffmpeg', {})
@@ -248,37 +227,24 @@ def main():
                 ffmpeg_output_args +
                 ['pipe:'])
         
+        config['ffmpeg_cmd'] = ffmpeg_cmd
+        
         if 'width' in config and 'height' in config:
             frame_shape = (config['height'], config['width'], 3)
         else:
             frame_shape = get_frame_shape(ffmpeg_input)
         
         config['frame_shape'] = frame_shape
+        config['take_frame'] = config.get('take_frame', 1)
 
-        frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
-        take_frame = config.get('take_frame', 1)
-
-        detection_frame = mp.Value('d', 0.0)
-
-        ffmpeg_process = start_or_restart_ffmpeg(ffmpeg_cmd, frame_size)
-        frame_queue = mp.Queue(maxsize=2)
-        camera_fps = EventsPerSecond()
-        camera_fps.start()
-        camera_capture = CameraCapture(name, ffmpeg_process, frame_shape, frame_queue, take_frame, camera_fps, stop_event)
-        camera_capture.start()
-
-        camera_processes[name] = {
-            'camera_fps': camera_fps,
-            'take_frame': take_frame,
+        camera_process_info[name] = {
+            'camera_fps': mp.Value('d', 0.0),
+            'skipped_fps': mp.Value('d', 0.0),
             'process_fps': mp.Value('d', 0.0),
             'detection_fps': mp.Value('d', 0.0),
-            'detection_frame': detection_frame,
+            'detection_frame': mp.Value('d', 0.0),
             'read_start': mp.Value('d', 0.0),
-            'ffmpeg_process': ffmpeg_process,
-            'ffmpeg_cmd': ffmpeg_cmd,
-            'frame_queue': frame_queue,
-            'frame_shape': frame_shape,
-            'capture_thread': camera_capture
+            'frame_queue': mp.Queue(maxsize=2)
         }
 
         # merge global object config into camera object config
@@ -292,41 +258,38 @@ def main():
             'filters': object_filters
         }
 
-        camera_process = mp.Process(target=track_camera, args=(name, config, frame_queue, frame_shape,
-            detection_queue, out_events[name], tracked_objects_queue, camera_processes[name]['process_fps'], 
-            camera_processes[name]['detection_fps'], 
-            camera_processes[name]['read_start'], camera_processes[name]['detection_frame'], stop_event))
+        capture_process = mp.Process(target=capture_camera, args=(name, config,
+            camera_process_info[name], stop_event))
+        capture_process.daemon = True
+        camera_process_info[name]['capture_process'] = capture_process
+
+        camera_process = mp.Process(target=track_camera, args=(name, config,
+            detection_queue, out_events[name], tracked_objects_queue, camera_process_info[name], stop_event))
         camera_process.daemon = True
-        camera_processes[name]['process'] = camera_process
+        camera_process_info[name]['process'] = camera_process
 
     # start the camera_processes
-    for name, camera_process in camera_processes.items():
+    for name, camera_process in camera_process_info.items():
+        camera_process['capture_process'].start()
+        print(f"Camera capture process started for {name}: {camera_process['capture_process'].pid}")
         camera_process['process'].start()
-        print(f"Camera_process started for {name}: {camera_process['process'].pid}")
+        print(f"Camera process started for {name}: {camera_process['process'].pid}")
 
-    event_processor = EventProcessor(CONFIG, camera_processes, CACHE_DIR, CLIPS_DIR, event_queue, stop_event)
+    event_processor = EventProcessor(CONFIG, camera_process_info, CACHE_DIR, CLIPS_DIR, event_queue, stop_event)
     event_processor.start()
     
     object_processor = TrackedObjectProcessor(CONFIG['cameras'], client, MQTT_TOPIC_PREFIX, tracked_objects_queue, event_queue, stop_event)
     object_processor.start()
     
-    camera_watchdog = CameraWatchdog(camera_processes, CONFIG['cameras'], detectors, detection_queue, tracked_objects_queue, stop_event)
-    camera_watchdog.start()
+    frigate_watchdog = FrigateWatchdog(camera_process_info, CONFIG['cameras'], detectors, detection_queue, out_events, tracked_objects_queue, stop_event)
+    frigate_watchdog.start()
 
     def receiveSignal(signalNumber, frame):
         print('Received:', signalNumber)
         stop_event.set()
         event_processor.join()
         object_processor.join()
-        camera_watchdog.join()
-        for camera_name, camera_process in camera_processes.items():
-            camera_process['capture_thread'].join()
-            # cleanup the frame queue
-            while not camera_process['frame_queue'].empty():
-                frame_time = camera_process['frame_queue'].get()
-                shm = mp.shared_memory.SharedMemory(name=f"{camera_name}{frame_time}")
-                shm.close()
-                shm.unlink()
+        frigate_watchdog.join()
 
         for detector in detectors.values():
             detector.stop()
@@ -371,19 +334,16 @@ def main():
 
         total_detection_fps = 0
 
-        for name, camera_stats in camera_processes.items():
+        for name, camera_stats in camera_process_info.items():
             total_detection_fps += camera_stats['detection_fps'].value
-            capture_thread = camera_stats['capture_thread']
             stats[name] = {
-                'camera_fps': round(capture_thread.fps.eps(), 2),
+                'camera_fps': round(camera_stats['camera_fps'].value, 2),
                 'process_fps': round(camera_stats['process_fps'].value, 2),
-                'skipped_fps': round(capture_thread.skipped_fps.eps(), 2),
+                'skipped_fps': round(camera_stats['skipped_fps'].value, 2),
                 'detection_fps': round(camera_stats['detection_fps'].value, 2),
-                'read_start': camera_stats['read_start'].value,
                 'pid': camera_stats['process'].pid,
-                'ffmpeg_pid': camera_stats['ffmpeg_process'].pid,
+                'capture_pid': camera_stats['capture_process'].pid,
                 'frame_info': {
-                    'read': capture_thread.current_frame.value,
                     'detect': camera_stats['detection_frame'].value,
                     'process': object_processor.camera_data[name]['current_frame_time']
                 }

+ 70 - 11
frigate/video.py

@@ -116,13 +116,17 @@ def start_or_restart_ffmpeg(ffmpeg_cmd, frame_size, ffmpeg_process=None):
     return process
 
 def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: FrameManager, 
-    frame_queue, take_frame: int, fps:EventsPerSecond, skipped_fps: EventsPerSecond, 
+    frame_queue, take_frame: int, fps:mp.Value, skipped_fps: mp.Value, 
     stop_event: mp.Event, current_frame: mp.Value):
 
     frame_num = 0
     frame_size = frame_shape[0] * frame_shape[1] * 3 // 2
-    skipped_fps.start()
+    frame_rate = EventsPerSecond()
+    skipped_eps = EventsPerSecond()
+    skipped_eps.start()
     while True:
+        fps.value = frame_rate.eps()
+        skipped_fps = skipped_eps.eps()
         if stop_event.is_set():
             print(f"{camera_name}: stop event set. exiting capture thread...")
             break
@@ -142,17 +146,17 @@ def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: Fram
           
           continue
 
-        fps.update()
+        frame_rate.update()
 
         frame_num += 1
         if (frame_num % take_frame) != 0:
-            skipped_fps.update()
+            skipped_eps.update()
             frame_manager.delete(frame_name)
             continue
 
         # if the queue is full, skip this frame
         if frame_queue.full():
-            skipped_fps.update()
+            skipped_eps.update()
             frame_manager.delete(frame_name)
             continue
 
@@ -162,6 +166,51 @@ def capture_frames(ffmpeg_process, camera_name, frame_shape, frame_manager: Fram
         # add to the queue
         frame_queue.put(current_frame.value)
 
+class CameraWatchdog(threading.Thread):
+    def __init__(self, name, config, frame_queue, camera_fps, stop_event):
+        threading.Thread.__init__(self)
+        self.name = name
+        self.config = config
+        self.capture_thread = None
+        self.ffmpeg_process = None
+        self.stop_event = stop_event
+        self.camera_fps = camera_fps
+        self.frame_queue = frame_queue
+        self.frame_shape = self.config['frame_shape']
+        self.frame_size = self.frame_shape[0] * self.frame_shape[1] * 3 // 2
+
+    def run(self):
+        self.start_ffmpeg()
+        time.sleep(10)
+        while True:
+            if self.stop_event.is_set():
+                print(f"Exiting watchdog...")
+                break
+
+            now = datetime.datetime.now().timestamp()
+
+            if not self.capture_thread.is_alive():
+                self.start_ffmpeg()
+            elif now - self.capture_thread.current_frame.value > 5:
+                print(f"No frames received from {self.name} in 5 seconds. Exiting ffmpeg...")
+                self.ffmpeg_process.terminate()
+                try:
+                    print("Waiting for ffmpeg to exit gracefully...")
+                    self.ffmpeg_process.communicate(timeout=30)
+                except sp.TimeoutExpired:
+                    print("FFmpeg didnt exit. Force killing...")
+                    self.ffmpeg_process.kill()
+                    self.ffmpeg_process.communicate()
+            
+            # wait a bit before checking again
+            time.sleep(10)
+    
+    def start_ffmpeg(self):
+      self.ffmpeg_process = start_or_restart_ffmpeg(self.config['ffmpeg_cmd'], self.frame_size)
+      self.capture_thread = CameraCapture(self.name, self.ffmpeg_process, self.frame_shape, self.frame_queue, 
+          self.config['take_frame'], self.camera_fps, self.stop_event)
+      self.capture_thread.start()
+
 class CameraCapture(threading.Thread):
     def __init__(self, name, ffmpeg_process, frame_shape, frame_queue, take_frame, fps, stop_event):
         threading.Thread.__init__(self)
@@ -183,11 +232,18 @@ class CameraCapture(threading.Thread):
         capture_frames(self.ffmpeg_process, self.name, self.frame_shape, self.frame_manager, self.frame_queue, self.take_frame,
             self.fps, self.skipped_fps, self.stop_event, self.current_frame)
 
-def track_camera(name, config, frame_queue, frame_shape, detection_queue, result_connection, detected_objects_queue, fps, detection_fps, read_start, detection_frame, stop_event):
-    print(f"Starting process for {name}: {os.getpid()}")
+def capture_camera(name, config, process_info, stop_event):
+    frame_queue = process_info['frame_queue']
+    camera_watchdog = CameraWatchdog(name, config, frame_queue, process_info['camera_fps'], stop_event)
+    camera_watchdog.start()
+    camera_watchdog.join()
+
+def track_camera(name, config, detection_queue, result_connection, detected_objects_queue, process_info, stop_event):
     listen()
 
-    detection_frame.value = 0.0
+    frame_queue = process_info['frame_queue']
+
+    frame_shape = config['frame_shape']
 
     # Merge the tracked object config with the global config
     camera_objects_config = config.get('objects', {})
@@ -223,7 +279,7 @@ def track_camera(name, config, frame_queue, frame_shape, detection_queue, result
     frame_manager = SharedMemoryFrameManager()
 
     process_frames(name, frame_queue, frame_shape, frame_manager, motion_detector, object_detector,
-        object_tracker, detected_objects_queue, fps, detection_fps, detection_frame, objects_to_track, object_filters, mask, stop_event)
+        object_tracker, detected_objects_queue, process_info, objects_to_track, object_filters, mask, stop_event)
 
     print(f"{name}: exiting subprocess")
 
@@ -259,10 +315,14 @@ def detect(object_detector, frame, region, objects_to_track, object_filters, mas
 def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape, 
     frame_manager: FrameManager, motion_detector: MotionDetector, 
     object_detector: RemoteObjectDetector, object_tracker: ObjectTracker,
-    detected_objects_queue: mp.Queue, fps: mp.Value, detection_fps: mp.Value, current_frame_time: mp.Value,
+    detected_objects_queue: mp.Queue, process_info: Dict,
     objects_to_track: List[str], object_filters: Dict, mask, stop_event: mp.Event,
     exit_on_empty: bool = False):
     
+    fps = process_info['process_fps']
+    detection_fps = process_info['detection_fps']
+    current_frame_time = process_info['detection_frame']
+
     fps_tracker = EventsPerSecond()
     fps_tracker.start()
 
@@ -276,7 +336,6 @@ def process_frames(camera_name: str, frame_queue: mp.Queue, frame_shape,
         except queue.Empty:
             continue
 
-        
         current_frame_time.value = frame_time
 
         frame = frame_manager.get(f"{camera_name}{frame_time}", (frame_shape[0]*3//2, frame_shape[1]))