edgetpu.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. import os
  2. import datetime
  3. import hashlib
  4. import multiprocessing as mp
  5. import queue
  6. import logging
  7. from multiprocessing.connection import Connection
  8. from abc import ABC, abstractmethod
  9. from typing import Dict
  10. import numpy as np
  11. import tflite_runtime.interpreter as tflite
  12. from tflite_runtime.interpreter import load_delegate
  13. from frigate.util import EventsPerSecond, listen, SharedMemoryFrameManager
  14. logger = logging.getLogger(__name__)
  15. def load_labels(path, encoding='utf-8'):
  16. """Loads labels from file (with or without index numbers).
  17. Args:
  18. path: path to label file.
  19. encoding: label file encoding.
  20. Returns:
  21. Dictionary mapping indices to labels.
  22. """
  23. with open(path, 'r', encoding=encoding) as f:
  24. lines = f.readlines()
  25. if not lines:
  26. return {}
  27. if lines[0].split(' ', maxsplit=1)[0].isdigit():
  28. pairs = [line.split(' ', maxsplit=1) for line in lines]
  29. return {int(index): label.strip() for index, label in pairs}
  30. else:
  31. return {index: line.strip() for index, line in enumerate(lines)}
  32. class ObjectDetector(ABC):
  33. @abstractmethod
  34. def detect(self, tensor_input, threshold = .4):
  35. pass
  36. class LocalObjectDetector(ObjectDetector):
  37. def __init__(self, tf_device=None, labels=None):
  38. self.fps = EventsPerSecond()
  39. if labels is None:
  40. self.labels = {}
  41. else:
  42. self.labels = load_labels(labels)
  43. device_config = {"device": "usb"}
  44. if not tf_device is None:
  45. device_config = {"device": tf_device}
  46. edge_tpu_delegate = None
  47. if tf_device != 'cpu':
  48. try:
  49. logging.info(f"Attempting to load TPU as {device_config['device']}")
  50. edge_tpu_delegate = load_delegate('libedgetpu.so.1.0', device_config)
  51. logging.info("TPU found")
  52. except ValueError:
  53. logging.info("No EdgeTPU detected. Falling back to CPU.")
  54. if edge_tpu_delegate is None:
  55. self.interpreter = tflite.Interpreter(
  56. model_path='/cpu_model.tflite')
  57. else:
  58. self.interpreter = tflite.Interpreter(
  59. model_path='/edgetpu_model.tflite',
  60. experimental_delegates=[edge_tpu_delegate])
  61. self.interpreter.allocate_tensors()
  62. self.tensor_input_details = self.interpreter.get_input_details()
  63. self.tensor_output_details = self.interpreter.get_output_details()
  64. def detect(self, tensor_input, threshold=.4):
  65. detections = []
  66. raw_detections = self.detect_raw(tensor_input)
  67. for d in raw_detections:
  68. if d[1] < threshold:
  69. break
  70. detections.append((
  71. self.labels[int(d[0])],
  72. float(d[1]),
  73. (d[2], d[3], d[4], d[5])
  74. ))
  75. self.fps.update()
  76. return detections
  77. def detect_raw(self, tensor_input):
  78. self.interpreter.set_tensor(self.tensor_input_details[0]['index'], tensor_input)
  79. self.interpreter.invoke()
  80. boxes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[0]['index']))
  81. label_codes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[1]['index']))
  82. scores = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[2]['index']))
  83. detections = np.zeros((20,6), np.float32)
  84. for i, score in enumerate(scores):
  85. detections[i] = [label_codes[i], score, boxes[i][0], boxes[i][1], boxes[i][2], boxes[i][3]]
  86. return detections
  87. def run_detector(detection_queue: mp.Queue, out_events: Dict[str, mp.Event], avg_speed, start, tf_device):
  88. logging.info(f"Starting detection process: {os.getpid()}")
  89. listen()
  90. frame_manager = SharedMemoryFrameManager()
  91. object_detector = LocalObjectDetector(tf_device=tf_device)
  92. outputs = {}
  93. for name in out_events.keys():
  94. out_shm = mp.shared_memory.SharedMemory(name=f"out-{name}", create=False)
  95. out_np = np.ndarray((20,6), dtype=np.float32, buffer=out_shm.buf)
  96. outputs[name] = {
  97. 'shm': out_shm,
  98. 'np': out_np
  99. }
  100. while True:
  101. connection_id = detection_queue.get()
  102. input_frame = frame_manager.get(connection_id, (1,300,300,3))
  103. if input_frame is None:
  104. continue
  105. # detect and send the output
  106. start.value = datetime.datetime.now().timestamp()
  107. detections = object_detector.detect_raw(input_frame)
  108. duration = datetime.datetime.now().timestamp()-start.value
  109. outputs[connection_id]['np'][:] = detections[:]
  110. out_events[connection_id].set()
  111. start.value = 0.0
  112. avg_speed.value = (avg_speed.value*9 + duration)/10
  113. class EdgeTPUProcess():
  114. def __init__(self, detection_queue, out_events, tf_device=None):
  115. self.out_events = out_events
  116. self.detection_queue = detection_queue
  117. self.avg_inference_speed = mp.Value('d', 0.01)
  118. self.detection_start = mp.Value('d', 0.0)
  119. self.detect_process = None
  120. self.tf_device = tf_device
  121. self.start_or_restart()
  122. def stop(self):
  123. self.detect_process.terminate()
  124. logging.info("Waiting for detection process to exit gracefully...")
  125. self.detect_process.join(timeout=30)
  126. if self.detect_process.exitcode is None:
  127. logging.info("Detection process didnt exit. Force killing...")
  128. self.detect_process.kill()
  129. self.detect_process.join()
  130. def start_or_restart(self):
  131. self.detection_start.value = 0.0
  132. if (not self.detect_process is None) and self.detect_process.is_alive():
  133. self.stop()
  134. self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.out_events, self.avg_inference_speed, self.detection_start, self.tf_device))
  135. self.detect_process.daemon = True
  136. self.detect_process.start()
  137. class RemoteObjectDetector():
  138. def __init__(self, name, labels, detection_queue, event):
  139. self.labels = load_labels(labels)
  140. self.name = name
  141. self.fps = EventsPerSecond()
  142. self.detection_queue = detection_queue
  143. self.event = event
  144. self.shm = mp.shared_memory.SharedMemory(name=self.name, create=False)
  145. self.np_shm = np.ndarray((1,300,300,3), dtype=np.uint8, buffer=self.shm.buf)
  146. self.out_shm = mp.shared_memory.SharedMemory(name=f"out-{self.name}", create=False)
  147. self.out_np_shm = np.ndarray((20,6), dtype=np.float32, buffer=self.out_shm.buf)
  148. def detect(self, tensor_input, threshold=.4):
  149. detections = []
  150. # copy input to shared memory
  151. self.np_shm[:] = tensor_input[:]
  152. self.event.clear()
  153. self.detection_queue.put(self.name)
  154. result = self.event.wait(timeout=10.0)
  155. # if it timed out
  156. if result is None:
  157. return detections
  158. for d in self.out_np_shm:
  159. if d[1] < threshold:
  160. break
  161. detections.append((
  162. self.labels[int(d[0])],
  163. float(d[1]),
  164. (d[2], d[3], d[4], d[5])
  165. ))
  166. self.fps.update()
  167. return detections
  168. def cleanup(self):
  169. self.shm.unlink()
  170. self.out_shm.unlink()