Ver Fonte

update detection handoff to use shared memory

Blake Blackshear há 4 anos atrás
pai
commit
574ee2a46f
2 ficheiros alterados com 54 adições e 30 exclusões
  1. 26 17
      benchmark.py
  2. 28 13
      frigate/edgetpu.py

+ 26 - 17
benchmark.py

@@ -11,7 +11,7 @@ labels = load_labels('/labelmap.txt')
 ######
 # Minimal same process runner
 ######
-# object_detector = ObjectDetector()
+# object_detector = LocalObjectDetector()
 # tensor_input = np.expand_dims(np.full((300,300,3), 0, np.uint8), axis=0)
 
 # start = datetime.datetime.now().timestamp()
@@ -40,8 +40,8 @@ labels = load_labels('/labelmap.txt')
 ######
 # Separate process runner
 ######
-def start(id, num_detections, detection_queue):
-  object_detector = RemoteObjectDetector(str(id), '/labelmap.txt', detection_queue)
+def start(id, num_detections, detection_queue, event):
+  object_detector = RemoteObjectDetector(str(id), '/labelmap.txt', detection_queue, event)
   start = datetime.datetime.now().timestamp()
 
   frame_times = []
@@ -54,26 +54,35 @@ def start(id, num_detections, detection_queue):
   print(f"{id} - Processed for {duration:.2f} seconds.")
   print(f"{id} - Average frame processing time: {mean(frame_times)*1000:.2f}ms")
 
-edgetpu_process = EdgeTPUProcess()
+event = mp.Event()
+edgetpu_process = EdgeTPUProcess({'1': event})
 
-# start(1, 1000, edgetpu_process.detect_lock, edgetpu_process.detect_ready, edgetpu_process.frame_ready)
+start(1, 1000, edgetpu_process.detection_queue, event)
+print(f"Average raw inference speed: {edgetpu_process.avg_inference_speed.value*1000:.2f}ms")
 
 ####
 # Multiple camera processes
 ####
-camera_processes = []
-for x in range(0, 10):
-  camera_process = mp.Process(target=start, args=(x, 100, edgetpu_process.detection_queue))
-  camera_process.daemon = True
-  camera_processes.append(camera_process)
+# camera_processes = []
 
-start = datetime.datetime.now().timestamp()
+# pipes = {}
+# for x in range(0, 10):
+#   pipes[x] = mp.Pipe(duplex=False)
 
-for p in camera_processes:
-  p.start()
+# edgetpu_process = EdgeTPUProcess({str(key): value[1] for (key, value) in pipes.items()})
 
-for p in camera_processes:
-  p.join()
+# for x in range(0, 10):
+#   camera_process = mp.Process(target=start, args=(x, 100, edgetpu_process.detection_queue, pipes[x][0]))
+#   camera_process.daemon = True
+#   camera_processes.append(camera_process)
 
-duration = datetime.datetime.now().timestamp()-start
-print(f"Total - Processed for {duration:.2f} seconds.")
+# start = datetime.datetime.now().timestamp()
+
+# for p in camera_processes:
+#   p.start()
+
+# for p in camera_processes:
+#   p.join()
+
+# duration = datetime.datetime.now().timestamp()-start
+# print(f"Total - Processed for {duration:.2f} seconds.")

+ 28 - 13
frigate/edgetpu.py

@@ -102,12 +102,21 @@ class LocalObjectDetector(ObjectDetector):
         
         return detections
 
-def run_detector(detection_queue, result_connections: Dict[str, Connection], avg_speed, start, tf_device):
+def run_detector(detection_queue, out_events: Dict[str, mp.Event], avg_speed, start, tf_device):
     print(f"Starting detection process: {os.getpid()}")
     listen()
     frame_manager = SharedMemoryFrameManager()
     object_detector = LocalObjectDetector(tf_device=tf_device)
 
+    outputs = {}
+    for name in out_events.keys():
+        out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False)
+        out_np = np.ndarray((20,6), dtype=np.float32, buffer=out_shm.buf)
+        outputs[name] = {
+            'shm': out_shm,
+            'np': out_np
+        }
+    
     while True:
         connection_id = detection_queue.get()
         input_frame = frame_manager.get(connection_id, (1,300,300,3))
@@ -115,20 +124,21 @@ def run_detector(detection_queue, result_connections: Dict[str, Connection], avg
         if input_frame is None:
             continue
 
-        # detect and put the output in the plasma store
+        # detect and send the output
         start.value = datetime.datetime.now().timestamp()
         # TODO: what is the overhead for pickling this result vs writing back to shared memory?
         #       I could try using an Event() and waiting in the other process before looking in memory...
         detections = object_detector.detect_raw(input_frame)
-        result_connections[connection_id].send(detections)
         duration = datetime.datetime.now().timestamp()-start.value
+        outputs[connection_id]['np'][:] = detections[:]
+        out_events[connection_id].set()
         start.value = 0.0
 
         avg_speed.value = (avg_speed.value*9 + duration)/10
         
 class EdgeTPUProcess():
-    def __init__(self, result_connections, tf_device=None):
-        self.result_connections = result_connections
+    def __init__(self, out_events, tf_device=None):
+        self.out_events = out_events
         self.detection_queue = mp.Queue()
         self.avg_inference_speed = mp.Value('d', 0.01)
         self.detection_start = mp.Value('d', 0.0)
@@ -149,19 +159,21 @@ class EdgeTPUProcess():
         self.detection_start.value = 0.0
         if (not self.detect_process is None) and self.detect_process.is_alive():
             self.stop()
-        self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.result_connections, self.avg_inference_speed, self.detection_start, self.tf_device))
+        self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.out_events, self.avg_inference_speed, self.detection_start, self.tf_device))
         self.detect_process.daemon = True
         self.detect_process.start()
 
 class RemoteObjectDetector():
-    def __init__(self, name, labels, detection_queue, result_connection: Connection):
+    def __init__(self, name, labels, detection_queue, event):
         self.labels = load_labels(labels)
         self.name = name
         self.fps = EventsPerSecond()
         self.detection_queue = detection_queue
-        self.result_connection = result_connection
+        self.event = event
         self.shm = mp.shared_memory.SharedMemory(name=self.name, create=True, size=300*300*3)
         self.np_shm = np.ndarray((1,300,300,3), dtype=np.uint8, buffer=self.shm.buf)
+        self.out_shm = mp.shared_memory.SharedMemory(name=f"out-{self.name}", create=True, size=20*6*4)
+        self.out_np_shm = np.ndarray((20,6), dtype=np.float32, buffer=self.out_shm.buf)
     
     def detect(self, tensor_input, threshold=.4):
         detections = []
@@ -169,13 +181,16 @@ class RemoteObjectDetector():
         # copy input to shared memory
         # TODO: what if I just write it there in the first place?
         self.np_shm[:] = tensor_input[:]
+        self.event.clear()
         self.detection_queue.put(self.name)
-        if self.result_connection.poll(10):
-            raw_detections = self.result_connection.recv()
-        else:
-            return detections
+        self.event.wait()
+        
+        # if self.result_connection.poll(10):
+        #     raw_detections = self.result_connection.recv()
+        # else:
+        #     return detections
 
-        for d in raw_detections:
+        for d in self.out_np_shm:
             if d[1] < threshold:
                 break
             detections.append((