video.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. import os
  2. import time
  3. import datetime
  4. import cv2
  5. import threading
  6. import ctypes
  7. import multiprocessing as mp
  8. import subprocess as sp
  9. import numpy as np
  10. from . util import tonumpyarray, draw_box_with_label
  11. from . object_detection import FramePrepper
  12. from . objects import ObjectCleaner, BestPersonFrame
  13. from . mqtt import MqttObjectPublisher
  14. # Stores 2 seconds worth of frames when motion is detected so they can be used for other threads
  15. class FrameTracker(threading.Thread):
  16. def __init__(self, shared_frame, frame_time, frame_ready, frame_lock, recent_frames):
  17. threading.Thread.__init__(self)
  18. self.shared_frame = shared_frame
  19. self.frame_time = frame_time
  20. self.frame_ready = frame_ready
  21. self.frame_lock = frame_lock
  22. self.recent_frames = recent_frames
  23. def run(self):
  24. frame_time = 0.0
  25. while True:
  26. now = datetime.datetime.now().timestamp()
  27. # wait for a frame
  28. with self.frame_ready:
  29. # if there isnt a frame ready for processing or it is old, wait for a signal
  30. if self.frame_time.value == frame_time or (now - self.frame_time.value) > 0.5:
  31. self.frame_ready.wait()
  32. # lock and make a copy of the frame
  33. with self.frame_lock:
  34. frame = self.shared_frame.copy()
  35. frame_time = self.frame_time.value
  36. # add the frame to recent frames
  37. self.recent_frames[frame_time] = frame
  38. # delete any old frames
  39. stored_frame_times = list(self.recent_frames.keys())
  40. for k in stored_frame_times:
  41. if (now - k) > 2:
  42. del self.recent_frames[k]
  43. def get_frame_shape(source):
  44. # capture a single frame and check the frame shape so the correct array
  45. # size can be allocated in memory
  46. video = cv2.VideoCapture(source)
  47. ret, frame = video.read()
  48. frame_shape = frame.shape
  49. video.release()
  50. return frame_shape
  51. def get_ffmpeg_input(ffmpeg_input):
  52. frigate_vars = {k: v for k, v in os.environ.items() if k.startswith('FRIGATE_')}
  53. return ffmpeg_input.format(**frigate_vars)
  54. class CameraWatchdog(threading.Thread):
  55. def __init__(self, camera):
  56. threading.Thread.__init__(self)
  57. self.camera = camera
  58. def run(self):
  59. while True:
  60. # wait a bit before checking
  61. time.sleep(10)
  62. if (datetime.datetime.now().timestamp() - self.camera.frame_time.value) > 10:
  63. print("last frame is more than 10 seconds old, restarting camera capture...")
  64. self.camera.start_or_restart_capture()
  65. time.sleep(5)
  66. # Thread to read the stdout of the ffmpeg process and update the current frame
  67. class CameraCapture(threading.Thread):
  68. def __init__(self, camera):
  69. threading.Thread.__init__(self)
  70. self.camera = camera
  71. def run(self):
  72. frame_num = 0
  73. while True:
  74. if self.camera.ffmpeg_process.poll() != None:
  75. print("ffmpeg process is not running. exiting capture thread...")
  76. break
  77. raw_image = self.camera.ffmpeg_process.stdout.read(self.camera.frame_size)
  78. if len(raw_image) == 0:
  79. print("ffmpeg didnt return a frame. something is wrong. exiting capture thread...")
  80. break
  81. frame_num += 1
  82. if (frame_num % self.camera.take_frame) != 0:
  83. continue
  84. with self.camera.frame_lock:
  85. self.camera.frame_time.value = datetime.datetime.now().timestamp()
  86. self.camera.current_frame[:] = (
  87. np
  88. .frombuffer(raw_image, np.uint8)
  89. .reshape(self.camera.frame_shape)
  90. )
  91. # Notify with the condition that a new frame is ready
  92. with self.camera.frame_ready:
  93. self.camera.frame_ready.notify_all()
  94. class Camera:
  95. def __init__(self, name, ffmpeg_config, config, prepped_frame_queue, mqtt_client, mqtt_prefix):
  96. self.name = name
  97. self.config = config
  98. self.detected_objects = []
  99. self.recent_frames = {}
  100. self.ffmpeg = config.get('ffmpeg', {})
  101. self.ffmpeg_input = get_ffmpeg_input(self.ffmpeg['input'])
  102. self.ffmpeg_global_args = self.ffmpeg.get('global_args', ffmpeg_config['global_args'])
  103. self.ffmpeg_hwaccel_args = self.ffmpeg.get('hwaccel_args', ffmpeg_config['hwaccel_args'])
  104. self.ffmpeg_input_args = self.ffmpeg.get('input_args', ffmpeg_config['input_args'])
  105. self.ffmpeg_output_args = self.ffmpeg.get('output_args', ffmpeg_config['output_args'])
  106. self.take_frame = self.config.get('take_frame', 1)
  107. self.regions = self.config['regions']
  108. self.frame_shape = get_frame_shape(self.ffmpeg_input)
  109. self.frame_size = self.frame_shape[0] * self.frame_shape[1] * self.frame_shape[2]
  110. self.mqtt_client = mqtt_client
  111. self.mqtt_topic_prefix = '{}/{}'.format(mqtt_prefix, self.name)
  112. # create a numpy array for the current frame in initialize to zeros
  113. self.current_frame = np.zeros(self.frame_shape, np.uint8)
  114. # create shared value for storing the frame_time
  115. self.frame_time = mp.Value('d', 0.0)
  116. # Lock to control access to the frame
  117. self.frame_lock = mp.Lock()
  118. # Condition for notifying that a new frame is ready
  119. self.frame_ready = mp.Condition()
  120. # Condition for notifying that objects were parsed
  121. self.objects_parsed = mp.Condition()
  122. self.ffmpeg_process = None
  123. self.capture_thread = None
  124. # for each region, create a separate thread to resize the region and prep for detection
  125. self.detection_prep_threads = []
  126. for region in self.config['regions']:
  127. # set a default threshold of 0.5 if not defined
  128. if not 'threshold' in region:
  129. region['threshold'] = 0.5
  130. if not isinstance(region['threshold'], float):
  131. print('Threshold is not a float. Setting to 0.5 default.')
  132. region['threshold'] = 0.5
  133. self.detection_prep_threads.append(FramePrepper(
  134. self.name,
  135. self.current_frame,
  136. self.frame_time,
  137. self.frame_ready,
  138. self.frame_lock,
  139. region['size'], region['x_offset'], region['y_offset'], region['threshold'],
  140. prepped_frame_queue
  141. ))
  142. # start a thread to store recent motion frames for processing
  143. self.frame_tracker = FrameTracker(self.current_frame, self.frame_time,
  144. self.frame_ready, self.frame_lock, self.recent_frames)
  145. self.frame_tracker.start()
  146. # start a thread to store the highest scoring recent person frame
  147. self.best_person_frame = BestPersonFrame(self.objects_parsed, self.recent_frames, self.detected_objects)
  148. self.best_person_frame.start()
  149. # start a thread to expire objects from the detected objects list
  150. self.object_cleaner = ObjectCleaner(self.objects_parsed, self.detected_objects)
  151. self.object_cleaner.start()
  152. # start a thread to publish object scores (currently only person)
  153. mqtt_publisher = MqttObjectPublisher(self.mqtt_client, self.mqtt_topic_prefix, self.objects_parsed, self.detected_objects, self.best_person_frame)
  154. mqtt_publisher.start()
  155. # create a watchdog thread for capture process
  156. self.watchdog = CameraWatchdog(self)
  157. # load in the mask for person detection
  158. if 'mask' in self.config:
  159. self.mask = cv2.imread("/config/{}".format(self.config['mask']), cv2.IMREAD_GRAYSCALE)
  160. else:
  161. self.mask = None
  162. if self.mask is None:
  163. self.mask = np.zeros((self.frame_shape[0], self.frame_shape[1], 1), np.uint8)
  164. self.mask[:] = 255
  165. def start_or_restart_capture(self):
  166. if not self.ffmpeg_process is None:
  167. print("Terminating the existing ffmpeg process...")
  168. self.ffmpeg_process.terminate()
  169. try:
  170. print("Waiting for ffmpeg to exit gracefully...")
  171. self.ffmpeg_process.wait(timeout=30)
  172. except sp.TimeoutExpired:
  173. print("FFmpeg didnt exit. Force killing...")
  174. self.ffmpeg_process.kill()
  175. self.ffmpeg_process.wait()
  176. print("Waiting for the capture thread to exit...")
  177. self.capture_thread.join()
  178. self.ffmpeg_process = None
  179. self.capture_thread = None
  180. # create the process to capture frames from the input stream and store in a shared array
  181. print("Creating a new ffmpeg process...")
  182. self.start_ffmpeg()
  183. print("Creating a new capture thread...")
  184. self.capture_thread = CameraCapture(self)
  185. print("Starting a new capture thread...")
  186. self.capture_thread.start()
  187. def start_ffmpeg(self):
  188. ffmpeg_cmd = (['ffmpeg'] +
  189. self.ffmpeg_global_args +
  190. self.ffmpeg_hwaccel_args +
  191. self.ffmpeg_input_args +
  192. ['-i', self.ffmpeg_input] +
  193. self.ffmpeg_output_args +
  194. ['pipe:'])
  195. print(" ".join(ffmpeg_cmd))
  196. self.ffmpeg_process = sp.Popen(ffmpeg_cmd, stdout = sp.PIPE, bufsize=self.frame_size)
  197. def start(self):
  198. self.start_or_restart_capture()
  199. # start the object detection prep threads
  200. for detection_prep_thread in self.detection_prep_threads:
  201. detection_prep_thread.start()
  202. self.watchdog.start()
  203. def join(self):
  204. self.capture_thread.join()
  205. def get_capture_pid(self):
  206. return self.ffmpeg_process.pid
  207. def add_objects(self, objects):
  208. if len(objects) == 0:
  209. return
  210. for obj in objects:
  211. # Store object area to use in bounding box labels
  212. obj['area'] = (obj['xmax']-obj['xmin'])*(obj['ymax']-obj['ymin'])
  213. if obj['name'] == 'person':
  214. # find the matching region
  215. region = None
  216. for r in self.regions:
  217. if (
  218. obj['xmin'] >= r['x_offset'] and
  219. obj['ymin'] >= r['y_offset'] and
  220. obj['xmax'] <= r['x_offset']+r['size'] and
  221. obj['ymax'] <= r['y_offset']+r['size']
  222. ):
  223. region = r
  224. break
  225. # if the min person area is larger than the
  226. # detected person, don't add it to detected objects
  227. if region and 'min_person_area' in region and region['min_person_area'] > obj['area']:
  228. continue
  229. # if the detected person is larger than the
  230. # max person area, don't add it to detected objects
  231. if region and 'max_person_area' in region and region['max_person_area'] < obj['area']:
  232. continue
  233. # compute the coordinates of the person and make sure
  234. # the location isnt outside the bounds of the image (can happen from rounding)
  235. y_location = min(int(obj['ymax']), len(self.mask)-1)
  236. x_location = min(int((obj['xmax']-obj['xmin'])/2.0)+obj['xmin'], len(self.mask[0])-1)
  237. # if the person is in a masked location, continue
  238. if self.mask[y_location][x_location] == [0]:
  239. continue
  240. self.detected_objects.append(obj)
  241. with self.objects_parsed:
  242. self.objects_parsed.notify_all()
  243. def get_best_person(self):
  244. return self.best_person_frame.best_frame
  245. def get_current_frame_with_objects(self):
  246. # make a copy of the current detected objects
  247. detected_objects = self.detected_objects.copy()
  248. # lock and make a copy of the current frame
  249. with self.frame_lock:
  250. frame = self.current_frame.copy()
  251. frame_time = self.frame_time.value
  252. # draw the bounding boxes on the screen
  253. for obj in detected_objects:
  254. label = "{}: {}% {}".format(obj['name'],int(obj['score']*100),int(obj['area']))
  255. draw_box_with_label(frame, obj['xmin'], obj['ymin'], obj['xmax'], obj['ymax'], label)
  256. for region in self.regions:
  257. color = (255,255,255)
  258. cv2.rectangle(frame, (region['x_offset'], region['y_offset']),
  259. (region['x_offset']+region['size'], region['y_offset']+region['size']),
  260. color, 2)
  261. # print a timestamp
  262. time_to_show = datetime.datetime.fromtimestamp(frame_time).strftime("%m/%d/%Y %H:%M:%S")
  263. cv2.putText(frame, time_to_show, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, fontScale=.8, color=(255, 255, 255), thickness=2)
  264. # convert to BGR
  265. frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
  266. return frame