edgetpu.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. import datetime
  2. import logging
  3. import multiprocessing as mp
  4. import os
  5. import queue
  6. import signal
  7. import threading
  8. from abc import ABC, abstractmethod
  9. from typing import Dict
  10. import numpy as np
  11. from pycoral.adapters import detect
  12. import tflite_runtime.interpreter as tflite
  13. from setproctitle import setproctitle
  14. from tflite_runtime.interpreter import load_delegate
  15. from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen
  16. logger = logging.getLogger(__name__)
  17. def load_labels(path, encoding="utf-8"):
  18. """Loads labels from file (with or without index numbers).
  19. Args:
  20. path: path to label file.
  21. encoding: label file encoding.
  22. Returns:
  23. Dictionary mapping indices to labels.
  24. """
  25. with open(path, "r", encoding=encoding) as f:
  26. lines = f.readlines()
  27. if not lines:
  28. return {}
  29. if lines[0].split(" ", maxsplit=1)[0].isdigit():
  30. pairs = [line.split(" ", maxsplit=1) for line in lines]
  31. return {int(index): label.strip() for index, label in pairs}
  32. else:
  33. return {index: line.strip() for index, line in enumerate(lines)}
  34. class ObjectDetector(ABC):
  35. @abstractmethod
  36. def detect(self, tensor_input, threshold=0.4):
  37. pass
  38. class LocalObjectDetector(ObjectDetector):
  39. def __init__(self, tf_device=None, num_threads=3, labels=None):
  40. self.fps = EventsPerSecond()
  41. if labels is None:
  42. self.labels = {}
  43. else:
  44. self.labels = load_labels(labels)
  45. device_config = {"device": "usb"}
  46. if not tf_device is None:
  47. device_config = {"device": tf_device}
  48. edge_tpu_delegate = None
  49. if tf_device != "cpu":
  50. try:
  51. logger.info(f"Attempting to load TPU as {device_config['device']}")
  52. edge_tpu_delegate = load_delegate("libedgetpu.so.1.0", device_config)
  53. logger.info("TPU found")
  54. self.interpreter = tflite.Interpreter(
  55. model_path="/edgetpu_model.tflite",
  56. experimental_delegates=[edge_tpu_delegate],
  57. )
  58. except ValueError:
  59. logger.info("No EdgeTPU detected.")
  60. raise
  61. else:
  62. self.logger.error("****************************************************")
  63. self.logger.error(f"** CPU detectors are not recommended and should")
  64. self.logger.error(f"** only be used for testing or for trial purposes.")
  65. self.logger.error(("****************************************************"))
  66. self.interpreter = tflite.Interpreter(
  67. model_path="/cpu_model.tflite", num_threads=num_threads
  68. )
  69. self.interpreter.allocate_tensors()
  70. self.tensor_input_details = self.interpreter.get_input_details()
  71. self.tensor_output_details = self.interpreter.get_output_details()
  72. def detect(self, tensor_input, threshold=0.4):
  73. detections = []
  74. raw_detections = self.detect_raw(tensor_input)
  75. for d in raw_detections:
  76. if d[1] < threshold:
  77. break
  78. detections.append(
  79. (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5]))
  80. )
  81. self.fps.update()
  82. return detections
  83. def detect_raw(self, tensor_input):
  84. self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input)
  85. self.interpreter.invoke()
  86. objects = detect.get_objects(self.interpreter, 0.4)
  87. detections = np.zeros((20, 6), np.float32)
  88. for i, obj in enumerate(objects):
  89. if i == 20:
  90. break
  91. detections[i] = [
  92. obj.id,
  93. obj.score,
  94. obj.bbox.ymin,
  95. obj.bbox.xmin,
  96. obj.bbox.ymax,
  97. obj.bbox.xmax,
  98. ]
  99. return detections
  100. def run_detector(
  101. name: str,
  102. detection_queue: mp.Queue,
  103. out_events: Dict[str, mp.Event],
  104. avg_speed,
  105. start,
  106. model_shape,
  107. tf_device,
  108. num_threads,
  109. ):
  110. threading.current_thread().name = f"detector:{name}"
  111. logger = logging.getLogger(f"detector.{name}")
  112. logger.info(f"Starting detection process: {os.getpid()}")
  113. setproctitle(f"frigate.detector.{name}")
  114. listen()
  115. stop_event = mp.Event()
  116. def receiveSignal(signalNumber, frame):
  117. stop_event.set()
  118. signal.signal(signal.SIGTERM, receiveSignal)
  119. signal.signal(signal.SIGINT, receiveSignal)
  120. frame_manager = SharedMemoryFrameManager()
  121. object_detector = LocalObjectDetector(tf_device=tf_device, num_threads=num_threads)
  122. outputs = {}
  123. for name in out_events.keys():
  124. out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False)
  125. out_np = np.ndarray((20, 6), dtype=np.float32, buffer=out_shm.buf)
  126. outputs[name] = {"shm": out_shm, "np": out_np}
  127. while not stop_event.is_set():
  128. try:
  129. connection_id = detection_queue.get(timeout=5)
  130. except queue.Empty:
  131. continue
  132. input_frame = frame_manager.get(
  133. connection_id, (1, model_shape[0], model_shape[1], 3)
  134. )
  135. if input_frame is None:
  136. continue
  137. # detect and send the output
  138. start.value = datetime.datetime.now().timestamp()
  139. detections = object_detector.detect_raw(input_frame)
  140. duration = datetime.datetime.now().timestamp() - start.value
  141. outputs[connection_id]["np"][:] = detections[:]
  142. out_events[connection_id].set()
  143. start.value = 0.0
  144. avg_speed.value = (avg_speed.value * 9 + duration) / 10
  145. class EdgeTPUProcess:
  146. def __init__(
  147. self,
  148. name,
  149. detection_queue,
  150. out_events,
  151. model_shape,
  152. tf_device=None,
  153. num_threads=3,
  154. ):
  155. self.name = name
  156. self.out_events = out_events
  157. self.detection_queue = detection_queue
  158. self.avg_inference_speed = mp.Value("d", 0.01)
  159. self.detection_start = mp.Value("d", 0.0)
  160. self.detect_process = None
  161. self.model_shape = model_shape
  162. self.tf_device = tf_device
  163. self.num_threads = num_threads
  164. self.start_or_restart()
  165. def stop(self):
  166. self.detect_process.terminate()
  167. logging.info("Waiting for detection process to exit gracefully...")
  168. self.detect_process.join(timeout=30)
  169. if self.detect_process.exitcode is None:
  170. logging.info("Detection process didnt exit. Force killing...")
  171. self.detect_process.kill()
  172. self.detect_process.join()
  173. def start_or_restart(self):
  174. self.detection_start.value = 0.0
  175. if (not self.detect_process is None) and self.detect_process.is_alive():
  176. self.stop()
  177. self.detect_process = mp.Process(
  178. target=run_detector,
  179. name=f"detector:{self.name}",
  180. args=(
  181. self.name,
  182. self.detection_queue,
  183. self.out_events,
  184. self.avg_inference_speed,
  185. self.detection_start,
  186. self.model_shape,
  187. self.tf_device,
  188. self.num_threads,
  189. ),
  190. )
  191. self.detect_process.daemon = True
  192. self.detect_process.start()
  193. class RemoteObjectDetector:
  194. def __init__(self, name, labels, detection_queue, event, model_shape):
  195. self.labels = labels
  196. self.name = name
  197. self.fps = EventsPerSecond()
  198. self.detection_queue = detection_queue
  199. self.event = event
  200. self.shm = mp.shared_memory.SharedMemory(name=self.name, create=False)
  201. self.np_shm = np.ndarray(
  202. (1, model_shape[0], model_shape[1], 3), dtype=np.uint8, buffer=self.shm.buf
  203. )
  204. self.out_shm = mp.shared_memory.SharedMemory(
  205. name=f"out-{self.name}", create=False
  206. )
  207. self.out_np_shm = np.ndarray((20, 6), dtype=np.float32, buffer=self.out_shm.buf)
  208. def detect(self, tensor_input, threshold=0.4):
  209. detections = []
  210. # copy input to shared memory
  211. self.np_shm[:] = tensor_input[:]
  212. self.event.clear()
  213. self.detection_queue.put(self.name)
  214. result = self.event.wait(timeout=10.0)
  215. # if it timed out
  216. if result is None:
  217. return detections
  218. for d in self.out_np_shm:
  219. if d[1] < threshold:
  220. break
  221. detections.append(
  222. (self.labels[int(d[0])], float(d[1]), (d[2], d[3], d[4], d[5]))
  223. )
  224. self.fps.update()
  225. return detections
  226. def cleanup(self):
  227. self.shm.unlink()
  228. self.out_shm.unlink()