events.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. import datetime
  2. import logging
  3. import os
  4. import queue
  5. import threading
  6. import time
  7. from pathlib import Path
  8. from frigate.config import FrigateConfig, RecordConfig
  9. from frigate.const import CLIPS_DIR
  10. from frigate.models import Event, Recordings
  11. from peewee import fn
  12. logger = logging.getLogger(__name__)
  13. class EventProcessor(threading.Thread):
  14. def __init__(
  15. self, config, camera_processes, event_queue, event_processed_queue, stop_event
  16. ):
  17. threading.Thread.__init__(self)
  18. self.name = "event_processor"
  19. self.config = config
  20. self.camera_processes = camera_processes
  21. self.cached_clips = {}
  22. self.event_queue = event_queue
  23. self.event_processed_queue = event_processed_queue
  24. self.events_in_process = {}
  25. self.stop_event = stop_event
  26. def should_create_clip(self, camera, event_data):
  27. if event_data["false_positive"]:
  28. return False
  29. record_config: RecordConfig = self.config.cameras[camera].record
  30. # Recording clips is disabled
  31. if not record_config.enabled or (
  32. record_config.retain_days == 0 and not record_config.events.enabled
  33. ):
  34. return False
  35. # If there are required zones and there is no overlap
  36. required_zones = record_config.events.required_zones
  37. if len(required_zones) > 0 and not set(event_data["entered_zones"]) & set(
  38. required_zones
  39. ):
  40. logger.debug(
  41. f"Not creating clip for {event_data['id']} because it did not enter required zones"
  42. )
  43. return False
  44. # If the required objects are not present
  45. if (
  46. record_config.events.objects is not None
  47. and event_data["label"] not in record_config.events.objects
  48. ):
  49. logger.debug(
  50. f"Not creating clip for {event_data['id']} because it did not contain required objects"
  51. )
  52. return False
  53. return True
  54. def verify_clip(self, camera, end_time):
  55. # check every 5 seconds for the last required recording
  56. for _ in range(4):
  57. recordings_count = (
  58. Recordings.select()
  59. .where(Recordings.camera == camera, Recordings.end_time > end_time)
  60. .limit(1)
  61. .count()
  62. )
  63. if recordings_count > 0:
  64. return True
  65. logger.debug(f"Missing recording for {camera} clip. Waiting...")
  66. time.sleep(5)
  67. logger.warning(
  68. f"Unable to verify clip for {camera}. There were no recordings for this camera."
  69. )
  70. return False
  71. def run(self):
  72. while not self.stop_event.is_set():
  73. try:
  74. event_type, camera, event_data = self.event_queue.get(timeout=10)
  75. except queue.Empty:
  76. # if not self.stop_event.is_set():
  77. # self.refresh_cache()
  78. continue
  79. logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
  80. # self.refresh_cache()
  81. if event_type == "start":
  82. self.events_in_process[event_data["id"]] = event_data
  83. if event_type == "end":
  84. record_config: RecordConfig = self.config.cameras[camera].record
  85. has_clip = self.should_create_clip(camera, event_data)
  86. # Wait for recordings to be ready
  87. if has_clip:
  88. has_clip = self.verify_clip(
  89. camera,
  90. event_data["end_time"] + record_config.events.post_capture,
  91. )
  92. if has_clip or event_data["has_snapshot"]:
  93. Event.create(
  94. id=event_data["id"],
  95. label=event_data["label"],
  96. camera=camera,
  97. start_time=event_data["start_time"],
  98. end_time=event_data["end_time"],
  99. top_score=event_data["top_score"],
  100. false_positive=event_data["false_positive"],
  101. zones=list(event_data["entered_zones"]),
  102. thumbnail=event_data["thumbnail"],
  103. has_clip=has_clip,
  104. has_snapshot=event_data["has_snapshot"],
  105. )
  106. del self.events_in_process[event_data["id"]]
  107. self.event_processed_queue.put((event_data["id"], camera, has_clip))
  108. logger.info(f"Exiting event processor...")
  109. class EventCleanup(threading.Thread):
  110. def __init__(self, config: FrigateConfig, stop_event):
  111. threading.Thread.__init__(self)
  112. self.name = "event_cleanup"
  113. self.config = config
  114. self.stop_event = stop_event
  115. self.camera_keys = list(self.config.cameras.keys())
  116. def expire(self, media_type):
  117. ## Expire events from unlisted cameras based on the global config
  118. if media_type == "clips":
  119. retain_config = self.config.clips.retain
  120. file_extension = "mp4"
  121. update_params = {"has_clip": False}
  122. else:
  123. retain_config = self.config.snapshots.retain
  124. file_extension = "jpg"
  125. update_params = {"has_snapshot": False}
  126. distinct_labels = (
  127. Event.select(Event.label)
  128. .where(Event.camera.not_in(self.camera_keys))
  129. .distinct()
  130. )
  131. # loop over object types in db
  132. for l in distinct_labels:
  133. # get expiration time for this label
  134. expire_days = retain_config.objects.get(l.label, retain_config.default)
  135. expire_after = (
  136. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  137. ).timestamp()
  138. # grab all events after specific time
  139. expired_events = Event.select().where(
  140. Event.camera.not_in(self.camera_keys),
  141. Event.start_time < expire_after,
  142. Event.label == l.label,
  143. )
  144. # delete the media from disk
  145. for event in expired_events:
  146. media_name = f"{event.camera}-{event.id}"
  147. media_path = Path(
  148. f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
  149. )
  150. media_path.unlink(missing_ok=True)
  151. if file_extension == "jpg":
  152. media_path = Path(
  153. f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
  154. )
  155. media_path.unlink(missing_ok=True)
  156. # update the clips attribute for the db entry
  157. update_query = Event.update(update_params).where(
  158. Event.camera.not_in(self.camera_keys),
  159. Event.start_time < expire_after,
  160. Event.label == l.label,
  161. )
  162. update_query.execute()
  163. ## Expire events from cameras based on the camera config
  164. for name, camera in self.config.cameras.items():
  165. if media_type == "clips":
  166. retain_config = camera.clips.retain
  167. else:
  168. retain_config = camera.snapshots.retain
  169. # get distinct objects in database for this camera
  170. distinct_labels = (
  171. Event.select(Event.label).where(Event.camera == name).distinct()
  172. )
  173. # loop over object types in db
  174. for l in distinct_labels:
  175. # get expiration time for this label
  176. expire_days = retain_config.objects.get(l.label, retain_config.default)
  177. expire_after = (
  178. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  179. ).timestamp()
  180. # grab all events after specific time
  181. expired_events = Event.select().where(
  182. Event.camera == name,
  183. Event.start_time < expire_after,
  184. Event.label == l.label,
  185. )
  186. # delete the grabbed clips from disk
  187. for event in expired_events:
  188. media_name = f"{event.camera}-{event.id}"
  189. media_path = Path(
  190. f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
  191. )
  192. media_path.unlink(missing_ok=True)
  193. if file_extension == "jpg":
  194. media_path = Path(
  195. f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
  196. )
  197. media_path.unlink(missing_ok=True)
  198. # update the clips attribute for the db entry
  199. update_query = Event.update(update_params).where(
  200. Event.camera == name,
  201. Event.start_time < expire_after,
  202. Event.label == l.label,
  203. )
  204. update_query.execute()
  205. def purge_duplicates(self):
  206. duplicate_query = """with grouped_events as (
  207. select id,
  208. label,
  209. camera,
  210. has_snapshot,
  211. has_clip,
  212. row_number() over (
  213. partition by label, camera, round(start_time/5,0)*5
  214. order by end_time-start_time desc
  215. ) as copy_number
  216. from event
  217. )
  218. select distinct id, camera, has_snapshot, has_clip from grouped_events
  219. where copy_number > 1;"""
  220. duplicate_events = Event.raw(duplicate_query)
  221. for event in duplicate_events:
  222. logger.debug(f"Removing duplicate: {event.id}")
  223. media_name = f"{event.camera}-{event.id}"
  224. if event.has_snapshot:
  225. media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
  226. media_path.unlink(missing_ok=True)
  227. if event.has_clip:
  228. media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4")
  229. media_path.unlink(missing_ok=True)
  230. (
  231. Event.delete()
  232. .where(Event.id << [event.id for event in duplicate_events])
  233. .execute()
  234. )
  235. def run(self):
  236. # only expire events every 5 minutes
  237. while not self.stop_event.wait(300):
  238. self.expire("clips")
  239. self.expire("snapshots")
  240. self.purge_duplicates()
  241. # drop events from db where has_clip and has_snapshot are false
  242. delete_query = Event.delete().where(
  243. Event.has_clip == False, Event.has_snapshot == False
  244. )
  245. delete_query.execute()
  246. logger.info(f"Exiting event cleanup...")