video.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725
  1. import datetime
  2. import itertools
  3. import logging
  4. import multiprocessing as mp
  5. import queue
  6. import random
  7. import signal
  8. import subprocess as sp
  9. import threading
  10. import time
  11. from collections import defaultdict
  12. from typing import Dict, List
  13. import numpy as np
  14. from cv2 import cv2, reduce
  15. from setproctitle import setproctitle
  16. from frigate.config import CameraConfig, DetectConfig
  17. from frigate.edgetpu import RemoteObjectDetector
  18. from frigate.log import LogPipe
  19. from frigate.motion import MotionDetector
  20. from frigate.objects import ObjectTracker
  21. from frigate.util import (
  22. EventsPerSecond,
  23. FrameManager,
  24. SharedMemoryFrameManager,
  25. area,
  26. calculate_region,
  27. clipped,
  28. intersection,
  29. intersection_over_union,
  30. listen,
  31. yuv_region_2_rgb,
  32. )
  33. logger = logging.getLogger(__name__)
  34. def filtered(obj, objects_to_track, object_filters):
  35. object_name = obj[0]
  36. if not object_name in objects_to_track:
  37. return True
  38. if object_name in object_filters:
  39. obj_settings = object_filters[object_name]
  40. # if the min area is larger than the
  41. # detected object, don't add it to detected objects
  42. if obj_settings.min_area > obj[3]:
  43. return True
  44. # if the detected object is larger than the
  45. # max area, don't add it to detected objects
  46. if obj_settings.max_area < obj[3]:
  47. return True
  48. # if the score is lower than the min_score, skip
  49. if obj_settings.min_score > obj[1]:
  50. return True
  51. if not obj_settings.mask is None:
  52. # compute the coordinates of the object and make sure
  53. # the location isnt outside the bounds of the image (can happen from rounding)
  54. y_location = min(int(obj[2][3]), len(obj_settings.mask) - 1)
  55. x_location = min(
  56. int((obj[2][2] - obj[2][0]) / 2.0) + obj[2][0],
  57. len(obj_settings.mask[0]) - 1,
  58. )
  59. # if the object is in a masked location, don't add it to detected objects
  60. if obj_settings.mask[y_location][x_location] == 0:
  61. return True
  62. return False
  63. def create_tensor_input(frame, model_shape, region):
  64. cropped_frame = yuv_region_2_rgb(frame, region)
  65. # Resize to 300x300 if needed
  66. if cropped_frame.shape != (model_shape[0], model_shape[1], 3):
  67. cropped_frame = cv2.resize(
  68. cropped_frame, dsize=model_shape, interpolation=cv2.INTER_LINEAR
  69. )
  70. # Expand dimensions since the model expects images to have shape: [1, height, width, 3]
  71. return np.expand_dims(cropped_frame, axis=0)
  72. def stop_ffmpeg(ffmpeg_process, logger):
  73. logger.info("Terminating the existing ffmpeg process...")
  74. ffmpeg_process.terminate()
  75. try:
  76. logger.info("Waiting for ffmpeg to exit gracefully...")
  77. ffmpeg_process.communicate(timeout=30)
  78. except sp.TimeoutExpired:
  79. logger.info("FFmpeg didnt exit. Force killing...")
  80. ffmpeg_process.kill()
  81. ffmpeg_process.communicate()
  82. ffmpeg_process = None
  83. def start_or_restart_ffmpeg(
  84. ffmpeg_cmd, logger, logpipe: LogPipe, frame_size=None, ffmpeg_process=None
  85. ):
  86. if ffmpeg_process is not None:
  87. stop_ffmpeg(ffmpeg_process, logger)
  88. if frame_size is None:
  89. process = sp.Popen(
  90. ffmpeg_cmd,
  91. stdout=sp.DEVNULL,
  92. stderr=logpipe,
  93. stdin=sp.DEVNULL,
  94. start_new_session=True,
  95. )
  96. else:
  97. process = sp.Popen(
  98. ffmpeg_cmd,
  99. stdout=sp.PIPE,
  100. stderr=logpipe,
  101. stdin=sp.DEVNULL,
  102. bufsize=frame_size * 10,
  103. start_new_session=True,
  104. )
  105. return process
  106. def capture_frames(
  107. ffmpeg_process,
  108. camera_name,
  109. frame_shape,
  110. frame_manager: FrameManager,
  111. frame_queue,
  112. fps: mp.Value,
  113. skipped_fps: mp.Value,
  114. current_frame: mp.Value,
  115. ):
  116. frame_size = frame_shape[0] * frame_shape[1]
  117. frame_rate = EventsPerSecond()
  118. frame_rate.start()
  119. skipped_eps = EventsPerSecond()
  120. skipped_eps.start()
  121. while True:
  122. fps.value = frame_rate.eps()
  123. skipped_fps = skipped_eps.eps()
  124. current_frame.value = datetime.datetime.now().timestamp()
  125. frame_name = f"{camera_name}{current_frame.value}"
  126. frame_buffer = frame_manager.create(frame_name, frame_size)
  127. try:
  128. frame_buffer[:] = ffmpeg_process.stdout.read(frame_size)
  129. except Exception as e:
  130. logger.error(f"{camera_name}: Unable to read frames from ffmpeg process.")
  131. if ffmpeg_process.poll() != None:
  132. logger.error(
  133. f"{camera_name}: ffmpeg process is not running. exiting capture thread..."
  134. )
  135. frame_manager.delete(frame_name)
  136. break
  137. continue
  138. frame_rate.update()
  139. # if the queue is full, skip this frame
  140. if frame_queue.full():
  141. skipped_eps.update()
  142. frame_manager.delete(frame_name)
  143. continue
  144. # close the frame
  145. frame_manager.close(frame_name)
  146. # add to the queue
  147. frame_queue.put(current_frame.value)
  148. class CameraWatchdog(threading.Thread):
  149. def __init__(
  150. self, camera_name, config, frame_queue, camera_fps, ffmpeg_pid, stop_event
  151. ):
  152. threading.Thread.__init__(self)
  153. self.logger = logging.getLogger(f"watchdog.{camera_name}")
  154. self.camera_name = camera_name
  155. self.config = config
  156. self.capture_thread = None
  157. self.ffmpeg_detect_process = None
  158. self.logpipe = LogPipe(f"ffmpeg.{self.camera_name}.detect", logging.ERROR)
  159. self.ffmpeg_other_processes = []
  160. self.camera_fps = camera_fps
  161. self.ffmpeg_pid = ffmpeg_pid
  162. self.frame_queue = frame_queue
  163. self.frame_shape = self.config.frame_shape_yuv
  164. self.frame_size = self.frame_shape[0] * self.frame_shape[1]
  165. self.stop_event = stop_event
  166. def run(self):
  167. self.start_ffmpeg_detect()
  168. for c in self.config.ffmpeg_cmds:
  169. if "detect" in c["roles"]:
  170. continue
  171. logpipe = LogPipe(
  172. f"ffmpeg.{self.camera_name}.{'_'.join(sorted(c['roles']))}",
  173. logging.ERROR,
  174. )
  175. self.ffmpeg_other_processes.append(
  176. {
  177. "cmd": c["cmd"],
  178. "logpipe": logpipe,
  179. "process": start_or_restart_ffmpeg(c["cmd"], self.logger, logpipe),
  180. }
  181. )
  182. time.sleep(10)
  183. while not self.stop_event.wait(10):
  184. now = datetime.datetime.now().timestamp()
  185. if not self.capture_thread.is_alive():
  186. self.logger.error(
  187. f"Ffmpeg process crashed unexpectedly for {self.camera_name}."
  188. )
  189. self.logger.error(
  190. "The following ffmpeg logs include the last 100 lines prior to exit."
  191. )
  192. self.logpipe.dump()
  193. self.start_ffmpeg_detect()
  194. elif now - self.capture_thread.current_frame.value > 20:
  195. self.logger.info(
  196. f"No frames received from {self.camera_name} in 20 seconds. Exiting ffmpeg..."
  197. )
  198. self.ffmpeg_detect_process.terminate()
  199. try:
  200. self.logger.info("Waiting for ffmpeg to exit gracefully...")
  201. self.ffmpeg_detect_process.communicate(timeout=30)
  202. except sp.TimeoutExpired:
  203. self.logger.info("FFmpeg didnt exit. Force killing...")
  204. self.ffmpeg_detect_process.kill()
  205. self.ffmpeg_detect_process.communicate()
  206. for p in self.ffmpeg_other_processes:
  207. poll = p["process"].poll()
  208. if poll is None:
  209. continue
  210. p["logpipe"].dump()
  211. p["process"] = start_or_restart_ffmpeg(
  212. p["cmd"], self.logger, p["logpipe"], ffmpeg_process=p["process"]
  213. )
  214. stop_ffmpeg(self.ffmpeg_detect_process, self.logger)
  215. for p in self.ffmpeg_other_processes:
  216. stop_ffmpeg(p["process"], self.logger)
  217. p["logpipe"].close()
  218. self.logpipe.close()
  219. def start_ffmpeg_detect(self):
  220. ffmpeg_cmd = [
  221. c["cmd"] for c in self.config.ffmpeg_cmds if "detect" in c["roles"]
  222. ][0]
  223. self.ffmpeg_detect_process = start_or_restart_ffmpeg(
  224. ffmpeg_cmd, self.logger, self.logpipe, self.frame_size
  225. )
  226. self.ffmpeg_pid.value = self.ffmpeg_detect_process.pid
  227. self.capture_thread = CameraCapture(
  228. self.camera_name,
  229. self.ffmpeg_detect_process,
  230. self.frame_shape,
  231. self.frame_queue,
  232. self.camera_fps,
  233. )
  234. self.capture_thread.start()
  235. class CameraCapture(threading.Thread):
  236. def __init__(self, camera_name, ffmpeg_process, frame_shape, frame_queue, fps):
  237. threading.Thread.__init__(self)
  238. self.name = f"capture:{camera_name}"
  239. self.camera_name = camera_name
  240. self.frame_shape = frame_shape
  241. self.frame_queue = frame_queue
  242. self.fps = fps
  243. self.skipped_fps = EventsPerSecond()
  244. self.frame_manager = SharedMemoryFrameManager()
  245. self.ffmpeg_process = ffmpeg_process
  246. self.current_frame = mp.Value("d", 0.0)
  247. self.last_frame = 0
  248. def run(self):
  249. self.skipped_fps.start()
  250. capture_frames(
  251. self.ffmpeg_process,
  252. self.camera_name,
  253. self.frame_shape,
  254. self.frame_manager,
  255. self.frame_queue,
  256. self.fps,
  257. self.skipped_fps,
  258. self.current_frame,
  259. )
  260. def capture_camera(name, config: CameraConfig, process_info):
  261. stop_event = mp.Event()
  262. def receiveSignal(signalNumber, frame):
  263. stop_event.set()
  264. signal.signal(signal.SIGTERM, receiveSignal)
  265. signal.signal(signal.SIGINT, receiveSignal)
  266. frame_queue = process_info["frame_queue"]
  267. camera_watchdog = CameraWatchdog(
  268. name,
  269. config,
  270. frame_queue,
  271. process_info["camera_fps"],
  272. process_info["ffmpeg_pid"],
  273. stop_event,
  274. )
  275. camera_watchdog.start()
  276. camera_watchdog.join()
  277. def track_camera(
  278. name,
  279. config: CameraConfig,
  280. model_shape,
  281. labelmap,
  282. detection_queue,
  283. result_connection,
  284. detected_objects_queue,
  285. process_info,
  286. ):
  287. stop_event = mp.Event()
  288. def receiveSignal(signalNumber, frame):
  289. stop_event.set()
  290. signal.signal(signal.SIGTERM, receiveSignal)
  291. signal.signal(signal.SIGINT, receiveSignal)
  292. threading.current_thread().name = f"process:{name}"
  293. setproctitle(f"frigate.process:{name}")
  294. listen()
  295. frame_queue = process_info["frame_queue"]
  296. detection_enabled = process_info["detection_enabled"]
  297. frame_shape = config.frame_shape
  298. objects_to_track = config.objects.track
  299. object_filters = config.objects.filters
  300. motion_detector = MotionDetector(frame_shape, config.motion)
  301. object_detector = RemoteObjectDetector(
  302. name, labelmap, detection_queue, result_connection, model_shape
  303. )
  304. object_tracker = ObjectTracker(config.detect)
  305. frame_manager = SharedMemoryFrameManager()
  306. process_frames(
  307. name,
  308. frame_queue,
  309. frame_shape,
  310. model_shape,
  311. config.detect,
  312. frame_manager,
  313. motion_detector,
  314. object_detector,
  315. object_tracker,
  316. detected_objects_queue,
  317. process_info,
  318. objects_to_track,
  319. object_filters,
  320. detection_enabled,
  321. stop_event,
  322. )
  323. logger.info(f"{name}: exiting subprocess")
  324. def box_overlaps(b1, b2):
  325. if b1[2] < b2[0] or b1[0] > b2[2] or b1[1] > b2[3] or b1[3] < b2[1]:
  326. return False
  327. return True
  328. def reduce_boxes(boxes, iou_threshold=0.0):
  329. clusters = []
  330. for box in boxes:
  331. matched = 0
  332. for cluster in clusters:
  333. if intersection_over_union(box, cluster) > iou_threshold:
  334. matched = 1
  335. cluster[0] = min(cluster[0], box[0])
  336. cluster[1] = min(cluster[1], box[1])
  337. cluster[2] = max(cluster[2], box[2])
  338. cluster[3] = max(cluster[3], box[3])
  339. if not matched:
  340. clusters.append(list(box))
  341. return [tuple(c) for c in clusters]
  342. def intersects_any(box_a, boxes):
  343. for box in boxes:
  344. if box_overlaps(box_a, box):
  345. return True
  346. return False
  347. def detect(
  348. object_detector, frame, model_shape, region, objects_to_track, object_filters
  349. ):
  350. tensor_input = create_tensor_input(frame, model_shape, region)
  351. detections = []
  352. region_detections = object_detector.detect(tensor_input)
  353. for d in region_detections:
  354. box = d[2]
  355. size = region[2] - region[0]
  356. x_min = int((box[1] * size) + region[0])
  357. y_min = int((box[0] * size) + region[1])
  358. x_max = int((box[3] * size) + region[0])
  359. y_max = int((box[2] * size) + region[1])
  360. det = (
  361. d[0],
  362. d[1],
  363. (x_min, y_min, x_max, y_max),
  364. (x_max - x_min) * (y_max - y_min),
  365. region,
  366. )
  367. # apply object filters
  368. if filtered(det, objects_to_track, object_filters):
  369. continue
  370. detections.append(det)
  371. return detections
  372. def process_frames(
  373. camera_name: str,
  374. frame_queue: mp.Queue,
  375. frame_shape,
  376. model_shape,
  377. detect_config: DetectConfig,
  378. frame_manager: FrameManager,
  379. motion_detector: MotionDetector,
  380. object_detector: RemoteObjectDetector,
  381. object_tracker: ObjectTracker,
  382. detected_objects_queue: mp.Queue,
  383. process_info: Dict,
  384. objects_to_track: List[str],
  385. object_filters,
  386. detection_enabled: mp.Value,
  387. stop_event,
  388. exit_on_empty: bool = False,
  389. ):
  390. fps = process_info["process_fps"]
  391. detection_fps = process_info["detection_fps"]
  392. current_frame_time = process_info["detection_frame"]
  393. fps_tracker = EventsPerSecond()
  394. fps_tracker.start()
  395. startup_scan_counter = 0
  396. while not stop_event.is_set():
  397. if exit_on_empty and frame_queue.empty():
  398. logger.info(f"Exiting track_objects...")
  399. break
  400. try:
  401. frame_time = frame_queue.get(True, 10)
  402. except queue.Empty:
  403. continue
  404. current_frame_time.value = frame_time
  405. frame = frame_manager.get(
  406. f"{camera_name}{frame_time}", (frame_shape[0] * 3 // 2, frame_shape[1])
  407. )
  408. if frame is None:
  409. logger.info(f"{camera_name}: frame {frame_time} is not in memory store.")
  410. continue
  411. # look for motion
  412. motion_boxes = motion_detector.detect(frame)
  413. regions = []
  414. # if detection is disabled
  415. if not detection_enabled.value:
  416. object_tracker.match_and_update(frame_time, [])
  417. else:
  418. # get stationary object ids
  419. # check every Nth frame for stationary objects
  420. # disappeared objects are not stationary
  421. # also check for overlapping motion boxes
  422. stationary_object_ids = [
  423. obj["id"]
  424. for obj in object_tracker.tracked_objects.values()
  425. # if there hasn't been motion for 10 frames
  426. if obj["motionless_count"] >= 10
  427. # and it isn't due for a periodic check
  428. and (
  429. detect_config.stationary.interval == 0
  430. or obj["motionless_count"] % detect_config.stationary.interval != 0
  431. )
  432. # and it hasn't disappeared
  433. and object_tracker.disappeared[obj["id"]] == 0
  434. # and it doesn't overlap with any current motion boxes
  435. and not intersects_any(obj["box"], motion_boxes)
  436. ]
  437. # get tracked object boxes that aren't stationary
  438. tracked_object_boxes = [
  439. obj["box"]
  440. for obj in object_tracker.tracked_objects.values()
  441. if not obj["id"] in stationary_object_ids
  442. ]
  443. # combine motion boxes with known locations of existing objects
  444. combined_boxes = reduce_boxes(motion_boxes + tracked_object_boxes)
  445. region_min_size = max(model_shape[0], model_shape[1])
  446. # compute regions
  447. regions = [
  448. calculate_region(
  449. frame_shape,
  450. a[0],
  451. a[1],
  452. a[2],
  453. a[3],
  454. region_min_size,
  455. multiplier=random.uniform(1.2, 1.5),
  456. )
  457. for a in combined_boxes
  458. ]
  459. # consolidate regions with heavy overlap
  460. regions = [
  461. calculate_region(
  462. frame_shape, a[0], a[1], a[2], a[3], region_min_size, multiplier=1.0
  463. )
  464. for a in reduce_boxes(regions, 0.4)
  465. ]
  466. # if starting up, get the next startup scan region
  467. if startup_scan_counter < 9:
  468. ymin = int(frame_shape[0] / 3 * startup_scan_counter / 3)
  469. ymax = int(frame_shape[0] / 3 + ymin)
  470. xmin = int(frame_shape[1] / 3 * startup_scan_counter / 3)
  471. xmax = int(frame_shape[1] / 3 + xmin)
  472. regions.append(
  473. calculate_region(
  474. frame_shape,
  475. xmin,
  476. ymin,
  477. xmax,
  478. ymax,
  479. region_min_size,
  480. multiplier=1.2,
  481. )
  482. )
  483. startup_scan_counter += 1
  484. # resize regions and detect
  485. # seed with stationary objects
  486. detections = [
  487. (
  488. obj["label"],
  489. obj["score"],
  490. obj["box"],
  491. obj["area"],
  492. obj["region"],
  493. )
  494. for obj in object_tracker.tracked_objects.values()
  495. if obj["id"] in stationary_object_ids
  496. ]
  497. for region in regions:
  498. detections.extend(
  499. detect(
  500. object_detector,
  501. frame,
  502. model_shape,
  503. region,
  504. objects_to_track,
  505. object_filters,
  506. )
  507. )
  508. #########
  509. # merge objects, check for clipped objects and look again up to 4 times
  510. #########
  511. refining = len(regions) > 0
  512. refine_count = 0
  513. while refining and refine_count < 4:
  514. refining = False
  515. # group by name
  516. detected_object_groups = defaultdict(lambda: [])
  517. for detection in detections:
  518. detected_object_groups[detection[0]].append(detection)
  519. selected_objects = []
  520. for group in detected_object_groups.values():
  521. # apply non-maxima suppression to suppress weak, overlapping bounding boxes
  522. boxes = [
  523. (o[2][0], o[2][1], o[2][2] - o[2][0], o[2][3] - o[2][1])
  524. for o in group
  525. ]
  526. confidences = [o[1] for o in group]
  527. idxs = cv2.dnn.NMSBoxes(boxes, confidences, 0.5, 0.4)
  528. for index in idxs:
  529. obj = group[index[0]]
  530. if clipped(obj, frame_shape):
  531. box = obj[2]
  532. # calculate a new region that will hopefully get the entire object
  533. region = calculate_region(
  534. frame_shape,
  535. box[0],
  536. box[1],
  537. box[2],
  538. box[3],
  539. region_min_size,
  540. )
  541. regions.append(region)
  542. selected_objects.extend(
  543. detect(
  544. object_detector,
  545. frame,
  546. model_shape,
  547. region,
  548. objects_to_track,
  549. object_filters,
  550. )
  551. )
  552. refining = True
  553. else:
  554. selected_objects.append(obj)
  555. # set the detections list to only include top, complete objects
  556. # and new detections
  557. detections = selected_objects
  558. if refining:
  559. refine_count += 1
  560. ## drop detections that overlap too much
  561. consolidated_detections = []
  562. # if detection was run on this frame, consolidate
  563. if len(regions) > 0:
  564. # group by name
  565. detected_object_groups = defaultdict(lambda: [])
  566. for detection in detections:
  567. detected_object_groups[detection[0]].append(detection)
  568. # loop over detections grouped by label
  569. for group in detected_object_groups.values():
  570. # if the group only has 1 item, skip
  571. if len(group) == 1:
  572. consolidated_detections.append(group[0])
  573. continue
  574. # sort smallest to largest by area
  575. sorted_by_area = sorted(group, key=lambda g: g[3])
  576. for current_detection_idx in range(0, len(sorted_by_area)):
  577. current_detection = sorted_by_area[current_detection_idx][2]
  578. overlap = 0
  579. for to_check_idx in range(
  580. min(current_detection_idx + 1, len(sorted_by_area)),
  581. len(sorted_by_area),
  582. ):
  583. to_check = sorted_by_area[to_check_idx][2]
  584. # if 90% of smaller detection is inside of another detection, consolidate
  585. if (
  586. area(intersection(current_detection, to_check))
  587. / area(current_detection)
  588. > 0.9
  589. ):
  590. overlap = 1
  591. break
  592. if overlap == 0:
  593. consolidated_detections.append(
  594. sorted_by_area[current_detection_idx]
  595. )
  596. # now that we have refined our detections, we need to track objects
  597. object_tracker.match_and_update(frame_time, consolidated_detections)
  598. # else, just update the frame times for the stationary objects
  599. else:
  600. object_tracker.update_frame_times(frame_time)
  601. # add to the queue if not full
  602. if detected_objects_queue.full():
  603. frame_manager.delete(f"{camera_name}{frame_time}")
  604. continue
  605. else:
  606. fps_tracker.update()
  607. fps.value = fps_tracker.eps()
  608. detected_objects_queue.put(
  609. (
  610. camera_name,
  611. frame_time,
  612. object_tracker.tracked_objects,
  613. motion_boxes,
  614. regions,
  615. )
  616. )
  617. detection_fps.value = object_detector.fps.eps()
  618. frame_manager.close(f"{camera_name}{frame_time}")