events.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. import datetime
  2. import json
  3. import logging
  4. import os
  5. import queue
  6. import subprocess as sp
  7. import threading
  8. import time
  9. from collections import defaultdict
  10. from pathlib import Path
  11. import psutil
  12. import shutil
  13. from frigate.config import FrigateConfig
  14. from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
  15. from frigate.models import Event
  16. from peewee import fn
  17. logger = logging.getLogger(__name__)
  18. class EventProcessor(threading.Thread):
  19. def __init__(
  20. self, config, camera_processes, event_queue, event_processed_queue, stop_event
  21. ):
  22. threading.Thread.__init__(self)
  23. self.name = "event_processor"
  24. self.config = config
  25. self.camera_processes = camera_processes
  26. self.cached_clips = {}
  27. self.event_queue = event_queue
  28. self.event_processed_queue = event_processed_queue
  29. self.events_in_process = {}
  30. self.stop_event = stop_event
  31. def should_create_clip(self, camera, event_data):
  32. if event_data["false_positive"]:
  33. return False
  34. # if there are required zones and there is no overlap
  35. required_zones = self.config.cameras[camera].clips.required_zones
  36. if len(required_zones) > 0 and not set(event_data["entered_zones"]) & set(
  37. required_zones
  38. ):
  39. logger.debug(
  40. f"Not creating clip for {event_data['id']} because it did not enter required zones"
  41. )
  42. return False
  43. return True
  44. def refresh_cache(self):
  45. cached_files = os.listdir(CACHE_DIR)
  46. files_in_use = []
  47. for process in psutil.process_iter():
  48. try:
  49. if process.name() != "ffmpeg":
  50. continue
  51. flist = process.open_files()
  52. if flist:
  53. for nt in flist:
  54. if nt.path.startswith(CACHE_DIR):
  55. files_in_use.append(nt.path.split("/")[-1])
  56. except:
  57. continue
  58. for f in cached_files:
  59. if f in files_in_use or f in self.cached_clips:
  60. continue
  61. basename = os.path.splitext(f)[0]
  62. camera, date = basename.rsplit("-", maxsplit=1)
  63. start_time = datetime.datetime.strptime(date, "%Y%m%d%H%M%S")
  64. ffprobe_cmd = [
  65. "ffprobe",
  66. "-v",
  67. "error",
  68. "-show_entries",
  69. "format=duration",
  70. "-of",
  71. "default=noprint_wrappers=1:nokey=1",
  72. f"{os.path.join(CACHE_DIR, f)}",
  73. ]
  74. p = sp.run(ffprobe_cmd, capture_output=True)
  75. if p.returncode == 0:
  76. duration = float(p.stdout.decode().strip())
  77. else:
  78. logger.info(f"bad file: {f}")
  79. os.remove(os.path.join(CACHE_DIR, f))
  80. continue
  81. self.cached_clips[f] = {
  82. "path": f,
  83. "camera": camera,
  84. "start_time": start_time.timestamp(),
  85. "duration": duration,
  86. }
  87. if len(self.events_in_process) > 0:
  88. earliest_event = min(
  89. self.events_in_process.values(), key=lambda x: x["start_time"]
  90. )["start_time"]
  91. else:
  92. earliest_event = datetime.datetime.now().timestamp()
  93. # if the earliest event is more tha max seconds ago, cap it
  94. earliest_event = max(
  95. earliest_event, datetime.datetime.now().timestamp() - max_seconds
  96. )
  97. for f, data in list(self.cached_clips.items()):
  98. if earliest_event - 90 > data["start_time"] + data["duration"]:
  99. del self.cached_clips[f]
  100. logger.debug(f"Cleaning up cached file {f}")
  101. os.remove(os.path.join(CACHE_DIR, f))
  102. # if we are still using more than 90% of the cache, proactively cleanup
  103. cache_usage = shutil.disk_usage("/tmp/cache")
  104. if (
  105. cache_usage.used / cache_usage.total > 0.9
  106. and cache_usage.free < 200000000
  107. and len(self.cached_clips) > 0
  108. ):
  109. logger.warning("More than 90% of the cache is used.")
  110. logger.warning(
  111. "Consider increasing space available at /tmp/cache or reducing max_seconds in your clips config."
  112. )
  113. logger.warning("Proactively cleaning up the cache...")
  114. while cache_usage.used / cache_usage.total > 0.9:
  115. oldest_clip = min(
  116. self.cached_clips.values(), key=lambda x: x["start_time"]
  117. )
  118. del self.cached_clips[oldest_clip["path"]]
  119. os.remove(os.path.join(CACHE_DIR, oldest_clip["path"]))
  120. cache_usage = shutil.disk_usage("/tmp/cache")
  121. def create_clip(self, camera, event_data, pre_capture, post_capture):
  122. # get all clips from the camera with the event sorted
  123. sorted_clips = sorted(
  124. [c for c in self.cached_clips.values() if c["camera"] == camera],
  125. key=lambda i: i["start_time"],
  126. )
  127. # if there are no clips in the cache or we are still waiting on a needed file check every 5 seconds
  128. wait_count = 0
  129. while (
  130. len(sorted_clips) == 0
  131. or sorted_clips[-1]["start_time"] + sorted_clips[-1]["duration"]
  132. < event_data["end_time"] + post_capture
  133. ):
  134. if wait_count > 4:
  135. logger.warning(
  136. f"Unable to create clip for {camera} and event {event_data['id']}. There were no cache files for this event."
  137. )
  138. return False
  139. logger.debug(f"No cache clips for {camera}. Waiting...")
  140. time.sleep(5)
  141. self.refresh_cache()
  142. # get all clips from the camera with the event sorted
  143. sorted_clips = sorted(
  144. [c for c in self.cached_clips.values() if c["camera"] == camera],
  145. key=lambda i: i["start_time"],
  146. )
  147. wait_count += 1
  148. playlist_start = event_data["start_time"] - pre_capture
  149. playlist_end = event_data["end_time"] + post_capture
  150. playlist_lines = []
  151. for clip in sorted_clips:
  152. # clip ends before playlist start time, skip
  153. if clip["start_time"] + clip["duration"] < playlist_start:
  154. continue
  155. # clip starts after playlist ends, finish
  156. if clip["start_time"] > playlist_end:
  157. break
  158. playlist_lines.append(f"file '{os.path.join(CACHE_DIR,clip['path'])}'")
  159. # if this is the starting clip, add an inpoint
  160. if clip["start_time"] < playlist_start:
  161. playlist_lines.append(
  162. f"inpoint {int(playlist_start-clip['start_time'])}"
  163. )
  164. # if this is the ending clip, add an outpoint
  165. if clip["start_time"] + clip["duration"] > playlist_end:
  166. playlist_lines.append(
  167. f"outpoint {int(playlist_end-clip['start_time'])}"
  168. )
  169. clip_name = f"{camera}-{event_data['id']}"
  170. ffmpeg_cmd = [
  171. "ffmpeg",
  172. "-y",
  173. "-protocol_whitelist",
  174. "pipe,file",
  175. "-f",
  176. "concat",
  177. "-safe",
  178. "0",
  179. "-i",
  180. "-",
  181. "-c",
  182. "copy",
  183. "-movflags",
  184. "+faststart",
  185. f"{os.path.join(CLIPS_DIR, clip_name)}.mp4",
  186. ]
  187. p = sp.run(
  188. ffmpeg_cmd,
  189. input="\n".join(playlist_lines),
  190. encoding="ascii",
  191. capture_output=True,
  192. )
  193. if p.returncode != 0:
  194. logger.error(p.stderr)
  195. return False
  196. return True
  197. def run(self):
  198. while True:
  199. if self.stop_event.is_set():
  200. logger.info(f"Exiting event processor...")
  201. break
  202. try:
  203. event_type, camera, event_data = self.event_queue.get(timeout=10)
  204. except queue.Empty:
  205. if not self.stop_event.is_set():
  206. self.refresh_cache()
  207. continue
  208. logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
  209. self.refresh_cache()
  210. if event_type == "start":
  211. self.events_in_process[event_data["id"]] = event_data
  212. if event_type == "end":
  213. clips_config = self.config.cameras[camera].clips
  214. clip_created = False
  215. if self.should_create_clip(camera, event_data):
  216. if clips_config.enabled and (
  217. clips_config.objects is None
  218. or event_data["label"] in clips_config.objects
  219. ):
  220. clip_created = self.create_clip(
  221. camera,
  222. event_data,
  223. clips_config.pre_capture,
  224. clips_config.post_capture,
  225. )
  226. if clip_created or event_data["has_snapshot"]:
  227. Event.create(
  228. id=event_data["id"],
  229. label=event_data["label"],
  230. camera=camera,
  231. start_time=event_data["start_time"],
  232. end_time=event_data["end_time"],
  233. top_score=event_data["top_score"],
  234. false_positive=event_data["false_positive"],
  235. zones=list(event_data["entered_zones"]),
  236. thumbnail=event_data["thumbnail"],
  237. has_clip=clip_created,
  238. has_snapshot=event_data["has_snapshot"],
  239. )
  240. del self.events_in_process[event_data["id"]]
  241. self.event_processed_queue.put((event_data["id"], camera))
  242. class EventCleanup(threading.Thread):
  243. def __init__(self, config: FrigateConfig, stop_event):
  244. threading.Thread.__init__(self)
  245. self.name = "event_cleanup"
  246. self.config = config
  247. self.stop_event = stop_event
  248. self.camera_keys = list(self.config.cameras.keys())
  249. def expire(self, media):
  250. ## Expire events from unlisted cameras based on the global config
  251. if media == "clips":
  252. retain_config = self.config.clips.retain
  253. file_extension = "mp4"
  254. update_params = {"has_clip": False}
  255. else:
  256. retain_config = self.config.snapshots.retain
  257. file_extension = "jpg"
  258. update_params = {"has_snapshot": False}
  259. distinct_labels = (
  260. Event.select(Event.label)
  261. .where(Event.camera.not_in(self.camera_keys))
  262. .distinct()
  263. )
  264. # loop over object types in db
  265. for l in distinct_labels:
  266. # get expiration time for this label
  267. expire_days = retain_config.objects.get(l.label, retain_config.default)
  268. expire_after = (
  269. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  270. ).timestamp()
  271. # grab all events after specific time
  272. expired_events = Event.select().where(
  273. Event.camera.not_in(self.camera_keys),
  274. Event.start_time < expire_after,
  275. Event.label == l.label,
  276. )
  277. # delete the media from disk
  278. for event in expired_events:
  279. media_name = f"{event.camera}-{event.id}"
  280. media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}")
  281. media.unlink(missing_ok=True)
  282. # update the clips attribute for the db entry
  283. update_query = Event.update(update_params).where(
  284. Event.camera.not_in(self.camera_keys),
  285. Event.start_time < expire_after,
  286. Event.label == l.label,
  287. )
  288. update_query.execute()
  289. ## Expire events from cameras based on the camera config
  290. for name, camera in self.config.cameras.items():
  291. if media == "clips":
  292. retain_config = camera.clips.retain
  293. else:
  294. retain_config = camera.snapshots.retain
  295. # get distinct objects in database for this camera
  296. distinct_labels = (
  297. Event.select(Event.label).where(Event.camera == name).distinct()
  298. )
  299. # loop over object types in db
  300. for l in distinct_labels:
  301. # get expiration time for this label
  302. expire_days = retain_config.objects.get(l.label, retain_config.default)
  303. expire_after = (
  304. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  305. ).timestamp()
  306. # grab all events after specific time
  307. expired_events = Event.select().where(
  308. Event.camera == name,
  309. Event.start_time < expire_after,
  310. Event.label == l.label,
  311. )
  312. # delete the grabbed clips from disk
  313. for event in expired_events:
  314. media_name = f"{event.camera}-{event.id}"
  315. media = Path(
  316. f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
  317. )
  318. media.unlink(missing_ok=True)
  319. # update the clips attribute for the db entry
  320. update_query = Event.update(update_params).where(
  321. Event.camera == name,
  322. Event.start_time < expire_after,
  323. Event.label == l.label,
  324. )
  325. update_query.execute()
  326. def purge_duplicates(self):
  327. duplicate_query = """with grouped_events as (
  328. select id,
  329. label,
  330. camera,
  331. has_snapshot,
  332. has_clip,
  333. row_number() over (
  334. partition by label, camera, round(start_time/5,0)*5
  335. order by end_time-start_time desc
  336. ) as copy_number
  337. from event
  338. )
  339. select distinct id, camera, has_snapshot, has_clip from grouped_events
  340. where copy_number > 1;"""
  341. duplicate_events = Event.raw(duplicate_query)
  342. for event in duplicate_events:
  343. logger.debug(f"Removing duplicate: {event.id}")
  344. media_name = f"{event.camera}-{event.id}"
  345. if event.has_snapshot:
  346. media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
  347. media.unlink(missing_ok=True)
  348. if event.has_clip:
  349. media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4")
  350. media.unlink(missing_ok=True)
  351. (
  352. Event.delete()
  353. .where(Event.id << [event.id for event in duplicate_events])
  354. .execute()
  355. )
  356. def run(self):
  357. counter = 0
  358. while True:
  359. if self.stop_event.is_set():
  360. logger.info(f"Exiting event cleanup...")
  361. break
  362. # only expire events every 5 minutes, but check for stop events every 10 seconds
  363. time.sleep(10)
  364. counter = counter + 1
  365. if counter < 30:
  366. continue
  367. counter = 0
  368. self.expire("clips")
  369. self.expire("snapshots")
  370. self.purge_duplicates()
  371. # drop events from db where has_clip and has_snapshot are false
  372. delete_query = Event.delete().where(
  373. Event.has_clip == False, Event.has_snapshot == False
  374. )
  375. delete_query.execute()