events.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. import datetime
  2. import json
  3. import logging
  4. import os
  5. import queue
  6. import subprocess as sp
  7. import threading
  8. import time
  9. from collections import defaultdict
  10. from pathlib import Path
  11. import psutil
  12. import shutil
  13. from frigate.config import FrigateConfig
  14. from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
  15. from frigate.models import Event
  16. from peewee import fn
  17. logger = logging.getLogger(__name__)
  18. class EventProcessor(threading.Thread):
  19. def __init__(self, config, camera_processes, event_queue, event_processed_queue, stop_event):
  20. threading.Thread.__init__(self)
  21. self.name = 'event_processor'
  22. self.config = config
  23. self.camera_processes = camera_processes
  24. self.cached_clips = {}
  25. self.event_queue = event_queue
  26. self.event_processed_queue = event_processed_queue
  27. self.events_in_process = {}
  28. self.stop_event = stop_event
  29. def should_create_clip(self, camera, event_data):
  30. if event_data['false_positive']:
  31. return False
  32. # if there are required zones and there is no overlap
  33. required_zones = self.config.cameras[camera].clips.required_zones
  34. if len(required_zones) > 0 and not set(event_data['entered_zones']) & set(required_zones):
  35. logger.debug(f"Not creating clip for {event_data['id']} because it did not enter required zones")
  36. return False
  37. return True
  38. def refresh_cache(self):
  39. cached_files = os.listdir(CACHE_DIR)
  40. files_in_use = []
  41. for process in psutil.process_iter():
  42. try:
  43. if process.name() != 'ffmpeg':
  44. continue
  45. flist = process.open_files()
  46. if flist:
  47. for nt in flist:
  48. if nt.path.startswith(CACHE_DIR):
  49. files_in_use.append(nt.path.split('/')[-1])
  50. except:
  51. continue
  52. for f in cached_files:
  53. if f in files_in_use or f in self.cached_clips:
  54. continue
  55. camera = '-'.join(f.split('-')[:-1])
  56. start_time = datetime.datetime.strptime(f.split('-')[-1].split('.')[0], '%Y%m%d%H%M%S')
  57. ffprobe_cmd = " ".join([
  58. 'ffprobe',
  59. '-v',
  60. 'error',
  61. '-show_entries',
  62. 'format=duration',
  63. '-of',
  64. 'default=noprint_wrappers=1:nokey=1',
  65. f"{os.path.join(CACHE_DIR,f)}"
  66. ])
  67. p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True)
  68. (output, err) = p.communicate()
  69. p_status = p.wait()
  70. if p_status == 0:
  71. duration = float(output.decode('utf-8').strip())
  72. else:
  73. logger.info(f"bad file: {f}")
  74. os.remove(os.path.join(CACHE_DIR,f))
  75. continue
  76. self.cached_clips[f] = {
  77. 'path': f,
  78. 'camera': camera,
  79. 'start_time': start_time.timestamp(),
  80. 'duration': duration
  81. }
  82. if len(self.events_in_process) > 0:
  83. earliest_event = min(self.events_in_process.values(), key=lambda x:x['start_time'])['start_time']
  84. else:
  85. earliest_event = datetime.datetime.now().timestamp()
  86. # if the earliest event exceeds the max seconds, cap it
  87. max_seconds = self.config.clips.max_seconds
  88. if datetime.datetime.now().timestamp()-earliest_event > max_seconds:
  89. earliest_event = datetime.datetime.now().timestamp()-max_seconds
  90. for f, data in list(self.cached_clips.items()):
  91. if earliest_event-90 > data['start_time']+data['duration']:
  92. del self.cached_clips[f]
  93. logger.debug(f"Cleaning up cached file {f}")
  94. os.remove(os.path.join(CACHE_DIR,f))
  95. # if we are still using more than 90% of the cache, proactively cleanup
  96. cache_usage = shutil.disk_usage("/tmp/cache")
  97. if cache_usage.used/cache_usage.total > .9 and cache_usage.free < 200000000 and len(self.cached_clips) > 0:
  98. logger.warning("More than 90% of the cache is used.")
  99. logger.warning("Consider increasing space available at /tmp/cache or reducing max_seconds in your clips config.")
  100. logger.warning("Proactively cleaning up the cache...")
  101. while cache_usage.used/cache_usage.total > .9:
  102. oldest_clip = min(self.cached_clips.values(), key=lambda x:x['start_time'])
  103. del self.cached_clips[oldest_clip['path']]
  104. os.remove(os.path.join(CACHE_DIR,oldest_clip['path']))
  105. cache_usage = shutil.disk_usage("/tmp/cache")
  106. def create_clip(self, camera, event_data, pre_capture, post_capture):
  107. # get all clips from the camera with the event sorted
  108. sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
  109. # if there are no clips in the cache or we are still waiting on a needed file check every 5 seconds
  110. wait_count = 0
  111. while len(sorted_clips) == 0 or sorted_clips[-1]['start_time'] + sorted_clips[-1]['duration'] < event_data['end_time']+post_capture:
  112. if wait_count > 4:
  113. logger.warning(f"Unable to create clip for {camera} and event {event_data['id']}. There were no cache files for this event.")
  114. return False
  115. logger.debug(f"No cache clips for {camera}. Waiting...")
  116. time.sleep(5)
  117. self.refresh_cache()
  118. # get all clips from the camera with the event sorted
  119. sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
  120. wait_count += 1
  121. playlist_start = event_data['start_time']-pre_capture
  122. playlist_end = event_data['end_time']+post_capture
  123. playlist_lines = []
  124. for clip in sorted_clips:
  125. # clip ends before playlist start time, skip
  126. if clip['start_time']+clip['duration'] < playlist_start:
  127. continue
  128. # clip starts after playlist ends, finish
  129. if clip['start_time'] > playlist_end:
  130. break
  131. playlist_lines.append(f"file '{os.path.join(CACHE_DIR,clip['path'])}'")
  132. # if this is the starting clip, add an inpoint
  133. if clip['start_time'] < playlist_start:
  134. playlist_lines.append(f"inpoint {int(playlist_start-clip['start_time'])}")
  135. # if this is the ending clip, add an outpoint
  136. if clip['start_time']+clip['duration'] > playlist_end:
  137. playlist_lines.append(f"outpoint {int(playlist_end-clip['start_time'])}")
  138. clip_name = f"{camera}-{event_data['id']}"
  139. ffmpeg_cmd = [
  140. 'ffmpeg',
  141. '-y',
  142. '-protocol_whitelist',
  143. 'pipe,file',
  144. '-f',
  145. 'concat',
  146. '-safe',
  147. '0',
  148. '-i',
  149. '-',
  150. '-c',
  151. 'copy',
  152. '-movflags',
  153. '+faststart',
  154. f"{os.path.join(CLIPS_DIR, clip_name)}.mp4"
  155. ]
  156. p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True)
  157. if p.returncode != 0:
  158. logger.error(p.stderr)
  159. return False
  160. return True
  161. def run(self):
  162. while True:
  163. if self.stop_event.is_set():
  164. logger.info(f"Exiting event processor...")
  165. break
  166. try:
  167. event_type, camera, event_data = self.event_queue.get(timeout=10)
  168. except queue.Empty:
  169. if not self.stop_event.is_set():
  170. self.refresh_cache()
  171. continue
  172. logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
  173. self.refresh_cache()
  174. if event_type == 'start':
  175. self.events_in_process[event_data['id']] = event_data
  176. if event_type == 'end':
  177. clips_config = self.config.cameras[camera].clips
  178. clip_created = False
  179. if self.should_create_clip(camera, event_data):
  180. if clips_config.enabled and (clips_config.objects is None or event_data['label'] in clips_config.objects):
  181. clip_created = self.create_clip(camera, event_data, clips_config.pre_capture, clips_config.post_capture)
  182. if clip_created or event_data['has_snapshot']:
  183. Event.create(
  184. id=event_data['id'],
  185. label=event_data['label'],
  186. camera=camera,
  187. start_time=event_data['start_time'],
  188. end_time=event_data['end_time'],
  189. top_score=event_data['top_score'],
  190. false_positive=event_data['false_positive'],
  191. zones=list(event_data['entered_zones']),
  192. thumbnail=event_data['thumbnail'],
  193. has_clip=clip_created,
  194. has_snapshot=event_data['has_snapshot'],
  195. )
  196. del self.events_in_process[event_data['id']]
  197. self.event_processed_queue.put((event_data['id'], camera))
  198. class EventCleanup(threading.Thread):
  199. def __init__(self, config: FrigateConfig, stop_event):
  200. threading.Thread.__init__(self)
  201. self.name = 'event_cleanup'
  202. self.config = config
  203. self.stop_event = stop_event
  204. self.camera_keys = list(self.config.cameras.keys())
  205. def expire(self, media):
  206. ## Expire events from unlisted cameras based on the global config
  207. if media == 'clips':
  208. retain_config = self.config.clips.retain
  209. file_extension = 'mp4'
  210. update_params = {'has_clip': False}
  211. else:
  212. retain_config = self.config.snapshots.retain
  213. file_extension = 'jpg'
  214. update_params = {'has_snapshot': False}
  215. distinct_labels = (Event.select(Event.label)
  216. .where(Event.camera.not_in(self.camera_keys))
  217. .distinct())
  218. # loop over object types in db
  219. for l in distinct_labels:
  220. # get expiration time for this label
  221. expire_days = retain_config.objects.get(l.label, retain_config.default)
  222. expire_after = (datetime.datetime.now() - datetime.timedelta(days=expire_days)).timestamp()
  223. # grab all events after specific time
  224. expired_events = (
  225. Event.select()
  226. .where(Event.camera.not_in(self.camera_keys),
  227. Event.start_time < expire_after,
  228. Event.label == l.label)
  229. )
  230. # delete the media from disk
  231. for event in expired_events:
  232. media_name = f"{event.camera}-{event.id}"
  233. media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}")
  234. media.unlink(missing_ok=True)
  235. # update the clips attribute for the db entry
  236. update_query = (
  237. Event.update(update_params)
  238. .where(Event.camera.not_in(self.camera_keys),
  239. Event.start_time < expire_after,
  240. Event.label == l.label)
  241. )
  242. update_query.execute()
  243. ## Expire events from cameras based on the camera config
  244. for name, camera in self.config.cameras.items():
  245. if media == 'clips':
  246. retain_config = camera.clips.retain
  247. else:
  248. retain_config = camera.snapshots.retain
  249. # get distinct objects in database for this camera
  250. distinct_labels = (Event.select(Event.label)
  251. .where(Event.camera == name)
  252. .distinct())
  253. # loop over object types in db
  254. for l in distinct_labels:
  255. # get expiration time for this label
  256. expire_days = retain_config.objects.get(l.label, retain_config.default)
  257. expire_after = (datetime.datetime.now() - datetime.timedelta(days=expire_days)).timestamp()
  258. # grab all events after specific time
  259. expired_events = (
  260. Event.select()
  261. .where(Event.camera == name,
  262. Event.start_time < expire_after,
  263. Event.label == l.label)
  264. )
  265. # delete the grabbed clips from disk
  266. for event in expired_events:
  267. media_name = f"{event.camera}-{event.id}"
  268. media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}")
  269. media.unlink(missing_ok=True)
  270. # update the clips attribute for the db entry
  271. update_query = (
  272. Event.update(update_params)
  273. .where( Event.camera == name,
  274. Event.start_time < expire_after,
  275. Event.label == l.label)
  276. )
  277. update_query.execute()
  278. def purge_duplicates(self):
  279. duplicate_query = """with grouped_events as (
  280. select id,
  281. label,
  282. camera,
  283. has_snapshot,
  284. has_clip,
  285. row_number() over (
  286. partition by label, camera, round(start_time/5,0)*5
  287. order by end_time-start_time desc
  288. ) as copy_number
  289. from event
  290. )
  291. select distinct id, camera, has_snapshot, has_clip from grouped_events
  292. where copy_number > 1;"""
  293. duplicate_events = Event.raw(duplicate_query)
  294. for event in duplicate_events:
  295. logger.debug(f"Removing duplicate: {event.id}")
  296. media_name = f"{event.camera}-{event.id}"
  297. if event.has_snapshot:
  298. media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
  299. media.unlink(missing_ok=True)
  300. if event.has_clip:
  301. media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4")
  302. media.unlink(missing_ok=True)
  303. (Event.delete()
  304. .where( Event.id << [event.id for event in duplicate_events] )
  305. .execute())
  306. def run(self):
  307. counter = 0
  308. while(True):
  309. if self.stop_event.is_set():
  310. logger.info(f"Exiting event cleanup...")
  311. break
  312. # only expire events every 5 minutes, but check for stop events every 10 seconds
  313. time.sleep(10)
  314. counter = counter + 1
  315. if counter < 30:
  316. continue
  317. counter = 0
  318. self.expire('clips')
  319. self.expire('snapshots')
  320. self.purge_duplicates()
  321. # drop events from db where has_clip and has_snapshot are false
  322. delete_query = (
  323. Event.delete()
  324. .where( Event.has_clip == False,
  325. Event.has_snapshot == False)
  326. )
  327. delete_query.execute()