edgetpu.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import os
  2. import datetime
  3. import hashlib
  4. import multiprocessing as mp
  5. from abc import ABC, abstractmethod
  6. import numpy as np
  7. import pyarrow.plasma as plasma
  8. import tflite_runtime.interpreter as tflite
  9. from tflite_runtime.interpreter import load_delegate
  10. from frigate.util import EventsPerSecond, listen
  11. def load_labels(path, encoding='utf-8'):
  12. """Loads labels from file (with or without index numbers).
  13. Args:
  14. path: path to label file.
  15. encoding: label file encoding.
  16. Returns:
  17. Dictionary mapping indices to labels.
  18. """
  19. with open(path, 'r', encoding=encoding) as f:
  20. lines = f.readlines()
  21. if not lines:
  22. return {}
  23. if lines[0].split(' ', maxsplit=1)[0].isdigit():
  24. pairs = [line.split(' ', maxsplit=1) for line in lines]
  25. return {int(index): label.strip() for index, label in pairs}
  26. else:
  27. return {index: line.strip() for index, line in enumerate(lines)}
  28. class ObjectDetector(ABC):
  29. @abstractmethod
  30. def detect(self, tensor_input, threshold = .4):
  31. pass
  32. class LocalObjectDetector(ObjectDetector):
  33. def __init__(self, tf_device=None, labels=None):
  34. self.fps = EventsPerSecond()
  35. if labels is None:
  36. self.labels = {}
  37. else:
  38. self.labels = load_labels(labels)
  39. device_config = {"device": "usb"}
  40. if not tf_device is None:
  41. device_config = {"device": tf_device}
  42. edge_tpu_delegate = None
  43. try:
  44. print(f"Attempting to load TPU as {device_config['device']}")
  45. edge_tpu_delegate = load_delegate('libedgetpu.so.1.0', device_config)
  46. print("TPU found")
  47. except ValueError:
  48. try:
  49. print(f"Attempting to load TPU as pci:0")
  50. edge_tpu_delegate = load_delegate('libedgetpu.so.1.0', {"device": "pci:0"})
  51. print("PCIe TPU found")
  52. except ValueError:
  53. print("No EdgeTPU detected. Falling back to CPU.")
  54. if edge_tpu_delegate is None:
  55. self.interpreter = tflite.Interpreter(
  56. model_path='/cpu_model.tflite')
  57. else:
  58. self.interpreter = tflite.Interpreter(
  59. model_path='/edgetpu_model.tflite',
  60. experimental_delegates=[edge_tpu_delegate])
  61. self.interpreter.allocate_tensors()
  62. self.tensor_input_details = self.interpreter.get_input_details()
  63. self.tensor_output_details = self.interpreter.get_output_details()
  64. def detect(self, tensor_input, threshold=.4):
  65. detections = []
  66. raw_detections = self.detect_raw(tensor_input)
  67. for d in raw_detections:
  68. if d[1] < threshold:
  69. break
  70. detections.append((
  71. self.labels[int(d[0])],
  72. float(d[1]),
  73. (d[2], d[3], d[4], d[5])
  74. ))
  75. self.fps.update()
  76. return detections
  77. def detect_raw(self, tensor_input):
  78. self.interpreter.set_tensor(self.tensor_input_details[0]['index'], tensor_input)
  79. self.interpreter.invoke()
  80. boxes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[0]['index']))
  81. label_codes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[1]['index']))
  82. scores = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[2]['index']))
  83. detections = np.zeros((20,6), np.float32)
  84. for i, score in enumerate(scores):
  85. detections[i] = [label_codes[i], score, boxes[i][0], boxes[i][1], boxes[i][2], boxes[i][3]]
  86. return detections
  87. def run_detector(detection_queue, avg_speed, start, tf_device):
  88. print(f"Starting detection process: {os.getpid()}")
  89. listen()
  90. plasma_client = plasma.connect("/tmp/plasma")
  91. object_detector = LocalObjectDetector(tf_device=tf_device)
  92. while True:
  93. object_id_str = detection_queue.get()
  94. object_id_hash = hashlib.sha1(str.encode(object_id_str))
  95. object_id = plasma.ObjectID(object_id_hash.digest())
  96. object_id_out = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{object_id_str}")).digest())
  97. input_frame = plasma_client.get(object_id, timeout_ms=0)
  98. if input_frame is plasma.ObjectNotAvailable:
  99. continue
  100. # detect and put the output in the plasma store
  101. start.value = datetime.datetime.now().timestamp()
  102. plasma_client.put(object_detector.detect_raw(input_frame), object_id_out)
  103. duration = datetime.datetime.now().timestamp()-start.value
  104. start.value = 0.0
  105. avg_speed.value = (avg_speed.value*9 + duration)/10
  106. class EdgeTPUProcess():
  107. def __init__(self, tf_device=None):
  108. self.detection_queue = mp.Queue()
  109. self.avg_inference_speed = mp.Value('d', 0.01)
  110. self.detection_start = mp.Value('d', 0.0)
  111. self.detect_process = None
  112. self.tf_device = tf_device
  113. self.start_or_restart()
  114. def start_or_restart(self):
  115. self.detection_start.value = 0.0
  116. if (not self.detect_process is None) and self.detect_process.is_alive():
  117. self.detect_process.terminate()
  118. print("Waiting for detection process to exit gracefully...")
  119. self.detect_process.join(timeout=30)
  120. if self.detect_process.exitcode is None:
  121. print("Detection process didnt exit. Force killing...")
  122. self.detect_process.kill()
  123. self.detect_process.join()
  124. self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.avg_inference_speed, self.detection_start, self.tf_device))
  125. self.detect_process.daemon = True
  126. self.detect_process.start()
  127. class RemoteObjectDetector():
  128. def __init__(self, name, labels, detection_queue):
  129. self.labels = load_labels(labels)
  130. self.name = name
  131. self.fps = EventsPerSecond()
  132. self.plasma_client = plasma.connect("/tmp/plasma")
  133. self.detection_queue = detection_queue
  134. def detect(self, tensor_input, threshold=.4):
  135. detections = []
  136. now = f"{self.name}-{str(datetime.datetime.now().timestamp())}"
  137. object_id_frame = plasma.ObjectID(hashlib.sha1(str.encode(now)).digest())
  138. object_id_detections = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{now}")).digest())
  139. self.plasma_client.put(tensor_input, object_id_frame)
  140. self.detection_queue.put(now)
  141. raw_detections = self.plasma_client.get(object_id_detections, timeout_ms=10000)
  142. if raw_detections is plasma.ObjectNotAvailable:
  143. self.plasma_client.delete([object_id_frame])
  144. return detections
  145. for d in raw_detections:
  146. if d[1] < threshold:
  147. break
  148. detections.append((
  149. self.labels[int(d[0])],
  150. float(d[1]),
  151. (d[2], d[3], d[4], d[5])
  152. ))
  153. self.plasma_client.delete([object_id_frame, object_id_detections])
  154. self.fps.update()
  155. return detections