edgetpu.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. import datetime
  2. import logging
  3. import multiprocessing as mp
  4. import os
  5. import queue
  6. import signal
  7. import threading
  8. from abc import ABC, abstractmethod
  9. from typing import Dict
  10. import numpy as np
  11. import tflite_runtime.interpreter as tflite
  12. from setproctitle import setproctitle
  13. from tflite_runtime.interpreter import load_delegate
  14. from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen, load_labels
  15. logger = logging.getLogger(__name__)
  16. class ObjectDetector(ABC):
  17. @abstractmethod
  18. def detect(self, tensor_input, threshold=0.4):
  19. pass
  20. class LocalObjectDetector(ObjectDetector):
  21. def __init__(self, tf_device=None, model_path=None, num_threads=3, labels=None):
  22. self.fps = EventsPerSecond()
  23. if labels is None:
  24. self.labels = {}
  25. else:
  26. self.labels = load_labels(labels)
  27. device_config = {"device": "usb"}
  28. if not tf_device is None:
  29. device_config = {"device": tf_device}
  30. edge_tpu_delegate = None
  31. if tf_device != "cpu":
  32. try:
  33. logger.info(f"Attempting to load TPU as {device_config['device']}")
  34. edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config)
  35. logger.info("TPU found")
  36. self.interpreter = tflite.Interpreter(
  37. model_path=model_path or "/edgetpu_model.tflite",
  38. experimental_delegates=[edge_tpu_delegate],
  39. )
  40. except ValueError:
  41. logger.error(
  42. "No EdgeTPU was detected. If you do not have a Coral device yet, you must configure CPU detectors."
  43. )
  44. raise
  45. else:
  46. logger.warning(
  47. "CPU detectors are not recommended and should only be used for testing or for trial purposes."
  48. )
  49. self.interpreter = tflite.Interpreter(
  50. model_path=model_path or "/cpu_model.tflite", num_threads=num_threads
  51. )
  52. self.interpreter.allocate_tensors()
  53. self.tensor_input_details = self.interpreter.get_input_details()
  54. self.tensor_output_details = self.interpreter.get_output_details()
  55. def detect(self, tensor_input, threshold=0.4):
  56. detections = []
  57. raw_detections = self.detect_raw(tensor_input)
  58. for d in raw_detections:
  59. if d[1] < threshold:
  60. break
  61. detections.append(
  62. (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5]))
  63. )
  64. self.fps.update()
  65. return detections
  66. def detect_raw(self, tensor_input):
  67. self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
  68. self.interpreter.invoke()
  69. boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0]
  70. class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0]
  71. scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0]
  72. count = int(
  73. self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0]
  74. )
  75. detections = np.zeros((20, 6), np.float32)
  76. for i in range(count):
  77. if scores[i] < 0.4 or i == 20:
  78. break
  79. detections[i] = [
  80. class_ids[i],
  81. float(scores[i]),
  82. boxes[i][0],
  83. boxes[i][1],
  84. boxes[i][2],
  85. boxes[i][3],
  86. ]
  87. return detections
  88. def run_detector(
  89. name: str,
  90. detection_queue: mp.Queue,
  91. out_events: Dict[str, mp.Event],
  92. avg_speed,
  93. start,
  94. model_path,
  95. model_shape,
  96. tf_device,
  97. num_threads,
  98. ):
  99. threading.current_thread().name = f"detector:{name}"
  100. logger = logging.getLogger(f"detector.{name}")
  101. logger.info(f"Starting detection process: {os.getpid()}")
  102. setproctitle(f"frigate.detector.{name}")
  103. listen()
  104. stop_event = mp.Event()
  105. def receiveSignal(signalNumber, frame):
  106. stop_event.set()
  107. signal.signal(signal.SIGTERM, receiveSignal)
  108. signal.signal(signal.SIGINT, receiveSignal)
  109. frame_manager = SharedMemoryFrameManager()
  110. object_detector = LocalObjectDetector(
  111. tf_device=tf_device, model_path=model_path, num_threads=num_threads
  112. )
  113. outputs = {}
  114. for name in out_events.keys():
  115. out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False)
  116. out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
  117. outputs[name] = {"shm": out_shm, "np": out_np}
  118. while not stop_event.is_set():
  119. try:
  120. connection_id = detection_queue.get(timeout=5)
  121. except queue.Empty:
  122. continue
  123. input_frame = frame_manager.get(
  124. connection_id, (1, model_shape[0], model_shape[1], 3)
  125. )
  126. if input_frame is None:
  127. continue
  128. # detect and send the output
  129. start.value = datetime.datetime.now().timestamp()
  130. detections = object_detector.detect_raw(input_frame)
  131. duration = datetime.datetime.now().timestamp() - start.value
  132. outputs[connection_id]["np"][:] = detections[:]
  133. out_events[connection_id].set()
  134. start.value = 0.0
  135. avg_speed.value = (avg_speed.value * 9 + duration) / 10
  136. class EdgeTPUProcess:
  137. def __init__(
  138. self,
  139. name,
  140. detection_queue,
  141. out_events,
  142. model_path,
  143. model_shape,
  144. tf_device=None,
  145. num_threads=3,
  146. ):
  147. self.name = name
  148. self.out_events = out_events
  149. self.detection_queue = detection_queue
  150. self.avg_inference_speed = mp.Value("d", 0.01)
  151. self.detection_start = mp.Value("d", 0.0)
  152. self.detect_process = None
  153. self.model_path = model_path
  154. self.model_shape = model_shape
  155. self.tf_device = tf_device
  156. self.num_threads = num_threads
  157. self.start_or_restart()
  158. def stop(self):
  159. self.detect_process.terminate()
  160. logging.info("Waiting for detection process to exit gracefully...")
  161. self.detect_process.join(timeout=30)
  162. if self.detect_process.exitcode is None:
  163. logging.info("Detection process didnt exit. Force killing...")
  164. self.detect_process.kill()
  165. self.detect_process.join()
  166. def start_or_restart(self):
  167. self.detection_start.value = 0.0
  168. if (not self.detect_process is None) and self.detect_process.is_alive():
  169. self.stop()
  170. self.detect_process = mp.Process(
  171. target=run_detector,
  172. name=f"detector:{self.name}",
  173. args=(
  174. self.name,
  175. self.detection_queue,
  176. self.out_events,
  177. self.avg_inference_speed,
  178. self.detection_start,
  179. self.model_path,
  180. self.model_shape,
  181. self.tf_device,
  182. self.num_threads,
  183. ),
  184. )
  185. self.detect_process.daemon = True
  186. self.detect_process.start()
  187. class RemoteObjectDetector:
  188. def __init__(self, name, labels, detection_queue, event, model_shape):
  189. self.labels = labels
  190. self.name = name
  191. self.fps = EventsPerSecond()
  192. self.detection_queue = detection_queue
  193. self.event = event
  194. self.shm = mp.shared_memory.SharedMemory(name=self.name, create=False)
  195. self.np_shm = np.ndarray(
  196. (1, model_shape[0], model_shape[1], 3), dtype=np.uint8, buffer=self.shm.buf
  197. )
  198. self.out_shm = mp.shared_memory.SharedMemory(
  199. name=f"out-{self.name}", create=False
  200. )
  201. self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf)
  202. def detect(self, tensor_input, threshold=0.4):
  203. detections = []
  204. # copy input to shared memory
  205. self.np_shm[:] = tensor_input[:]
  206. self.event.clear()
  207. self.detection_queue.put(self.name)
  208. result = self.event.wait(timeout=10.0)
  209. # if it timed out
  210. if result is None:
  211. return detections
  212. for d in self.out_np_shm:
  213. if d[1] < threshold:
  214. break
  215. detections.append(
  216. (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5]))
  217. )
  218. self.fps.update()
  219. return detections
  220. def cleanup(self):
  221. self.shm.unlink()
  222. self.out_shm.unlink()