edgetpu.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. import os
  2. import datetime
  3. import hashlib
  4. import multiprocessing as mp
  5. import numpy as np
  6. import SharedArray as sa
  7. import pyarrow.plasma as plasma
  8. import tflite_runtime.interpreter as tflite
  9. from tflite_runtime.interpreter import load_delegate
  10. from frigate.util import EventsPerSecond
  11. def load_labels(path, encoding='utf-8'):
  12. """Loads labels from file (with or without index numbers).
  13. Args:
  14. path: path to label file.
  15. encoding: label file encoding.
  16. Returns:
  17. Dictionary mapping indices to labels.
  18. """
  19. with open(path, 'r', encoding=encoding) as f:
  20. lines = f.readlines()
  21. if not lines:
  22. return {}
  23. if lines[0].split(' ', maxsplit=1)[0].isdigit():
  24. pairs = [line.split(' ', maxsplit=1) for line in lines]
  25. return {int(index): label.strip() for index, label in pairs}
  26. else:
  27. return {index: line.strip() for index, line in enumerate(lines)}
  28. class ObjectDetector():
  29. def __init__(self):
  30. edge_tpu_delegate = None
  31. try:
  32. edge_tpu_delegate = load_delegate('libedgetpu.so.1.0')
  33. except ValueError:
  34. print("No EdgeTPU detected. Falling back to CPU.")
  35. if edge_tpu_delegate is None:
  36. self.interpreter = tflite.Interpreter(
  37. model_path='/cpu_model.tflite')
  38. else:
  39. self.interpreter = tflite.Interpreter(
  40. model_path='/edgetpu_model.tflite',
  41. experimental_delegates=[edge_tpu_delegate])
  42. self.interpreter.allocate_tensors()
  43. self.tensor_input_details = self.interpreter.get_input_details()
  44. self.tensor_output_details = self.interpreter.get_output_details()
  45. def detect_raw(self, tensor_input):
  46. self.interpreter.set_tensor(self.tensor_input_details[0]['index'], tensor_input)
  47. self.interpreter.invoke()
  48. boxes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[0]['index']))
  49. label_codes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[1]['index']))
  50. scores = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[2]['index']))
  51. detections = np.zeros((20,6), np.float32)
  52. for i, score in enumerate(scores):
  53. detections[i] = [label_codes[i], score, boxes[i][0], boxes[i][1], boxes[i][2], boxes[i][3]]
  54. return detections
  55. def run_detector(detection_queue, avg_speed, start):
  56. print(f"Starting detection process: {os.getpid()}")
  57. plasma_client = plasma.connect("/tmp/plasma")
  58. object_detector = ObjectDetector()
  59. while True:
  60. object_id_str = detection_queue.get()
  61. object_id_hash = hashlib.sha1(str.encode(object_id_str))
  62. object_id = plasma.ObjectID(object_id_hash.digest())
  63. input_frame = plasma_client.get(object_id, timeout_ms=0)
  64. start.value = datetime.datetime.now().timestamp()
  65. # detect and put the output in the plasma store
  66. object_id_out = hashlib.sha1(str.encode(f"out-{object_id_str}")).digest()
  67. plasma_client.put(object_detector.detect_raw(input_frame), plasma.ObjectID(object_id_out))
  68. duration = datetime.datetime.now().timestamp()-start.value
  69. start.value = 0.0
  70. avg_speed.value = (avg_speed.value*9 + duration)/10
  71. class EdgeTPUProcess():
  72. def __init__(self):
  73. self.detection_queue = mp.Queue()
  74. self.avg_inference_speed = mp.Value('d', 0.01)
  75. self.detection_start = mp.Value('d', 0.0)
  76. self.detect_process = None
  77. self.start_or_restart()
  78. def start_or_restart(self):
  79. self.detection_start.value = 0.0
  80. if (not self.detect_process is None) and self.detect_process.is_alive():
  81. self.detect_process.terminate()
  82. print("Waiting for detection process to exit gracefully...")
  83. self.detect_process.join(timeout=30)
  84. if self.detect_process.exitcode is None:
  85. print("Detection process didnt exit. Force killing...")
  86. self.detect_process.kill()
  87. self.detect_process.join()
  88. self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.avg_inference_speed, self.detection_start))
  89. self.detect_process.daemon = True
  90. self.detect_process.start()
  91. class RemoteObjectDetector():
  92. def __init__(self, name, labels, detection_queue):
  93. self.labels = load_labels(labels)
  94. self.name = name
  95. self.fps = EventsPerSecond()
  96. self.plasma_client = plasma.connect("/tmp/plasma")
  97. self.detection_queue = detection_queue
  98. def detect(self, tensor_input, threshold=.4):
  99. detections = []
  100. now = f"{self.name}-{str(datetime.datetime.now().timestamp())}"
  101. object_id_frame = plasma.ObjectID(hashlib.sha1(str.encode(now)).digest())
  102. object_id_detections = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{now}")).digest())
  103. self.plasma_client.put(tensor_input, object_id_frame)
  104. self.detection_queue.put(now)
  105. raw_detections = self.plasma_client.get(object_id_detections)
  106. for d in raw_detections:
  107. if d[1] < threshold:
  108. break
  109. detections.append((
  110. self.labels[int(d[0])],
  111. float(d[1]),
  112. (d[2], d[3], d[4], d[5])
  113. ))
  114. self.plasma_client.delete([object_id_frame, object_id_detections])
  115. self.fps.update()
  116. return detections