Browse Source

label threads and implements stats endpoint

Blake Blackshear 5 năm trước cách đây
mục cha
commit
b6130e77ff
7 tập tin đã thay đổi với 192 bổ sung110 xóa
  1. 1 0
      Dockerfile
  2. 29 5
      detect_objects.py
  3. 4 1
      frigate/mqtt.py
  4. 20 78
      frigate/object_detection.py
  5. 4 1
      frigate/objects.py
  6. 70 1
      frigate/util.py
  7. 64 24
      frigate/video.py

+ 1 - 0
Dockerfile

@@ -45,6 +45,7 @@ RUN apt-get -qq update && apt-get -qq install --no-install-recommends -y \
  python3-pip \
  python3-pil \
  python3-numpy \
+ python3-prctl \
  libc++1 \
  libc++abi1 \
  libunwind8 \

+ 29 - 5
detect_objects.py

@@ -3,11 +3,12 @@ import time
 import queue
 import yaml
 import numpy as np
-from flask import Flask, Response, make_response
+from flask import Flask, Response, make_response, jsonify
 import paho.mqtt.client as mqtt
 
 from frigate.video import Camera
 from frigate.object_detection import PreppedQueueProcessor
+from frigate.util import EventsPerSecond
 
 with open('/config/config.yml') as f:
     CONFIG = yaml.safe_load(f)
@@ -70,20 +71,27 @@ def main():
     client.connect(MQTT_HOST, MQTT_PORT, 60)
     client.loop_start()
     
-    # Queue for prepped frames, max size set to (number of cameras * 5)
-    max_queue_size = len(CONFIG['cameras'].items())*5
-    prepped_frame_queue = queue.PriorityQueue(max_queue_size)
+    # Queue for prepped frames, max size set to number of regions * 3
+    max_queue_size = sum([len(camera['regions'])*3 for name, camera in CONFIG['cameras'].items()])
+    prepped_frame_queue = queue.Queue(max_queue_size)
 
     cameras = {}
     for name, config in CONFIG['cameras'].items():
         cameras[name] = Camera(name, FFMPEG_DEFAULT_CONFIG, GLOBAL_OBJECT_CONFIG, config, 
             prepped_frame_queue, client, MQTT_TOPIC_PREFIX)
 
+    fps_tracker = EventsPerSecond()
+    queue_full_tracker = EventsPerSecond()
+
     prepped_queue_processor = PreppedQueueProcessor(
         cameras,
-        prepped_frame_queue
+        prepped_frame_queue,
+        fps_tracker,
+        queue_full_tracker
     )
     prepped_queue_processor.start()
+    fps_tracker.start()
+    queue_full_tracker.start()
 
     for name, camera in cameras.items():
         camera.start()
@@ -97,6 +105,22 @@ def main():
         # return a healh
         return "Frigate is running. Alive and healthy!"
 
+    @app.route('/debug/stats')
+    def stats():
+        stats = {
+            'coral': {
+                'fps': fps_tracker.eps(),
+                'inference_speed': prepped_queue_processor.avg_inference_speed,
+                'queue_length': prepped_frame_queue.qsize(),
+                'queue_full_events_per_min': queue_full_tracker.eps(60)
+            }
+        }
+
+        for name, camera in cameras.items():
+            stats[name] = camera.stats()
+
+        return jsonify(stats)
+
     @app.route('/<camera_name>/<label>/best.jpg')
     def best(camera_name, label):
         if camera_name in cameras:

+ 4 - 1
frigate/mqtt.py

@@ -1,6 +1,7 @@
 import json
 import cv2
 import threading
+import prctl
 from collections import Counter, defaultdict
 
 class MqttObjectPublisher(threading.Thread):
@@ -13,6 +14,7 @@ class MqttObjectPublisher(threading.Thread):
         self.best_frames = best_frames
 
     def run(self):
+        prctl.set_name("MqttObjectPublisher")
         current_object_status = defaultdict(lambda: 'OFF')
         while True:
             # wait until objects have been parsed
@@ -35,7 +37,8 @@ class MqttObjectPublisher(threading.Thread):
                     self.client.publish(self.topic_prefix+'/'+obj_name, new_status, retain=False)
                     # send the snapshot over mqtt if we have it as well
                     if obj_name in self.best_frames.best_frames:
-                        ret, jpg = cv2.imencode('.jpg', self.best_frames.best_frames[obj_name])
+                        best_frame = cv2.cvtColor(self.best_frames.best_frames[obj_name], cv2.COLOR_RGB2BGR)
+                        ret, jpg = cv2.imencode('.jpg', best_frame)
                         if ret:
                             jpg_bytes = jpg.tobytes()
                             self.client.publish(self.topic_prefix+'/'+obj_name+'/snapshot', jpg_bytes, retain=True)

+ 20 - 78
frigate/object_detection.py

@@ -2,12 +2,13 @@ import datetime
 import time
 import cv2
 import threading
+import prctl
 import numpy as np
 from edgetpu.detection.engine import DetectionEngine
 from . util import tonumpyarray, LABELS, PATH_TO_CKPT
 
 class PreppedQueueProcessor(threading.Thread):
-    def __init__(self, cameras, prepped_frame_queue):
+    def __init__(self, cameras, prepped_frame_queue, fps, queue_full):
 
         threading.Thread.__init__(self)
         self.cameras = cameras
@@ -16,89 +17,33 @@ class PreppedQueueProcessor(threading.Thread):
         # Load the edgetpu engine and labels
         self.engine = DetectionEngine(PATH_TO_CKPT)
         self.labels = LABELS
+        self.fps = fps
+        self.queue_full = queue_full
+        self.avg_inference_speed = 10
 
     def run(self):
+        prctl.set_name("PreppedQueueProcessor")
         # process queue...
         while True:
+            if self.prepped_frame_queue.full():
+                self.queue_full.update()
+
             frame = self.prepped_frame_queue.get()
 
             # Actual detection.
-            objects = self.engine.DetectWithInputTensor(frame['frame'], threshold=0.5, top_k=5)
-            # print(self.engine.get_inference_time())
-
-            # parse and pass detected objects back to the camera
-            # TODO: just send this back with all the same info you received and objects as a new property
-            parsed_objects = []
-            for obj in objects:
-                parsed_objects.append({
-                            'region_id': frame['region_id'],
-                            'frame_time': frame['frame_time'],
-                            'name': str(self.labels[obj.label_id]),
-                            'score': float(obj.score),
-                            'box': obj.bounding_box.flatten().tolist()
-                        })
-            self.cameras[frame['camera_name']].add_objects(parsed_objects)
-
-
-# should this be a region class?
-class FramePrepper(threading.Thread):
-    def __init__(self, camera_name, shared_frame, frame_time, frame_ready, 
-        frame_lock,
-        region_size, region_x_offset, region_y_offset, region_id,
-        prepped_frame_queue):
-
-        threading.Thread.__init__(self)
-        self.camera_name = camera_name
-        self.shared_frame = shared_frame
-        self.frame_time = frame_time
-        self.frame_ready = frame_ready
-        self.frame_lock = frame_lock
-        self.region_size = region_size
-        self.region_x_offset = region_x_offset
-        self.region_y_offset = region_y_offset
-        self.region_id = region_id
-        self.prepped_frame_queue = prepped_frame_queue
+            frame['detected_objects'] = self.engine.DetectWithInputTensor(frame['frame'], threshold=0.5, top_k=5)
+            self.fps.update()
+            self.avg_inference_speed = (self.avg_inference_speed*9 + self.engine.get_inference_time())/10
 
-    def run(self):
-        frame_time = 0.0
-        while True:
-            now = datetime.datetime.now().timestamp()
-
-            with self.frame_ready:
-                # if there isnt a frame ready for processing or it is old, wait for a new frame
-                if self.frame_time.value == frame_time or (now - self.frame_time.value) > 0.5:
-                    self.frame_ready.wait()
-            
-            # make a copy of the cropped frame
-            with self.frame_lock:
-                cropped_frame = self.shared_frame[self.region_y_offset:self.region_y_offset+self.region_size, self.region_x_offset:self.region_x_offset+self.region_size].copy()
-                frame_time = self.frame_time.value
-            
-            # Resize to 300x300 if needed
-            if cropped_frame.shape != (300, 300, 3):
-                cropped_frame = cv2.resize(cropped_frame, dsize=(300, 300), interpolation=cv2.INTER_LINEAR)
-            # Expand dimensions since the model expects images to have shape: [1, 300, 300, 3]
-            frame_expanded = np.expand_dims(cropped_frame, axis=0)
-
-            # add the frame to the queue
-            if not self.prepped_frame_queue.full():
-                self.prepped_frame_queue.put({
-                    'camera_name': self.camera_name,
-                    'frame_time': frame_time,
-                    'frame': frame_expanded.flatten().copy(),
-                    'region_size': self.region_size,
-                    'region_id': self.region_id,
-                    'region_x_offset': self.region_x_offset,
-                    'region_y_offset': self.region_y_offset
-                })
-            else:
-                print("queue full. moving on")
+            self.cameras[frame['camera_name']].add_objects(frame)
 
 class RegionRequester(threading.Thread):
     def __init__(self, camera):
+        threading.Thread.__init__(self)
         self.camera = camera
 
     def run(self):
+        prctl.set_name("RegionRequester")
         frame_time = 0.0
         while True:
             now = datetime.datetime.now().timestamp()
@@ -110,27 +55,27 @@ class RegionRequester(threading.Thread):
             
             # make a copy of the frame_time
             frame_time = self.camera.frame_time.value
-            
+
             for index, region in enumerate(self.camera.config['regions']):
                 # queue with priority 1
-                self.camera.resize_queue.put((1, {
+                self.camera.resize_queue.put({
                     'camera_name': self.camera.name,
                     'frame_time': frame_time,
                     'region_id': index,
                     'size': region['size'],
                     'x_offset': region['x_offset'],
                     'y_offset': region['y_offset']
-                }))
+                })
 
 class RegionPrepper(threading.Thread):
     def __init__(self, frame_cache, resize_request_queue, prepped_frame_queue):
-
         threading.Thread.__init__(self)
         self.frame_cache = frame_cache
         self.resize_request_queue = resize_request_queue
         self.prepped_frame_queue = prepped_frame_queue
 
     def run(self):
+        prctl.set_name("RegionPrepper")
         while True:
 
             resize_request = self.resize_request_queue.get()
@@ -153,7 +98,4 @@ class RegionPrepper(threading.Thread):
             # add the frame to the queue
             if not self.prepped_frame_queue.full():
                 resize_request['frame'] = frame_expanded.flatten().copy()
-                # add to queue with priority 1
-                self.prepped_frame_queue.put((1, resize_request))
-            else:
-                print("queue full. moving on")
+                self.prepped_frame_queue.put(resize_request)

+ 4 - 1
frigate/objects.py

@@ -2,6 +2,7 @@ import time
 import datetime
 import threading
 import cv2
+import prctl
 import numpy as np
 from . util import draw_box_with_label
 
@@ -12,6 +13,7 @@ class ObjectCleaner(threading.Thread):
         self._detected_objects = detected_objects
 
     def run(self):
+        prctl.set_name("ObjectCleaner")
         while True:
 
             # wait a bit before checking for expired frames
@@ -47,6 +49,7 @@ class BestFrames(threading.Thread):
         self.best_frames = {}
 
     def run(self):
+        prctl.set_name("BestFrames")
         while True:
 
             # wait until objects have been parsed
@@ -80,4 +83,4 @@ class BestFrames(threading.Thread):
                     time_to_show = datetime.datetime.fromtimestamp(obj['frame_time']).strftime("%m/%d/%Y %H:%M:%S")
                     cv2.putText(best_frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
                     
-                    self.best_frames[name] = cv2.cvtColor(best_frame, cv2.COLOR_RGB2BGR)
+                    self.best_frames[name] = best_frame

+ 70 - 1
frigate/util.py

@@ -1,5 +1,8 @@
+import datetime
+import collections
 import numpy as np
 import cv2
+import threading
 import matplotlib.pyplot as plt
 
 # Function to read labels from text files.
@@ -12,6 +15,31 @@ def ReadLabelFile(file_path):
         ret[int(pair[0])] = pair[1].strip()
     return ret
 
+def calculate_region(frame_shape, xmin, ymin, xmax, ymax):    
+    # size is 50% larger than longest edge
+    size = max(xmax-xmin, ymax-ymin)
+    # if the size is too big to fit in the frame
+    if size > min(frame_shape[0], frame_shape[1]):
+        size = min(frame_shape[0], frame_shape[1])
+
+    # x_offset is midpoint of bounding box minus half the size
+    x_offset = int(((xmax-xmin)/2+xmin)-size/2)
+    # if outside the image
+    if x_offset < 0:
+        x_offset = 0
+    elif x_offset > (frame_shape[1]-size):
+        x_offset = (frame_shape[1]-size)
+
+    # x_offset is midpoint of bounding box minus half the size
+    y_offset = int(((ymax-ymin)/2+ymin)-size/2)
+    # if outside the image
+    if y_offset < 0:
+        y_offset = 0
+    elif y_offset > (frame_shape[0]-size):
+        y_offset = (frame_shape[0]-size)
+
+    return (size, x_offset, y_offset)
+
 # convert shared memory array into numpy array
 def tonumpyarray(mp_arr):
     return np.frombuffer(mp_arr.get_obj(), dtype=np.uint8)
@@ -47,4 +75,45 @@ cmap = plt.cm.get_cmap('tab10', len(LABELS.keys()))
 
 COLOR_MAP = {}
 for key, val in LABELS.items():
-    COLOR_MAP[val] = tuple(int(round(255 * c)) for c in cmap(key)[:3])
+    COLOR_MAP[val] = tuple(int(round(255 * c)) for c in cmap(key)[:3])
+
+class QueueMerger():
+    def __init__(self, from_queues, to_queue):
+        self.from_queues = from_queues
+        self.to_queue = to_queue
+        self.merge_threads = []
+
+    def start(self):
+        for from_q in self.from_queues:
+            self.merge_threads.append(QueueTransfer(from_q,self.to_queue))
+
+class QueueTransfer(threading.Thread):
+    def __init__(self, from_queue, to_queue):
+        threading.Thread.__init__(self)
+        self.from_queue = from_queue
+        self.to_queue = to_queue
+
+    def run(self):
+        while True:
+            self.to_queue.put(self.from_queue.get())
+
+class EventsPerSecond:
+    def __init__(self, max_events=1000):
+        self._start = None
+        self._max_events = max_events
+        self._timestamps = []
+    
+    def start(self):
+        self._start = datetime.datetime.now().timestamp()
+
+    def update(self):
+        self._timestamps.append(datetime.datetime.now().timestamp())
+        # truncate the list when it goes 100 over the max_size
+        if len(self._timestamps) > self._max_events+100:
+            self._timestamps = self._timestamps[(1-self._max_events):]
+
+    def eps(self, last_n_seconds=10):
+		# compute the (approximate) events in the last n seconds
+        now = datetime.datetime.now().timestamp()
+        seconds = min(now-self._start, last_n_seconds)
+        return len([t for t in self._timestamps if t > (now-last_n_seconds)]) / seconds

+ 64 - 24
frigate/video.py

@@ -8,9 +8,10 @@ import ctypes
 import multiprocessing as mp
 import subprocess as sp
 import numpy as np
+import prctl
 from collections import defaultdict
-from . util import tonumpyarray, draw_box_with_label
-from . object_detection import FramePrepper, RegionPrepper, RegionRequester
+from . util import tonumpyarray, LABELS, draw_box_with_label, calculate_region, EventsPerSecond
+from . object_detection import RegionPrepper, RegionRequester
 from . objects import ObjectCleaner, BestFrames
 from . mqtt import MqttObjectPublisher
 
@@ -25,23 +26,13 @@ class FrameTracker(threading.Thread):
         self.recent_frames = recent_frames
 
     def run(self):
-        frame_time = 0.0
+        prctl.set_name("FrameTracker")
         while True:
-            now = datetime.datetime.now().timestamp()
             # wait for a frame
             with self.frame_ready:
-                # if there isnt a frame ready for processing or it is old, wait for a signal
-                if self.frame_time.value == frame_time or (now - self.frame_time.value) > 0.5:
-                    self.frame_ready.wait()
-            
-            # lock and make a copy of the frame
-            with self.frame_lock: 
-                frame = self.shared_frame.copy()
-                frame_time = self.frame_time.value
-            
-            # add the frame to recent frames
-            self.recent_frames[frame_time] = frame
+                self.frame_ready.wait()
 
+            now = datetime.datetime.now().timestamp()
             # delete any old frames
             stored_frame_times = list(self.recent_frames.keys())
             for k in stored_frame_times:
@@ -67,7 +58,7 @@ class CameraWatchdog(threading.Thread):
         self.camera = camera
 
     def run(self):
-
+        prctl.set_name("CameraWatchdog")
         while True:
             # wait a bit before checking
             time.sleep(10)
@@ -84,6 +75,7 @@ class CameraCapture(threading.Thread):
         self.camera = camera
 
     def run(self):
+        prctl.set_name("CameraCapture")
         frame_num = 0
         while True:
             if self.camera.ffmpeg_process.poll() != None:
@@ -108,10 +100,13 @@ class CameraCapture(threading.Thread):
                     .frombuffer(raw_image, np.uint8)
                     .reshape(self.camera.frame_shape)
                 )
+                self.camera.frame_cache[self.camera.frame_time.value] = self.camera.current_frame.copy()
             # Notify with the condition that a new frame is ready
             with self.camera.frame_ready:
                 self.camera.frame_ready.notify_all()
 
+            self.camera.fps.update()
+
 class Camera:
     def __init__(self, name, ffmpeg_config, global_objects_config, config, prepped_frame_queue, mqtt_client, mqtt_prefix):
         self.name = name
@@ -148,7 +143,7 @@ class Camera:
 
         # Queue for prepped frames, max size set to (number of regions * 5)
         max_queue_size = len(self.config['regions'])*5
-        self.resize_queue = queue.PriorityQueue(max_queue_size)
+        self.resize_queue = queue.Queue(max_queue_size)
         
         # initialize the frame cache
         self.cached_frame_with_objects = {
@@ -158,6 +153,7 @@ class Camera:
 
         self.ffmpeg_process = None
         self.capture_thread = None
+        self.fps = EventsPerSecond()
 
         # for each region, merge the object config
         self.detection_prep_threads = []
@@ -173,6 +169,7 @@ class Camera:
 
         # start a thread to queue resize requests for regions
         self.region_requester = RegionRequester(self)
+        self.region_requester.start()
 
         # start a thread to cache recent frames for processing
         self.frame_tracker = FrameTracker(self.current_frame, self.frame_time, 
@@ -234,6 +231,7 @@ class Camera:
         self.capture_thread = CameraCapture(self)
         print("Starting a new capture thread...")
         self.capture_thread.start()
+        self.fps.start()
     
     def start_ffmpeg(self):
         ffmpeg_cmd = (['ffmpeg'] +
@@ -261,20 +259,30 @@ class Camera:
     def get_capture_pid(self):
         return self.ffmpeg_process.pid
     
-    def add_objects(self, objects):
+    def add_objects(self, frame):
+        objects = frame['detected_objects']
+
         if len(objects) == 0:
             return
 
-        for obj in objects:
+        for raw_obj in objects:
+            obj = {
+                'score': float(raw_obj.score),
+                'box': raw_obj.bounding_box.flatten().tolist(),
+                'name': str(LABELS[raw_obj.label_id]),
+                'frame_time': frame['frame_time'],
+                'region_id': frame['region_id']
+            }
+
             # find the matching region
-            region = self.regions[obj['region_id']]
+            region = self.regions[frame['region_id']]
 
             # Compute some extra properties
             obj.update({
-                'xmin': int((obj['box'][0] * region['size']) + region['x_offset']),
-                'ymin': int((obj['box'][1] * region['size']) + region['y_offset']),
-                'xmax': int((obj['box'][2] * region['size']) + region['x_offset']),
-                'ymax': int((obj['box'][3] * region['size']) + region['y_offset'])
+                'xmin': int((obj['box'][0] * frame['size']) + frame['x_offset']),
+                'ymin': int((obj['box'][1] * frame['size']) + frame['y_offset']),
+                'xmax': int((obj['box'][2] * frame['size']) + frame['x_offset']),
+                'ymax': int((obj['box'][3] * frame['size']) + frame['y_offset'])
             })
             
             # Compute the area
@@ -307,6 +315,29 @@ class Camera:
                 # if the object is in a masked location, don't add it to detected objects
                 if self.mask[y_location][x_location] == [0]:
                     continue
+            
+            # look to see if the bounding box is too close to the region border and the region border is not the edge of the frame
+            # if ((frame['x_offset'] > 0 and obj['box'][0] < 0.01) or 
+            #     (frame['y_offset'] > 0 and obj['box'][1] < 0.01) or
+            #     (frame['x_offset']+frame['size'] < self.frame_shape[1] and obj['box'][2] > 0.99) or
+            #     (frame['y_offset']+frame['size'] < self.frame_shape[0] and obj['box'][3] > 0.99)):
+
+            #     size, x_offset, y_offset = calculate_region(self.frame_shape, obj['xmin'], obj['ymin'], obj['xmax'], obj['ymax'])
+                # This triggers WAY too often with stationary objects on the edge of a region. 
+                # Every frame triggers it and fills the queue...
+                # I need to create a new region and add it to the list of regions, but 
+                # it needs to check for a duplicate region first.
+
+                # self.resize_queue.put({
+                #     'camera_name': self.name,
+                #     'frame_time': frame['frame_time'],
+                #     'region_id': frame['region_id'],
+                #     'size': size,
+                #     'x_offset': x_offset,
+                #     'y_offset': y_offset
+                # })
+                # print('object too close to region border')
+                #continue
 
             self.detected_objects.append(obj)
 
@@ -315,6 +346,12 @@ class Camera:
     
     def get_best(self, label):
         return self.best_frames.best_frames.get(label)
+
+    def stats(self):
+        return {
+            'camera_fps': self.fps.eps(60),
+            'resize_queue': self.resize_queue.qsize()
+        }
     
     def get_current_frame_with_objects(self):
         # make a copy of the current detected objects
@@ -340,6 +377,9 @@ class Camera:
         # print a timestamp
         time_to_show = datetime.datetime.fromtimestamp(frame_time).strftime("%m/%d/%Y %H:%M:%S")
         cv2.putText(frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
+        
+        # print fps
+        cv2.putText(frame, str(self.fps.eps())+'FPS', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
 
         # convert to BGR
         frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)