record.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. import datetime
  2. import itertools
  3. import logging
  4. import multiprocessing as mp
  5. import os
  6. import queue
  7. import random
  8. import shutil
  9. import string
  10. import subprocess as sp
  11. import threading
  12. import time
  13. from collections import defaultdict
  14. from pathlib import Path
  15. import psutil
  16. from peewee import JOIN, DoesNotExist
  17. from frigate.config import RetainModeEnum, FrigateConfig
  18. from frigate.const import CACHE_DIR, RECORD_DIR
  19. from frigate.models import Event, Recordings
  20. from frigate.util import area
  21. logger = logging.getLogger(__name__)
  22. SECONDS_IN_DAY = 60 * 60 * 24
  23. def remove_empty_directories(directory):
  24. # list all directories recursively and sort them by path,
  25. # longest first
  26. paths = sorted(
  27. [x[0] for x in os.walk(RECORD_DIR)],
  28. key=lambda p: len(str(p)),
  29. reverse=True,
  30. )
  31. for path in paths:
  32. # don't delete the parent
  33. if path == RECORD_DIR:
  34. continue
  35. if len(os.listdir(path)) == 0:
  36. os.rmdir(path)
  37. class RecordingMaintainer(threading.Thread):
  38. def __init__(
  39. self, config: FrigateConfig, recordings_info_queue: mp.Queue, stop_event
  40. ):
  41. threading.Thread.__init__(self)
  42. self.name = "recording_maint"
  43. self.config = config
  44. self.recordings_info_queue = recordings_info_queue
  45. self.stop_event = stop_event
  46. self.first_pass = True
  47. self.recordings_info = defaultdict(list)
  48. self.end_time_cache = {}
  49. def move_files(self):
  50. cache_files = sorted(
  51. [
  52. d
  53. for d in os.listdir(CACHE_DIR)
  54. if os.path.isfile(os.path.join(CACHE_DIR, d))
  55. and d.endswith(".mp4")
  56. and not d.startswith("clip_")
  57. ]
  58. )
  59. files_in_use = []
  60. for process in psutil.process_iter():
  61. try:
  62. if process.name() != "ffmpeg":
  63. continue
  64. flist = process.open_files()
  65. if flist:
  66. for nt in flist:
  67. if nt.path.startswith(CACHE_DIR):
  68. files_in_use.append(nt.path.split("/")[-1])
  69. except:
  70. continue
  71. # group recordings by camera
  72. grouped_recordings = defaultdict(list)
  73. for f in cache_files:
  74. # Skip files currently in use
  75. if f in files_in_use:
  76. continue
  77. cache_path = os.path.join(CACHE_DIR, f)
  78. basename = os.path.splitext(f)[0]
  79. camera, date = basename.rsplit("-", maxsplit=1)
  80. start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S")
  81. grouped_recordings[camera].append(
  82. {
  83. "cache_path": cache_path,
  84. "start_time": start_time,
  85. }
  86. )
  87. # delete all cached files past the most recent 5
  88. keep_count = 5
  89. for camera in grouped_recordings.keys():
  90. if len(grouped_recordings[camera]) > keep_count:
  91. to_remove = grouped_recordings[camera][:-keep_count]
  92. for f in to_remove:
  93. Path(f["cache_path"]).unlink(missing_ok=True)
  94. self.end_time_cache.pop(f["cache_path"], None)
  95. grouped_recordings[camera] = grouped_recordings[camera][-keep_count:]
  96. for camera, recordings in grouped_recordings.items():
  97. # clear out all the recording info for old frames
  98. while (
  99. len(self.recordings_info[camera]) > 0
  100. and self.recordings_info[camera][0][0]
  101. < recordings[0]["start_time"].timestamp()
  102. ):
  103. self.recordings_info[camera].pop(0)
  104. # get all events with the end time after the start of the oldest cache file
  105. # or with end_time None
  106. events: Event = (
  107. Event.select()
  108. .where(
  109. Event.camera == camera,
  110. (Event.end_time == None)
  111. | (Event.end_time >= recordings[0]["start_time"].timestamp()),
  112. Event.has_clip,
  113. )
  114. .order_by(Event.start_time)
  115. )
  116. for r in recordings:
  117. cache_path = r["cache_path"]
  118. start_time = r["start_time"]
  119. # Just delete files if recordings are turned off
  120. if (
  121. not camera in self.config.cameras
  122. or not self.config.cameras[camera].record.enabled
  123. ):
  124. Path(cache_path).unlink(missing_ok=True)
  125. self.end_time_cache.pop(cache_path, None)
  126. continue
  127. if cache_path in self.end_time_cache:
  128. end_time, duration = self.end_time_cache[cache_path]
  129. else:
  130. ffprobe_cmd = [
  131. "ffprobe",
  132. "-v",
  133. "error",
  134. "-show_entries",
  135. "format=duration",
  136. "-of",
  137. "default=noprint_wrappers=1:nokey=1",
  138. f"{cache_path}",
  139. ]
  140. p = sp.run(ffprobe_cmd, capture_output=True)
  141. if p.returncode == 0:
  142. duration = float(p.stdout.decode().strip())
  143. end_time = start_time + datetime.timedelta(seconds=duration)
  144. self.end_time_cache[cache_path] = (end_time, duration)
  145. else:
  146. logger.warning(f"Discarding a corrupt recording segment: {f}")
  147. Path(cache_path).unlink(missing_ok=True)
  148. continue
  149. # if cached file's start_time is earlier than the retain days for the camera
  150. if start_time <= (
  151. (
  152. datetime.datetime.now()
  153. - datetime.timedelta(
  154. days=self.config.cameras[camera].record.retain.days
  155. )
  156. )
  157. ):
  158. # if the cached segment overlaps with the events:
  159. overlaps = False
  160. for event in events:
  161. # if the event starts in the future, stop checking events
  162. # and remove this segment
  163. if event.start_time > end_time.timestamp():
  164. overlaps = False
  165. Path(cache_path).unlink(missing_ok=True)
  166. self.end_time_cache.pop(cache_path, None)
  167. break
  168. # if the event is in progress or ends after the recording starts, keep it
  169. # and stop looking at events
  170. if (
  171. event.end_time is None
  172. or event.end_time >= start_time.timestamp()
  173. ):
  174. overlaps = True
  175. break
  176. if overlaps:
  177. record_mode = self.config.cameras[
  178. camera
  179. ].record.events.retain.mode
  180. # move from cache to recordings immediately
  181. self.store_segment(
  182. camera,
  183. start_time,
  184. end_time,
  185. duration,
  186. cache_path,
  187. record_mode,
  188. )
  189. # else retain days includes this segment
  190. else:
  191. record_mode = self.config.cameras[camera].record.retain.mode
  192. self.store_segment(
  193. camera, start_time, end_time, duration, cache_path, record_mode
  194. )
  195. def segment_stats(self, camera, start_time, end_time):
  196. active_count = 0
  197. motion_count = 0
  198. for frame in self.recordings_info[camera]:
  199. # frame is after end time of segment
  200. if frame[0] > end_time.timestamp():
  201. break
  202. # frame is before start time of segment
  203. if frame[0] < start_time.timestamp():
  204. continue
  205. active_count += len(
  206. [
  207. o
  208. for o in frame[1]
  209. if not o["false_positive"] and o["motionless_count"] == 0
  210. ]
  211. )
  212. motion_count += sum([area(box) for box in frame[2]])
  213. return (motion_count, active_count)
  214. def store_segment(
  215. self,
  216. camera,
  217. start_time,
  218. end_time,
  219. duration,
  220. cache_path,
  221. store_mode: RetainModeEnum,
  222. ):
  223. motion_count, active_count = self.segment_stats(camera, start_time, end_time)
  224. # check if the segment shouldn't be stored
  225. if (store_mode == RetainModeEnum.motion and motion_count == 0) or (
  226. store_mode == RetainModeEnum.active_objects and active_count == 0
  227. ):
  228. Path(cache_path).unlink(missing_ok=True)
  229. self.end_time_cache.pop(cache_path, None)
  230. return
  231. directory = os.path.join(RECORD_DIR, start_time.strftime("%Y-%m/%d/%H"), camera)
  232. if not os.path.exists(directory):
  233. os.makedirs(directory)
  234. file_name = f"{start_time.strftime('%M.%S.mp4')}"
  235. file_path = os.path.join(directory, file_name)
  236. try:
  237. start_frame = datetime.datetime.now().timestamp()
  238. # copy then delete is required when recordings are stored on some network drives
  239. shutil.copyfile(cache_path, file_path)
  240. logger.debug(
  241. f"Copied {file_path} in {datetime.datetime.now().timestamp()-start_frame} seconds."
  242. )
  243. os.remove(cache_path)
  244. rand_id = "".join(
  245. random.choices(string.ascii_lowercase + string.digits, k=6)
  246. )
  247. Recordings.create(
  248. id=f"{start_time.timestamp()}-{rand_id}",
  249. camera=camera,
  250. path=file_path,
  251. start_time=start_time.timestamp(),
  252. end_time=end_time.timestamp(),
  253. duration=duration,
  254. motion=motion_count,
  255. # TODO: update this to store list of active objects at some point
  256. objects=active_count,
  257. )
  258. except Exception as e:
  259. logger.error(f"Unable to store recording segment {cache_path}")
  260. Path(cache_path).unlink(missing_ok=True)
  261. logger.error(e)
  262. # clear end_time cache
  263. self.end_time_cache.pop(cache_path, None)
  264. def run(self):
  265. # Check for new files every 5 seconds
  266. wait_time = 5
  267. while not self.stop_event.wait(wait_time):
  268. run_start = datetime.datetime.now().timestamp()
  269. # empty the recordings info queue
  270. while True:
  271. try:
  272. (
  273. camera,
  274. frame_time,
  275. current_tracked_objects,
  276. motion_boxes,
  277. regions,
  278. ) = self.recordings_info_queue.get(False)
  279. if self.config.cameras[camera].record.enabled:
  280. self.recordings_info[camera].append(
  281. (
  282. frame_time,
  283. current_tracked_objects,
  284. motion_boxes,
  285. regions,
  286. )
  287. )
  288. except queue.Empty:
  289. break
  290. try:
  291. self.move_files()
  292. except Exception as e:
  293. logger.error(
  294. "Error occurred when attempting to maintain recording cache"
  295. )
  296. logger.error(e)
  297. duration = datetime.datetime.now().timestamp() - run_start
  298. wait_time = max(0, 5 - duration)
  299. if wait_time == 0 and not self.first_pass:
  300. logger.warning(
  301. "Cache is taking longer than 5 seconds to clear. Your recordings disk may be too slow."
  302. )
  303. if self.first_pass:
  304. self.first_pass = False
  305. logger.info(f"Exiting recording maintenance...")
  306. class RecordingCleanup(threading.Thread):
  307. def __init__(self, config: FrigateConfig, stop_event):
  308. threading.Thread.__init__(self)
  309. self.name = "recording_cleanup"
  310. self.config = config
  311. self.stop_event = stop_event
  312. def clean_tmp_clips(self):
  313. # delete any clips more than 5 minutes old
  314. for p in Path("/tmp/cache").rglob("clip_*.mp4"):
  315. logger.debug(f"Checking tmp clip {p}.")
  316. if p.stat().st_mtime < (datetime.datetime.now().timestamp() - 60 * 1):
  317. logger.debug("Deleting tmp clip.")
  318. p.unlink(missing_ok=True)
  319. def expire_recordings(self):
  320. logger.debug("Start expire recordings (new).")
  321. logger.debug("Start deleted cameras.")
  322. # Handle deleted cameras
  323. expire_days = self.config.record.retain.days
  324. expire_before = (
  325. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  326. ).timestamp()
  327. no_camera_recordings: Recordings = Recordings.select().where(
  328. Recordings.camera.not_in(list(self.config.cameras.keys())),
  329. Recordings.end_time < expire_before,
  330. )
  331. deleted_recordings = set()
  332. for recording in no_camera_recordings:
  333. Path(recording.path).unlink(missing_ok=True)
  334. deleted_recordings.add(recording.id)
  335. logger.debug(f"Expiring {len(deleted_recordings)} recordings")
  336. Recordings.delete().where(Recordings.id << deleted_recordings).execute()
  337. logger.debug("End deleted cameras.")
  338. logger.debug("Start all cameras.")
  339. for camera, config in self.config.cameras.items():
  340. logger.debug(f"Start camera: {camera}.")
  341. # When deleting recordings without events, we have to keep at LEAST the configured max clip duration
  342. min_end = (
  343. datetime.datetime.now()
  344. - datetime.timedelta(seconds=config.record.events.max_seconds)
  345. ).timestamp()
  346. expire_days = config.record.retain.days
  347. expire_before = (
  348. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  349. ).timestamp()
  350. expire_date = min(min_end, expire_before)
  351. # Get recordings to check for expiration
  352. recordings: Recordings = (
  353. Recordings.select()
  354. .where(
  355. Recordings.camera == camera,
  356. Recordings.end_time < expire_date,
  357. )
  358. .order_by(Recordings.start_time)
  359. )
  360. # Get all the events to check against
  361. events: Event = (
  362. Event.select()
  363. .where(
  364. Event.camera == camera,
  365. # need to ensure segments for all events starting
  366. # before the expire date are included
  367. Event.start_time < expire_date,
  368. Event.has_clip,
  369. )
  370. .order_by(Event.start_time)
  371. .objects()
  372. )
  373. # loop over recordings and see if they overlap with any non-expired events
  374. # TODO: expire segments based on segment stats according to config
  375. event_start = 0
  376. deleted_recordings = set()
  377. for recording in recordings.objects().iterator():
  378. keep = False
  379. # Now look for a reason to keep this recording segment
  380. for idx in range(event_start, len(events)):
  381. event = events[idx]
  382. # if the event starts in the future, stop checking events
  383. # and let this recording segment expire
  384. if event.start_time > recording.end_time:
  385. keep = False
  386. break
  387. # if the event is in progress or ends after the recording starts, keep it
  388. # and stop looking at events
  389. if event.end_time is None or event.end_time >= recording.start_time:
  390. keep = True
  391. break
  392. # if the event ends before this recording segment starts, skip
  393. # this event and check the next event for an overlap.
  394. # since the events and recordings are sorted, we can skip events
  395. # that end before the previous recording segment started on future segments
  396. if event.end_time < recording.start_time:
  397. event_start = idx
  398. # Delete recordings outside of the retention window or based on the retention mode
  399. if (
  400. not keep
  401. or (
  402. config.record.events.retain.mode == RetainModeEnum.motion
  403. and recording.motion == 0
  404. )
  405. or (
  406. config.record.events.retain.mode
  407. == RetainModeEnum.active_objects
  408. and recording.objects == 0
  409. )
  410. ):
  411. Path(recording.path).unlink(missing_ok=True)
  412. deleted_recordings.add(recording.id)
  413. logger.debug(f"Expiring {len(deleted_recordings)} recordings")
  414. Recordings.delete().where(Recordings.id << deleted_recordings).execute()
  415. logger.debug(f"End camera: {camera}.")
  416. logger.debug("End all cameras.")
  417. logger.debug("End expire recordings (new).")
  418. def expire_files(self):
  419. logger.debug("Start expire files (legacy).")
  420. default_expire = (
  421. datetime.datetime.now().timestamp()
  422. - SECONDS_IN_DAY * self.config.record.retain.days
  423. )
  424. delete_before = {}
  425. for name, camera in self.config.cameras.items():
  426. delete_before[name] = (
  427. datetime.datetime.now().timestamp()
  428. - SECONDS_IN_DAY * camera.record.retain.days
  429. )
  430. # find all the recordings older than the oldest recording in the db
  431. try:
  432. oldest_recording = Recordings.select().order_by(Recordings.start_time).get()
  433. p = Path(oldest_recording.path)
  434. oldest_timestamp = p.stat().st_mtime - 1
  435. except DoesNotExist:
  436. oldest_timestamp = datetime.datetime.now().timestamp()
  437. except FileNotFoundError:
  438. logger.warning(f"Unable to find file from recordings database: {p}")
  439. Recordings.delete().where(Recordings.id == oldest_recording.id).execute()
  440. return
  441. logger.debug(f"Oldest recording in the db: {oldest_timestamp}")
  442. process = sp.run(
  443. ["find", RECORD_DIR, "-type", "f", "!", "-newermt", f"@{oldest_timestamp}"],
  444. capture_output=True,
  445. text=True,
  446. )
  447. files_to_check = process.stdout.splitlines()
  448. for f in files_to_check:
  449. p = Path(f)
  450. try:
  451. if p.stat().st_mtime < delete_before.get(p.parent.name, default_expire):
  452. p.unlink(missing_ok=True)
  453. except FileNotFoundError:
  454. logger.warning(f"Attempted to expire missing file: {f}")
  455. logger.debug("End expire files (legacy).")
  456. def sync_recordings(self):
  457. logger.debug("Start sync recordings.")
  458. # get all recordings in the db
  459. recordings: Recordings = Recordings.select()
  460. # get all recordings files on disk
  461. process = sp.run(
  462. ["find", RECORD_DIR, "-type", "f"],
  463. capture_output=True,
  464. text=True,
  465. )
  466. files_on_disk = process.stdout.splitlines()
  467. recordings_to_delete = []
  468. for recording in recordings.objects().iterator():
  469. if not recording.path in files_on_disk:
  470. recordings_to_delete.append(recording.id)
  471. logger.debug(
  472. f"Deleting {len(recordings_to_delete)} recordings with missing files"
  473. )
  474. Recordings.delete().where(Recordings.id << recordings_to_delete).execute()
  475. logger.debug("End sync recordings.")
  476. def run(self):
  477. # on startup sync recordings with disk (disabled due to too much CPU usage)
  478. # self.sync_recordings()
  479. # Expire tmp clips every minute, recordings and clean directories every hour.
  480. for counter in itertools.cycle(range(self.config.record.expire_interval)):
  481. if self.stop_event.wait(60):
  482. logger.info(f"Exiting recording cleanup...")
  483. break
  484. self.clean_tmp_clips()
  485. if counter == 0:
  486. self.expire_recordings()
  487. self.expire_files()
  488. remove_empty_directories(RECORD_DIR)