events.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import datetime
  2. import json
  3. import logging
  4. import os
  5. import queue
  6. import subprocess as sp
  7. import threading
  8. import time
  9. from collections import defaultdict
  10. import psutil
  11. from frigate.models import Event
  12. logger = logging.getLogger(__name__)
  13. class EventProcessor(threading.Thread):
  14. def __init__(self, config, camera_processes, event_queue, stop_event):
  15. threading.Thread.__init__(self)
  16. self.name = 'event_processor'
  17. self.config = config
  18. self.cache_dir = self.config.save_clips.cache_dir
  19. self.clips_dir = self.config.save_clips.clips_dir
  20. self.camera_processes = camera_processes
  21. self.cached_clips = {}
  22. self.event_queue = event_queue
  23. self.events_in_process = {}
  24. self.stop_event = stop_event
  25. def refresh_cache(self):
  26. cached_files = os.listdir(self.cache_dir)
  27. files_in_use = []
  28. for process_data in self.camera_processes.values():
  29. try:
  30. ffmpeg_process = psutil.Process(pid=process_data['ffmpeg_pid'].value)
  31. flist = ffmpeg_process.open_files()
  32. if flist:
  33. for nt in flist:
  34. if nt.path.startswith(self.cache_dir):
  35. files_in_use.append(nt.path.split('/')[-1])
  36. except:
  37. continue
  38. for f in cached_files:
  39. if f in files_in_use or f in self.cached_clips:
  40. continue
  41. camera = '-'.join(f.split('-')[:-1])
  42. start_time = datetime.datetime.strptime(f.split('-')[-1].split('.')[0], '%Y%m%d%H%M%S')
  43. ffprobe_cmd = " ".join([
  44. 'ffprobe',
  45. '-v',
  46. 'error',
  47. '-show_entries',
  48. 'format=duration',
  49. '-of',
  50. 'default=noprint_wrappers=1:nokey=1',
  51. f"{os.path.join(self.cache_dir,f)}"
  52. ])
  53. p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True)
  54. (output, err) = p.communicate()
  55. p_status = p.wait()
  56. if p_status == 0:
  57. duration = float(output.decode('utf-8').strip())
  58. else:
  59. logger.info(f"bad file: {f}")
  60. os.remove(os.path.join(self.cache_dir,f))
  61. continue
  62. self.cached_clips[f] = {
  63. 'path': f,
  64. 'camera': camera,
  65. 'start_time': start_time.timestamp(),
  66. 'duration': duration
  67. }
  68. if len(self.events_in_process) > 0:
  69. earliest_event = min(self.events_in_process.values(), key=lambda x:x['start_time'])['start_time']
  70. else:
  71. earliest_event = datetime.datetime.now().timestamp()
  72. # if the earliest event exceeds the max seconds, cap it
  73. max_seconds = self.config.save_clips.max_seconds
  74. if datetime.datetime.now().timestamp()-earliest_event > max_seconds:
  75. earliest_event = datetime.datetime.now().timestamp()-max_seconds
  76. for f, data in list(self.cached_clips.items()):
  77. if earliest_event-90 > data['start_time']+data['duration']:
  78. del self.cached_clips[f]
  79. os.remove(os.path.join(self.cache_dir,f))
  80. def create_clip(self, camera, event_data, pre_capture):
  81. # get all clips from the camera with the event sorted
  82. sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
  83. while sorted_clips[-1]['start_time'] + sorted_clips[-1]['duration'] < event_data['end_time']:
  84. time.sleep(5)
  85. self.refresh_cache()
  86. # get all clips from the camera with the event sorted
  87. sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
  88. playlist_start = event_data['start_time']-pre_capture
  89. playlist_end = event_data['end_time']+5
  90. playlist_lines = []
  91. for clip in sorted_clips:
  92. # clip ends before playlist start time, skip
  93. if clip['start_time']+clip['duration'] < playlist_start:
  94. continue
  95. # clip starts after playlist ends, finish
  96. if clip['start_time'] > playlist_end:
  97. break
  98. playlist_lines.append(f"file '{os.path.join(self.cache_dir,clip['path'])}'")
  99. # if this is the starting clip, add an inpoint
  100. if clip['start_time'] < playlist_start:
  101. playlist_lines.append(f"inpoint {int(playlist_start-clip['start_time'])}")
  102. # if this is the ending clip, add an outpoint
  103. if clip['start_time']+clip['duration'] > playlist_end:
  104. playlist_lines.append(f"outpoint {int(playlist_end-clip['start_time'])}")
  105. clip_name = f"{camera}-{event_data['id']}"
  106. ffmpeg_cmd = [
  107. 'ffmpeg',
  108. '-y',
  109. '-protocol_whitelist',
  110. 'pipe,file',
  111. '-f',
  112. 'concat',
  113. '-safe',
  114. '0',
  115. '-i',
  116. '-',
  117. '-c',
  118. 'copy',
  119. f"{os.path.join(self.clips_dir, clip_name)}.mp4"
  120. ]
  121. p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True)
  122. if p.returncode != 0:
  123. logger.error(p.stderr)
  124. return
  125. def run(self):
  126. while True:
  127. if self.stop_event.is_set():
  128. logger.info(f"Exiting event processor...")
  129. break
  130. try:
  131. event_type, camera, event_data = self.event_queue.get(timeout=10)
  132. except queue.Empty:
  133. if not self.stop_event.is_set():
  134. self.refresh_cache()
  135. continue
  136. self.refresh_cache()
  137. save_clips_config = self.config.cameras[camera].save_clips
  138. # if save clips is not enabled for this camera, just continue
  139. if not save_clips_config.enabled:
  140. continue
  141. # if specific objects are listed for this camera, only save clips for them
  142. # TODO: default to all tracked objects rather than checking for None
  143. if save_clips_config.objects:
  144. if not event_data['label'] in save_clips_config.objects:
  145. continue
  146. if event_type == 'start':
  147. self.events_in_process[event_data['id']] = event_data
  148. if event_type == 'end':
  149. Event.create(
  150. id=event_data['id'],
  151. label=event_data['label'],
  152. camera=camera,
  153. start_time=event_data['start_time'],
  154. end_time=event_data['end_time'],
  155. top_score=event_data['top_score'],
  156. false_positive=event_data['false_positive'],
  157. zones=list(event_data['entered_zones']),
  158. thumbnail=event_data['thumbnail']
  159. )
  160. if len(self.cached_clips) > 0 and not event_data['false_positive']:
  161. self.create_clip(camera, event_data, save_clips_config.pre_capture)
  162. del self.events_in_process[event_data['id']]