events.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. import datetime
  2. import json
  3. import logging
  4. import os
  5. import queue
  6. import subprocess as sp
  7. import threading
  8. import time
  9. from collections import defaultdict
  10. from pathlib import Path
  11. import psutil
  12. from frigate.config import FrigateConfig
  13. from frigate.const import RECORD_DIR, CLIPS_DIR, CACHE_DIR
  14. from frigate.models import Event
  15. from peewee import fn
  16. logger = logging.getLogger(__name__)
  17. class EventProcessor(threading.Thread):
  18. def __init__(self, config, camera_processes, event_queue, event_processed_queue, stop_event):
  19. threading.Thread.__init__(self)
  20. self.name = 'event_processor'
  21. self.config = config
  22. self.camera_processes = camera_processes
  23. self.cached_clips = {}
  24. self.event_queue = event_queue
  25. self.event_processed_queue = event_processed_queue
  26. self.events_in_process = {}
  27. self.stop_event = stop_event
  28. def refresh_cache(self):
  29. cached_files = os.listdir(CACHE_DIR)
  30. files_in_use = []
  31. for process in psutil.process_iter():
  32. try:
  33. if process.name() != 'ffmpeg':
  34. continue
  35. flist = process.open_files()
  36. if flist:
  37. for nt in flist:
  38. if nt.path.startswith(CACHE_DIR):
  39. files_in_use.append(nt.path.split('/')[-1])
  40. except:
  41. continue
  42. for f in cached_files:
  43. if f in files_in_use or f in self.cached_clips:
  44. continue
  45. camera = '-'.join(f.split('-')[:-1])
  46. start_time = datetime.datetime.strptime(f.split('-')[-1].split('.')[0], '%Y%m%d%H%M%S')
  47. ffprobe_cmd = " ".join([
  48. 'ffprobe',
  49. '-v',
  50. 'error',
  51. '-show_entries',
  52. 'format=duration',
  53. '-of',
  54. 'default=noprint_wrappers=1:nokey=1',
  55. f"{os.path.join(CACHE_DIR,f)}"
  56. ])
  57. p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True)
  58. (output, err) = p.communicate()
  59. p_status = p.wait()
  60. if p_status == 0:
  61. duration = float(output.decode('utf-8').strip())
  62. else:
  63. logger.info(f"bad file: {f}")
  64. os.remove(os.path.join(CACHE_DIR,f))
  65. continue
  66. self.cached_clips[f] = {
  67. 'path': f,
  68. 'camera': camera,
  69. 'start_time': start_time.timestamp(),
  70. 'duration': duration
  71. }
  72. if len(self.events_in_process) > 0:
  73. earliest_event = min(self.events_in_process.values(), key=lambda x:x['start_time'])['start_time']
  74. else:
  75. earliest_event = datetime.datetime.now().timestamp()
  76. # if the earliest event exceeds the max seconds, cap it
  77. max_seconds = self.config.clips.max_seconds
  78. if datetime.datetime.now().timestamp()-earliest_event > max_seconds:
  79. earliest_event = datetime.datetime.now().timestamp()-max_seconds
  80. for f, data in list(self.cached_clips.items()):
  81. if earliest_event-90 > data['start_time']+data['duration']:
  82. del self.cached_clips[f]
  83. logger.debug(f"Cleaning up cached file {f}")
  84. os.remove(os.path.join(CACHE_DIR,f))
  85. def create_clip(self, camera, event_data, pre_capture, post_capture):
  86. # get all clips from the camera with the event sorted
  87. sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
  88. # if there are no clips in the cache or we are still waiting on a needed file check every 5 seconds
  89. wait_count = 0
  90. while len(sorted_clips) == 0 or sorted_clips[-1]['start_time'] + sorted_clips[-1]['duration'] < event_data['end_time']+post_capture:
  91. if wait_count > 4:
  92. logger.warning(f"Unable to create clip for {camera} and event {event_data['id']}. There were no cache files for this event.")
  93. return False
  94. logger.debug(f"No cache clips for {camera}. Waiting...")
  95. time.sleep(5)
  96. self.refresh_cache()
  97. # get all clips from the camera with the event sorted
  98. sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
  99. wait_count += 1
  100. playlist_start = event_data['start_time']-pre_capture
  101. playlist_end = event_data['end_time']+post_capture
  102. playlist_lines = []
  103. for clip in sorted_clips:
  104. # clip ends before playlist start time, skip
  105. if clip['start_time']+clip['duration'] < playlist_start:
  106. continue
  107. # clip starts after playlist ends, finish
  108. if clip['start_time'] > playlist_end:
  109. break
  110. playlist_lines.append(f"file '{os.path.join(CACHE_DIR,clip['path'])}'")
  111. # if this is the starting clip, add an inpoint
  112. if clip['start_time'] < playlist_start:
  113. playlist_lines.append(f"inpoint {int(playlist_start-clip['start_time'])}")
  114. # if this is the ending clip, add an outpoint
  115. if clip['start_time']+clip['duration'] > playlist_end:
  116. playlist_lines.append(f"outpoint {int(playlist_end-clip['start_time'])}")
  117. clip_name = f"{camera}-{event_data['id']}"
  118. ffmpeg_cmd = [
  119. 'ffmpeg',
  120. '-y',
  121. '-protocol_whitelist',
  122. 'pipe,file',
  123. '-f',
  124. 'concat',
  125. '-safe',
  126. '0',
  127. '-i',
  128. '-',
  129. '-c',
  130. 'copy',
  131. '-movflags',
  132. '+faststart',
  133. f"{os.path.join(CLIPS_DIR, clip_name)}.mp4"
  134. ]
  135. p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True)
  136. if p.returncode != 0:
  137. logger.error(p.stderr)
  138. return False
  139. return True
  140. def run(self):
  141. while True:
  142. if self.stop_event.is_set():
  143. logger.info(f"Exiting event processor...")
  144. break
  145. try:
  146. event_type, camera, event_data = self.event_queue.get(timeout=10)
  147. except queue.Empty:
  148. if not self.stop_event.is_set():
  149. self.refresh_cache()
  150. continue
  151. logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
  152. self.refresh_cache()
  153. if event_type == 'start':
  154. self.events_in_process[event_data['id']] = event_data
  155. if event_type == 'end':
  156. clips_config = self.config.cameras[camera].clips
  157. if not event_data['false_positive']:
  158. clip_created = False
  159. if clips_config.enabled and (clips_config.objects is None or event_data['label'] in clips_config.objects):
  160. clip_created = self.create_clip(camera, event_data, clips_config.pre_capture, clips_config.post_capture)
  161. Event.create(
  162. id=event_data['id'],
  163. label=event_data['label'],
  164. camera=camera,
  165. start_time=event_data['start_time'],
  166. end_time=event_data['end_time'],
  167. top_score=event_data['top_score'],
  168. false_positive=event_data['false_positive'],
  169. zones=list(event_data['entered_zones']),
  170. thumbnail=event_data['thumbnail'],
  171. has_clip=clip_created,
  172. has_snapshot=event_data['has_snapshot'],
  173. )
  174. del self.events_in_process[event_data['id']]
  175. self.event_processed_queue.put((event_data['id'], camera))
  176. class EventCleanup(threading.Thread):
  177. def __init__(self, config: FrigateConfig, stop_event):
  178. threading.Thread.__init__(self)
  179. self.name = 'event_cleanup'
  180. self.config = config
  181. self.stop_event = stop_event
  182. self.camera_keys = list(self.config.cameras.keys())
  183. def expire(self, media):
  184. ## Expire events from unlisted cameras based on the global config
  185. if media == 'clips':
  186. retain_config = self.config.clips.retain
  187. file_extension = 'mp4'
  188. update_params = {'has_clip': False}
  189. else:
  190. retain_config = self.config.snapshots.retain
  191. file_extension = 'jpg'
  192. update_params = {'has_snapshot': False}
  193. distinct_labels = (Event.select(Event.label)
  194. .where(Event.camera.not_in(self.camera_keys))
  195. .distinct())
  196. # loop over object types in db
  197. for l in distinct_labels:
  198. # get expiration time for this label
  199. expire_days = retain_config.objects.get(l.label, retain_config.default)
  200. expire_after = (datetime.datetime.now() - datetime.timedelta(days=expire_days)).timestamp()
  201. # grab all events after specific time
  202. expired_events = (
  203. Event.select()
  204. .where(Event.camera.not_in(self.camera_keys),
  205. Event.start_time < expire_after,
  206. Event.label == l.label)
  207. )
  208. # delete the media from disk
  209. for event in expired_events:
  210. media_name = f"{event.camera}-{event.id}"
  211. media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}")
  212. media.unlink(missing_ok=True)
  213. # update the clips attribute for the db entry
  214. update_query = (
  215. Event.update(update_params)
  216. .where(Event.camera.not_in(self.camera_keys),
  217. Event.start_time < expire_after,
  218. Event.label == l.label)
  219. )
  220. update_query.execute()
  221. ## Expire events from cameras based on the camera config
  222. for name, camera in self.config.cameras.items():
  223. if media == 'clips':
  224. retain_config = camera.clips.retain
  225. else:
  226. retain_config = camera.snapshots.retain
  227. # get distinct objects in database for this camera
  228. distinct_labels = (Event.select(Event.label)
  229. .where(Event.camera == name)
  230. .distinct())
  231. # loop over object types in db
  232. for l in distinct_labels:
  233. # get expiration time for this label
  234. expire_days = retain_config.objects.get(l.label, retain_config.default)
  235. expire_after = (datetime.datetime.now() - datetime.timedelta(days=expire_days)).timestamp()
  236. # grab all events after specific time
  237. expired_events = (
  238. Event.select()
  239. .where(Event.camera == name,
  240. Event.start_time < expire_after,
  241. Event.label == l.label)
  242. )
  243. # delete the grabbed clips from disk
  244. for event in expired_events:
  245. media_name = f"{event.camera}-{event.id}"
  246. media = Path(f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}")
  247. media.unlink(missing_ok=True)
  248. # update the clips attribute for the db entry
  249. update_query = (
  250. Event.update(update_params)
  251. .where( Event.camera == name,
  252. Event.start_time < expire_after,
  253. Event.label == l.label)
  254. )
  255. update_query.execute()
  256. def run(self):
  257. counter = 0
  258. while(True):
  259. if self.stop_event.is_set():
  260. logger.info(f"Exiting event cleanup...")
  261. break
  262. # only expire events every 10 minutes, but check for stop events every 10 seconds
  263. time.sleep(10)
  264. counter = counter + 1
  265. if counter < 60:
  266. continue
  267. counter = 0
  268. self.expire('clips')
  269. self.expire('snapshots')
  270. # drop events from db where has_clip and has_snapshot are false
  271. delete_query = (
  272. Event.delete()
  273. .where( Event.has_clip == False,
  274. Event.has_snapshot == False)
  275. )
  276. delete_query.execute()