record.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. import datetime
  2. import itertools
  3. import logging
  4. import os
  5. import random
  6. import shutil
  7. import string
  8. import subprocess as sp
  9. import threading
  10. from collections import defaultdict
  11. from pathlib import Path
  12. import psutil
  13. from peewee import JOIN, DoesNotExist
  14. from frigate.config import FrigateConfig
  15. from frigate.const import CACHE_DIR, RECORD_DIR
  16. from frigate.models import Event, Recordings
  17. logger = logging.getLogger(__name__)
  18. SECONDS_IN_DAY = 60 * 60 * 24
  19. def remove_empty_directories(directory):
  20. # list all directories recursively and sort them by path,
  21. # longest first
  22. paths = sorted(
  23. [x[0] for x in os.walk(RECORD_DIR)],
  24. key=lambda p: len(str(p)),
  25. reverse=True,
  26. )
  27. for path in paths:
  28. # don't delete the parent
  29. if path == RECORD_DIR:
  30. continue
  31. if len(os.listdir(path)) == 0:
  32. os.rmdir(path)
  33. class RecordingMaintainer(threading.Thread):
  34. def __init__(self, config: FrigateConfig, stop_event):
  35. threading.Thread.__init__(self)
  36. self.name = "recording_maint"
  37. self.config = config
  38. self.stop_event = stop_event
  39. def move_files(self):
  40. cache_files = [
  41. d
  42. for d in os.listdir(CACHE_DIR)
  43. if os.path.isfile(os.path.join(CACHE_DIR, d))
  44. and d.endswith(".mp4")
  45. and not d.startswith("clip_")
  46. ]
  47. files_in_use = []
  48. for process in psutil.process_iter():
  49. try:
  50. if process.name() != "ffmpeg":
  51. continue
  52. flist = process.open_files()
  53. if flist:
  54. for nt in flist:
  55. if nt.path.startswith(CACHE_DIR):
  56. files_in_use.append(nt.path.split("/")[-1])
  57. except:
  58. continue
  59. # group recordings by camera
  60. grouped_recordings = defaultdict(list)
  61. for f in cache_files:
  62. # Skip files currently in use
  63. if f in files_in_use:
  64. continue
  65. cache_path = os.path.join(CACHE_DIR, f)
  66. basename = os.path.splitext(f)[0]
  67. camera, date = basename.rsplit("-", maxsplit=1)
  68. start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S")
  69. grouped_recordings[camera].append(
  70. {
  71. "cache_path": cache_path,
  72. "start_time": start_time,
  73. }
  74. )
  75. for camera, recordings in grouped_recordings.items():
  76. # get all events with the end time after the start of the oldest cache file
  77. # or with end_time None
  78. events: Event = (
  79. Event.select()
  80. .where(
  81. Event.camera == camera,
  82. (Event.end_time == None)
  83. | (Event.end_time >= recordings[0]["start_time"]),
  84. Event.has_clip,
  85. )
  86. .order_by(Event.start_time)
  87. )
  88. for r in recordings:
  89. cache_path = r["cache_path"]
  90. start_time = r["start_time"]
  91. # Just delete files if recordings are turned off
  92. if (
  93. not camera in self.config.cameras
  94. or not self.config.cameras[camera].record.enabled
  95. ):
  96. Path(cache_path).unlink(missing_ok=True)
  97. continue
  98. ffprobe_cmd = [
  99. "ffprobe",
  100. "-v",
  101. "error",
  102. "-show_entries",
  103. "format=duration",
  104. "-of",
  105. "default=noprint_wrappers=1:nokey=1",
  106. f"{cache_path}",
  107. ]
  108. p = sp.run(ffprobe_cmd, capture_output=True)
  109. if p.returncode == 0:
  110. duration = float(p.stdout.decode().strip())
  111. end_time = start_time + datetime.timedelta(seconds=duration)
  112. else:
  113. logger.warning(f"Discarding a corrupt recording segment: {f}")
  114. Path(cache_path).unlink(missing_ok=True)
  115. continue
  116. # if cached file's start_time is earlier than the retain_days for the camera
  117. if start_time <= (
  118. (
  119. datetime.datetime.now()
  120. - datetime.timedelta(
  121. days=self.config.cameras[camera].record.retain_days
  122. )
  123. )
  124. ):
  125. # if the cached segment overlaps with the events:
  126. overlaps = False
  127. for event in events:
  128. # if the event starts in the future, stop checking events
  129. # and let this recording segment expire
  130. if event.start_time > end_time.timestamp():
  131. overlaps = False
  132. break
  133. # if the event is in progress or ends after the recording starts, keep it
  134. # and stop looking at events
  135. if event.end_time is None or event.end_time >= start_time:
  136. overlaps = True
  137. break
  138. if overlaps:
  139. # move from cache to recordings immediately
  140. self.store_segment(
  141. camera,
  142. start_time,
  143. end_time,
  144. duration,
  145. cache_path,
  146. )
  147. # else retain_days includes this segment
  148. else:
  149. self.store_segment(
  150. camera, start_time, end_time, duration, cache_path
  151. )
  152. if len(recordings) > 2:
  153. # delete all cached files past the most recent 2
  154. to_remove = sorted(recordings, key=lambda i: i["start_time"])[:-2]
  155. for f in to_remove:
  156. Path(cache_path).unlink(missing_ok=True)
  157. def store_segment(self, camera, start_time, end_time, duration, cache_path):
  158. directory = os.path.join(RECORD_DIR, start_time.strftime("%Y-%m/%d/%H"), camera)
  159. if not os.path.exists(directory):
  160. os.makedirs(directory)
  161. file_name = f"{start_time.strftime('%M.%S.mp4')}"
  162. file_path = os.path.join(directory, file_name)
  163. # copy then delete is required when recordings are stored on some network drives
  164. shutil.copyfile(cache_path, file_path)
  165. os.remove(cache_path)
  166. rand_id = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
  167. Recordings.create(
  168. id=f"{start_time.timestamp()}-{rand_id}",
  169. camera=camera,
  170. path=file_path,
  171. start_time=start_time.timestamp(),
  172. end_time=end_time.timestamp(),
  173. duration=duration,
  174. )
  175. def run(self):
  176. # Check for new files every 5 seconds
  177. wait_time = 5
  178. while not self.stop_event.wait(wait_time):
  179. run_start = datetime.datetime.now().timestamp()
  180. self.move_files()
  181. wait_time = max(0, 5 - (datetime.datetime.now().timestamp() - run_start))
  182. logger.info(f"Exiting recording maintenance...")
  183. class RecordingCleanup(threading.Thread):
  184. def __init__(self, config: FrigateConfig, stop_event):
  185. threading.Thread.__init__(self)
  186. self.name = "recording_cleanup"
  187. self.config = config
  188. self.stop_event = stop_event
  189. def clean_tmp_clips(self):
  190. # delete any clips more than 5 minutes old
  191. for p in Path("/tmp/cache").rglob("clip_*.mp4"):
  192. logger.debug(f"Checking tmp clip {p}.")
  193. if p.stat().st_mtime < (datetime.datetime.now().timestamp() - 60 * 1):
  194. logger.debug("Deleting tmp clip.")
  195. p.unlink(missing_ok=True)
  196. def expire_recordings(self):
  197. logger.debug("Start expire recordings (new).")
  198. logger.debug("Start deleted cameras.")
  199. # Handle deleted cameras
  200. expire_days = self.config.record.retain_days
  201. expire_before = (
  202. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  203. ).timestamp()
  204. no_camera_recordings: Recordings = Recordings.select().where(
  205. Recordings.camera.not_in(list(self.config.cameras.keys())),
  206. Recordings.end_time < expire_before,
  207. )
  208. deleted_recordings = set()
  209. for recording in no_camera_recordings:
  210. Path(recording.path).unlink(missing_ok=True)
  211. deleted_recordings.add(recording.id)
  212. logger.debug(f"Expiring {len(deleted_recordings)} recordings")
  213. Recordings.delete().where(Recordings.id << deleted_recordings).execute()
  214. logger.debug("End deleted cameras.")
  215. logger.debug("Start all cameras.")
  216. for camera, config in self.config.cameras.items():
  217. logger.debug(f"Start camera: {camera}.")
  218. # When deleting recordings without events, we have to keep at LEAST the configured max clip duration
  219. min_end = (
  220. datetime.datetime.now()
  221. - datetime.timedelta(seconds=config.record.events.max_seconds)
  222. ).timestamp()
  223. expire_days = config.record.retain_days
  224. expire_before = (
  225. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  226. ).timestamp()
  227. expire_date = min(min_end, expire_before)
  228. # Get recordings to check for expiration
  229. recordings: Recordings = (
  230. Recordings.select()
  231. .where(
  232. Recordings.camera == camera,
  233. Recordings.end_time < expire_date,
  234. )
  235. .order_by(Recordings.start_time)
  236. )
  237. # Get all the events to check against
  238. events: Event = (
  239. Event.select()
  240. .where(
  241. Event.camera == camera,
  242. # need to ensure segments for all events starting
  243. # before the expire date are included
  244. Event.start_time < expire_date,
  245. Event.has_clip,
  246. )
  247. .order_by(Event.start_time)
  248. .objects()
  249. )
  250. # loop over recordings and see if they overlap with any non-expired events
  251. event_start = 0
  252. deleted_recordings = set()
  253. for recording in recordings.objects().iterator():
  254. keep = False
  255. # Now look for a reason to keep this recording segment
  256. for idx in range(event_start, len(events)):
  257. event = events[idx]
  258. # if the event starts in the future, stop checking events
  259. # and let this recording segment expire
  260. if event.start_time > recording.end_time:
  261. keep = False
  262. break
  263. # if the event is in progress or ends after the recording starts, keep it
  264. # and stop looking at events
  265. if event.end_time is None or event.end_time >= recording.start_time:
  266. keep = True
  267. break
  268. # if the event ends before this recording segment starts, skip
  269. # this event and check the next event for an overlap.
  270. # since the events and recordings are sorted, we can skip events
  271. # that end before the previous recording segment started on future segments
  272. if event.end_time < recording.start_time:
  273. event_start = idx
  274. # Delete recordings outside of the retention window
  275. if not keep:
  276. Path(recording.path).unlink(missing_ok=True)
  277. deleted_recordings.add(recording.id)
  278. logger.debug(f"Expiring {len(deleted_recordings)} recordings")
  279. Recordings.delete().where(Recordings.id << deleted_recordings).execute()
  280. logger.debug(f"End camera: {camera}.")
  281. logger.debug("End all cameras.")
  282. logger.debug("End expire recordings (new).")
  283. def expire_files(self):
  284. logger.debug("Start expire files (legacy).")
  285. default_expire = (
  286. datetime.datetime.now().timestamp()
  287. - SECONDS_IN_DAY * self.config.record.retain_days
  288. )
  289. delete_before = {}
  290. for name, camera in self.config.cameras.items():
  291. delete_before[name] = (
  292. datetime.datetime.now().timestamp()
  293. - SECONDS_IN_DAY * camera.record.retain_days
  294. )
  295. # find all the recordings older than the oldest recording in the db
  296. try:
  297. oldest_recording = Recordings.select().order_by(Recordings.start_time).get()
  298. p = Path(oldest_recording.path)
  299. oldest_timestamp = p.stat().st_mtime - 1
  300. except DoesNotExist:
  301. oldest_timestamp = datetime.datetime.now().timestamp()
  302. logger.debug(f"Oldest recording in the db: {oldest_timestamp}")
  303. process = sp.run(
  304. ["find", RECORD_DIR, "-type", "f", "!", "-newermt", f"@{oldest_timestamp}"],
  305. capture_output=True,
  306. text=True,
  307. )
  308. files_to_check = process.stdout.splitlines()
  309. for f in files_to_check:
  310. p = Path(f)
  311. if p.stat().st_mtime < delete_before.get(p.parent.name, default_expire):
  312. p.unlink(missing_ok=True)
  313. logger.debug("End expire files (legacy).")
  314. def run(self):
  315. # Expire recordings every minute, clean directories every hour.
  316. for counter in itertools.cycle(range(60)):
  317. if self.stop_event.wait(60):
  318. logger.info(f"Exiting recording cleanup...")
  319. break
  320. self.expire_recordings()
  321. self.clean_tmp_clips()
  322. if counter == 0:
  323. self.expire_files()
  324. remove_empty_directories(RECORD_DIR)