Procházet zdrojové kódy

use a thread to capture frames from the subprocess so it can be killed properly

blakeblackshear před 6 roky
rodič
revize
700bd1e3ef
1 změnil soubory, kde provedl 92 přidání a 83 odebrání
  1. 92 83
      frigate/video.py

+ 92 - 83
frigate/video.py

@@ -12,60 +12,6 @@ from . object_detection import FramePrepper
 from . objects import ObjectCleaner, BestPersonFrame
 from . mqtt import MqttObjectPublisher
 
-# fetch the frames as fast a possible and store current frame in a shared memory array
-def fetch_frames(shared_arr, shared_frame_time, frame_lock, frame_ready, frame_shape, rtsp_url, take_frame=1, ffmpeg_hwaccel_args=[]):
-    # convert shared memory array into numpy and shape into image array
-    arr = tonumpyarray(shared_arr).reshape(frame_shape)
-    frame_size = frame_shape[0] * frame_shape[1] * frame_shape[2]
-
-    ffmpeg_global_args = [
-        '-hide_banner', '-loglevel', 'panic'
-    ]
-    ffmpeg_input_args = [
-        '-avoid_negative_ts', 'make_zero', 
-        '-fflags', 'nobuffer',
-        '-flags', 'low_delay',
-        '-strict', 'experimental',
-        '-fflags', '+genpts', 
-        '-rtsp_transport', 'tcp', 
-        '-stimeout', '5000000', 
-        '-use_wallclock_as_timestamps', '1'
-    ]
-
-    ffmpeg_cmd = (['ffmpeg'] +
-        ffmpeg_global_args +
-        ffmpeg_hwaccel_args +
-        ffmpeg_input_args +
-        ['-i', rtsp_url,
-        '-f', 'rawvideo',
-        '-pix_fmt', 'rgb24',
-        'pipe:'])
-
-    print(" ".join(ffmpeg_cmd))
-    
-    pipe = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=frame_size)
-
-    frame_num = 0
-    while True:
-        raw_image = pipe.stdout.read(frame_size)
-        frame_num += 1
-        if (frame_num % take_frame) != 0:
-            continue
-        frame = (
-            np
-            .frombuffer(raw_image, np.uint8)
-            .reshape(frame_shape)
-        )
-
-        with frame_lock:
-            shared_frame_time.value = datetime.datetime.now().timestamp()
-            arr[:] = frame
-        # Notify with the condition that a new frame is ready
-        with frame_ready:
-            frame_ready.notify_all()
-
-    pipe.stdout.flush()
-
 # Stores 2 seconds worth of frames when motion is detected so they can be used for other threads
 class FrameTracker(threading.Thread):
     def __init__(self, shared_frame, frame_time, frame_ready, frame_lock, recent_frames):
@@ -125,13 +71,48 @@ class CameraWatchdog(threading.Thread):
 
         while True:
             # wait a bit before checking
-            time.sleep(60)
+            time.sleep(10)
 
-            if (datetime.datetime.now().timestamp() - self.camera.shared_frame_time.value) > 2:
+            if (datetime.datetime.now().timestamp() - self.camera.frame_time.value) > 2:
                 print("last frame is more than 2 seconds old, restarting camera capture...")
                 self.camera.start_or_restart_capture()
                 time.sleep(5)
 
+# Thread to read the stdout of the ffmpeg process and update the current frame
+class CameraCapture(threading.Thread):
+    def __init__(self, camera):
+        threading.Thread.__init__(self)
+        self.camera = camera
+
+    def run(self):
+        frame_num = 0
+        while True:
+            if self.camera.ffmpeg_process.poll() != None:
+                print("ffmpeg process is not running. exiting capture thread...")
+                break
+
+            raw_image = self.camera.ffmpeg_process.stdout.read(self.camera.frame_size)
+
+            if len(raw_image) == 0:
+                print("ffmpeg didnt return a frame. something is wrong. exiting capture thread...")
+                break
+
+            frame_num += 1
+            if (frame_num % self.camera.take_frame) != 0:
+                continue
+
+            with self.camera.frame_lock:
+                self.camera.frame_time.value = datetime.datetime.now().timestamp()
+                
+                self.camera.current_frame[:] = (
+                    np
+                    .frombuffer(raw_image, np.uint8)
+                    .reshape(self.camera.frame_shape)
+                )
+            # Notify with the condition that a new frame is ready
+            with self.camera.frame_ready:
+                self.camera.frame_ready.notify_all()
+
 class Camera:
     def __init__(self, name, config, prepped_frame_queue, mqtt_client, mqtt_prefix):
         self.name = name
@@ -143,15 +124,14 @@ class Camera:
         self.ffmpeg_hwaccel_args = self.config.get('ffmpeg_hwaccel_args', [])
         self.regions = self.config['regions']
         self.frame_shape = get_frame_shape(self.rtsp_url)
+        self.frame_size = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2]
         self.mqtt_client = mqtt_client
         self.mqtt_topic_prefix = '{}/{}'.format(mqtt_prefix, self.name)
 
-        # compute the flattened array length from the shape of the frame
-        flat_array_length = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2]
-        # create shared array for storing the full frame image data
-        self.shared_frame_array = mp.Array(ctypes.c_uint8, flat_array_length)
+        # create a numpy array for the current frame in initialize to zeros
+        self.current_frame = np.zeros(self.frame_shape, np.uint8)
         # create shared value for storing the frame_time
-        self.shared_frame_time = mp.Value('d', 0.0)
+        self.frame_time = mp.Value('d', 0.0)
         # Lock to control access to the frame
         self.frame_lock = mp.Lock()
         # Condition for notifying that a new frame is ready
@@ -159,10 +139,8 @@ class Camera:
         # Condition for notifying that objects were parsed
         self.objects_parsed = mp.Condition()
 
-        # shape current frame so it can be treated as a numpy image
-        self.shared_frame_np = tonumpyarray(self.shared_frame_array).reshape(self.frame_shape)
-
-        self.capture_process = None
+        self.ffmpeg_process = None
+        self.capture_thread = None
 
         # for each region, create a separate thread to resize the region and prep for detection
         self.detection_prep_threads = []
@@ -175,8 +153,8 @@ class Camera:
                 region['threshold'] = 0.5
             self.detection_prep_threads.append(FramePrepper(
                 self.name,
-                self.shared_frame_np,
-                self.shared_frame_time,
+                self.current_frame,
+                self.frame_time,
                 self.frame_ready,
                 self.frame_lock,
                 region['size'], region['x_offset'], region['y_offset'], region['threshold'],
@@ -184,7 +162,7 @@ class Camera:
             ))
         
         # start a thread to store recent motion frames for processing
-        self.frame_tracker = FrameTracker(self.shared_frame_np, self.shared_frame_time, 
+        self.frame_tracker = FrameTracker(self.current_frame, self.frame_time, 
             self.frame_ready, self.frame_lock, self.recent_frames)
         self.frame_tracker.start()
 
@@ -215,20 +193,51 @@ class Camera:
 
 
     def start_or_restart_capture(self):
-        if not self.capture_process is None:
-            print("Terminating the existing capture process...")
-            self.capture_process.terminate()
-            del self.capture_process
-            self.capture_process = None
+        if not self.ffmpeg_process is None:
+            print("Killing the existing ffmpeg process...")
+            self.ffmpeg_process.kill()
+            self.ffmpeg_process.wait()
+            print("Waiting for the capture thread to exit...")
+            self.capture_thread.join()
+            self.ffmpeg_process = None
+            self.capture_thread = None
             
         # create the process to capture frames from the RTSP stream and store in a shared array
-        print("Creating a new capture process...")
-        self.capture_process = mp.Process(target=fetch_frames, args=(self.shared_frame_array, 
-            self.shared_frame_time, self.frame_lock, self.frame_ready, self.frame_shape, 
-            self.rtsp_url, self.take_frame, self.ffmpeg_hwaccel_args))
-        self.capture_process.daemon = True
-        print("Starting a new capture process...")
-        self.capture_process.start()
+        print("Creating a new ffmpeg process...")
+        self.start_ffmpeg()
+        
+        print("Creating a new capture thread...")
+        self.capture_thread = CameraCapture(self)
+        print("Starting a new capture thread...")
+        self.capture_thread.start()
+    
+    def start_ffmpeg(self):
+        ffmpeg_global_args = [
+            '-hide_banner', '-loglevel', 'panic'
+        ]
+        ffmpeg_input_args = [
+            '-avoid_negative_ts', 'make_zero', 
+            '-fflags', 'nobuffer',
+            '-flags', 'low_delay',
+            '-strict', 'experimental',
+            '-fflags', '+genpts', 
+            '-rtsp_transport', 'tcp', 
+            '-stimeout', '5000000', 
+            '-use_wallclock_as_timestamps', '1'
+        ]
+
+        ffmpeg_cmd = (['ffmpeg'] +
+            ffmpeg_global_args +
+            self.ffmpeg_hwaccel_args +
+            ffmpeg_input_args +
+            ['-i', self.rtsp_url,
+            '-f', 'rawvideo',
+            '-pix_fmt', 'rgb24',
+            'pipe:'])
+
+        print(" ".join(ffmpeg_cmd))
+        
+        self.ffmpeg_process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=self.frame_size)
     
     def start(self):
         self.start_or_restart_capture()
@@ -238,10 +247,10 @@ class Camera:
         self.watchdog.start()
     
     def join(self):
-        self.capture_process.join()
+        self.capture_thread.join()
     
     def get_capture_pid(self):
-        return self.capture_process.pid
+        return self.ffmpeg_process.pid
     
     def add_objects(self, objects):
         if len(objects) == 0:
@@ -291,7 +300,7 @@ class Camera:
         detected_objects = self.detected_objects.copy()
         # lock and make a copy of the current frame
         with self.frame_lock:
-            frame = self.shared_frame_np.copy()
+            frame = self.current_frame.copy()
 
         # draw the bounding boxes on the screen
         for obj in detected_objects: