edgetpu.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import os
  2. import datetime
  3. import hashlib
  4. import multiprocessing as mp
  5. import numpy as np
  6. import SharedArray as sa
  7. import pyarrow.plasma as plasma
  8. import tflite_runtime.interpreter as tflite
  9. from tflite_runtime.interpreter import load_delegate
  10. from frigate.util import EventsPerSecond
  11. def load_labels(path, encoding='utf-8'):
  12. """Loads labels from file (with or without index numbers).
  13. Args:
  14. path: path to label file.
  15. encoding: label file encoding.
  16. Returns:
  17. Dictionary mapping indices to labels.
  18. """
  19. with open(path, 'r', encoding=encoding) as f:
  20. lines = f.readlines()
  21. if not lines:
  22. return {}
  23. if lines[0].split(' ', maxsplit=1)[0].isdigit():
  24. pairs = [line.split(' ', maxsplit=1) for line in lines]
  25. return {int(index): label.strip() for index, label in pairs}
  26. else:
  27. return {index: line.strip() for index, line in enumerate(lines)}
  28. class ObjectDetector():
  29. def __init__(self):
  30. edge_tpu_delegate = None
  31. try:
  32. edge_tpu_delegate = load_delegate('libedgetpu.so.1.0')
  33. except ValueError:
  34. print("No EdgeTPU detected. Falling back to CPU.")
  35. if edge_tpu_delegate is None:
  36. self.interpreter = tflite.Interpreter(
  37. model_path='/cpu_model.tflite')
  38. else:
  39. self.interpreter = tflite.Interpreter(
  40. model_path='/edgetpu_model.tflite',
  41. experimental_delegates=[edge_tpu_delegate])
  42. self.interpreter.allocate_tensors()
  43. self.tensor_input_details = self.interpreter.get_input_details()
  44. self.tensor_output_details = self.interpreter.get_output_details()
  45. def detect_raw(self, tensor_input):
  46. self.interpreter.set_tensor(self.tensor_input_details[0]['index'], tensor_input)
  47. self.interpreter.invoke()
  48. boxes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[0]['index']))
  49. label_codes = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[1]['index']))
  50. scores = np.squeeze(self.interpreter.get_tensor(self.tensor_output_details[2]['index']))
  51. detections = np.zeros((20,6), np.float32)
  52. for i, score in enumerate(scores):
  53. detections[i] = [label_codes[i], score, boxes[i][0], boxes[i][1], boxes[i][2], boxes[i][3]]
  54. return detections
  55. def run_detector(detection_queue, avg_speed, start):
  56. print(f"Starting detection process: {os.getpid()}")
  57. plasma_client = plasma.connect("/tmp/plasma")
  58. object_detector = ObjectDetector()
  59. while True:
  60. object_id_str = detection_queue.get()
  61. object_id_hash = hashlib.sha1(str.encode(object_id_str))
  62. object_id = plasma.ObjectID(object_id_hash.digest())
  63. object_id_out = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{object_id_str}")).digest())
  64. input_frame = plasma_client.get(object_id, timeout_ms=0)
  65. if input_frame is plasma.ObjectNotAvailable:
  66. continue
  67. # detect and put the output in the plasma store
  68. start.value = datetime.datetime.now().timestamp()
  69. plasma_client.put(object_detector.detect_raw(input_frame), object_id_out)
  70. duration = datetime.datetime.now().timestamp()-start.value
  71. start.value = 0.0
  72. avg_speed.value = (avg_speed.value*9 + duration)/10
  73. class EdgeTPUProcess():
  74. def __init__(self):
  75. self.detection_queue = mp.Queue()
  76. self.avg_inference_speed = mp.Value('d', 0.01)
  77. self.detection_start = mp.Value('d', 0.0)
  78. self.detect_process = None
  79. self.start_or_restart()
  80. def start_or_restart(self):
  81. self.detection_start.value = 0.0
  82. if (not self.detect_process is None) and self.detect_process.is_alive():
  83. self.detect_process.terminate()
  84. print("Waiting for detection process to exit gracefully...")
  85. self.detect_process.join(timeout=30)
  86. if self.detect_process.exitcode is None:
  87. print("Detection process didnt exit. Force killing...")
  88. self.detect_process.kill()
  89. self.detect_process.join()
  90. self.detect_process = mp.Process(target=run_detector, args=(self.detection_queue, self.avg_inference_speed, self.detection_start))
  91. self.detect_process.daemon = True
  92. self.detect_process.start()
  93. class RemoteObjectDetector():
  94. def __init__(self, name, labels, detection_queue):
  95. self.labels = load_labels(labels)
  96. self.name = name
  97. self.fps = EventsPerSecond()
  98. self.plasma_client = plasma.connect("/tmp/plasma")
  99. self.detection_queue = detection_queue
  100. def detect(self, tensor_input, threshold=.4):
  101. detections = []
  102. now = f"{self.name}-{str(datetime.datetime.now().timestamp())}"
  103. object_id_frame = plasma.ObjectID(hashlib.sha1(str.encode(now)).digest())
  104. object_id_detections = plasma.ObjectID(hashlib.sha1(str.encode(f"out-{now}")).digest())
  105. self.plasma_client.put(tensor_input, object_id_frame)
  106. self.detection_queue.put(now)
  107. raw_detections = self.plasma_client.get(object_id_detections, timeout_ms=10000)
  108. if raw_detections is plasma.ObjectNotAvailable:
  109. self.plasma_client.delete([object_id_frame])
  110. return detections
  111. for d in raw_detections:
  112. if d[1] < threshold:
  113. break
  114. detections.append((
  115. self.labels[int(d[0])],
  116. float(d[1]),
  117. (d[2], d[3], d[4], d[5])
  118. ))
  119. self.plasma_client.delete([object_id_frame, object_id_detections])
  120. self.fps.update()
  121. return detections