edgetpu.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import datetime
  2. import hashlib
  3. import logging
  4. import multiprocessing as mp
  5. import os
  6. import queue
  7. import threading
  8. from abc import ABC, abstractmethod
  9. from multiprocessing.connection import Connection
  10. from typing import Dict
  11. import numpy as np
  12. import tflite_runtime.interpreter as tflite
  13. from tflite_runtime.interpreter import load_delegate
  14. from frigate.util import EventsPerSecond, SharedMemoryFrameManager, listen
  15. logger = logging.getLogger(__name__)
  16. def load_labels(path, encoding='utf-8'):
  17. """Loads labels from file (with or without index numbers).
  18. Args:
  19. path: path to label file.
  20. encoding: label file encoding.
  21. Returns:
  22. Dictionary mapping indices to labels.
  23. """
  24. with open(path, 'r', encoding=encoding) as f:
  25. lines = f.readlines()
  26. if not lines:
  27. return {}
  28. if lines[0].split(' ', maxsplit=1)[0].isdigit():
  29. pairs = [line.split(' ', maxsplit=1) for line in lines]
  30. return {int(index): label.strip() for index, label in pairs}
  31. else:
  32. return {index: line.strip() for index, line in enumerate(lines)}
  33. class ObjectDetector(ABC):
  34. @abstractmethod
  35. def detect(self, tensor_input, threshold = .4):
  36. pass
  37. class LocalObjectDetector(ObjectDetector):
  38. def __init__(self, tf_device=None, labels=None):
  39. self.fps = EventsPerSecond()
  40. if labels is None:
  41. self.labels = {}
  42. else:
  43. self.labels = load_labels(labels)
  44. device_config = {"device": "usb"}
  45. if not tf_device is None:
  46. device_config = {"device": tf_device}
  47. edge_tpu_delegate = None
  48. if tf_device != 'cpu':
  49. try:
  50. logging.info(f"Attempting to load TPU as {device_config['device']}")
  51. edge_tpu_delegate = load_delegate('libedgetpu.so.1.0', device_config)
  52. logging.info("TPU found")
  53. except ValueError:
  54. logging.info("No EdgeTPU detected. Falling back to CPU.")
  55. if edge_tpu_delegate is None:
  56. self.interpreter = tflite.Interpreter(
  57. model_path='/cpu_model.tflite')
  58. else:
  59. self.interpreter = tflite.Interpreter(
  60. model_path='/edgetpu_model.tflite',
  61. experimental_delegates=[edge_tpu_delegate])
  62. self.interpreter.allocate_tensors()
  63. self.tensor_input_details = self.interpreter.get_input_details()
  64. self.tensor_output_details = self.interpreter.get_output_details()
  65. def detect(self, tensor_input, threshold=.4):
  66. detections = []
  67. raw_detections = self.detect_raw(tensor_input)
  68. for d in raw_detections:
  69. if d[1] < threshold:
  70. break
  71. detections.append((
  72. self.labels[int(d[0])],
  73. float(d[1]),
  74. (d[2], d[3], d[4], d[5])
  75. ))
  76. self.fps.update()
  77. return detections
  78. def detect_raw(self, tensor_input):
  79. self.interpreter.set_tensor(self.tensor_input_details[0]['index'], tensor_input)
  80. self.interpreter.invoke()
  81. boxes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[0]['index']))
  82. label_codes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[1]['index']))
  83. scores = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[2]['index']))
  84. detections = np.zeros((20,6), np.float32)
  85. for i, score in enumerate(scores):
  86. detections[i] = [label_codes[i], score, boxes[i][0], boxes[i][1], boxes[i][2], boxes[i][3]]
  87. return detections
  88. def run_detector(name: str, detection_queue: mp.Queue, out_events: Dict[str, mp.Event], avg_speed, start, tf_device):
  89. threading.current_thread().name = f"detector:{name}"
  90. logging.info(f"Starting detection process: {os.getpid()}")
  91. listen()
  92. frame_manager = SharedMemoryFrameManager()
  93. object_detector = LocalObjectDetector(tf_device=tf_device)
  94. outputs = {}
  95. for name in out_events.keys():
  96. out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False)
  97. out_np = np.ndarray((20,6), dtype=np.float32, buffer=out_shm.buf)
  98. outputs[name] = {
  99. 'shm': out_shm,
  100. 'np': out_np
  101. }
  102. while True:
  103. connection_id = detection_queue.get()
  104. input_frame = frame_manager.get(connection_id, (1,300,300,3))
  105. if input_frame is None:
  106. continue
  107. # detect and send the output
  108. start.value = datetime.datetime.now().timestamp()
  109. detections = object_detector.detect_raw(input_frame)
  110. duration = datetime.datetime.now().timestamp()-start.value
  111. outputs[connection_id]['np'][:] = detections[:]
  112. out_events[connection_id].set()
  113. start.value = 0.0
  114. avg_speed.value = (avg_speed.value*9 + duration)/10
  115. class EdgeTPUProcess():
  116. def __init__(self, name, detection_queue, out_events, tf_device=None):
  117. self.name = name
  118. self.out_events = out_events
  119. self.detection_queue = detection_queue
  120. self.avg_inference_speed = mp.Value('d', 0.01)
  121. self.detection_start = mp.Value('d', 0.0)
  122. self.detect_process = None
  123. self.tf_device = tf_device
  124. self.start_or_restart()
  125. def stop(self):
  126. self.detect_process.terminate()
  127. logging.info("Waiting for detection process to exit gracefully...")
  128. self.detect_process.join(timeout=30)
  129. if self.detect_process.exitcode is None:
  130. logging.info("Detection process didnt exit. Force killing...")
  131. self.detect_process.kill()
  132. self.detect_process.join()
  133. def start_or_restart(self):
  134. self.detection_start.value = 0.0
  135. if (not self.detect_process is None) and self.detect_process.is_alive():
  136. self.stop()
  137. self.detect_process = mp.Process(target=run_detector, name=f"detector:{self.name}", args=(self.name, self.detection_queue, self.out_events, self.avg_inference_speed, self.detection_start, self.tf_device))
  138. self.detect_process.daemon = True
  139. self.detect_process.start()
  140. class RemoteObjectDetector():
  141. def __init__(self, name, labels, detection_queue, event):
  142. self.labels = load_labels(labels)
  143. self.name = name
  144. self.fps = EventsPerSecond()
  145. self.detection_queue = detection_queue
  146. self.event = event
  147. self.shm = mp.shared_memory.SharedMemory(name=self.name, create=False)
  148. self.np_shm = np.ndarray((1,300,300,3), dtype=np.uint8, buffer=self.shm.buf)
  149. self.out_shm = mp.shared_memory.SharedMemory(name=f"out-{self.name}", create=False)
  150. self.out_np_shm = np.ndarray((20,6), dtype=np.float32, buffer=self.out_shm.buf)
  151. def detect(self, tensor_input, threshold=.4):
  152. detections = []
  153. # copy input to shared memory
  154. self.np_shm[:] = tensor_input[:]
  155. self.event.clear()
  156. self.detection_queue.put(self.name)
  157. result = self.event.wait(timeout=10.0)
  158. # if it timed out
  159. if result is None:
  160. return detections
  161. for d in self.out_np_shm:
  162. if d[1] < threshold:
  163. break
  164. detections.append((
  165. self.labels[int(d[0])],
  166. float(d[1]),
  167. (d[2], d[3], d[4], d[5])
  168. ))
  169. self.fps.update()
  170. return detections
  171. def cleanup(self):
  172. self.shm.unlink()
  173. self.out_shm.unlink()