events.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import os
  2. import time
  3. import psutil
  4. import threading
  5. from collections import defaultdict
  6. import json
  7. import datetime
  8. import subprocess as sp
  9. import queue
  10. from tinydb import TinyDB
  11. class EventProcessor(threading.Thread):
  12. def __init__(self, config, camera_processes, cache_dir, clip_dir, event_queue, stop_event):
  13. threading.Thread.__init__(self)
  14. self.config = config
  15. self.camera_processes = camera_processes
  16. self.cache_dir = cache_dir
  17. self.clip_dir = clip_dir
  18. self.cached_clips = {}
  19. self.event_queue = event_queue
  20. self.events_in_process = {}
  21. self.stop_event = stop_event
  22. self.db = TinyDB(f"{os.path.join(self.clip_dir, 'events')}.json")
  23. def refresh_cache(self):
  24. cached_files = os.listdir(self.cache_dir)
  25. files_in_use = []
  26. for process_data in self.camera_processes.values():
  27. try:
  28. ffmpeg_process = psutil.Process(pid=process_data['ffmpeg_pid'].value)
  29. flist = ffmpeg_process.open_files()
  30. if flist:
  31. for nt in flist:
  32. if nt.path.startswith(self.cache_dir):
  33. files_in_use.append(nt.path.split('/')[-1])
  34. except:
  35. continue
  36. for f in cached_files:
  37. if f in files_in_use or f in self.cached_clips:
  38. continue
  39. camera = '-'.join(f.split('-')[:-1])
  40. start_time = datetime.datetime.strptime(f.split('-')[-1].split('.')[0], '%Y%m%d%H%M%S')
  41. ffprobe_cmd = " ".join([
  42. 'ffprobe',
  43. '-v',
  44. 'error',
  45. '-show_entries',
  46. 'format=duration',
  47. '-of',
  48. 'default=noprint_wrappers=1:nokey=1',
  49. f"{os.path.join(self.cache_dir,f)}"
  50. ])
  51. p = sp.Popen(ffprobe_cmd, stdout=sp.PIPE, shell=True)
  52. (output, err) = p.communicate()
  53. p_status = p.wait()
  54. if p_status == 0:
  55. duration = float(output.decode('utf-8').strip())
  56. else:
  57. print(f"bad file: {f}")
  58. os.remove(os.path.join(self.cache_dir,f))
  59. continue
  60. self.cached_clips[f] = {
  61. 'path': f,
  62. 'camera': camera,
  63. 'start_time': start_time.timestamp(),
  64. 'duration': duration
  65. }
  66. if len(self.events_in_process) > 0:
  67. earliest_event = min(self.events_in_process.values(), key=lambda x:x['start_time'])['start_time']
  68. else:
  69. earliest_event = datetime.datetime.now().timestamp()
  70. # if the earliest event exceeds the max seconds, cap it
  71. max_seconds = self.config.get('save_clips', {}).get('max_seconds', 300)
  72. if datetime.datetime.now().timestamp()-earliest_event > max_seconds:
  73. earliest_event = datetime.datetime.now().timestamp()-max_seconds
  74. for f, data in list(self.cached_clips.items()):
  75. if earliest_event-90 > data['start_time']+data['duration']:
  76. del self.cached_clips[f]
  77. os.remove(os.path.join(self.cache_dir,f))
  78. def create_clip(self, camera, event_data, pre_capture):
  79. # get all clips from the camera with the event sorted
  80. sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
  81. while sorted_clips[-1]['start_time'] + sorted_clips[-1]['duration'] < event_data['end_time']:
  82. time.sleep(5)
  83. self.refresh_cache()
  84. # get all clips from the camera with the event sorted
  85. sorted_clips = sorted([c for c in self.cached_clips.values() if c['camera'] == camera], key = lambda i: i['start_time'])
  86. playlist_start = event_data['start_time']-pre_capture
  87. playlist_end = event_data['end_time']+5
  88. playlist_lines = []
  89. for clip in sorted_clips:
  90. # clip ends before playlist start time, skip
  91. if clip['start_time']+clip['duration'] < playlist_start:
  92. continue
  93. # clip starts after playlist ends, finish
  94. if clip['start_time'] > playlist_end:
  95. break
  96. playlist_lines.append(f"file '{os.path.join(self.cache_dir,clip['path'])}'")
  97. # if this is the starting clip, add an inpoint
  98. if clip['start_time'] < playlist_start:
  99. playlist_lines.append(f"inpoint {int(playlist_start-clip['start_time'])}")
  100. # if this is the ending clip, add an outpoint
  101. if clip['start_time']+clip['duration'] > playlist_end:
  102. playlist_lines.append(f"outpoint {int(playlist_end-clip['start_time'])}")
  103. clip_name = f"{camera}-{event_data['id']}"
  104. ffmpeg_cmd = [
  105. 'ffmpeg',
  106. '-y',
  107. '-protocol_whitelist',
  108. 'pipe,file',
  109. '-f',
  110. 'concat',
  111. '-safe',
  112. '0',
  113. '-i',
  114. '-',
  115. '-c',
  116. 'copy',
  117. f"{os.path.join(self.clip_dir, clip_name)}.mp4"
  118. ]
  119. p = sp.run(ffmpeg_cmd, input="\n".join(playlist_lines), encoding='ascii', capture_output=True)
  120. if p.returncode != 0:
  121. print(p.stderr)
  122. return
  123. with open(f"{os.path.join(self.clip_dir, clip_name)}.json", 'w') as outfile:
  124. json.dump({
  125. 'id': event_data['id'],
  126. 'label': event_data['label'],
  127. 'camera': camera,
  128. 'start_time': event_data['start_time'],
  129. 'end_time': event_data['end_time'],
  130. 'top_score': event_data['top_score'],
  131. 'false_positive': event_data['false_positive'],
  132. 'zones': list(event_data['entered_zones'])
  133. }, outfile)
  134. def run(self):
  135. while True:
  136. if self.stop_event.is_set():
  137. print(f"Exiting event processor...")
  138. break
  139. try:
  140. event_type, camera, event_data = self.event_queue.get(timeout=10)
  141. except queue.Empty:
  142. if not self.stop_event.is_set():
  143. self.refresh_cache()
  144. continue
  145. self.refresh_cache()
  146. save_clips_config = self.config['cameras'][camera].get('save_clips', {})
  147. # if save clips is not enabled for this camera, just continue
  148. if not save_clips_config.get('enabled', False):
  149. continue
  150. # if specific objects are listed for this camera, only save clips for them
  151. if 'objects' in save_clips_config:
  152. if not event_data['label'] in save_clips_config['objects']:
  153. continue
  154. if event_type == 'start':
  155. self.events_in_process[event_data['id']] = event_data
  156. if event_type == 'end':
  157. self.db.insert({
  158. 'id': event_data['id'],
  159. 'label': event_data['label'],
  160. 'camera': camera,
  161. 'start_time': event_data['start_time'],
  162. 'end_time': event_data['end_time'],
  163. 'top_score': event_data['top_score'],
  164. 'false_positive': event_data['false_positive'],
  165. 'zones': list(event_data['entered_zones'])
  166. })
  167. if len(self.cached_clips) > 0 and not event_data['false_positive']:
  168. self.create_clip(camera, event_data, save_clips_config.get('pre_capture', 30))
  169. del self.events_in_process[event_data['id']]