events.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. import datetime
  2. import logging
  3. import os
  4. import queue
  5. import threading
  6. import time
  7. from pathlib import Path
  8. from peewee import fn
  9. from frigate.config import EventsConfig, FrigateConfig, RecordConfig
  10. from frigate.const import CLIPS_DIR
  11. from frigate.models import Event
  12. logger = logging.getLogger(__name__)
  13. class EventProcessor(threading.Thread):
  14. def __init__(
  15. self, config, camera_processes, event_queue, event_processed_queue, stop_event
  16. ):
  17. threading.Thread.__init__(self)
  18. self.name = "event_processor"
  19. self.config = config
  20. self.camera_processes = camera_processes
  21. self.cached_clips = {}
  22. self.event_queue = event_queue
  23. self.event_processed_queue = event_processed_queue
  24. self.events_in_process = {}
  25. self.stop_event = stop_event
  26. def run(self):
  27. while not self.stop_event.is_set():
  28. try:
  29. event_type, camera, event_data = self.event_queue.get(timeout=10)
  30. except queue.Empty:
  31. continue
  32. logger.debug(f"Event received: {event_type} {camera} {event_data['id']}")
  33. if event_type == "start":
  34. self.events_in_process[event_data["id"]] = event_data
  35. if event_type == "end":
  36. event_config: EventsConfig = self.config.cameras[camera].record.events
  37. if event_data["has_clip"] or event_data["has_snapshot"]:
  38. Event.create(
  39. id=event_data["id"],
  40. label=event_data["label"],
  41. camera=camera,
  42. start_time=event_data["start_time"] - event_config.pre_capture,
  43. end_time=event_data["end_time"] + event_config.post_capture,
  44. top_score=event_data["top_score"],
  45. false_positive=event_data["false_positive"],
  46. zones=list(event_data["entered_zones"]),
  47. thumbnail=event_data["thumbnail"],
  48. region=event_data["region"],
  49. box=event_data["box"],
  50. area=event_data["area"],
  51. has_clip=event_data["has_clip"],
  52. has_snapshot=event_data["has_snapshot"],
  53. )
  54. del self.events_in_process[event_data["id"]]
  55. self.event_processed_queue.put((event_data["id"], camera))
  56. logger.info(f"Exiting event processor...")
  57. class EventCleanup(threading.Thread):
  58. def __init__(self, config: FrigateConfig, stop_event):
  59. threading.Thread.__init__(self)
  60. self.name = "event_cleanup"
  61. self.config = config
  62. self.stop_event = stop_event
  63. self.camera_keys = list(self.config.cameras.keys())
  64. def expire(self, media_type):
  65. ## Expire events from unlisted cameras based on the global config
  66. if media_type == "clips":
  67. retain_config = self.config.record.events.retain
  68. file_extension = "mp4"
  69. update_params = {"has_clip": False}
  70. else:
  71. retain_config = self.config.snapshots.retain
  72. file_extension = "jpg"
  73. update_params = {"has_snapshot": False}
  74. distinct_labels = (
  75. Event.select(Event.label)
  76. .where(Event.camera.not_in(self.camera_keys))
  77. .distinct()
  78. )
  79. # loop over object types in db
  80. for l in distinct_labels:
  81. # get expiration time for this label
  82. expire_days = retain_config.objects.get(l.label, retain_config.default)
  83. expire_after = (
  84. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  85. ).timestamp()
  86. # grab all events after specific time
  87. expired_events = Event.select().where(
  88. Event.camera.not_in(self.camera_keys),
  89. Event.start_time < expire_after,
  90. Event.label == l.label,
  91. )
  92. # delete the media from disk
  93. for event in expired_events:
  94. media_name = f"{event.camera}-{event.id}"
  95. media_path = Path(
  96. f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
  97. )
  98. media_path.unlink(missing_ok=True)
  99. if file_extension == "jpg":
  100. media_path = Path(
  101. f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
  102. )
  103. media_path.unlink(missing_ok=True)
  104. # update the clips attribute for the db entry
  105. update_query = Event.update(update_params).where(
  106. Event.camera.not_in(self.camera_keys),
  107. Event.start_time < expire_after,
  108. Event.label == l.label,
  109. )
  110. update_query.execute()
  111. ## Expire events from cameras based on the camera config
  112. for name, camera in self.config.cameras.items():
  113. if media_type == "clips":
  114. retain_config = camera.record.events.retain
  115. else:
  116. retain_config = camera.snapshots.retain
  117. # get distinct objects in database for this camera
  118. distinct_labels = (
  119. Event.select(Event.label).where(Event.camera == name).distinct()
  120. )
  121. # loop over object types in db
  122. for l in distinct_labels:
  123. # get expiration time for this label
  124. expire_days = retain_config.objects.get(l.label, retain_config.default)
  125. expire_after = (
  126. datetime.datetime.now() - datetime.timedelta(days=expire_days)
  127. ).timestamp()
  128. # grab all events after specific time
  129. expired_events = Event.select().where(
  130. Event.camera == name,
  131. Event.start_time < expire_after,
  132. Event.label == l.label,
  133. )
  134. # delete the grabbed clips from disk
  135. for event in expired_events:
  136. media_name = f"{event.camera}-{event.id}"
  137. media_path = Path(
  138. f"{os.path.join(CLIPS_DIR, media_name)}.{file_extension}"
  139. )
  140. media_path.unlink(missing_ok=True)
  141. if file_extension == "jpg":
  142. media_path = Path(
  143. f"{os.path.join(CLIPS_DIR, media_name)}-clean.png"
  144. )
  145. media_path.unlink(missing_ok=True)
  146. # update the clips attribute for the db entry
  147. update_query = Event.update(update_params).where(
  148. Event.camera == name,
  149. Event.start_time < expire_after,
  150. Event.label == l.label,
  151. )
  152. update_query.execute()
  153. def purge_duplicates(self):
  154. duplicate_query = """with grouped_events as (
  155. select id,
  156. label,
  157. camera,
  158. has_snapshot,
  159. has_clip,
  160. row_number() over (
  161. partition by label, camera, round(start_time/5,0)*5
  162. order by end_time-start_time desc
  163. ) as copy_number
  164. from event
  165. )
  166. select distinct id, camera, has_snapshot, has_clip from grouped_events
  167. where copy_number > 1;"""
  168. duplicate_events = Event.raw(duplicate_query)
  169. for event in duplicate_events:
  170. logger.debug(f"Removing duplicate: {event.id}")
  171. media_name = f"{event.camera}-{event.id}"
  172. media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.jpg")
  173. media_path.unlink(missing_ok=True)
  174. media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}-clean.png")
  175. media_path.unlink(missing_ok=True)
  176. media_path = Path(f"{os.path.join(CLIPS_DIR, media_name)}.mp4")
  177. media_path.unlink(missing_ok=True)
  178. (
  179. Event.delete()
  180. .where(Event.id << [event.id for event in duplicate_events])
  181. .execute()
  182. )
  183. def run(self):
  184. # only expire events every 5 minutes
  185. while not self.stop_event.wait(300):
  186. self.expire("clips")
  187. self.expire("snapshots")
  188. self.purge_duplicates()
  189. # drop events from db where has_clip and has_snapshot are false
  190. delete_query = Event.delete().where(
  191. Event.has_clip == False, Event.has_snapshot == False
  192. )
  193. delete_query.execute()
  194. logger.info(f"Exiting event cleanup...")