events.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. import os
  2. import time
  3. import psutil
  4. import threading
  5. import logging
  6. from collections import defaultdict
  7. import json
  8. import datetime
  9. import subprocess as sp
  10. import queue
  11. from frigate.models import Event
  12. logger = logging.getLogger(__name__)
  13. class EventProcessor(threading.Thread):
  14. def __init__(self, config, camera_processes, event_queue, stop_event):
  15. threading.Thread.__init__(self)
  16. self.name = 'event_processor'
  17. self.config = config
  18. self.cache_dir = self.config.save_clips.cache_dir
  19. self.clips_dir = self.config.save_clips.clips_dir
  20. self.camera_processes = camera_processes
  21. self.cached_clips = {}
  22. self.event_queue = event_queue
  23. self.events_in_process = {}
  24. self.stop_event = stop_event
  25. def refresh_cache(self):
  26. cached_files = os.listdir(self.cache_dir)
  27. files_in_use = []
  28. for process_data in self.camera_processes.values():
  29. try:
  30. ffmpeg_process = psutil.Process(pid=process_data['ffmpeg_pid'].value)
  31. flist = ffmpeg_process.open_files()
  32. if flist:
  33. for nt in flist:
  34. if nt.path.startswith(self.cache_dir):
  35. files_in_use.append(nt.path.split('/')[-1])
  36. except:
  37. continue
  38. for f in cached_files:
  39. if f in files_in_use or f in self.cached_clips:
  40. continue
  41. camera = '-'.join(f.split('-')[:-1])
  42. start_time = datetime.datetime.strptime(f.split('-')[-1].split('.')[0], '%Y%m%d%H%M%S')
  43. ffprobe_cmd = " ".join([
  44. 'ffprobe',
  45. '-v',
  46. 'error',
  47. '-show_entries',
  48. 'format=duration',
  49. '-of',
  50. 'default=noprint_wrappers=1:nokey=1',
  51. f"{os.path.join(self.cache_dir,f)}"
  52. ])
  53. p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True)
  54. (output, err) = p.communicate()
  55. p_status = p.wait()
  56. if p_status == 0:
  57. duration = float(output.decode('utf-8').strip())
  58. else:
  59. logger.info(f"bad file: {f}")
  60. os.remove(os.path.join(self.cache_dir,f))
  61. continue
  62. self.cached_clips[f] = {
  63. 'path': f,
  64. 'camera': camera,
  65. 'start_time': start_time.timestamp(),
  66. 'duration': duration
  67. }
  68. if len(self.events_in_process) > 0:
  69. earliest_event = min(self.events_in_process.values(), key=lambda x:x['start_time'])['start_time']
  70. else:
  71. earliest_event = datetime.datetime.now().timestamp()
  72. # if the earliest event exceeds the max seconds, cap it
  73. max_seconds = self.config.save_clips.max_seconds
  74. if datetime.datetime.now().timestamp()-earliest_event > max_seconds:
  75. earliest_event = datetime.datetime.now().timestamp()-max_seconds
  76. for f, data in list(self.cached_clips.items()):
  77. if earliest_event-90 > data['start_time']+data['duration']:
  78. del self.cached_clips[f]
  79. os.remove(os.path.join(self.cache_dir,f))
  80. def create_clip(self, camera, event_data, pre_capture):
  81. # get all clips from the camera with the event sorted
  82. sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
  83. while sorted_clips[-1]['start_time'] + sorted_clips[-1]['duration'] < event_data['end_time']:
  84. time.sleep(5)
  85. self.refresh_cache()
  86. # get all clips from the camera with the event sorted
  87. sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
  88. playlist_start = event_data['start_time']-pre_capture
  89. playlist_end = event_data['end_time']+5
  90. playlist_lines = []
  91. for clip in sorted_clips:
  92. # clip ends before playlist start time, skip
  93. if clip['start_time']+clip['duration'] < playlist_start:
  94. continue
  95. # clip starts after playlist ends, finish
  96. if clip['start_time'] > playlist_end:
  97. break
  98. playlist_lines.append(f"file '{os.path.join(self.cache_dir,clip['path'])}'")
  99. # if this is the starting clip, add an inpoint
  100. if clip['start_time'] < playlist_start:
  101. playlist_lines.append(f"inpoint {int(playlist_start-clip['start_time'])}")
  102. # if this is the ending clip, add an outpoint
  103. if clip['start_time']+clip['duration'] > playlist_end:
  104. playlist_lines.append(f"outpoint {int(playlist_end-clip['start_time'])}")
  105. clip_name = f"{camera}-{event_data['id']}"
  106. ffmpeg_cmd = [
  107. 'ffmpeg',
  108. '-y',
  109. '-protocol_whitelist',
  110. 'pipe,file',
  111. '-f',
  112. 'concat',
  113. '-safe',
  114. '0',
  115. '-i',
  116. '-',
  117. '-c',
  118. 'copy',
  119. f"{os.path.join(self.clips_dir, clip_name)}.mp4"
  120. ]
  121. p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True)
  122. if p.returncode != 0:
  123. logger.error(p.stderr)
  124. return
  125. with open(f"{os.path.join(self.clips_dir, clip_name)}.json", 'w') as outfile:
  126. json.dump({
  127. 'id': event_data['id'],
  128. 'label': event_data['label'],
  129. 'camera': camera,
  130. 'start_time': event_data['start_time'],
  131. 'end_time': event_data['end_time'],
  132. 'top_score': event_data['top_score'],
  133. 'false_positive': event_data['false_positive'],
  134. 'zones': list(event_data['entered_zones'])
  135. }, outfile)
  136. def run(self):
  137. while True:
  138. if self.stop_event.is_set():
  139. logger.info(f"Exiting event processor...")
  140. break
  141. try:
  142. event_type, camera, event_data = self.event_queue.get(timeout=10)
  143. except queue.Empty:
  144. if not self.stop_event.is_set():
  145. self.refresh_cache()
  146. continue
  147. self.refresh_cache()
  148. save_clips_config = self.config.cameras[camera].save_clips
  149. # if save clips is not enabled for this camera, just continue
  150. if not save_clips_config.enabled:
  151. continue
  152. # if specific objects are listed for this camera, only save clips for them
  153. # TODO: default to all tracked objects rather than checking for None
  154. if save_clips_config.objects:
  155. if not event_data['label'] in save_clips_config.objects:
  156. continue
  157. if event_type == 'start':
  158. self.events_in_process[event_data['id']] = event_data
  159. if event_type == 'end':
  160. Event.create(
  161. id=event_data['id'],
  162. label=event_data['label'],
  163. camera=camera,
  164. start_time=event_data['start_time'],
  165. end_time=event_data['end_time'],
  166. top_score=event_data['top_score'],
  167. false_positive=event_data['false_positive'],
  168. zones=list(event_data['entered_zones'])
  169. )
  170. if len(self.cached_clips) > 0 and not event_data['false_positive']:
  171. self.create_clip(camera, event_data, save_clips_config.pre_capture)
  172. del self.events_in_process[event_data['id']]