video.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. import os
  2. import time
  3. import datetime
  4. import cv2
  5. import queue
  6. import threading
  7. import ctypes
  8. import multiprocessing as mp
  9. import subprocess as sp
  10. import numpy as np
  11. import prctl
  12. import copy
  13. import itertools
  14. from collections import defaultdict
  15. from frigate.util import tonumpyarray, LABELS, draw_box_with_label, calculate_region, EventsPerSecond
  16. from frigate.object_detection import RegionPrepper, RegionRequester
  17. from frigate.objects import ObjectCleaner, BestFrames, DetectedObjectsProcessor, RegionRefiner, ObjectTracker
  18. from frigate.mqtt import MqttObjectPublisher
  19. # Stores 2 seconds worth of frames so they can be used for other threads
  20. class FrameTracker(threading.Thread):
  21. def __init__(self, frame_time, frame_ready, frame_lock, recent_frames):
  22. threading.Thread.__init__(self)
  23. self.frame_time = frame_time
  24. self.frame_ready = frame_ready
  25. self.frame_lock = frame_lock
  26. self.recent_frames = recent_frames
  27. def run(self):
  28. prctl.set_name(self.__class__.__name__)
  29. while True:
  30. # wait for a frame
  31. with self.frame_ready:
  32. self.frame_ready.wait()
  33. # delete any old frames
  34. stored_frame_times = list(self.recent_frames.keys())
  35. stored_frame_times.sort(reverse=True)
  36. if len(stored_frame_times) > 100:
  37. frames_to_delete = stored_frame_times[50:]
  38. for k in frames_to_delete:
  39. del self.recent_frames[k]
  40. def get_frame_shape(source):
  41. # capture a single frame and check the frame shape so the correct array
  42. # size can be allocated in memory
  43. video = cv2.VideoCapture(source)
  44. ret, frame = video.read()
  45. frame_shape = frame.shape
  46. video.release()
  47. return frame_shape
  48. def get_ffmpeg_input(ffmpeg_input):
  49. frigate_vars = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
  50. return ffmpeg_input.format(**frigate_vars)
  51. class CameraWatchdog(threading.Thread):
  52. def __init__(self, camera):
  53. threading.Thread.__init__(self)
  54. self.camera = camera
  55. def run(self):
  56. prctl.set_name(self.__class__.__name__)
  57. while True:
  58. # wait a bit before checking
  59. time.sleep(10)
  60. if self.camera.frame_time.value != 0.0 and (datetime.datetime.now().timestamp() - self.camera.frame_time.value) > 300:
  61. print("last frame is more than 5 minutes old, restarting camera capture...")
  62. self.camera.start_or_restart_capture()
  63. time.sleep(5)
  64. # Thread to read the stdout of the ffmpeg process and update the current frame
  65. class CameraCapture(threading.Thread):
  66. def __init__(self, camera):
  67. threading.Thread.__init__(self)
  68. self.camera = camera
  69. def run(self):
  70. prctl.set_name(self.__class__.__name__)
  71. frame_num = 0
  72. while True:
  73. if self.camera.ffmpeg_process.poll() != None:
  74. print("ffmpeg process is not running. exiting capture thread...")
  75. break
  76. raw_image = self.camera.ffmpeg_process.stdout.read(self.camera.frame_size)
  77. if len(raw_image) == 0:
  78. print("ffmpeg didnt return a frame. something is wrong. exiting capture thread...")
  79. break
  80. frame_num += 1
  81. if (frame_num % self.camera.take_frame) != 0:
  82. continue
  83. with self.camera.frame_lock:
  84. # TODO: use frame_queue instead
  85. self.camera.frame_time.value = datetime.datetime.now().timestamp()
  86. self.camera.frame_cache[self.camera.frame_time.value] = (
  87. np
  88. .frombuffer(raw_image, np.uint8)
  89. .reshape(self.camera.frame_shape)
  90. )
  91. self.camera.frame_queue.put(self.camera.frame_time.value)
  92. # Notify with the condition that a new frame is ready
  93. with self.camera.frame_ready:
  94. self.camera.frame_ready.notify_all()
  95. self.camera.fps.update()
  96. class VideoWriter(threading.Thread):
  97. def __init__(self, camera):
  98. threading.Thread.__init__(self)
  99. self.camera = camera
  100. def run(self):
  101. prctl.set_name(self.__class__.__name__)
  102. while True:
  103. (frame_time, tracked_objects) = self.camera.frame_output_queue.get()
  104. # if len(self.camera.object_tracker.tracked_objects) == 0:
  105. # continue
  106. # f = open(f"/debug/output/{self.camera.name}-{str(format(frame_time, '.8f'))}.jpg", 'wb')
  107. # f.write(self.camera.frame_with_objects(frame_time, tracked_objects))
  108. # f.close()
  109. class Camera:
  110. def __init__(self, name, ffmpeg_config, global_objects_config, config, prepped_frame_queue, mqtt_client, mqtt_prefix):
  111. self.name = name
  112. self.config = config
  113. self.detected_objects = defaultdict(lambda: [])
  114. self.frame_cache = {}
  115. self.last_processed_frame = None
  116. # queue for re-assembling frames in order
  117. self.frame_queue = queue.Queue()
  118. # track how many regions have been requested for a frame so we know when a frame is complete
  119. self.regions_in_process = {}
  120. # Lock to control access
  121. self.regions_in_process_lock = mp.Lock()
  122. self.finished_frame_queue = queue.Queue()
  123. self.refined_frame_queue = queue.Queue()
  124. self.frame_output_queue = queue.Queue()
  125. self.ffmpeg = config.get('ffmpeg', {})
  126. self.ffmpeg_input = get_ffmpeg_input(self.ffmpeg['input'])
  127. self.ffmpeg_global_args = self.ffmpeg.get('global_args', ffmpeg_config['global_args'])
  128. self.ffmpeg_hwaccel_args = self.ffmpeg.get('hwaccel_args', ffmpeg_config['hwaccel_args'])
  129. self.ffmpeg_input_args = self.ffmpeg.get('input_args', ffmpeg_config['input_args'])
  130. self.ffmpeg_output_args = self.ffmpeg.get('output_args', ffmpeg_config['output_args'])
  131. camera_objects_config = config.get('objects', {})
  132. self.take_frame = self.config.get('take_frame', 1)
  133. self.regions = self.config['regions']
  134. self.frame_shape = get_frame_shape(self.ffmpeg_input)
  135. self.frame_size = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2]
  136. self.mqtt_client = mqtt_client
  137. self.mqtt_topic_prefix = '{}/{}'.format(mqtt_prefix, self.name)
  138. # create shared value for storing the frame_time
  139. self.frame_time = mp.Value('d', 0.0)
  140. # Lock to control access to the frame
  141. self.frame_lock = mp.Lock()
  142. # Condition for notifying that a new frame is ready
  143. self.frame_ready = mp.Condition()
  144. # Condition for notifying that objects were tracked
  145. self.objects_tracked = mp.Condition()
  146. # Queue for prepped frames, max size set to (number of regions * 5)
  147. max_queue_size = len(self.config['regions'])*5
  148. self.resize_queue = queue.Queue()
  149. # Queue for raw detected objects
  150. self.detected_objects_queue = queue.Queue()
  151. self.detected_objects_processor = DetectedObjectsProcessor(self)
  152. self.detected_objects_processor.start()
  153. # initialize the frame cache
  154. self.cached_frame_with_objects = {
  155. 'frame_bytes': [],
  156. 'frame_time': 0
  157. }
  158. self.ffmpeg_process = None
  159. self.capture_thread = None
  160. self.fps = EventsPerSecond()
  161. # combine tracked objects lists
  162. self.objects_to_track = set().union(global_objects_config.get('track', ['person', 'car', 'truck']), camera_objects_config.get('track', []))
  163. # merge object filters
  164. global_object_filters = global_objects_config.get('filters', {})
  165. camera_object_filters = camera_objects_config.get('filters', {})
  166. objects_with_config = set().union(global_object_filters.keys(), camera_object_filters.keys())
  167. self.object_filters = {}
  168. for obj in objects_with_config:
  169. self.object_filters[obj] = {**global_object_filters.get(obj, {}), **camera_object_filters.get(obj, {})}
  170. # start a thread to track objects
  171. self.object_tracker = ObjectTracker(self, 10)
  172. self.object_tracker.start()
  173. # start a thread to write tracked frames to disk
  174. self.video_writer = VideoWriter(self)
  175. self.video_writer.start()
  176. # start a thread to queue resize requests for regions
  177. self.region_requester = RegionRequester(self)
  178. self.region_requester.start()
  179. # start a thread to cache recent frames for processing
  180. self.frame_tracker = FrameTracker(self.frame_time,
  181. self.frame_ready, self.frame_lock, self.frame_cache)
  182. self.frame_tracker.start()
  183. # start a thread to resize regions
  184. self.region_prepper = RegionPrepper(self.frame_cache, self.resize_queue, prepped_frame_queue)
  185. self.region_prepper.start()
  186. # start a thread to store the highest scoring recent frames for monitored object types
  187. self.best_frames = BestFrames(self)
  188. self.best_frames.start()
  189. # start a thread to expire objects from the detected objects list
  190. self.object_cleaner = ObjectCleaner(self)
  191. self.object_cleaner.start()
  192. # start a thread to refine regions when objects are clipped
  193. self.dynamic_region_fps = EventsPerSecond()
  194. self.region_refiner = RegionRefiner(self)
  195. self.region_refiner.start()
  196. self.dynamic_region_fps.start()
  197. # start a thread to publish object scores
  198. mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self)
  199. mqtt_publisher.start()
  200. # create a watchdog thread for capture process
  201. self.watchdog = CameraWatchdog(self)
  202. # load in the mask for object detection
  203. if 'mask' in self.config:
  204. self.mask = cv2.imread("/config/{}".format(self.config['mask']), cv2.IMREAD_GRAYSCALE)
  205. else:
  206. self.mask = None
  207. if self.mask is None:
  208. self.mask = np.zeros((self.frame_shape[0], self.frame_shape[1], 1), np.uint8)
  209. self.mask[:] = 255
  210. def start_or_restart_capture(self):
  211. if not self.ffmpeg_process is None:
  212. print("Terminating the existing ffmpeg process...")
  213. self.ffmpeg_process.terminate()
  214. try:
  215. print("Waiting for ffmpeg to exit gracefully...")
  216. self.ffmpeg_process.wait(timeout=30)
  217. except sp.TimeoutExpired:
  218. print("FFmpeg didnt exit. Force killing...")
  219. self.ffmpeg_process.kill()
  220. self.ffmpeg_process.wait()
  221. print("Waiting for the capture thread to exit...")
  222. self.capture_thread.join()
  223. self.ffmpeg_process = None
  224. self.capture_thread = None
  225. # create the process to capture frames from the input stream and store in a shared array
  226. print("Creating a new ffmpeg process...")
  227. self.start_ffmpeg()
  228. print("Creating a new capture thread...")
  229. self.capture_thread = CameraCapture(self)
  230. print("Starting a new capture thread...")
  231. self.capture_thread.start()
  232. self.fps.start()
  233. def start_ffmpeg(self):
  234. ffmpeg_cmd = (['ffmpeg'] +
  235. self.ffmpeg_global_args +
  236. self.ffmpeg_hwaccel_args +
  237. self.ffmpeg_input_args +
  238. ['-i', self.ffmpeg_input] +
  239. self.ffmpeg_output_args +
  240. ['pipe:'])
  241. print(" ".join(ffmpeg_cmd))
  242. self.ffmpeg_process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=self.frame_size)
  243. def start(self):
  244. self.start_or_restart_capture()
  245. self.watchdog.start()
  246. def join(self):
  247. self.capture_thread.join()
  248. def get_capture_pid(self):
  249. return self.ffmpeg_process.pid
  250. def get_best(self, label):
  251. return self.best_frames.best_frames.get(label)
  252. def stats(self):
  253. return {
  254. 'camera_fps': self.fps.eps(60),
  255. 'resize_queue': self.resize_queue.qsize(),
  256. 'frame_queue': self.frame_queue.qsize(),
  257. 'finished_frame_queue': self.finished_frame_queue.qsize(),
  258. 'refined_frame_queue': self.refined_frame_queue.qsize(),
  259. 'regions_in_process': self.regions_in_process,
  260. 'dynamic_regions_per_sec': self.dynamic_region_fps.eps()
  261. }
  262. def frame_with_objects(self, frame_time, tracked_objects=None):
  263. frame = self.frame_cache[frame_time].copy()
  264. detected_objects = self.detected_objects[frame_time].copy()
  265. for region in self.regions:
  266. color = (255,255,255)
  267. cv2.rectangle(frame, (region['x_offset'], region['y_offset']),
  268. (region['x_offset']+region['size'], region['y_offset']+region['size']),
  269. color, 2)
  270. # draw the bounding boxes on the screen
  271. if tracked_objects is None:
  272. tracked_objects = copy.deepcopy(self.object_tracker.tracked_objects)
  273. for obj in detected_objects:
  274. draw_box_with_label(frame, obj['box']['xmin'], obj['box']['ymin'], obj['box']['xmax'], obj['box']['ymax'], obj['name'], f"{int(obj['score']*100)}% {obj['area']}", thickness=3)
  275. for id, obj in tracked_objects.items():
  276. color = (0, 255,0) if obj['frame_time'] == frame_time else (255, 0, 0)
  277. draw_box_with_label(frame, obj['box']['xmin'], obj['box']['ymin'], obj['box']['xmax'], obj['box']['ymax'], obj['name'], f"{int(obj['score']*100)}% {obj['area']} {id}", color=color, thickness=1)
  278. # print a timestamp
  279. time_to_show = datetime.datetime.fromtimestamp(frame_time).strftime("%m/%d/%Y %H:%M:%S")
  280. cv2.putText(frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
  281. # print fps
  282. cv2.putText(frame, str(self.fps.eps())+'FPS', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
  283. # convert to BGR
  284. frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
  285. # encode the image into a jpg
  286. ret, jpg = cv2.imencode('.jpg', frame)
  287. return jpg.tobytes()
  288. def get_current_frame_with_objects(self):
  289. frame_time = self.last_processed_frame
  290. if frame_time == self.cached_frame_with_objects['frame_time']:
  291. return self.cached_frame_with_objects['frame_bytes']
  292. frame_bytes = self.frame_with_objects(frame_time)
  293. self.cached_frame_with_objects = {
  294. 'frame_bytes': frame_bytes,
  295. 'frame_time': frame_time
  296. }
  297. return frame_bytes